/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "pg_manager.h" #include "zbxcommon.h" #include "pg_cache.h" #include "pg_service.h" #include "zbx_host_constants.h" #include "zbxalgo.h" #include "zbxcacheconfig.h" #include "zbxdb.h" #include "zbxdbhigh.h" #include "zbxnix.h" #include "zbxnum.h" #include "zbxself.h" #include "zbxstr.h" #include "zbxtime.h" #include "zbxtypes.h" /****************************************************************************** * * * Purpose: initialize proxy group manager * * * ******************************************************************************/ static void pgm_init(zbx_pg_cache_t *cache) { zbx_db_row_t row; zbx_db_result_t result; zbx_uint64_t map_revision = 0; result = zbx_db_select("select nextid from ids where table_name='host_proxy' and field_name='revision'"); if (NULL != (row = zbx_db_fetch(result))) ZBX_DBROW2UINT64(map_revision, row[0]); zbx_db_free_result(result); pg_cache_init(cache, map_revision); } /****************************************************************************** * * * Purpose: update hosts and host-proxy group assignments from database * * * ******************************************************************************/ static void pgm_db_get_hosts(zbx_pg_cache_t *cache) { zbx_db_row_t row; zbx_db_result_t result; result = zbx_db_select("select h.hostid,hp.proxyid,hp.revision,hp.hostproxyid,h.monitored_by,h.proxy_groupid" " from hosts h" " left join host_proxy hp" " on h.hostid=hp.hostid"); pg_cache_lock(cache); while (NULL != (row = zbx_db_fetch(result))) { zbx_uint64_t hostid, proxyid, revision, hostproxyid, proxy_groupid; zbx_pg_proxy_t *proxy; zbx_pg_group_t *group; unsigned char monitored_by; ZBX_DBROW2UINT64(hostid, row[0]); ZBX_DBROW2UINT64(proxyid, row[1]); ZBX_DBROW2UINT64(revision, row[2]); ZBX_DBROW2UINT64(hostproxyid, row[3]); ZBX_STR2UCHAR(monitored_by, row[4]); ZBX_DBROW2UINT64(proxy_groupid, row[5]); if (HOST_MONITORED_BY_PROXY_GROUP != monitored_by || 0 == proxy_groupid || NULL == (group = (zbx_pg_group_t *)zbx_hashset_search(&cache->groups, &proxy_groupid))) { /* remove hostmap if host is not monitored by proxy group */ if (0 != proxyid) pg_cache_set_host_proxy(cache, hostid, 0); continue; } zbx_hashset_insert(&group->hostids, &hostid, sizeof(hostid)); if (0 == proxyid) { zbx_vector_uint64_append(&group->unassigned_hostids, hostid); continue; } if (NULL == (proxy = (zbx_pg_proxy_t *)zbx_hashset_search(&cache->proxies, &proxyid)) || group != proxy->group) { zbx_vector_uint64_append(&group->unassigned_hostids, hostid); pg_cache_set_host_proxy(cache, hostid, 0); continue; } zbx_pg_host_t host_local = { .hostid = hostid, .proxyid = proxyid, .revision = revision, .hostproxyid = hostproxyid }; zbx_pg_host_ref_t ref_local; ref_local.host = (zbx_pg_host_t *)zbx_hashset_insert(&cache->hostmap, &host_local, sizeof(host_local)); zbx_hashset_insert(&proxy->hosts, &ref_local, sizeof(ref_local)); if (group->hostmap_revision < revision) group->hostmap_revision = revision; } pg_cache_unlock(cache); zbx_db_free_result(result); } /****************************************************************************** * * * Purpose: update configuration and re-calculate proxy group/proxy statuses * * * ******************************************************************************/ static void pgm_update(zbx_pg_cache_t *cache) { int now, flags; zbx_dc_um_handle_t *um_handle; um_handle = zbx_dc_open_user_macros(); pg_cache_lock(cache); if (SUCCEED == pg_cache_update_groups(cache)) flags = ZBX_PG_PROXY_FETCH_FORCE; else flags = ZBX_PG_PROXY_FETCH_REVISION; pg_cache_update_proxies(cache, flags); now = (int)time(NULL); pg_cache_update_proxy_state(cache, um_handle, now); pg_cache_update_group_state(cache, um_handle, now); pg_cache_unlock(cache); zbx_dc_close_user_macros(um_handle); } /****************************************************************************** * * * Purpose: flush proxy group updates to database * * * ******************************************************************************/ static void pgm_db_flush_group_updates(char **sql, size_t *sql_alloc, size_t *sql_offset, zbx_vector_pg_update_t *group_updates) { for (int i = 0; i < group_updates->values_num; i++) { zbx_snprintf_alloc(sql, sql_alloc, sql_offset, "update proxy_group_rtdata set state=%d where proxy_groupid=" ZBX_FS_UI64 ";\n", group_updates->values[i].state, group_updates->values[i].objectid); } zbx_db_execute_overflowed_sql(sql, sql_alloc, sql_offset); } /****************************************************************************** * * * Purpose: flush proxy updates to database * * * ******************************************************************************/ static void pgm_db_flush_proxy_updates(char **sql, size_t *sql_alloc, size_t *sql_offset, zbx_vector_pg_update_t *proxies) { for (int i = 0; i < proxies->values_num; i++) { if (0 == (proxies->values[i].flags & ZBX_PG_PROXY_UPDATE_STATE)) continue; zbx_snprintf_alloc(sql, sql_alloc, sql_offset, "update proxy_rtdata set state=%d where proxyid=" ZBX_FS_UI64 ";\n", proxies->values[i].state, proxies->values[i].objectid); zbx_db_execute_overflowed_sql(sql, sql_alloc, sql_offset); } } /****************************************************************************** * * * Purpose: flush host-proxy mapping changes to database * * * ******************************************************************************/ static void pgm_db_flush_host_proxy_updates(char **sql, size_t *sql_alloc, size_t *sql_offset, zbx_vector_pg_host_t *hosts) { for (int i = 0; i < hosts->values_num; i++) { zbx_snprintf_alloc(sql, sql_alloc, sql_offset, "update host_proxy set proxyid=" ZBX_FS_UI64 ",revision=" ZBX_FS_UI64 " where hostid=" ZBX_FS_UI64 ";\n", hosts->values[i].proxyid, hosts->values[i].revision, hosts->values[i].hostid); zbx_db_execute_overflowed_sql(sql, sql_alloc, sql_offset); zabbix_log(LOG_LEVEL_DEBUG, "re-assigned hostid " ZBX_FS_UI64 " to proxyid " ZBX_FS_UI64, hosts->values[i].hostid, hosts->values[i].proxyid); } } /****************************************************************************** * * * Purpose: delete removed host-proxy mapping records from database * * * ******************************************************************************/ static void pgm_db_flush_host_proxy_deletes(char **sql, size_t *sql_alloc, size_t *sql_offset, zbx_vector_pg_host_t *hosts) { if (0 == hosts->values_num) return; zbx_vector_uint64_t hostids; zbx_vector_uint64_create(&hostids); for (int i = 0; i < hosts->values_num; i++) { zbx_vector_uint64_append(&hostids, hosts->values[i].hostid); zabbix_log(LOG_LEVEL_DEBUG, "unassigned hostid " ZBX_FS_UI64 " from proxyid " ZBX_FS_UI64, hosts->values[i].hostid, hosts->values[i].proxyid); } zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_snprintf_alloc(sql, sql_alloc, sql_offset, "delete from host_proxy where "); zbx_db_add_condition_alloc(sql, sql_alloc, sql_offset, "hostid", hostids.values, hostids.values_num); zbx_snprintf_alloc(sql, sql_alloc, sql_offset, ";\n"); zbx_db_execute_overflowed_sql(sql, sql_alloc, sql_offset); zbx_vector_uint64_destroy(&hostids); } /****************************************************************************** * * * Purpose: get record identifiers from database and lock them * * * * Parameters: ids - [IN] vector with identifier to lock * * table - [IN] target table * * field - [IN] record identifier field name * * index - [OUT] locked identifiers * * * ******************************************************************************/ static void pgm_db_get_recids_for_update(zbx_vector_uint64_t *ids, const char *table, const char *field, zbx_hashset_t *index) { zbx_db_row_t row; zbx_db_result_t result; zbx_uint64_t id; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_vector_uint64_sort(ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_uniq(ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select %s from %s where ", field, table); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, field, ids->values, ids->values_num); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ZBX_FOR_UPDATE); result = zbx_db_select("%s", sql); zbx_free(sql); while (NULL != (row = zbx_db_fetch(result))) { ZBX_DBROW2UINT64(id, row[0]); zbx_hashset_insert(index, &id, sizeof(id)); } zbx_db_free_result(result); } /****************************************************************************** * * * Purpose: flush new host-mapping record batch to database * * * ******************************************************************************/ static void pgm_db_flush_host_proxy_insert_batch(zbx_pg_host_t *hosts, int hosts_num) { zbx_vector_uint64_t hostids, proxyids; zbx_hashset_t host_index, proxy_index; zbx_vector_uint64_create(&hostids); zbx_vector_uint64_create(&proxyids); zbx_hashset_create(&host_index, (size_t)hosts_num, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_hashset_create(&proxy_index, (size_t)hosts_num, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); for (int i = 0; i < hosts_num; i++) { zbx_vector_uint64_append(&hostids, hosts[i].hostid); zbx_vector_uint64_append(&proxyids, hosts[i].proxyid); } pgm_db_get_recids_for_update(&hostids, "hosts", "hostid", &host_index); pgm_db_get_recids_for_update(&proxyids, "proxy", "proxyid", &proxy_index); zbx_db_insert_t db_insert; zbx_db_insert_prepare(&db_insert, "host_proxy", "hostproxyid", "hostid", "proxyid", "revision", NULL); for (int i = 0; i < hosts_num; i++) { if (NULL == zbx_hashset_search(&host_index, &hosts[i].hostid)) continue; if (NULL == zbx_hashset_search(&proxy_index, &hosts[i].proxyid)) continue; zbx_db_insert_add_values(&db_insert, hosts[i].hostproxyid, hosts[i].hostid, hosts[i].proxyid, hosts[i].revision); } zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); zbx_hashset_destroy(&proxy_index); zbx_hashset_destroy(&host_index); zbx_vector_uint64_destroy(&proxyids); zbx_vector_uint64_destroy(&hostids); } /****************************************************************************** * * * Purpose: flush new host-mapping records to database * * * ******************************************************************************/ static void pgm_db_flush_host_proxy_inserts(zbx_vector_pg_host_t *hosts) { #define PGM_INSERT_BATCH_SIZE 1000 for (int i = 0; i < hosts->values_num; i += PGM_INSERT_BATCH_SIZE) { int size = hosts->values_num - i; if (PGM_INSERT_BATCH_SIZE < size) size = PGM_INSERT_BATCH_SIZE; pgm_db_flush_host_proxy_insert_batch(hosts->values + i, size); zabbix_log(LOG_LEVEL_DEBUG, "assigned hostid " ZBX_FS_UI64 " to proxyid " ZBX_FS_UI64, hosts->values[i].hostid, hosts->values[i].proxyid); } #undef PGM_INSERT_BATCH_SIZE } /****************************************************************************** * * * Purpose: flush host-proxy mapping revision to database * * * ******************************************************************************/ static void pgm_db_flush_host_proxy_revision(zbx_uint64_t revision) { zbx_db_row_t row; zbx_db_result_t result; result = zbx_db_select("select nextid from ids where table_name='host_proxy' and field_name='revision'"); if (NULL == (row = zbx_db_fetch(result))) { zbx_db_insert_t db_insert; zbx_db_insert_prepare(&db_insert, "ids", "table_name", "field_name", "nextid", NULL); zbx_db_insert_add_values(&db_insert, "host_proxy", "revision", revision); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } else { zbx_uint64_t nextid; ZBX_DBROW2UINT64(nextid, row[0]); if (nextid != revision) { zbx_db_execute("update ids set nextid=" ZBX_FS_UI64 " where table_name='host_proxy' and field_name='revision'", revision); } } zbx_db_free_result(result); } /****************************************************************************** * * * Purpose: flush proxy group and host-proxy mapping updates to database * * * ******************************************************************************/ static void pgm_rebalance_and_flush_updates(zbx_pg_cache_t *cache) { zbx_vector_pg_update_t group_updates, proxy_updates; zbx_vector_pg_host_t hosts_new, hosts_mod, hosts_del; zbx_vector_uint64_t groupids; zbx_uint64_t hostmap_revision; zbx_dc_um_handle_t *um_handle; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_pg_update_create(&group_updates); zbx_vector_pg_update_create(&proxy_updates); zbx_vector_pg_host_create(&hosts_new); zbx_vector_pg_host_create(&hosts_mod); zbx_vector_pg_host_create(&hosts_del); zbx_vector_uint64_create(&groupids); um_handle = zbx_dc_open_user_macros(); pg_cache_lock(cache); hostmap_revision = cache->hostmap_revision; pg_cache_rebalance_groups(cache, um_handle); pg_cache_get_group_and_proxy_updates(cache, &group_updates, &proxy_updates); pg_cache_get_hostmap_updates(cache, &hosts_new, &hosts_mod, &hosts_del, &groupids); pg_cache_add_new_hostmaps(cache, &hosts_new, &groupids); pg_cache_add_deleted_hostmaps(cache, &hosts_del); pg_cache_unlock(cache); zbx_dc_close_user_macros(um_handle); if (0 != group_updates.values_num || 0 != proxy_updates.values_num || 0 != hosts_new.values_num || 0 != hosts_mod.values_num || 0 != hosts_del.values_num) { char *sql = NULL; size_t sql_alloc = 0; int ret; do { size_t sql_offset = 0; zbx_db_begin(); zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); pgm_db_flush_group_updates(&sql, &sql_alloc, &sql_offset, &group_updates); pgm_db_flush_proxy_updates(&sql, &sql_alloc, &sql_offset, &proxy_updates); pgm_db_flush_host_proxy_updates(&sql, &sql_alloc, &sql_offset, &hosts_mod); pgm_db_flush_host_proxy_deletes(&sql, &sql_alloc, &sql_offset, &hosts_del); zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (16 < sql_offset) zbx_db_execute("%s", sql); pgm_db_flush_host_proxy_inserts(&hosts_new); if (hostmap_revision != cache->hostmap_revision) pgm_db_flush_host_proxy_revision(cache->hostmap_revision); } while (ZBX_DB_DOWN == (ret = zbx_db_commit())); zbx_free(sql); if (ZBX_DB_OK <= ret && 0 != groupids.values_num) { zbx_vector_uint64_sort(&groupids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_uniq(&groupids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); pg_cache_lock(cache); pg_cache_update_hostmap_revision(cache, &groupids); pg_cache_unlock(cache); } if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE)) pg_cache_dump(cache); } zbx_vector_uint64_destroy(&groupids); zbx_vector_pg_host_destroy(&hosts_del); zbx_vector_pg_host_destroy(&hosts_mod); zbx_vector_pg_host_destroy(&hosts_new); zbx_vector_pg_update_destroy(&proxy_updates); zbx_vector_pg_update_destroy(&group_updates); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /* * main process loop */ ZBX_THREAD_ENTRY(pg_manager_thread, args) { zbx_pg_service_t pgs; char *error = NULL; const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; zbx_pg_cache_t cache; double time_update = 0; zbx_setproctitle("%s #%d starting", get_process_type_string(info->process_type), info->process_num); zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), info->server_num, get_process_type_string(info->process_type), info->process_num); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); pgm_init(&cache); if (FAIL == pg_service_init(&pgs, &cache, &error)) { zabbix_log(LOG_LEVEL_CRIT, "cannot start proxy group manager service: %s", error); zbx_free(error); exit(EXIT_FAILURE); } pgm_update(&cache); pgm_db_get_hosts(&cache); if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE)) pg_cache_dump(&cache); time_update = zbx_time(); zbx_setproctitle("%s #%d started", get_process_type_string(info->process_type), info->process_num); while (ZBX_IS_RUNNING()) { double time_now; time_now = zbx_time(); if (PG_STATE_CHECK_INTERVAL < time_now - time_update) { pgm_update(&cache); time_update = time_now; } zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE); zbx_sleep_loop(info, 1); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); if (0 != cache.group_updates.values_num) pgm_rebalance_and_flush_updates(&cache); } pg_service_destroy(&pgs); zbx_db_close(); pg_cache_destroy(&cache); zbx_setproctitle("%s #%d [terminated]", get_process_type_string(info->process_type), info->process_num); while (1) zbx_sleep(SEC_PER_MIN); }