/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "proxyconfig.h" #include "proxyconfigwrite/proxyconfigwrite.h" #include "zbxlog.h" #include "zbxnix.h" #include "zbxcachehistory.h" #include "zbxself.h" #include "zbxtime.h" #include "zbxcompress.h" #include "zbxrtc.h" #include "zbxcommshigh.h" #include "zbx_rtc_constants.h" #include "zbx_host_constants.h" #include "zbxstr.h" #include "zbxalgo.h" static void process_configuration_sync(size_t *data_size, zbx_synced_new_config_t *synced, const zbx_thread_info_t *thread_info, zbx_thread_proxyconfig_args *args) { zbx_socket_t sock; struct zbx_json_parse jp, jp_kvs_paths = {0}; char value[16], *error = NULL, *buffer = NULL; size_t buffer_size, reserved; struct zbx_json j; int ret = FAIL; zbx_uint64_t config_revision, hostmap_revision; zbx_proxyconfig_write_status_t status = ZBX_PROXYCONFIG_WRITE_STATUS_DATA; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); /* reset the performance metric */ *data_size = 0; zbx_dc_get_upstream_revision(&config_revision, &hostmap_revision); zbx_json_init(&j, 128); zbx_json_addstring(&j, "request", ZBX_PROTO_VALUE_PROXY_CONFIG, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&j, "host", args->config_hostname, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&j, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&j, ZBX_PROTO_TAG_SESSION, zbx_dc_get_session_token(), ZBX_JSON_TYPE_STRING); zbx_json_adduint64(&j, ZBX_PROTO_TAG_CONFIG_REVISION, config_revision); if (0 != hostmap_revision) zbx_json_adduint64(&j, ZBX_PROTO_TAG_HOSTMAP_REVISION, hostmap_revision); if (SUCCEED != zbx_compress(j.buffer, j.buffer_size, &buffer, &buffer_size)) { zabbix_log(LOG_LEVEL_ERR,"cannot compress data: %s", zbx_compress_strerror()); goto out; } reserved = j.buffer_size; zbx_json_free(&j); zbx_update_selfmon_counter(thread_info, ZBX_PROCESS_STATE_IDLE); #define CONFIG_PROXYCONFIG_RETRY 10 /* seconds */ if (FAIL == zbx_connect_to_server(&sock, args->config_source_ip, args->config_server_addrs, 600, args->config_timeout, CONFIG_PROXYCONFIG_RETRY, LOG_LEVEL_WARNING, args->config_tls)) /* retry till have a connection */ { zbx_update_selfmon_counter(thread_info, ZBX_PROCESS_STATE_BUSY); goto out; } #undef CONFIG_PROXYCONFIG_RETRY zbx_update_selfmon_counter(thread_info, ZBX_PROCESS_STATE_BUSY); if (SUCCEED != zbx_get_data_from_server(&sock, &buffer, buffer_size, reserved, &error)) { zabbix_log(LOG_LEVEL_WARNING, "cannot obtain configuration data from server at \"%s\": %s", sock.peer, error); goto error; } if ('\0' == *sock.buffer) { zabbix_log(LOG_LEVEL_WARNING, "cannot obtain configuration data from server at \"%s\": %s", sock.peer, "empty string received"); goto error; } if (SUCCEED != zbx_json_open(sock.buffer, &jp)) { zabbix_log(LOG_LEVEL_WARNING, "cannot obtain configuration data from server at \"%s\": %s", sock.peer, zbx_json_strerror()); goto error; } *data_size = (size_t)(jp.end - jp.start + 1); /* performance metric */ /* if the answer is short then most likely it is a negative answer "response":"failed" */ if (128 > *data_size && SUCCEED == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_RESPONSE, value, sizeof(value), NULL) && 0 == strcmp(value, ZBX_PROTO_VALUE_FAILED)) { char *info = NULL; size_t info_alloc = 0; if (SUCCEED != zbx_json_value_by_name_dyn(&jp, ZBX_PROTO_TAG_INFO, &info, &info_alloc, NULL)) info = zbx_dsprintf(info, "negative response \"%s\"", value); zabbix_log(LOG_LEVEL_WARNING, "cannot obtain configuration data from server at \"%s\": %s", sock.peer, info); zbx_free(info); goto error; } if (SUCCEED == (ret = zbx_proxyconfig_process(sock.peer, &jp, &status, &error))) { zbx_dc_sync_configuration(ZBX_DBSYNC_UPDATE, *synced, NULL, args->config_vault, args->config_proxyconfig_frequency); *synced = ZBX_SYNCED_NEW_CONFIG_YES; if (SUCCEED == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_MACRO_SECRETS, &jp_kvs_paths)) { zbx_dc_sync_kvs_paths(&jp_kvs_paths, args->config_vault, args->config_source_ip, args->config_ssl_ca_location, args->config_ssl_cert_location, args->config_ssl_key_location); } zbx_dc_update_interfaces_availability(); } else { zabbix_log(LOG_LEVEL_WARNING, "cannot process received configuration data from server at \"%s\": %s", sock.peer, error); zbx_free(error); } zbx_dc_set_proxy_lastonline((int)time(NULL)); error: zbx_disconnect_from_server(&sock); if (SUCCEED != ret) { /* reset received config_revision to force full resync after data transfer failure */ zbx_dc_set_upstream_revision(0, 0); zbx_addrs_failover(args->config_server_addrs); } out: zbx_free(error); zbx_free(buffer); zbx_json_free(&j); #ifdef HAVE_MALLOC_TRIM /* avoid memory not being released back to the system if large proxy configuration is retrieved from database */ if (ZBX_PROXYCONFIG_WRITE_STATUS_DATA == status) malloc_trim(ZBX_MALLOC_TRIM); #endif zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static void proxyconfig_remove_unused_templates(void) { zbx_vector_uint64_t hostids, templateids; zbx_hashset_t templates; int removed_num; zbx_db_row_t row; zbx_db_result_t result; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_uint64_create(&hostids); zbx_vector_uint64_create(&templateids); zbx_hashset_create(&templates, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); result = zbx_db_select("select hostid,status from hosts"); while (NULL != (row = zbx_db_fetch(result))) { zbx_uint64_t hostid; unsigned char status; ZBX_STR2UINT64(hostid, row[0]); ZBX_STR2UCHAR(status, row[1]); if (HOST_STATUS_TEMPLATE == status) zbx_hashset_insert(&templates, &hostid, sizeof(hostid)); else zbx_vector_uint64_append(&hostids, hostid); } zbx_db_free_result(result); zbx_dc_get_unused_macro_templates(&templates, &hostids, &templateids); if (0 != templateids.values_num) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_db_begin(); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "delete from hosts_templates where"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "hostid", templateids.values, templateids.values_num); if (ZBX_DB_OK > zbx_db_execute("%s", sql)) goto fail; sql_offset = 0; zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "delete from hostmacro where"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "hostid", templateids.values, templateids.values_num); if (ZBX_DB_OK > zbx_db_execute("%s", sql)) goto fail; sql_offset = 0; zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "delete from hosts where"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "hostid", templateids.values, templateids.values_num); if (ZBX_DB_OK > zbx_db_execute("%s", sql)) goto fail; fail: zbx_db_commit(); zbx_free(sql); } removed_num = templateids.values_num; zbx_hashset_destroy(&templates); zbx_vector_uint64_destroy(&templateids); zbx_vector_uint64_destroy(&hostids); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() removed:%d", __func__, removed_num); } /****************************************************************************** * * * Purpose: periodically request config data * * * * Comments: never returns * * * ******************************************************************************/ ZBX_THREAD_ENTRY(proxyconfig_thread, args) { zbx_thread_proxyconfig_args *proxyconfig_args_in = (zbx_thread_proxyconfig_args *) (((zbx_thread_args_t *)args)->args); size_t data_size; double sec, last_template_cleanup_sec = 0, interval; zbx_ipc_async_socket_t rtc; int sleeptime; zbx_synced_new_config_t synced = ZBX_SYNCED_NEW_CONFIG_NO; zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; int server_num = ((zbx_thread_args_t *)args)->info.server_num; int process_num = ((zbx_thread_args_t *)args)->info.process_num; unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type; zbx_uint32_t rtc_msgs[] = {ZBX_RTC_CONFIG_CACHE_RELOAD}; zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), server_num, get_process_type_string(process_type), process_num); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) zbx_tls_init_child(proxyconfig_args_in->config_tls, proxyconfig_args_in->zbx_get_program_type_cb_arg, zbx_dc_get_psk_by_identity); #endif zbx_rtc_subscribe(process_type, process_num, rtc_msgs, ARRSIZE(rtc_msgs), proxyconfig_args_in->config_timeout, &rtc); zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type)); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); zbx_setproctitle("%s [syncing configuration]", get_process_type_string(process_type)); zbx_dc_sync_configuration(ZBX_DBSYNC_INIT, ZBX_SYNCED_NEW_CONFIG_NO, NULL, proxyconfig_args_in->config_vault, proxyconfig_args_in->config_proxyconfig_frequency); zbx_rtc_notify_finished_sync(proxyconfig_args_in->config_timeout, ZBX_RTC_CONFIG_SYNC_NOTIFY, get_process_type_string(process_type), &rtc); sleeptime = (ZBX_PROGRAM_TYPE_PROXY_PASSIVE == info->program_type ? ZBX_IPC_WAIT_FOREVER : 0); while (ZBX_IS_RUNNING()) { zbx_uint32_t rtc_cmd; unsigned char *rtc_data; int config_cache_reload = 0; while (SUCCEED == zbx_rtc_wait(&rtc, info, &rtc_cmd, &rtc_data, sleeptime) && 0 != rtc_cmd) { if (ZBX_RTC_CONFIG_CACHE_RELOAD == rtc_cmd) config_cache_reload = 1; else if (ZBX_RTC_SHUTDOWN == rtc_cmd) goto stop; sleeptime = 0; } sec = zbx_time(); zbx_update_env(get_process_type_string(process_type), sec); if (ZBX_PROGRAM_TYPE_PROXY_PASSIVE == info->program_type) { if (0 != config_cache_reload) { zbx_setproctitle("%s [loading configuration]", get_process_type_string(process_type)); zbx_dc_sync_configuration(ZBX_DBSYNC_UPDATE, synced, NULL, proxyconfig_args_in->config_vault, proxyconfig_args_in->config_proxyconfig_frequency); synced = ZBX_SYNCED_NEW_CONFIG_YES; zbx_dc_update_interfaces_availability(); zbx_rtc_notify_finished_sync(proxyconfig_args_in->config_timeout, ZBX_RTC_CONFIG_SYNC_NOTIFY, get_process_type_string(process_type), &rtc); if (SEC_PER_HOUR < sec - last_template_cleanup_sec) { proxyconfig_remove_unused_templates(); last_template_cleanup_sec = sec; } zbx_setproctitle("%s [synced config in " ZBX_FS_DBL " sec]", get_process_type_string(process_type), zbx_time() - sec); } sleeptime = ZBX_IPC_WAIT_FOREVER; continue; } if (1 == config_cache_reload) zabbix_log(LOG_LEVEL_WARNING, "forced reloading of the configuration cache"); zbx_setproctitle("%s [loading configuration]", get_process_type_string(process_type)); process_configuration_sync(&data_size, &synced, info, proxyconfig_args_in); interval = zbx_time() - sec; zbx_setproctitle("%s [synced config " ZBX_FS_SIZE_T " bytes in " ZBX_FS_DBL " sec, idle %d sec]", get_process_type_string(process_type), (zbx_fs_size_t)data_size, interval, proxyconfig_args_in->config_proxyconfig_frequency); if (SEC_PER_HOUR < sec - last_template_cleanup_sec) { proxyconfig_remove_unused_templates(); last_template_cleanup_sec = sec; } sleeptime = proxyconfig_args_in->config_proxyconfig_frequency; } stop: zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num); while (1) zbx_sleep(SEC_PER_MIN); }