/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "dbconfigworker.h" #include "zbx_host_constants.h" #include "zbxlog.h" #include "zbxself.h" #include "zbxipcservice.h" #include "zbxnix.h" #include "zbxnum.h" #include "zbxtime.h" #include "zbxcacheconfig.h" #include "zbxalgo.h" #include "zbxdbhigh.h" static void dbsync_item_rtname(zbx_vector_uint64_t *hostids, int *processed_num, int *updated_num, int *macro_used) { #define ZBX_DBCONFIG_BATCH_SIZE 1000 zbx_db_result_t result; zbx_db_row_t row; zbx_dc_um_handle_t *um_handle; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; char *sql_select = NULL; size_t sql_select_alloc = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (0 == hostids->values[0]) { if (0 == *macro_used) zbx_vector_uint64_remove(hostids, 0); else hostids->values_num = 1; if (0 == hostids->values_num) goto out; } um_handle = zbx_dc_open_user_macros(); zbx_db_begin(); zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); for (zbx_uint64_t *batch = hostids->values; batch < hostids->values + hostids->values_num; batch += ZBX_DBCONFIG_BATCH_SIZE) { int batch_size = MIN(ZBX_DBCONFIG_BATCH_SIZE, hostids->values + hostids->values_num - batch); size_t sql_select_offset = 0; zbx_strcpy_alloc(&sql_select, &sql_select_alloc, &sql_select_offset, "select i.itemid,i.hostid,i.name,ir.name_resolved" " from items i,item_rtname ir" " where i.name like '%%{$%%' and ir.itemid=i.itemid"); if (0 != batch[0]) { zbx_strcpy_alloc(&sql_select, &sql_select_alloc, &sql_select_offset, " and"); zbx_db_add_condition_alloc(&sql_select, &sql_select_alloc, &sql_select_offset, "i.hostid", batch, batch_size); } zbx_strcpy_alloc(&sql_select, &sql_select_alloc, &sql_select_offset, " order by i.itemid"); result = zbx_db_select("%s", sql_select); zabbix_log(LOG_LEVEL_DEBUG, "fetch started"); while (NULL != (row = zbx_db_fetch(result))) { zbx_uint64_t hostid; char *name; (*processed_num)++; ZBX_STR2UINT64(hostid, row[1]); name = zbx_strdup(NULL, row[2]); (void)zbx_dc_expand_user_and_func_macros(um_handle, &name, &hostid, 1, NULL); if (0 != strcmp(row[3], name)) { char *name_esc; name_esc = zbx_db_dyn_escape_string(name); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update item_rtname set" " name_resolved='%s',name_resolved_upper=upper('%s')" " where itemid=%s;\n", name_esc, name_esc, row[0]); zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset); zbx_free(name_esc); (*updated_num)++; } zbx_free(name); } zbx_db_free_result(result); } zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (sql_offset > 16) /* In ORACLE always present begin..end; */ zbx_db_execute("%s", sql); zbx_free(sql_select); zbx_free(sql); zbx_dc_close_user_macros(um_handle); zbx_db_commit(); if (0 == hostids->values[0] || 0 != *processed_num) *macro_used = *processed_num; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); #undef ZBX_DBCONFIG_BATCH_SIZE } ZBX_THREAD_ENTRY(zbx_dbconfig_worker_thread, args) { #define ZBX_DBCONFIG_WORKER_DELAY 1 zbx_ipc_service_t service; char *error = NULL; zbx_ipc_client_t *client; zbx_ipc_message_t *message; int processed_num = 0, updated_num = 0, macro_used = 1; double sec = 0; zbx_timespec_t timeout = {ZBX_DBCONFIG_WORKER_DELAY, 0}; const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; int server_num = ((zbx_thread_args_t *)args)->info.server_num; int process_num = ((zbx_thread_args_t *)args)->info.process_num; unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type; zbx_vector_uint64_t hostids; zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), server_num, get_process_type_string(process_type), process_num); zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type)); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_DBCONFIG_WORKER, &error)) { zabbix_log(LOG_LEVEL_CRIT, "cannot start %s service: %s", get_process_type_string(process_type), error); zbx_free(error); exit(EXIT_FAILURE); } zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num); zbx_vector_uint64_create(&hostids); zbx_vector_uint64_append(&hostids, 0); while (ZBX_IS_RUNNING()) { double time_now; zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE); (void)zbx_ipc_service_recv(&service, &timeout, &client, &message); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); if (NULL != message) { switch (message->code) { case ZBX_IPC_DBCONFIG_WORKER_REQUEST: zbx_dbconfig_worker_deserialize_ids(message->data, message->size, &hostids); break; default: THIS_SHOULD_NEVER_HAPPEN; } zbx_ipc_message_free(message); } if (NULL != client) zbx_ipc_client_release(client); time_now = zbx_time(); zbx_update_env(get_process_type_string(process_type), time_now); if (0 != hostids.values_num) { zbx_setproctitle("%s [synced %d, updated %d item names in " ZBX_FS_DBL " sec, syncing]", get_process_type_string(process_type), processed_num, updated_num, sec); processed_num = 0; updated_num = 0; sec = time_now; zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); dbsync_item_rtname(&hostids, &processed_num, &updated_num, ¯o_used); zbx_vector_uint64_clear(&hostids); sec = zbx_time() - sec; zbx_setproctitle("%s [synced %d, updated %d item names in " ZBX_FS_DBL " sec, idle]", get_process_type_string(process_type), processed_num, updated_num, sec); } } zbx_vector_uint64_destroy(&hostids); zbx_ipc_service_close(&service); exit(EXIT_SUCCESS); #undef ZBX_DBCONFIG_WORKER_DELAY } static void dbconfig_worker_send(zbx_uint32_t code, unsigned char *data, zbx_uint32_t size) { static zbx_ipc_socket_t socket; /* configuration syncer process has a permanent connection to worker */ if (0 == socket.fd) { char *error = NULL; if (FAIL == zbx_ipc_socket_open(&socket, ZBX_IPC_SERVICE_DBCONFIG_WORKER, SEC_PER_MIN, &error)) { zabbix_log(LOG_LEVEL_CRIT, "cannot connect to connector manager service: %s", error); exit(EXIT_FAILURE); } } if (FAIL == zbx_ipc_socket_write(&socket, code, data, size)) { zabbix_log(LOG_LEVEL_CRIT, "cannot send data to connector manager service"); exit(EXIT_FAILURE); } } void zbx_dbconfig_worker_send_ids(const zbx_vector_uint64_t *hostids) { if (0 != hostids->values_num) { unsigned char *data = NULL; size_t data_offset = 0; zbx_dbconfig_worker_serialize_ids(&data, &data_offset, hostids); dbconfig_worker_send(ZBX_IPC_DBCONFIG_WORKER_REQUEST, data, data_offset); free(data); } }