/* ** Zabbix ** Copyright (C) 2001-2022 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ #include "common.h" #include "daemon.h" #include "zbxself.h" #include "log.h" #include "db.h" #include "dbcache.h" #include "zbxtasks.h" #include "../events.h" #include "../actions.h" #include "export.h" #include "taskmanager.h" #include "zbxdiag.h" #define ZBX_TM_PROCESS_PERIOD 5 #define ZBX_TM_CLEANUP_PERIOD SEC_PER_HOUR #define ZBX_TASKMANAGER_TIMEOUT 5 extern unsigned char process_type, program_type; extern int server_num, process_num; /****************************************************************************** * * * Function: tm_execute_task_close_problem * * * * Purpose: close the specified problem event and remove task * * * * Parameters: triggerid - [IN] the source trigger id * * eventid - [IN] the problem eventid to close * * userid - [IN] the user that requested to close the * * problem * * * ******************************************************************************/ static void tm_execute_task_close_problem(zbx_uint64_t taskid, zbx_uint64_t triggerid, zbx_uint64_t eventid, zbx_uint64_t userid) { DB_RESULT result; zabbix_log(LOG_LEVEL_DEBUG, "In %s() eventid:" ZBX_FS_UI64, __func__, eventid); result = DBselect("select null from problem where eventid=" ZBX_FS_UI64 " and r_eventid is null", eventid); /* check if the task hasn't been already closed by another process */ if (NULL != DBfetch(result)) zbx_close_problem(triggerid, eventid, userid); DBfree_result(result); DBexecute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Function: tm_try_task_close_problem * * * * Purpose: try to close problem by event acknowledgement action * * * * Parameters: taskid - [IN] the task identifier * * * * Return value: SUCCEED - task was executed and removed * * FAIL - otherwise * * * ******************************************************************************/ static int tm_try_task_close_problem(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; int ret = FAIL; zbx_uint64_t userid, triggerid, eventid; zbx_vector_uint64_t triggerids, locked_triggerids; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); zbx_vector_uint64_create(&triggerids); zbx_vector_uint64_create(&locked_triggerids); result = DBselect("select a.userid,a.eventid,e.objectid" " from task_close_problem tcp" " left join acknowledges a" " on tcp.acknowledgeid=a.acknowledgeid" " left join events e" " on a.eventid=e.eventid" " where tcp.taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = DBfetch(result))) { if (SUCCEED == DBis_null(row[0])) { zabbix_log(LOG_LEVEL_DEBUG, "cannot process close problem task because related event" " was removed"); DBexecute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); ret = SUCCEED; } else { ZBX_STR2UINT64(triggerid, row[2]); zbx_vector_uint64_append(&triggerids, triggerid); DCconfig_lock_triggers_by_triggerids(&triggerids, &locked_triggerids); /* close the problem if source trigger was successfully locked or */ /* if the trigger doesn't exist, but event still exists */ if (0 != locked_triggerids.values_num) { ZBX_STR2UINT64(userid, row[0]); ZBX_STR2UINT64(eventid, row[1]); tm_execute_task_close_problem(taskid, triggerid, eventid, userid); DCconfig_unlock_triggers(&locked_triggerids); ret = SUCCEED; } else if (FAIL == DCconfig_trigger_exists(triggerid)) { DBexecute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); ret = SUCCEED; } } } DBfree_result(result); zbx_vector_uint64_destroy(&locked_triggerids); zbx_vector_uint64_destroy(&triggerids); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Function: tm_expire_remote_command * * * * Purpose: process expired remote command task * * * ******************************************************************************/ static void tm_expire_remote_command(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; zbx_uint64_t alertid; char *error; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); DBbegin(); result = DBselect("select alertid from task_remote_command where taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = DBfetch(result))) { if (SUCCEED != DBis_null(row[0])) { ZBX_STR2UINT64(alertid, row[0]); error = DBdyn_escape_string_len("Remote command has been expired.", ALERT_ERROR_LEN); DBexecute("update alerts set error='%s',status=%d where alertid=" ZBX_FS_UI64, error, ALERT_STATUS_FAILED, alertid); zbx_free(error); } } DBfree_result(result); DBexecute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_EXPIRED, taskid); DBcommit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Function: tm_process_remote_command_result * * * * Purpose: process remote command result task * * * * Return value: SUCCEED - the task was processed successfully * * FAIL - otherwise * * * ******************************************************************************/ static int tm_process_remote_command_result(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; zbx_uint64_t alertid, parent_taskid = 0; int status, ret = FAIL; char *error, *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); DBbegin(); result = DBselect("select r.status,r.info,a.alertid,r.parent_taskid" " from task_remote_command_result r" " left join task_remote_command c" " on c.taskid=r.parent_taskid" " left join alerts a" " on a.alertid=c.alertid" " where r.taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = DBfetch(result))) { ZBX_STR2UINT64(parent_taskid, row[3]); if (SUCCEED != DBis_null(row[2])) { ZBX_STR2UINT64(alertid, row[2]); status = atoi(row[0]); if (SUCCEED == status) { DBexecute("update alerts set status=%d where alertid=" ZBX_FS_UI64, ALERT_STATUS_SENT, alertid); } else { error = DBdyn_escape_string_len(row[1], ALERT_ERROR_LEN); DBexecute("update alerts set error='%s',status=%d where alertid=" ZBX_FS_UI64, error, ALERT_STATUS_FAILED, alertid); zbx_free(error); } } ret = SUCCEED; } DBfree_result(result); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); if (0 != parent_taskid) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " or taskid=" ZBX_FS_UI64, parent_taskid); DBexecute("%s", sql); zbx_free(sql); DBcommit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Function: tm_process_data_result * * * * Purpose: process data task result * * * ******************************************************************************/ static void tm_process_data_result(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; zbx_uint64_t parent_taskid = 0; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); DBbegin(); result = DBselect("select parent_taskid" " from task_result" " where taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = DBfetch(result))) ZBX_STR2UINT64(parent_taskid, row[0]); DBfree_result(result); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); if (0 != parent_taskid) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " or taskid=" ZBX_FS_UI64, parent_taskid); DBexecute("%s", sql); zbx_free(sql); DBcommit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Function: tm_process_acknowledgements * * * * Purpose: process acknowledgements for alerts sending * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_acknowledgements(zbx_vector_uint64_t *ack_taskids) { DB_ROW row; DB_RESULT result; int processed_num = 0; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_ptr_t ack_tasks; zbx_ack_task_t *ack_task; zabbix_log(LOG_LEVEL_DEBUG, "In %s() tasks_num:%d", __func__, ack_taskids->values_num); zbx_vector_uint64_sort(ack_taskids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_ptr_create(&ack_tasks); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select a.eventid,ta.acknowledgeid,ta.taskid" " from task_acknowledge ta" " left join acknowledges a" " on ta.acknowledgeid=a.acknowledgeid" " left join events e" " on a.eventid=e.eventid" " left join task t" " on ta.taskid=t.taskid" " where t.status=%d and", ZBX_TM_STATUS_NEW); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.taskid", ack_taskids->values, ack_taskids->values_num); result = DBselect("%s", sql); while (NULL != (row = DBfetch(result))) { if (SUCCEED == DBis_null(row[0])) { zabbix_log(LOG_LEVEL_DEBUG, "cannot process acknowledge tasks because related event" " was removed"); continue; } ack_task = (zbx_ack_task_t *)zbx_malloc(NULL, sizeof(zbx_ack_task_t)); ZBX_STR2UINT64(ack_task->eventid, row[0]); ZBX_STR2UINT64(ack_task->acknowledgeid, row[1]); ZBX_STR2UINT64(ack_task->taskid, row[2]); zbx_vector_ptr_append(&ack_tasks, ack_task); } DBfree_result(result); if (0 < ack_tasks.values_num) { zbx_vector_ptr_sort(&ack_tasks, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); processed_num = process_actions_by_acknowledgements(&ack_tasks); } sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset , "update task set status=%d where", ZBX_TM_STATUS_DONE); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", ack_taskids->values, ack_taskids->values_num); DBexecute("%s", sql); zbx_free(sql); zbx_vector_ptr_clear_ext(&ack_tasks, zbx_ptr_free); zbx_vector_ptr_destroy(&ack_tasks); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Function: tm_process_check_now * * * * Purpose: process check now tasks for item rescheduling * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_check_now(zbx_vector_uint64_t *taskids) { DB_ROW row; DB_RESULT result; int i, processed_num = 0; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_ptr_t tasks; zbx_vector_uint64_t done_taskids, itemids; zbx_uint64_t taskid, itemid, proxy_hostid, *proxy_hostids; zbx_tm_task_t *task; zbx_tm_check_now_t *data; zabbix_log(LOG_LEVEL_DEBUG, "In %s() tasks_num:%d", __func__, taskids->values_num); zbx_vector_ptr_create(&tasks); zbx_vector_uint64_create(&done_taskids); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "select t.taskid,t.status,t.proxy_hostid,td.itemid" " from task t" " left join task_check_now td" " on t.taskid=td.taskid" " where"); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.taskid", taskids->values, taskids->values_num); result = DBselect("%s", sql); while (NULL != (row = DBfetch(result))) { ZBX_STR2UINT64(taskid, row[0]); if (SUCCEED == DBis_null(row[3])) { zbx_vector_uint64_append(&done_taskids, taskid); continue; } ZBX_DBROW2UINT64(proxy_hostid, row[2]); if (0 != proxy_hostid) { if (ZBX_TM_STATUS_INPROGRESS == atoi(row[1])) { /* task has been sent to proxy, mark as done */ zbx_vector_uint64_append(&done_taskids, taskid); continue; } } ZBX_STR2UINT64(itemid, row[3]); /* zbx_task_t here is used only to store taskid, proxyhostid, data->itemid - */ /* the rest of task properties are not used */ task = zbx_tm_task_create(taskid, ZBX_TM_TASK_CHECK_NOW, 0, 0, 0, proxy_hostid); task->data = (void *)zbx_tm_check_now_create(itemid); zbx_vector_ptr_append(&tasks, task); } DBfree_result(result); if (0 != tasks.values_num) { zbx_vector_uint64_create(&itemids); for (i = 0; i < tasks.values_num; i++) { task = (zbx_tm_task_t *)tasks.values[i]; data = (zbx_tm_check_now_t *)task->data; zbx_vector_uint64_append(&itemids, data->itemid); } proxy_hostids = (zbx_uint64_t *)zbx_malloc(NULL, tasks.values_num * sizeof(zbx_uint64_t)); zbx_dc_reschedule_items(&itemids, time(NULL), proxy_hostids); sql_offset = 0; DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset); for (i = 0; i < tasks.values_num; i++) { task = (zbx_tm_task_t *)tasks.values[i]; if (0 != proxy_hostids[i] && task->proxy_hostid == proxy_hostids[i]) continue; zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset , "update task set"); if (0 == proxy_hostids[i]) { /* close tasks managed by server - */ /* items either have been rescheduled or not cached */ zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " status=%d", ZBX_TM_STATUS_DONE); if (0 != task->proxy_hostid) zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",proxy_hostid=null"); processed_num++; } else { /* update target proxy hostid */ zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " proxy_hostid=" ZBX_FS_UI64, proxy_hostids[i]); } zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " where taskid=" ZBX_FS_UI64 ";\n", task->taskid); DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset); } DBend_multiple_update(&sql, &sql_alloc, &sql_offset); if (16 < sql_offset) /* in ORACLE always present begin..end; */ DBexecute("%s", sql); zbx_vector_uint64_destroy(&itemids); zbx_free(proxy_hostids); zbx_vector_ptr_clear_ext(&tasks, (zbx_clean_func_t)zbx_tm_task_free); } if (0 != done_taskids.values_num) { sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where", ZBX_TM_STATUS_DONE); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", done_taskids.values, done_taskids.values_num); DBexecute("%s", sql); } zbx_free(sql); zbx_vector_uint64_destroy(&done_taskids); zbx_vector_ptr_destroy(&tasks); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Function: tm_process_diaginfo * * * * Purpose: process diaginfo task * * * ******************************************************************************/ static void tm_process_diaginfo(zbx_uint64_t taskid, const char *data) { zbx_tm_task_t *task; int ret; char *info = NULL; struct zbx_json_parse jp_data; task = zbx_tm_task_create(0, ZBX_TM_TASK_DATA_RESULT, ZBX_TM_STATUS_NEW, time(NULL), 0, 0); if (SUCCEED == zbx_json_open(data, &jp_data)) { ret = zbx_diag_get_info(&jp_data, &info); task->data = zbx_tm_data_result_create(taskid, ret, info); zbx_free(info); } else task->data = zbx_tm_data_result_create(taskid, FAIL, zbx_json_strerror()); zbx_tm_save_task(task); zbx_tm_task_free(task); } /****************************************************************************** * * * Function: tm_process_data * * * * Purpose: process data tasks * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_data(zbx_vector_uint64_t *taskids) { DB_ROW row; DB_RESULT result; int processed_num = 0, data_type; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_uint64_t done_taskids; zbx_uint64_t taskid; zabbix_log(LOG_LEVEL_DEBUG, "In %s() tasks_num:%d", __func__, taskids->values_num); DBbegin(); zbx_vector_uint64_create(&done_taskids); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "select t.taskid,td.type,td.data" " from task t" " left join task_data td" " on t.taskid=td.taskid" " where t.proxy_hostid is null" " and"); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.taskid", taskids->values, taskids->values_num); result = DBselect("%s", sql); while (NULL != (row = DBfetch(result))) { ZBX_STR2UINT64(taskid, row[0]); if (SUCCEED == DBis_null(row[1])) { zbx_vector_uint64_append(&done_taskids, taskid); continue; } data_type = atoi(row[1]); switch (data_type) { case ZBX_TM_DATA_TYPE_DIAGINFO: tm_process_diaginfo(taskid, row[2]); zbx_vector_uint64_append(&done_taskids, taskid); break; default: THIS_SHOULD_NEVER_HAPPEN; } } DBfree_result(result); if (0 != (processed_num = done_taskids.values_num)) { sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where", ZBX_TM_STATUS_DONE); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", done_taskids.values, done_taskids.values_num); DBexecute("%s", sql); } zbx_free(sql); zbx_vector_uint64_destroy(&done_taskids); DBcommit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Function: tm_expire_generic_tasks * * * * Purpose: expires tasks that don't require specific expiration handling * * * * Return value: The number of successfully expired tasks * * * ******************************************************************************/ static int tm_expire_generic_tasks(zbx_vector_uint64_t *taskids) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where", ZBX_TM_STATUS_EXPIRED); DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", taskids->values, taskids->values_num); DBexecute("%s", sql); zbx_free(sql); return taskids->values_num; } /****************************************************************************** * * * Function: tm_process_tasks * * * * Purpose: process task manager tasks depending on task type * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_tasks(int now) { DB_ROW row; DB_RESULT result; int type, processed_num = 0, expired_num = 0, clock, ttl; zbx_uint64_t taskid; zbx_vector_uint64_t ack_taskids, check_now_taskids, expire_taskids, data_taskids; zbx_vector_uint64_create(&ack_taskids); zbx_vector_uint64_create(&check_now_taskids); zbx_vector_uint64_create(&expire_taskids); zbx_vector_uint64_create(&data_taskids); result = DBselect("select taskid,type,clock,ttl" " from task" " where status in (%d,%d)" " order by taskid", ZBX_TM_STATUS_NEW, ZBX_TM_STATUS_INPROGRESS); while (NULL != (row = DBfetch(result))) { ZBX_STR2UINT64(taskid, row[0]); ZBX_STR2UCHAR(type, row[1]); clock = atoi(row[2]); ttl = atoi(row[3]); switch (type) { case ZBX_TM_TASK_CLOSE_PROBLEM: /* close problem tasks will never have 'in progress' status */ if (SUCCEED == tm_try_task_close_problem(taskid)) processed_num++; break; case ZBX_TM_TASK_REMOTE_COMMAND: /* both - 'new' and 'in progress' remote tasks should expire */ if (0 != ttl && clock + ttl < now) { tm_expire_remote_command(taskid); expired_num++; } break; case ZBX_TM_TASK_REMOTE_COMMAND_RESULT: /* close problem tasks will never have 'in progress' status */ if (SUCCEED == tm_process_remote_command_result(taskid)) processed_num++; break; case ZBX_TM_TASK_ACKNOWLEDGE: zbx_vector_uint64_append(&ack_taskids, taskid); break; case ZBX_TM_TASK_CHECK_NOW: if (0 != ttl && clock + ttl < now) zbx_vector_uint64_append(&expire_taskids, taskid); else zbx_vector_uint64_append(&check_now_taskids, taskid); break; case ZBX_TM_TASK_DATA: /* both - 'new' and 'in progress' tasks should expire */ if (0 != ttl && clock + ttl < now) zbx_vector_uint64_append(&expire_taskids, taskid); else zbx_vector_uint64_append(&data_taskids, taskid); break; case ZBX_TM_TASK_DATA_RESULT: tm_process_data_result(taskid); processed_num++; break; default: THIS_SHOULD_NEVER_HAPPEN; break; } } DBfree_result(result); if (0 < ack_taskids.values_num) processed_num += tm_process_acknowledgements(&ack_taskids); if (0 < check_now_taskids.values_num) processed_num += tm_process_check_now(&check_now_taskids); if (0 < data_taskids.values_num) processed_num += tm_process_data(&data_taskids); if (0 < expire_taskids.values_num) expired_num += tm_expire_generic_tasks(&expire_taskids); zbx_vector_uint64_destroy(&data_taskids); zbx_vector_uint64_destroy(&expire_taskids); zbx_vector_uint64_destroy(&check_now_taskids); zbx_vector_uint64_destroy(&ack_taskids); return processed_num + expired_num; } /****************************************************************************** * * * Function: tm_remove_old_tasks * * * * Purpose: remove old done/expired tasks * * * ******************************************************************************/ static void tm_remove_old_tasks(int now) { DBbegin(); DBexecute("delete from task where status in (%d,%d) and clock<=%d", ZBX_TM_STATUS_DONE, ZBX_TM_STATUS_EXPIRED, now - ZBX_TM_CLEANUP_TASK_AGE); DBcommit(); } ZBX_THREAD_ENTRY(taskmanager_thread, args) { static int cleanup_time = 0; double sec1, sec2; int tasks_num, sleeptime, nextcheck; process_type = ((zbx_thread_args_t *)args)->process_type; server_num = ((zbx_thread_args_t *)args)->server_num; process_num = ((zbx_thread_args_t *)args)->process_num; zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type), server_num, get_process_type_string(process_type), process_num); update_selfmon_counter(ZBX_PROCESS_STATE_BUSY); zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type)); DBconnect(ZBX_DB_CONNECT_NORMAL); if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS)) zbx_problems_export_init("task-manager", process_num); sec1 = zbx_time(); sleeptime = ZBX_TM_PROCESS_PERIOD - (int)sec1 % ZBX_TM_PROCESS_PERIOD; zbx_setproctitle("%s [started, idle %d sec]", get_process_type_string(process_type), sleeptime); while (ZBX_IS_RUNNING()) { zbx_sleep_loop(sleeptime); sec1 = zbx_time(); zbx_update_env(sec1); zbx_setproctitle("%s [processing tasks]", get_process_type_string(process_type)); tasks_num = tm_process_tasks((int)sec1); if (ZBX_TM_CLEANUP_PERIOD <= sec1 - cleanup_time) { tm_remove_old_tasks((int)sec1); cleanup_time = sec1; } sec2 = zbx_time(); nextcheck = (int)sec1 - (int)sec1 % ZBX_TM_PROCESS_PERIOD + ZBX_TM_PROCESS_PERIOD; if (0 > (sleeptime = nextcheck - (int)sec2)) sleeptime = 0; zbx_setproctitle("%s [processed %d task(s) in " ZBX_FS_DBL " sec, idle %d sec]", get_process_type_string(process_type), tasks_num, sec2 - sec1, sleeptime); } zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num); while (1) zbx_sleep(SEC_PER_MIN); }