/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "zbxavailability.h" #include "zbx_availability_constants.h" #include "zbxlog.h" #include "zbxself.h" #include "zbxipcservice.h" #include "zbxnix.h" #include "zbxnum.h" #include "zbxtime.h" #include "zbxalgo.h" #include "zbxdb.h" #include "zbxdbhigh.h" #include "zbxthreads.h" #define AVAILABILITY_MANAGER_PROXY_ACTIVE_AVAIL_DELAY_SEC (SEC_PER_MIN * 10) typedef struct { zbx_uint64_t hostid; time_t lastaccess; } zbx_active_avail_proxy_t; static sigset_t orig_mask; static int interface_availability_compare(const void *d1, const void *d2) { const zbx_interface_availability_t *ia1 = *(const zbx_interface_availability_t * const *)d1; const zbx_interface_availability_t *ia2 = *(const zbx_interface_availability_t * const *)d2; ZBX_RETURN_IF_NOT_EQUAL(ia1->interfaceid, ia2->interfaceid); return ia1->id - ia2->id; } static void process_new_active_check_heartbeat(zbx_avail_active_hb_cache_t *cache, zbx_host_active_avail_t *avail_new) { zbx_host_active_avail_t *avail_cached, *avail_queued; if (NULL == (avail_cached = zbx_hashset_search(&cache->hosts, &avail_new->hostid))) { avail_new->active_status = ZBX_INTERFACE_AVAILABLE_TRUE; zbx_hashset_insert(&cache->hosts, avail_new, sizeof(zbx_host_active_avail_t)); if (NULL == (avail_queued = zbx_hashset_search(&cache->queue, &avail_new->hostid))) { zbx_hashset_insert(&cache->queue, avail_new, sizeof(zbx_host_active_avail_t)); } else { avail_queued->active_status = ZBX_INTERFACE_AVAILABLE_TRUE; } } else { avail_cached->heartbeat_freq = avail_new->heartbeat_freq; avail_cached->lastaccess_active = avail_new->lastaccess_active; } } static void calculate_cached_active_check_availabilities(zbx_avail_active_hb_cache_t *cache) { zbx_hashset_iter_t iter; zbx_host_active_avail_t *host; zbx_hashset_iter_reset(&cache->hosts, &iter); while (NULL != (host = (zbx_host_active_avail_t *)zbx_hashset_iter_next(&iter))) { int now, prev_status; now = (int)time(NULL); prev_status = host->active_status; if (now - host->lastaccess_active <= host->heartbeat_freq * 2) { host->active_status = ZBX_INTERFACE_AVAILABLE_TRUE; } else host->active_status = ZBX_INTERFACE_AVAILABLE_FALSE; if (prev_status != host->active_status) { zbx_host_active_avail_t *queued_host; if (NULL == (queued_host = zbx_hashset_search(&cache->queue, &host->hostid))) zbx_hashset_insert(&cache->queue, host, sizeof(zbx_host_active_avail_t)); else queued_host->active_status = host->active_status; } } } static void db_update_active_check_status(zbx_vector_uint64_t *hostids, int status) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; if (0 == hostids->values_num) return; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update host_rtdata set active_available=%i where", status); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "hostid", hostids->values, hostids->values_num); zbx_db_execute("%s", sql); zbx_free(sql); } static void flush_active_hb_queue(zbx_avail_active_hb_cache_t *cache) { zbx_hashset_iter_t iter; zbx_host_active_avail_t *host; zbx_vector_uint64_t status_unknown, status_available, status_unavailable; if (0 == cache->queue.num_data) return; zbx_vector_uint64_create(&status_unknown); zbx_vector_uint64_create(&status_available); zbx_vector_uint64_create(&status_unavailable); zbx_hashset_iter_reset(&cache->queue, &iter); zbx_db_begin(); while (NULL != (host = (zbx_host_active_avail_t *)zbx_hashset_iter_next(&iter))) { if (host->active_status == ZBX_INTERFACE_AVAILABLE_UNKNOWN) { zbx_vector_uint64_append(&status_unknown, host->hostid); } else if (host->active_status == ZBX_INTERFACE_AVAILABLE_TRUE) { zbx_vector_uint64_append(&status_available, host->hostid); } else if (host->active_status == ZBX_INTERFACE_AVAILABLE_FALSE) zbx_vector_uint64_append(&status_unavailable, host->hostid); } db_update_active_check_status(&status_unknown, ZBX_INTERFACE_AVAILABLE_UNKNOWN); db_update_active_check_status(&status_available, ZBX_INTERFACE_AVAILABLE_TRUE); db_update_active_check_status(&status_unavailable, ZBX_INTERFACE_AVAILABLE_FALSE); if (ZBX_DB_OK == zbx_db_commit()) zbx_hashset_clear(&cache->queue); zbx_vector_uint64_destroy(&status_unknown); zbx_vector_uint64_destroy(&status_available); zbx_vector_uint64_destroy(&status_unavailable); } static void process_active_hb(zbx_avail_active_hb_cache_t *cache, zbx_ipc_message_t *message) { zbx_host_active_avail_t avail; zbx_availability_deserialize_active_hb(message->data, &avail); avail.lastaccess_active = (int)time(NULL); process_new_active_check_heartbeat(cache, &avail); } static void send_hostdata_response(zbx_avail_active_hb_cache_t *cache, zbx_ipc_client_t *client) { unsigned char *data = NULL; zbx_uint32_t data_len; data_len = zbx_availability_serialize_hostdata(&data, &cache->queue); zbx_ipc_client_send(client, ZBX_IPC_AVAILMAN_ACTIVE_HOSTDATA, data, data_len); zbx_free(data); } static void send_avail_check_status_response(zbx_avail_active_hb_cache_t *cache, zbx_ipc_client_t *client, zbx_ipc_message_t *message) { unsigned char *data = NULL; zbx_uint32_t data_len; zbx_uint64_t hostid; int status = ZBX_INTERFACE_AVAILABLE_UNKNOWN; zbx_host_active_avail_t *host; zbx_availability_deserialize_active_status_request(message->data, &hostid); if (NULL == (host = zbx_hashset_search(&cache->queue, &hostid))) { if (NULL != (host = zbx_hashset_search(&cache->hosts, &hostid))) status = host->active_status; } else status = host->active_status; data_len = zbx_availability_serialize_active_status_response(&data, status); zbx_ipc_client_send(client, ZBX_IPC_AVAILMAN_ACTIVE_STATUS, data, data_len); zbx_free(data); } static void process_confsync_diff(zbx_avail_active_hb_cache_t *cache, zbx_ipc_message_t *message) { zbx_vector_uint64_t hostids; zbx_vector_uint64_create(&hostids); zbx_availability_deserialize_hostids(message->data, &hostids); for (int i = 0; i < hostids.values_num; i++) { zbx_host_active_avail_t *queued_host; zbx_uint64_t hostid; hostid = hostids.values[i]; zbx_hashset_remove(&cache->hosts, &hostid); if (NULL != (queued_host = zbx_hashset_search(&cache->queue, &hostid))) { queued_host->active_status = ZBX_INTERFACE_AVAILABLE_UNKNOWN; } else { zbx_host_active_avail_t host_local; host_local.active_status = ZBX_INTERFACE_AVAILABLE_UNKNOWN; host_local.hostid = hostid; host_local.lastaccess_active = 0; host_local.heartbeat_freq = 0; zbx_hashset_insert(&cache->queue, &host_local, sizeof(zbx_host_active_avail_t)); } } zbx_vector_uint64_destroy(&hostids); } static void init_active_availability(zbx_avail_active_hb_cache_t *cache, unsigned char program_type) { if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) { if (ZBX_DB_OK > zbx_db_execute("update host_rtdata hr set active_available=%i where exists " "(select null from hosts h where h.hostid=hr.hostid and proxyid is null)", ZBX_INTERFACE_AVAILABLE_UNKNOWN)) { zabbix_log(LOG_LEVEL_WARNING, "Failed to reset availability status for active checks"); } } else { zbx_db_result_t result; zbx_db_row_t row; result = zbx_db_select("select hostid from hosts"); while (NULL != (row = zbx_db_fetch(result))) { zbx_host_active_avail_t avail_local; ZBX_STR2UINT64(avail_local.hostid, row[0]); avail_local.active_status = ZBX_INTERFACE_AVAILABLE_UNKNOWN; avail_local.lastaccess_active = 0; avail_local.heartbeat_freq = 0; zbx_hashset_insert(&cache->queue, &avail_local, sizeof(zbx_host_active_avail_t)); } zbx_db_free_result(result); } } static void flush_proxy_hostdata(zbx_avail_active_hb_cache_t *cache, zbx_ipc_message_t *message) { zbx_uint64_t proxyid; zbx_vector_proxy_hostdata_ptr_t hosts; zbx_proxy_hostdata_t *host; zbx_vector_uint64_t status_unknown, status_available, status_unavailable; zbx_active_avail_proxy_t *proxy_avail; zbx_vector_proxy_hostdata_ptr_create(&hosts); zbx_availability_deserialize_proxy_hostdata(message->data, &hosts, &proxyid); zbx_vector_uint64_create(&status_unknown); zbx_vector_uint64_create(&status_available); zbx_vector_uint64_create(&status_unavailable); for (int i = 0; i < hosts.values_num; i++) { host = hosts.values[i]; if (host->status == ZBX_INTERFACE_AVAILABLE_UNKNOWN) { zbx_vector_uint64_append(&status_unknown, host->hostid); } else if (host->status == ZBX_INTERFACE_AVAILABLE_TRUE) { zbx_vector_uint64_append(&status_available, host->hostid); } else if (host->status == ZBX_INTERFACE_AVAILABLE_FALSE) zbx_vector_uint64_append(&status_unavailable, host->hostid); } zbx_db_begin(); db_update_active_check_status(&status_unknown, ZBX_INTERFACE_AVAILABLE_UNKNOWN); db_update_active_check_status(&status_available, ZBX_INTERFACE_AVAILABLE_TRUE); db_update_active_check_status(&status_unavailable, ZBX_INTERFACE_AVAILABLE_FALSE); if (ZBX_DB_OK == zbx_db_commit()) zbx_hashset_clear(&cache->queue); if (NULL == (proxy_avail = zbx_hashset_search(&cache->proxy_avail, &proxyid))) { zbx_active_avail_proxy_t proxy_avail_local; proxy_avail_local.hostid = proxyid; proxy_avail_local.lastaccess = time(NULL); zbx_hashset_insert(&cache->proxy_avail, &proxy_avail_local, sizeof(zbx_active_avail_proxy_t)); } else proxy_avail->lastaccess = time(NULL); zbx_vector_uint64_destroy(&status_unknown); zbx_vector_uint64_destroy(&status_available); zbx_vector_uint64_destroy(&status_unavailable); zbx_vector_proxy_hostdata_ptr_clear_ext(&hosts, (zbx_proxy_hostdata_ptr_free_func_t)zbx_ptr_free); zbx_vector_proxy_hostdata_ptr_destroy(&hosts); } static void flush_all_hosts(zbx_avail_active_hb_cache_t *cache) { zbx_hashset_iter_t iter; zbx_host_active_avail_t *host; if (0 == cache->hosts.num_data) return; zbx_hashset_iter_reset(&cache->hosts, &iter); while (NULL != (host = (zbx_host_active_avail_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&cache->queue, &host->hostid)) { zbx_hashset_insert(&cache->queue, host, sizeof(zbx_host_active_avail_t)); } } } static void active_checks_calculate_proxy_availability(zbx_avail_active_hb_cache_t *cache) { zbx_hashset_iter_t iter; zbx_active_avail_proxy_t *proxy_avail; time_t now; if (0 == cache->proxy_avail.num_data) return; now = time(NULL); zbx_hashset_iter_reset(&cache->proxy_avail, &iter); while (NULL != (proxy_avail = (zbx_active_avail_proxy_t *)zbx_hashset_iter_next(&iter))) { if (proxy_avail->lastaccess + AVAILABILITY_MANAGER_PROXY_ACTIVE_AVAIL_DELAY_SEC <= now) { if (ZBX_DB_OK > zbx_db_execute("update host_rtdata set active_available=%i " "where hostid in (select hostid from hosts where proxyid=" ZBX_FS_UI64 ")", ZBX_INTERFACE_AVAILABLE_UNKNOWN, proxy_avail->hostid)) { continue; } zbx_hashset_iter_remove(&iter); } } } static void update_proxy_heartbeat(zbx_avail_active_hb_cache_t *cache, zbx_ipc_message_t *message) { zbx_active_avail_proxy_t *proxy_avail; zbx_uint64_t proxyid; zbx_availability_deserialize_active_proxy_hb_update(message->data, &proxyid); if (NULL != (proxy_avail = zbx_hashset_search(&cache->proxy_avail, &proxyid))) proxy_avail->lastaccess = time(NULL); } ZBX_THREAD_ENTRY(zbx_availability_manager_thread, args) { #define AVAILABILITY_MANAGER_DELAY 1 #define AVAILABILITY_MANAGER_FLUSH_DELAY_SEC 5 #define AVAILABILITY_MANAGER_ACTIVE_HEARTBEAT_DELAY_SEC 10 #define AVAILABILITY_MANAGER_PROXY_ACTIVE_AUTOFLUSH_DELAY (SEC_PER_MIN * 5) #define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */ /* once in STAT_INTERVAL seconds */ zbx_ipc_service_t service; char *error = NULL; zbx_ipc_client_t *client; zbx_ipc_message_t *message; int ret, processed_num = 0; double time_stat, time_idle = 0, time_now, time_flush, sec, last_proxy_flush; zbx_vector_availability_ptr_t interface_availabilities; zbx_timespec_t timeout = {AVAILABILITY_MANAGER_DELAY, 0}; zbx_avail_active_hb_cache_t active_hb_cache; const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; int server_num = ((zbx_thread_args_t *)args)->info.server_num; int process_num = ((zbx_thread_args_t *)args)->info.process_num; unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type; zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), server_num, get_process_type_string(process_type), process_num); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); zbx_setproctitle("%s #%d [connecting to the database]", get_process_type_string(process_type), process_num); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_AVAILABILITY, &error)) { zabbix_log(LOG_LEVEL_CRIT, "cannot start availability manager service: %s", error); zbx_free(error); exit(EXIT_FAILURE); } /* initialize statistics */ time_stat = last_proxy_flush = zbx_time(); time_flush = time_stat; active_hb_cache.last_proxy_avail_refresh = active_hb_cache.last_status_refresh = zbx_time(); zbx_vector_availability_ptr_create(&interface_availabilities); zbx_hashset_create(&active_hb_cache.queue, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_hashset_create(&active_hb_cache.hosts, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_hashset_create(&active_hb_cache.proxy_avail, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num); init_active_availability(&active_hb_cache, info->program_type); while (ZBX_IS_RUNNING()) { time_now = zbx_time(); if (STAT_INTERVAL < time_now - time_stat) { zbx_setproctitle("%s #%d [queued %d, processed %d values, idle " ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num, interface_availabilities.values_num, processed_num, time_idle, time_now - time_stat); time_stat = time_now; time_idle = 0; processed_num = 0; } zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE); ret = zbx_ipc_service_recv(&service, &timeout, &client, &message); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); sec = zbx_time(); zbx_update_env(get_process_type_string(process_type), sec); if (ZBX_IPC_RECV_IMMEDIATE != ret) time_idle += sec - time_now; if (NULL != message) { switch (message->code) { case ZBX_IPC_AVAILABILITY_REQUEST: zbx_availability_deserialize(message->data, message->size, &interface_availabilities); break; case ZBX_IPC_AVAILMAN_ACTIVE_HB: process_active_hb(&active_hb_cache, message); break; case ZBX_IPC_AVAILMAN_ACTIVE_HOSTDATA: send_hostdata_response(&active_hb_cache, client); zbx_hashset_clear(&active_hb_cache.queue); break; case ZBX_IPC_AVAILMAN_ACTIVE_STATUS: send_avail_check_status_response(&active_hb_cache, client, message); break; case ZBX_IPC_AVAILMAN_CONFSYNC_DIFF: process_confsync_diff(&active_hb_cache, message); break; case ZBX_IPC_AVAILMAN_PROCESS_PROXY_HOSTDATA: flush_proxy_hostdata(&active_hb_cache, message); break; case ZBX_IPC_AVAILMAN_ACTIVE_PROXY_HB_UPDATE: update_proxy_heartbeat(&active_hb_cache, message); break; default: THIS_SHOULD_NEVER_HAPPEN; } zbx_ipc_message_free(message); } if (NULL != client) zbx_ipc_client_release(client); if (AVAILABILITY_MANAGER_ACTIVE_HEARTBEAT_DELAY_SEC < time_now - active_hb_cache.last_status_refresh) { active_hb_cache.last_status_refresh = time_now; calculate_cached_active_check_availabilities(&active_hb_cache); } if (AVAILABILITY_MANAGER_FLUSH_DELAY_SEC < time_now - time_flush) { time_flush = time_now; if (0 != interface_availabilities.values_num) { zbx_block_signals(&orig_mask); zbx_vector_availability_ptr_sort(&interface_availabilities, interface_availability_compare); zbx_db_update_interface_availabilities(&interface_availabilities); zbx_unblock_signals(&orig_mask); processed_num = interface_availabilities.values_num; zbx_vector_availability_ptr_clear_ext(&interface_availabilities, zbx_interface_availability_free); } if (0 != active_hb_cache.queue.num_data && 0 != (info->program_type & ZBX_PROGRAM_TYPE_SERVER)) { flush_active_hb_queue(&active_hb_cache); zbx_hashset_clear(&active_hb_cache.queue); } } if (AVAILABILITY_MANAGER_PROXY_ACTIVE_AVAIL_DELAY_SEC < time_now - active_hb_cache.last_proxy_avail_refresh && 0 != (info->program_type & ZBX_PROGRAM_TYPE_SERVER)) { active_hb_cache.last_proxy_avail_refresh = time_now; active_checks_calculate_proxy_availability(&active_hb_cache); } if (0 != (info->program_type & ZBX_PROGRAM_TYPE_PROXY) && last_proxy_flush + AVAILABILITY_MANAGER_PROXY_ACTIVE_AUTOFLUSH_DELAY <= time_now) { flush_all_hosts(&active_hb_cache); last_proxy_flush = time_now; } } zbx_block_signals(&orig_mask); if (0 != interface_availabilities.values_num) { zbx_vector_availability_ptr_sort(&interface_availabilities, interface_availability_compare); zbx_db_update_interface_availabilities(&interface_availabilities); } zbx_db_close(); zbx_unblock_signals(&orig_mask); exit(EXIT_SUCCESS); #undef STAT_INTERVAL #undef AVAILABILITY_MANAGER_DELAY #undef AVAILABILITY_MANAGER_FLUSH_DELAY_SEC #undef AVAILABILITY_MANAGER_ACTIVE_HEARTBEAT_DELAY_SEC #undef AVAILABILITY_MANAGER_PROXY_ACTIVE_AUTOFLUSH_DELAY }