/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ #include "taskmanager.h" #include "../db_lengths.h" #include "zbxnix.h" #include "zbxself.h" #include "log.h" #include "zbxcacheconfig.h" #include "zbxtasks.h" #include "../events.h" #include "../actions.h" #include "zbxexport.h" #include "zbxdiag.h" #include "zbxservice.h" #include "zbxjson.h" #include "zbxrtc.h" #include "audit/zbxaudit.h" #include "audit/zbxaudit_proxy.h" #include "zbxnum.h" #include "zbxtime.h" #include "zbxversion.h" #include "zbx_rtc_constants.h" #include "zbx_host_constants.h" #include "zbxdbwrap.h" #include "zbxserver.h" #define ZBX_TM_PROCESS_PERIOD 5 #define ZBX_TM_CLEANUP_PERIOD SEC_PER_HOUR #define ZBX_TASKMANAGER_TIMEOUT 5 #define ZBX_TM_TEMP_SUPPRESION_ACTION_SUPPRESS 32 #define ZBX_TM_TEMP_SUPPRESION_ACTION_UNSUPPRESS 64 #define ZBX_TM_TEMP_SUPPRESION_INDEFINITE_TIME 0 zbx_export_file_t *problems_export = NULL; static zbx_export_file_t *get_problems_export(void) { return problems_export; } /****************************************************************************** * * * Purpose: close the specified problem event and remove task * * * * Parameters: triggerid - [IN] the source trigger id * * eventid - [IN] the problem eventid to close * * userid - [IN] the user that requested to close the * * problem * * * ******************************************************************************/ static void tm_execute_task_close_problem(zbx_uint64_t taskid, zbx_uint64_t triggerid, zbx_uint64_t eventid, zbx_uint64_t userid) { DB_RESULT result; zabbix_log(LOG_LEVEL_DEBUG, "In %s() eventid:" ZBX_FS_UI64, __func__, eventid); result = zbx_db_select("select null from problem where eventid=" ZBX_FS_UI64 " and r_eventid is null", eventid); /* check if the task hasn't been already closed by another process */ if (NULL != zbx_db_fetch(result)) zbx_close_problem(triggerid, eventid, userid); zbx_db_free_result(result); zbx_db_execute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: try to close problem by event acknowledgment action * * * * Parameters: taskid - [IN] the task identifier * * * * Return value: SUCCEED - task was executed and removed * * FAIL - otherwise * * * ******************************************************************************/ static int tm_try_task_close_problem(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; int ret = FAIL; zbx_uint64_t userid, triggerid, eventid; zbx_vector_uint64_t triggerids, locked_triggerids; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); zbx_vector_uint64_create(&triggerids); zbx_vector_uint64_create(&locked_triggerids); result = zbx_db_select("select a.userid,a.eventid,e.objectid" " from task_close_problem tcp" " left join acknowledges a" " on tcp.acknowledgeid=a.acknowledgeid" " left join events e" " on a.eventid=e.eventid" " where tcp.taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = zbx_db_fetch(result))) { if (SUCCEED == zbx_db_is_null(row[0])) { zabbix_log(LOG_LEVEL_DEBUG, "cannot process close problem task because related event" " was removed"); zbx_db_execute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); ret = SUCCEED; } else { ZBX_STR2UINT64(triggerid, row[2]); zbx_vector_uint64_append(&triggerids, triggerid); DCconfig_lock_triggers_by_triggerids(&triggerids, &locked_triggerids); /* close the problem if source trigger was successfully locked or */ /* if the trigger doesn't exist, but event still exists */ if (0 != locked_triggerids.values_num) { ZBX_STR2UINT64(userid, row[0]); ZBX_STR2UINT64(eventid, row[1]); tm_execute_task_close_problem(taskid, triggerid, eventid, userid); DCconfig_unlock_triggers(&locked_triggerids); ret = SUCCEED; } else if (FAIL == DCconfig_trigger_exists(triggerid)) { zbx_db_execute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); ret = SUCCEED; } } } zbx_db_free_result(result); zbx_vector_uint64_destroy(&locked_triggerids); zbx_vector_uint64_destroy(&triggerids); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: process expired remote command task * * * ******************************************************************************/ static void tm_expire_remote_command(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; zbx_uint64_t alertid; char *error; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); zbx_db_begin(); result = zbx_db_select("select alertid from task_remote_command where taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = zbx_db_fetch(result))) { if (SUCCEED != zbx_db_is_null(row[0])) { ZBX_STR2UINT64(alertid, row[0]); error = zbx_db_dyn_escape_string_len("Remote command has been expired.", ALERT_ERROR_LEN); zbx_db_execute("update alerts set error='%s',status=%d where alertid=" ZBX_FS_UI64, error, ALERT_STATUS_FAILED, alertid); zbx_free(error); } } zbx_db_free_result(result); zbx_db_execute("update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_EXPIRED, taskid); zbx_db_commit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: process remote command result task * * * * Return value: SUCCEED - the task was processed successfully * * FAIL - otherwise * * * ******************************************************************************/ static int tm_process_remote_command_result(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; zbx_uint64_t alertid, parent_taskid = 0; int status, ret = FAIL; char *error, *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); zbx_db_begin(); result = zbx_db_select("select r.status,r.info,a.alertid,r.parent_taskid" " from task_remote_command_result r" " left join task_remote_command c" " on c.taskid=r.parent_taskid" " left join alerts a" " on a.alertid=c.alertid" " where r.taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(parent_taskid, row[3]); if (SUCCEED != zbx_db_is_null(row[2])) { ZBX_STR2UINT64(alertid, row[2]); status = atoi(row[0]); if (SUCCEED == status) { zbx_db_execute("update alerts set status=%d where alertid=" ZBX_FS_UI64, ALERT_STATUS_SENT, alertid); } else { error = zbx_db_dyn_escape_string_len(row[1], ALERT_ERROR_LEN); zbx_db_execute("update alerts set error='%s',status=%d where alertid=" ZBX_FS_UI64, error, ALERT_STATUS_FAILED, alertid); zbx_free(error); } } ret = SUCCEED; } zbx_db_free_result(result); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); if (0 != parent_taskid) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " or taskid=" ZBX_FS_UI64, parent_taskid); zbx_db_execute("%s", sql); zbx_free(sql); zbx_db_commit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: process data task result * * * ******************************************************************************/ static void tm_process_data_result(zbx_uint64_t taskid) { DB_ROW row; DB_RESULT result; zbx_uint64_t parent_taskid = 0; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid:" ZBX_FS_UI64, __func__, taskid); zbx_db_begin(); result = zbx_db_select("select parent_taskid" " from task_result" " where taskid=" ZBX_FS_UI64, taskid); if (NULL != (row = zbx_db_fetch(result))) ZBX_STR2UINT64(parent_taskid, row[0]); zbx_db_free_result(result); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where taskid=" ZBX_FS_UI64, ZBX_TM_STATUS_DONE, taskid); if (0 != parent_taskid) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " or taskid=" ZBX_FS_UI64, parent_taskid); zbx_db_execute("%s", sql); zbx_free(sql); zbx_db_commit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: rank event/problem as cause * * * * Parameters: eventid - [IN] the event/problem, which should be ranked * * as cause * * * * Return value: SUCCEED - if there are no database errors * * FAIL - otherwise * * * ******************************************************************************/ static int tm_rank_event_as_cause(zbx_uint64_t eventid) { int ret = SUCCEED; zabbix_log(LOG_LEVEL_DEBUG, "In %s() eventid: " ZBX_FS_UI64, __func__, eventid); if (ZBX_DB_OK > zbx_db_execute("update problem set cause_eventid=null where eventid=" ZBX_FS_UI64, eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to convert problem (eventid:" ZBX_FS_UI64 ") from symptom to" " cause", eventid); ret = FAIL; goto out; } if (ZBX_DB_OK > zbx_db_execute("delete from event_symptom where eventid=" ZBX_FS_UI64, eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to convert event (id:" ZBX_FS_UI64 ") from symptom to cause", eventid); ret = FAIL; } out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: rank event/problem as symptom * * * * Parameters: * * eventid - [IN] event id of the new symptom * * cause_eventid - [IN] event id of the new cause * * old_cause_eventid - [IN] event id of the old cause before the ranking * * * * Return value: SUCCEED - if there are no database errors * * FAIL - otherwise * * * *****************************************************************************/ static int tm_rank_event_as_symptom(zbx_uint64_t eventid, zbx_uint64_t cause_eventid, zbx_uint64_t old_cause_eventid) { int ret = SUCCEED; zabbix_log(LOG_LEVEL_DEBUG, "In %s() eventid: " ZBX_FS_UI64 ", cause_eventid: " ZBX_FS_UI64, __func__, eventid, cause_eventid); if (ZBX_DB_OK > zbx_db_execute( "update problem" " set cause_eventid=" ZBX_FS_UI64 " where eventid=" ZBX_FS_UI64 " or cause_eventid=" ZBX_FS_UI64, cause_eventid, eventid, eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to set new cause (eventid:" ZBX_FS_UI64 ") for problem" " (eventid:" ZBX_FS_UI64 ")", cause_eventid, eventid); ret = FAIL; goto out; } if (ZBX_DB_OK > zbx_db_execute( "update event_symptom" " set cause_eventid=" ZBX_FS_UI64 " where eventid=" ZBX_FS_UI64 " or cause_eventid=" ZBX_FS_UI64, cause_eventid, eventid, eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to set new cause (eventid:" ZBX_FS_UI64 ") for event" " (eventid:" ZBX_FS_UI64 ")", cause_eventid, eventid); ret = FAIL; goto out; } if (0 == old_cause_eventid && ZBX_DB_OK > zbx_db_execute( "insert into event_symptom (eventid,cause_eventid)" " values (" ZBX_FS_UI64 "," ZBX_FS_UI64 ")", eventid, cause_eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to convert cause event " ZBX_FS_UI64 " to symptom of " ZBX_FS_UI64, eventid, cause_eventid); ret = FAIL; } out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: rank event task * * * * Parameters: taskid - [IN] * * data - [IN] JSON with with acknowledge id, action, event id * * for all actions and cause_eventid for rank to symptom * * action * * * * Comments: Logic of this function is described in comments to test cases in * * the integration test testEventsCauseAndSymptoms * * * ******************************************************************************/ static void tm_process_rank_event(zbx_uint64_t taskid, const char *data) { zbx_uint64_t acknowledgeid, eventid, action; struct zbx_json_parse jp; char tmp[MAX_ID_LEN]; zabbix_log(LOG_LEVEL_DEBUG, "In %s() taskid: " ZBX_FS_UI64 ", data: '%s'", __func__, taskid, data); if (FAIL == zbx_json_open(data, &jp)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse process rank event task data"); goto fail; } if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_ACKNOWLEDGEID, tmp, sizeof(tmp), NULL) || FAIL == zbx_is_uint64(tmp, &acknowledgeid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse process rank event task data: failed to retrieve" " \"%s\" tag", ZBX_PROTO_TAG_ACKNOWLEDGEID); goto fail; } if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_ACTION, tmp, sizeof(tmp), NULL) || FAIL == zbx_is_uint64(tmp, &action)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse process rank event task data: failed to retrieve" " \"%s\" tag", ZBX_PROTO_TAG_ACTION); goto fail; } if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_EVENTID, tmp, sizeof(tmp), NULL) || FAIL == zbx_is_uint64(tmp, &eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse process rank event task data: failed to retrieve" " \"%s\" tag", ZBX_PROTO_TAG_EVENTID); goto fail; } if (0 != (action & ZBX_PROBLEM_UPDATE_RANK_TO_CAUSE)) { if (SUCCEED != tm_rank_event_as_cause(eventid)) goto fail; } else if (0 != (action & ZBX_PROBLEM_UPDATE_RANK_TO_SYMPTOM)) { int should_swap; zbx_uint64_t requested_cause_eventid, target_cause_eventid, old_cause_eventid, target_cause_triggerid; if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_CAUSE_EVENTID, tmp, sizeof(tmp), NULL) || FAIL == zbx_is_uint64(tmp, &requested_cause_eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse process rank event task data: failed to retrieve" " \"%s\" tag", ZBX_PROTO_TAG_CAUSE_EVENTID); goto fail; } /* the event specified in task data by cause_eventid might be a symptom, find the actual target cause */ if (0 == (target_cause_eventid = zbx_db_get_cause_eventid(requested_cause_eventid))) target_cause_eventid = requested_cause_eventid; if (target_cause_eventid == eventid) { /* cause and its symptom should be swapped */ should_swap = 1; target_cause_eventid = requested_cause_eventid; old_cause_eventid = 0; } else { should_swap = 0; old_cause_eventid = zbx_db_get_cause_eventid(eventid); } if (0 == (target_cause_triggerid = zbx_get_objectid_by_eventid(target_cause_eventid))) { zabbix_log(LOG_LEVEL_WARNING, "trigger id should never be '0' for target cause event (eventid: " ZBX_FS_UI64 ")", target_cause_eventid); goto fail; } else if (SUCCEED != zbx_db_lock_triggerid(target_cause_triggerid)) { zabbix_log(LOG_LEVEL_DEBUG, "the trigger (triggerid: " ZBX_FS_UI64 "), which generated the" " target cause event (eventid: " ZBX_FS_UI64 ") was deleted, skip ranking" " events as symptoms", target_cause_triggerid, target_cause_eventid); goto skip; } /* start swap by turning the symptom into a cause */ if (1 == should_swap && SUCCEED != tm_rank_event_as_cause(requested_cause_eventid)) goto fail; if (SUCCEED != tm_rank_event_as_symptom(eventid, target_cause_eventid, old_cause_eventid)) goto fail; } skip: if (ZBX_DB_OK > zbx_db_execute("update acknowledges set taskid=null where acknowledgeid=" ZBX_FS_UI64, acknowledgeid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to change taskid from " ZBX_FS_UI64 " to null in table" " acknowledges where acknowledgeid is " ZBX_FS_UI64, taskid, acknowledgeid); } fail: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: notify service manager about problem severity changes * * * ******************************************************************************/ static void notify_service_manager(const zbx_vector_ptr_t *ack_tasks) { int i; zbx_vector_ptr_t event_severities; zbx_vector_ptr_create(&event_severities); for (i = 0; i < ack_tasks->values_num; i++) { zbx_ack_task_t *ack_task = (zbx_ack_task_t *)ack_tasks->values[i]; if (ack_task->old_severity != ack_task->new_severity) { zbx_event_severity_t *es; es = (zbx_event_severity_t *)zbx_malloc(NULL, sizeof(zbx_event_severity_t)); es->eventid = ack_task->eventid; es->severity = ack_task->new_severity; zbx_vector_ptr_append(&event_severities, es); } } if (0 != event_severities.values_num) { unsigned char *data; zbx_uint32_t size; size = zbx_service_serialize_event_severities(&data, &event_severities); zbx_service_send(ZBX_IPC_SERVICE_EVENT_SEVERITIES, data, size, NULL); zbx_free(data); } zbx_vector_ptr_clear_ext(&event_severities, zbx_ptr_free); zbx_vector_ptr_destroy(&event_severities); } /****************************************************************************** * * * Purpose: process acknowledgments for alerts sending * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_acknowledgments(zbx_vector_uint64_t *ack_taskids) { DB_ROW row; DB_RESULT result; int processed_num = 0; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_ptr_t ack_tasks; zbx_ack_task_t *ack_task; zabbix_log(LOG_LEVEL_DEBUG, "In %s() tasks_num:%d", __func__, ack_taskids->values_num); zbx_vector_uint64_sort(ack_taskids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_ptr_create(&ack_tasks); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select a.eventid,ta.acknowledgeid,ta.taskid,a.old_severity,a.new_severity,a.action" " from task_acknowledge ta" " left join acknowledges a" " on ta.acknowledgeid=a.acknowledgeid" " left join events e" " on a.eventid=e.eventid" " left join task t" " on ta.taskid=t.taskid" " where t.status=%d and", ZBX_TM_STATUS_NEW); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.taskid", ack_taskids->values, ack_taskids->values_num); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { if (SUCCEED == zbx_db_is_null(row[0])) { zabbix_log(LOG_LEVEL_DEBUG, "cannot process acknowledge tasks because related event" " was removed"); continue; } /* do not notify about rank changes */ if (0 != (atoi(row[5]) & (ZBX_PROBLEM_UPDATE_RANK_TO_CAUSE | ZBX_PROBLEM_UPDATE_RANK_TO_SYMPTOM))) continue; ack_task = (zbx_ack_task_t *)zbx_malloc(NULL, sizeof(zbx_ack_task_t)); ZBX_STR2UINT64(ack_task->eventid, row[0]); ZBX_STR2UINT64(ack_task->acknowledgeid, row[1]); ZBX_STR2UINT64(ack_task->taskid, row[2]); ack_task->old_severity = atoi(row[3]); ack_task->new_severity = atoi(row[4]); zbx_vector_ptr_append(&ack_tasks, ack_task); } zbx_db_free_result(result); if (0 < ack_tasks.values_num) { zbx_vector_ptr_sort(&ack_tasks, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); processed_num = process_actions_by_acknowledgments(&ack_tasks); notify_service_manager(&ack_tasks); } sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset , "update task set status=%d where", ZBX_TM_STATUS_DONE); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", ack_taskids->values, ack_taskids->values_num); zbx_db_execute("%s", sql); zbx_free(sql); zbx_vector_ptr_clear_ext(&ack_tasks, zbx_ptr_free); zbx_vector_ptr_destroy(&ack_tasks); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Purpose: process check now tasks for item rescheduling * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_check_now(zbx_vector_uint64_t *taskids) { DB_ROW row; DB_RESULT result; int i, processed_num = 0; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_ptr_t tasks; zbx_vector_uint64_t done_taskids, itemids; zbx_uint64_t taskid, itemid, proxy_hostid, *proxy_hostids; zbx_tm_task_t *task; zbx_tm_check_now_t *data; zabbix_log(LOG_LEVEL_DEBUG, "In %s() tasks_num:%d", __func__, taskids->values_num); zbx_vector_ptr_create(&tasks); zbx_vector_uint64_create(&done_taskids); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "select t.taskid,t.status,t.proxy_hostid,td.itemid" " from task t" " left join task_check_now td" " on t.taskid=td.taskid" " where"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.taskid", taskids->values, taskids->values_num); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(taskid, row[0]); if (SUCCEED == zbx_db_is_null(row[3])) { zbx_vector_uint64_append(&done_taskids, taskid); continue; } ZBX_DBROW2UINT64(proxy_hostid, row[2]); if (0 != proxy_hostid) { if (ZBX_TM_STATUS_INPROGRESS == atoi(row[1])) { /* task has been sent to proxy, mark as done */ zbx_vector_uint64_append(&done_taskids, taskid); continue; } } ZBX_STR2UINT64(itemid, row[3]); /* zbx_task_t here is used only to store taskid, proxyhostid, data->itemid - */ /* the rest of task properties are not used */ task = zbx_tm_task_create(taskid, ZBX_TM_TASK_CHECK_NOW, 0, 0, 0, proxy_hostid); task->data = (void *)zbx_tm_check_now_create(itemid); zbx_vector_ptr_append(&tasks, task); } zbx_db_free_result(result); if (0 != tasks.values_num) { zbx_vector_uint64_create(&itemids); for (i = 0; i < tasks.values_num; i++) { task = (zbx_tm_task_t *)tasks.values[i]; data = (zbx_tm_check_now_t *)task->data; zbx_vector_uint64_append(&itemids, data->itemid); } proxy_hostids = (zbx_uint64_t *)zbx_malloc(NULL, tasks.values_num * sizeof(zbx_uint64_t)); zbx_dc_reschedule_items(&itemids, time(NULL), proxy_hostids); sql_offset = 0; zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); for (i = 0; i < tasks.values_num; i++) { task = (zbx_tm_task_t *)tasks.values[i]; if (0 != proxy_hostids[i] && task->proxy_hostid == proxy_hostids[i]) continue; zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset , "update task set"); if (0 == proxy_hostids[i]) { /* close tasks managed by server - */ /* items either have been rescheduled or not cached */ zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " status=%d", ZBX_TM_STATUS_DONE); if (0 != task->proxy_hostid) zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",proxy_hostid=null"); processed_num++; } else { /* update target proxy hostid */ zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " proxy_hostid=" ZBX_FS_UI64, proxy_hostids[i]); } zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " where taskid=" ZBX_FS_UI64 ";\n", task->taskid); zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset); } zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (16 < sql_offset) /* in ORACLE always present begin..end; */ zbx_db_execute("%s", sql); zbx_vector_uint64_destroy(&itemids); zbx_free(proxy_hostids); zbx_vector_ptr_clear_ext(&tasks, (zbx_clean_func_t)zbx_tm_task_free); } if (0 != done_taskids.values_num) { sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where", ZBX_TM_STATUS_DONE); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", done_taskids.values, done_taskids.values_num); zbx_db_execute("%s", sql); } zbx_free(sql); zbx_vector_uint64_destroy(&done_taskids); zbx_vector_ptr_destroy(&tasks); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Purpose: process diaginfo task * * * ******************************************************************************/ static void tm_process_diaginfo(zbx_uint64_t taskid, const char *data) { zbx_tm_task_t *task; int ret; char *info = NULL; struct zbx_json_parse jp_data; task = zbx_tm_task_create(0, ZBX_TM_TASK_DATA_RESULT, ZBX_TM_STATUS_NEW, time(NULL), 0, 0); if (SUCCEED == zbx_json_open(data, &jp_data)) { ret = zbx_diag_get_info(&jp_data, &info); task->data = zbx_tm_data_result_create(taskid, ret, info); zbx_free(info); } else task->data = zbx_tm_data_result_create(taskid, FAIL, zbx_json_strerror()); zbx_tm_save_task(task); zbx_tm_task_free(task); } /****************************************************************************** * * * Purpose: create config cache reload task to be sent to active proxy * * * ******************************************************************************/ static zbx_tm_task_t *tm_create_active_proxy_reload_task(zbx_uint64_t proxyid) { zbx_tm_task_t *task; task = zbx_tm_task_create(0, ZBX_TM_TASK_DATA, ZBX_TM_STATUS_NEW, (int)time(NULL), ZBX_DATA_ACTIVE_PROXY_CONFIG_RELOAD_TTL, proxyid); task->data = zbx_tm_data_create(0, "", 0, ZBX_TM_DATA_TYPE_ACTIVE_PROXY_CONFIG_RELOAD); return task; } /****************************************************************************** * * * Purpose: process task for reload of configuration cache on proxies * * * * Parameters: rtc - [IN] the RTC service * * data - [IN] the JSON with request * * * ******************************************************************************/ static void tm_process_proxy_config_reload_task(zbx_ipc_async_socket_t *rtc, const char *data) { struct zbx_json_parse jp, jp_data; const char *ptr; char buffer[MAX_ID_LEN + 1]; int passive_proxy_count = 0; zbx_vector_tm_task_t tasks_active; zbx_vector_str_t proxynames_log; if (FAIL == zbx_json_open(data, &jp)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse proxy config cache reload task data"); return; } if (FAIL == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_PROXY_HOSTIDS, &jp_data)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse proxy config cache reload task data: field " ZBX_PROTO_TAG_PROXY_HOSTIDS " not found"); return; } zbx_vector_tm_task_create(&tasks_active); zbx_vector_str_create(&proxynames_log); for (ptr = NULL; NULL != (ptr = zbx_json_next(&jp_data, ptr));) { if (NULL != zbx_json_decodevalue(ptr, buffer, sizeof(buffer), NULL)) { zbx_uint64_t proxyid; int type; char *name; ZBX_STR2UINT64(proxyid, buffer); if (FAIL == zbx_dc_get_proxy_name_type_by_id(proxyid, &type, &name)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache " "for proxy " ZBX_FS_UI64 ": proxy is not in cache", proxyid); continue; } if (HOST_STATUS_PROXY_ACTIVE == type) { zbx_tm_task_t *task; task = tm_create_active_proxy_reload_task(proxyid); zbx_vector_tm_task_append(&tasks_active, task); zbx_vector_str_append(&proxynames_log, name); } else if (HOST_STATUS_PROXY_PASSIVE == type) { if (FAIL == zbx_dc_update_passive_proxy_nextcheck(proxyid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache on proxy " "with id " ZBX_FS_UI64 ": failed to update nextcheck", proxyid); zbx_free(name); } else { passive_proxy_count++; zbx_vector_str_append(&proxynames_log, name); } } else { zbx_free(name); THIS_SHOULD_NEVER_HAPPEN; } } } if (1 == proxynames_log.values_num) { zabbix_log(LOG_LEVEL_WARNING, "reloading configuration on proxy \"%s\"", proxynames_log.values[0]); } else if (1 < proxynames_log.values_num) { int i = 0; char *names_success = NULL; while (1) { if (i + 1 == proxynames_log.values_num) { names_success = zbx_strdcatf(names_success, "\"%s\"", proxynames_log.values[i]); break; } else { names_success = zbx_strdcatf(names_success, "\"%s\", ", proxynames_log.values[i]); i++; } } zabbix_log(LOG_LEVEL_WARNING, "reloading configuration on proxies %s", names_success); zbx_free(names_success); } if (0 < tasks_active.values_num) { zbx_tm_save_tasks(&tasks_active); zbx_vector_tm_task_clear_ext(&tasks_active, zbx_tm_task_free); } zbx_vector_str_clear_ext(&proxynames_log, zbx_str_free); zbx_vector_str_destroy(&proxynames_log); zbx_vector_tm_task_destroy(&tasks_active); if (passive_proxy_count > 0) zbx_ipc_async_socket_send(rtc, ZBX_RTC_PROXYPOLLER_PROCESS, NULL, 0); } /****************************************************************************** * * * Purpose: process task for reload of configuration cache on passive proxy * * (received from that passive proxy) * * * * Parameters: rtc - [IN] the RTC service * * data - [IN] the JSON with request * * * ******************************************************************************/ static void tm_process_passive_proxy_cache_reload_request(zbx_ipc_async_socket_t *rtc, const char *data) { struct zbx_json_parse jp; char hostname[ZBX_MAX_HOSTNAME_LEN * ZBX_MAX_BYTES_IN_UTF8_CHAR + 1]; zbx_uint64_t proxyid; if (FAIL == zbx_json_open(data, &jp)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse passive proxy config cache reload request"); return; } if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_PROXY_NAME, hostname, sizeof(hostname), NULL)) { zabbix_log(LOG_LEVEL_WARNING, "broken passive proxy config cache reload request was received"); return; } if (FAIL == zbx_dc_get_proxyid_by_name(hostname, &proxyid, NULL)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache on proxy '%s': proxy is not in " "cache", hostname); return; } if (FAIL == zbx_dc_update_passive_proxy_nextcheck(proxyid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache on proxy " "with id " ZBX_FS_UI64 ": failed to update nextcheck", proxyid); } else { zabbix_log(LOG_LEVEL_WARNING, "reloading configuration cache on proxy '%s'", hostname); zbx_ipc_async_socket_send(rtc, ZBX_RTC_PROXYPOLLER_PROCESS, NULL, 0); } zbx_audit_prepare(); zbx_audit_proxy_config_reload(proxyid, hostname); zbx_audit_flush(); } static void tm_process_temp_suppression(const char *data) { struct zbx_json_parse jp; char tmp_eventid[MAX_ID_LEN], tmp_userid[MAX_ID_LEN], tmp_ts[MAX_ID_LEN], tmp_action[12]; zbx_uint64_t eventid, userid, action; unsigned int ts; if (FAIL == zbx_json_open(data, &jp)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse temporary suppression data request"); return; } if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_EVENTID, tmp_eventid, sizeof(tmp_eventid), NULL) || FAIL == zbx_is_uint64(tmp_eventid, &eventid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse temporary suppression data request: failed to retrieve " " \"%s\" tag", ZBX_PROTO_TAG_EVENTID); return; } if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_USERID, tmp_userid, sizeof(tmp_userid), NULL) || FAIL == zbx_is_uint64(tmp_userid, &userid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse temporary suppression data request: failed to retrieve " " \"%s\" tag", ZBX_PROTO_TAG_USERID); return; } if (SUCCEED == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_ACTION, tmp_action, sizeof(tmp_action), NULL)) { if (0 == strcmp(ZBX_PROTO_VALUE_SUPPRESSION_SUPPRESS, tmp_action)) { action = ZBX_TM_TEMP_SUPPRESION_ACTION_SUPPRESS; } else if (0 == strcmp(ZBX_PROTO_VALUE_SUPPRESSION_UNSUPPRESS, tmp_action)) { action = ZBX_TM_TEMP_SUPPRESION_ACTION_UNSUPPRESS; } else { zabbix_log(LOG_LEVEL_WARNING, "failed to parse temporary suppression data request: failed to " "retrieve \"%s\" tag's value '%s'", ZBX_PROTO_TAG_ACTION, tmp_action); return; } } else { zabbix_log(LOG_LEVEL_WARNING, "failed to parse temporary suppression data request: failed to retrieve " " \"%s\" tag", ZBX_PROTO_TAG_ACTION); return; } if (ZBX_TM_TEMP_SUPPRESION_ACTION_SUPPRESS == action) { if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_SUPPRESS_UNTIL, tmp_ts, sizeof(tmp_ts), NULL) || FAIL == zbx_is_uint32(tmp_ts, &ts)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse temporary suppression data request: failed to retrieve " " \"%s\" tag", ZBX_PROTO_TAG_SUPPRESS_UNTIL); return; } } if (SUCCEED != zbx_db_lock_record("users", userid, NULL, 0)) return; if (ZBX_TM_TEMP_SUPPRESION_ACTION_UNSUPPRESS == action || (ZBX_TM_TEMP_SUPPRESION_INDEFINITE_TIME != ts && time(NULL) >= ts)) { zbx_db_execute("delete from event_suppress where eventid=" ZBX_FS_UI64 " and maintenanceid is null", eventid); } else if (ZBX_TM_TEMP_SUPPRESION_ACTION_SUPPRESS == action) { DB_ROW row; DB_RESULT result; if (SUCCEED != zbx_db_lock_record("events", eventid, NULL, 0)) return; result = zbx_db_select("select event_suppressid,suppress_until from event_suppress where eventid=" ZBX_FS_UI64 " and maintenanceid is null" ZBX_FOR_UPDATE, eventid); if (NULL != (row = zbx_db_fetch(result))) { zbx_db_execute("update event_suppress set suppress_until=%u,userid=" ZBX_FS_UI64 " where event_suppressid=%s", ts, userid, row[0]); } else { zbx_db_insert_t db_insert; zbx_db_insert_prepare(&db_insert, "event_suppress", "event_suppressid", "eventid", "suppress_until", "userid", NULL); zbx_db_insert_add_values(&db_insert, __UINT64_C(0), eventid, ts, userid); zbx_db_insert_autoincrement(&db_insert, "event_suppressid"); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } zbx_db_free_result(result); } else THIS_SHOULD_NEVER_HAPPEN; } /****************************************************************************** * * * Purpose: process data tasks * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_data(zbx_ipc_async_socket_t *rtc, zbx_vector_uint64_t *taskids) { DB_ROW row; DB_RESULT result; int processed_num = 0, data_type; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_uint64_t done_taskids; zbx_uint64_t taskid; zabbix_log(LOG_LEVEL_DEBUG, "In %s() tasks_num:%d", __func__, taskids->values_num); zbx_db_begin(); zbx_vector_uint64_create(&done_taskids); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "select t.taskid,td.type,td.data" " from task t" " left join task_data td" " on t.taskid=td.taskid" " where t.proxy_hostid is null" " and"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.taskid", taskids->values, taskids->values_num); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(taskid, row[0]); if (SUCCEED == zbx_db_is_null(row[1])) { zbx_vector_uint64_append(&done_taskids, taskid); continue; } data_type = atoi(row[1]); switch (data_type) { case ZBX_TM_DATA_TYPE_DIAGINFO: tm_process_diaginfo(taskid, row[2]); zbx_vector_uint64_append(&done_taskids, taskid); break; case ZBX_TM_DATA_TYPE_PROXY_HOSTIDS: tm_process_proxy_config_reload_task(rtc, row[2]); zbx_vector_uint64_append(&done_taskids, taskid); break; case ZBX_TM_DATA_TYPE_PROXY_HOSTNAME: tm_process_passive_proxy_cache_reload_request(rtc, row[2]); zbx_vector_uint64_append(&done_taskids, taskid); break; case ZBX_TM_DATA_TYPE_TEMP_SUPPRESSION: tm_process_temp_suppression(row[2]); zbx_vector_uint64_append(&done_taskids, taskid); break; case ZBX_TM_DATA_TYPE_RANK_EVENT: tm_process_rank_event(taskid, row[2]); zbx_vector_uint64_append(&done_taskids, taskid); break; default: THIS_SHOULD_NEVER_HAPPEN; } } zbx_db_free_result(result); if (0 != (processed_num = done_taskids.values_num)) { sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where", ZBX_TM_STATUS_DONE); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", done_taskids.values, done_taskids.values_num); zbx_db_execute("%s", sql); } zbx_free(sql); zbx_vector_uint64_destroy(&done_taskids); zbx_db_commit(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Purpose: expires tasks that don't require specific expiration handling * * * * Return value: The number of successfully expired tasks * * * ******************************************************************************/ static int tm_expire_generic_tasks(zbx_vector_uint64_t *taskids) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update task set status=%d where", ZBX_TM_STATUS_EXPIRED); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "taskid", taskids->values, taskids->values_num); zbx_db_execute("%s", sql); zbx_free(sql); return taskids->values_num; } /****************************************************************************** * * * Purpose: get proxy version compatibility with server version * * * ******************************************************************************/ static zbx_proxy_compatibility_t tm_get_proxy_compatibility(zbx_uint64_t proxy_hostid) { zbx_proxy_compatibility_t compatibility = ZBX_PROXY_VERSION_UNDEFINED; if (0 < proxy_hostid) { DB_ROW row; DB_RESULT result; result = zbx_db_select( "select compatibility" " from host_rtdata" " where hostid=" ZBX_FS_UI64, proxy_hostid); if (NULL != (row = zbx_db_fetch(result))) compatibility = (zbx_proxy_compatibility_t)atoi(row[0]); zbx_db_free_result(result); } return compatibility; } /****************************************************************************** * * * Purpose: process task manager tasks depending on task type * * * * Return value: The number of successfully processed tasks * * * ******************************************************************************/ static int tm_process_tasks(zbx_ipc_async_socket_t *rtc, int now) { DB_ROW row; DB_RESULT result; int type, processed_num = 0, expired_num = 0, clock, ttl; zbx_uint64_t taskid, proxy_hostid; zbx_vector_uint64_t ack_taskids, check_now_taskids, expire_taskids, data_taskids; zbx_vector_uint64_create(&ack_taskids); zbx_vector_uint64_create(&check_now_taskids); zbx_vector_uint64_create(&expire_taskids); zbx_vector_uint64_create(&data_taskids); result = zbx_db_select("select taskid,type,clock,ttl,proxy_hostid" " from task" " where status in (%d,%d)" " order by taskid", ZBX_TM_STATUS_NEW, ZBX_TM_STATUS_INPROGRESS); while (NULL != (row = zbx_db_fetch(result))) { zbx_proxy_compatibility_t compatibility; ZBX_STR2UINT64(taskid, row[0]); ZBX_STR2UCHAR(type, row[1]); clock = atoi(row[2]); ttl = atoi(row[3]); ZBX_DBROW2UINT64(proxy_hostid, row[4]); switch (type) { case ZBX_TM_TASK_CLOSE_PROBLEM: /* close problem tasks will never have 'in progress' status */ if (SUCCEED == tm_try_task_close_problem(taskid)) processed_num++; break; case ZBX_TM_TASK_REMOTE_COMMAND: compatibility = tm_get_proxy_compatibility(proxy_hostid); if (ZBX_PROXY_VERSION_UNSUPPORTED == compatibility) { zbx_tm_task_t *task; const char *error = "Remote commands are disabled on unsupported proxies."; zabbix_log(LOG_LEVEL_WARNING, "%s", error); task = zbx_tm_task_create(0, ZBX_TM_TASK_REMOTE_COMMAND_RESULT, ZBX_TM_STATUS_NEW, zbx_time(), 0, 0); task->data = zbx_tm_remote_command_result_create(taskid, FAIL, error); zbx_tm_save_task(task); zbx_tm_task_free(task); } /* both - 'new' and 'in progress' remote tasks should expire */ if ((0 != ttl && clock + ttl < now) || (ZBX_PROXY_VERSION_UNSUPPORTED == compatibility)) { tm_expire_remote_command(taskid); expired_num++; } break; case ZBX_TM_TASK_REMOTE_COMMAND_RESULT: /* close problem tasks will never have 'in progress' status */ if (SUCCEED == tm_process_remote_command_result(taskid)) processed_num++; break; case ZBX_TM_TASK_ACKNOWLEDGE: zbx_vector_uint64_append(&ack_taskids, taskid); break; case ZBX_TM_TASK_CHECK_NOW: compatibility = tm_get_proxy_compatibility(proxy_hostid); if (ZBX_PROXY_VERSION_UNSUPPORTED == compatibility) { zabbix_log(LOG_LEVEL_WARNING, "Execute now task is disabled on unsupported" " proxies."); } if ((0 != ttl && clock + ttl < now) || (ZBX_PROXY_VERSION_UNSUPPORTED == compatibility)) zbx_vector_uint64_append(&expire_taskids, taskid); else zbx_vector_uint64_append(&check_now_taskids, taskid); break; case ZBX_TM_TASK_DATA: compatibility = tm_get_proxy_compatibility(proxy_hostid); if (ZBX_PROXY_VERSION_OUTDATED == compatibility || ZBX_PROXY_VERSION_UNSUPPORTED == compatibility) { zbx_tm_task_t *task; const char *error = "The requested task is disabled. Proxy major" " version does not match server major version."; zabbix_log(LOG_LEVEL_WARNING, "%s", error); task = zbx_tm_task_create(0, ZBX_TM_TASK_DATA_RESULT, ZBX_TM_STATUS_NEW, zbx_time(), 0, 0); task->data = zbx_tm_data_result_create(taskid, FAIL, error); zbx_tm_save_task(task); zbx_tm_task_free(task); zbx_vector_uint64_append(&expire_taskids, taskid); break; } ZBX_FALLTHROUGH; case ZBX_TM_PROXYDATA: /* both - 'new' and 'in progress' tasks should expire */ if (0 != ttl && clock + ttl < now) zbx_vector_uint64_append(&expire_taskids, taskid); else zbx_vector_uint64_append(&data_taskids, taskid); break; case ZBX_TM_TASK_DATA_RESULT: tm_process_data_result(taskid); processed_num++; break; default: THIS_SHOULD_NEVER_HAPPEN; break; } } zbx_db_free_result(result); if (0 < ack_taskids.values_num) processed_num += tm_process_acknowledgments(&ack_taskids); if (0 < check_now_taskids.values_num) processed_num += tm_process_check_now(&check_now_taskids); if (0 < data_taskids.values_num) processed_num += tm_process_data(rtc, &data_taskids); if (0 < expire_taskids.values_num) expired_num += tm_expire_generic_tasks(&expire_taskids); zbx_vector_uint64_destroy(&data_taskids); zbx_vector_uint64_destroy(&expire_taskids); zbx_vector_uint64_destroy(&check_now_taskids); zbx_vector_uint64_destroy(&ack_taskids); return processed_num + expired_num; } /****************************************************************************** * * * Purpose: remove old done/expired tasks * * * ******************************************************************************/ static void tm_remove_old_tasks(int now) { zbx_db_begin(); zbx_db_execute("delete from task where status in (%d,%d) and clock<=%d", ZBX_TM_STATUS_DONE, ZBX_TM_STATUS_EXPIRED, now - ZBX_TM_CLEANUP_TASK_AGE); zbx_db_commit(); } static void tm_reload_each_proxy_cache(zbx_ipc_async_socket_t *rtc) { int i, notify_proxypollers = 0; zbx_vector_cached_proxy_ptr_t proxies; zbx_vector_tm_task_t tasks_active; zbx_vector_cached_proxy_ptr_create(&proxies); zbx_vector_tm_task_create(&tasks_active); zbx_dc_get_all_proxies(&proxies); zabbix_log(LOG_LEVEL_WARNING, "reloading configuration cache on all proxies"); zbx_audit_prepare(); for (i = 0; i < proxies.values_num; i++) { zbx_tm_task_t *task; zbx_cached_proxy_t *proxy; proxy = proxies.values[i]; if (HOST_STATUS_PROXY_ACTIVE == proxy->status) { task = tm_create_active_proxy_reload_task(proxy->hostid); zbx_vector_tm_task_append(&tasks_active, task); } else if (HOST_STATUS_PROXY_PASSIVE == proxy->status) { if (FAIL == zbx_dc_update_passive_proxy_nextcheck(proxy->hostid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache on proxy " "with id " ZBX_FS_UI64 " [%s]: failed to update nextcheck", proxy->hostid, proxy->name); } else notify_proxypollers = 1; } zbx_audit_proxy_config_reload(proxy->hostid, proxy->name); } zbx_audit_flush(); if (0 != notify_proxypollers) zbx_ipc_async_socket_send(rtc, ZBX_RTC_PROXYPOLLER_PROCESS, NULL, 0); if (0 < tasks_active.values_num) { zbx_db_begin(); zbx_tm_save_tasks(&tasks_active); zbx_db_commit(); zbx_vector_tm_task_clear_ext(&tasks_active, zbx_tm_task_free); } zbx_vector_tm_task_destroy(&tasks_active); zbx_vector_cached_proxy_ptr_clear_ext(&proxies, zbx_cached_proxy_free); zbx_vector_cached_proxy_ptr_destroy(&proxies); } /****************************************************************************** * * * Purpose: reload configuration cache on proxies using given proxy names * * * * Parameters: rtc - [IN] the RTC service * * data - [IN] the JSON with request * * * ******************************************************************************/ static void tm_reload_proxy_cache_by_names(zbx_ipc_async_socket_t *rtc, const unsigned char *data) { struct zbx_json_parse jp, jp_data; const char *ptr; char name[ZBX_MAX_HOSTNAME_LEN * ZBX_MAX_BYTES_IN_UTF8_CHAR + 1]; zbx_vector_tm_task_t tasks_active; zbx_vector_str_t proxynames_log; char *names_success = NULL; zbx_vector_str_create(&proxynames_log); if (FAIL == zbx_json_open((const char *)data, &jp)) { zabbix_log(LOG_LEVEL_WARNING, "failed to parse proxy config cache reload data"); return; } if (FAIL == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_PROXY_NAMES, &jp_data)) { tm_reload_each_proxy_cache(rtc); return; } zbx_vector_tm_task_create(&tasks_active); zbx_audit_prepare(); for (ptr = NULL; NULL != (ptr = zbx_json_next(&jp_data, ptr));) { if (NULL != zbx_json_decodevalue(ptr, name, sizeof(name), NULL)) { zbx_uint64_t proxyid; unsigned char type; if (FAIL == zbx_dc_get_proxyid_by_name(name, &proxyid, &type)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache " "on proxy '%s': proxy is not in cache", name); continue; } if (HOST_STATUS_PROXY_ACTIVE == type) { zbx_tm_task_t *task; task = tm_create_active_proxy_reload_task(proxyid); zbx_vector_tm_task_append(&tasks_active, task); zbx_vector_str_append(&proxynames_log, zbx_strdup(NULL, name)); } else if (HOST_STATUS_PROXY_PASSIVE == type) { if (FAIL == zbx_dc_update_passive_proxy_nextcheck(proxyid)) { zabbix_log(LOG_LEVEL_WARNING, "failed to reload configuration cache on proxy " "with id " ZBX_FS_UI64 ": failed to update nextcheck", proxyid); } else zbx_vector_str_append(&proxynames_log, zbx_strdup(NULL, name)); } zbx_audit_proxy_config_reload(proxyid, name); } } zbx_audit_flush(); if (1 == proxynames_log.values_num) { zabbix_log(LOG_LEVEL_WARNING, "reloading configuration on proxy \"%s\"", proxynames_log.values[0]); } else if (1 < proxynames_log.values_num) { int i = 0; while (1) { if (i + 1 == proxynames_log.values_num) { names_success = zbx_strdcatf(names_success, "\"%s\"", proxynames_log.values[i]); break; } else { names_success = zbx_strdcatf(names_success, "\"%s\", ", proxynames_log.values[i]); i++; } } zabbix_log(LOG_LEVEL_WARNING, "reloading configuration on proxies %s", names_success); zbx_free(names_success); zbx_ipc_async_socket_send(rtc, ZBX_RTC_PROXYPOLLER_PROCESS, NULL, 0); } zbx_vector_str_clear_ext(&proxynames_log, zbx_str_free); zbx_vector_str_destroy(&proxynames_log); if (0 < tasks_active.values_num) { zbx_db_begin(); zbx_tm_save_tasks(&tasks_active); zbx_db_commit(); zbx_vector_tm_task_clear_ext(&tasks_active, zbx_tm_task_free); } zbx_vector_tm_task_destroy(&tasks_active); } ZBX_THREAD_ENTRY(taskmanager_thread, args) { static int cleanup_time = 0; double sec1, sec2; int tasks_num, sleeptime, nextcheck; zbx_ipc_async_socket_t rtc; const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; int server_num = ((zbx_thread_args_t *)args)->info.server_num; int process_num = ((zbx_thread_args_t *)args)->info.process_num; unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type; zbx_uint32_t rtc_msgs[] = {ZBX_RTC_PROXY_CONFIG_CACHE_RELOAD}; zbx_thread_taskmanager_args *taskmanager_args_in = (zbx_thread_taskmanager_args *) ((((zbx_thread_args_t *)args))->args); zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), server_num, get_process_type_string(process_type), process_num); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type)); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS)) problems_export = zbx_problems_export_init(get_problems_export, "task-manager", process_num); sec1 = zbx_time(); sleeptime = ZBX_TM_PROCESS_PERIOD - (int)sec1 % ZBX_TM_PROCESS_PERIOD; zbx_setproctitle("%s [started, idle %d sec]", get_process_type_string(process_type), sleeptime); zbx_rtc_subscribe(process_type, process_num, rtc_msgs, ARRSIZE(rtc_msgs), taskmanager_args_in->config_timeout, &rtc); while (ZBX_IS_RUNNING()) { zbx_uint32_t rtc_cmd; unsigned char *rtc_data = NULL; if (SUCCEED == zbx_rtc_wait(&rtc, info, &rtc_cmd, &rtc_data, sleeptime) && 0 != rtc_cmd) { if (ZBX_RTC_PROXY_CONFIG_CACHE_RELOAD == rtc_cmd) tm_reload_proxy_cache_by_names(&rtc, rtc_data); zbx_free(rtc_data); if (ZBX_RTC_SHUTDOWN == rtc_cmd) break; } sec1 = zbx_time(); zbx_update_env(get_process_type_string(process_type), sec1); zbx_setproctitle("%s [processing tasks]", get_process_type_string(process_type)); tasks_num = tm_process_tasks(&rtc, (int)sec1); if (ZBX_TM_CLEANUP_PERIOD <= sec1 - cleanup_time) { tm_remove_old_tasks((int)sec1); cleanup_time = sec1; } sec2 = zbx_time(); nextcheck = (int)sec1 - (int)sec1 % ZBX_TM_PROCESS_PERIOD + ZBX_TM_PROCESS_PERIOD; if (0 > (sleeptime = nextcheck - (int)sec2)) sleeptime = 0; zbx_setproctitle("%s [processed %d task(s) in " ZBX_FS_DBL " sec, idle %d sec]", get_process_type_string(process_type), tasks_num, sec2 - sec1, sleeptime); } if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS)) zbx_export_deinit(problems_export); zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num); while (1) zbx_sleep(SEC_PER_MIN); }