/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ #include "datasender.h" #include "zbxcommshigh.h" #include "log.h" #include "zbxnix.h" #include "zbxdbwrap.h" #include "zbxcachehistory.h" #include "zbxself.h" #include "zbxtasks.h" #include "zbxcompress.h" #include "zbxavailability.h" #include "zbxnum.h" #include "zbxtime.h" #include "../taskmanager/taskmanager.h" extern zbx_vector_ptr_t zbx_addrs; extern char *CONFIG_HOSTNAME; extern char *CONFIG_SOURCE_IP; #define ZBX_DATASENDER_AVAILABILITY 0x0001 #define ZBX_DATASENDER_HISTORY 0x0002 #define ZBX_DATASENDER_DISCOVERY 0x0004 #define ZBX_DATASENDER_AUTOREGISTRATION 0x0008 #define ZBX_DATASENDER_TASKS 0x0010 #define ZBX_DATASENDER_TASKS_RECV 0x0020 #define ZBX_DATASENDER_TASKS_REQUEST 0x8000 #define ZBX_DATASENDER_DB_UPDATE (ZBX_DATASENDER_HISTORY | ZBX_DATASENDER_DISCOVERY | \ ZBX_DATASENDER_AUTOREGISTRATION | ZBX_DATASENDER_TASKS | \ ZBX_DATASENDER_TASKS_RECV) /****************************************************************************** * * * Purpose: Get current history upload state (disabled/enabled) * * * * Parameters: buffer - [IN] the contents of a packet (JSON) * * * * Return value: SUCCEED - processed successfully * * FAIL - an error occurred * * * ******************************************************************************/ static void get_hist_upload_state(const char *buffer, int *state) { struct zbx_json_parse jp; char value[MAX_STRING_LEN]; if (NULL == buffer || '\0' == *buffer || SUCCEED != zbx_json_open(buffer, &jp)) return; if (SUCCEED == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_PROXY_UPLOAD, value, sizeof(value), NULL)) { if (0 == strcmp(value, ZBX_PROTO_VALUE_PROXY_UPLOAD_ENABLED)) *state = ZBX_PROXY_UPLOAD_ENABLED; else if (0 == strcmp(value, ZBX_PROTO_VALUE_PROXY_UPLOAD_DISABLED)) *state = ZBX_PROXY_UPLOAD_DISABLED; } } /****************************************************************************** * * * Purpose: collects host availability, history, discovery, autoregistration * * data and sends 'proxy data' request * * * ******************************************************************************/ static int proxy_data_sender(int *more, int now, int *hist_upload_state, const zbx_config_tls_t *config_tls, const zbx_thread_info_t *info, int config_timeout) { static int data_timestamp = 0, task_timestamp = 0, upload_state = SUCCEED; zbx_socket_t sock; struct zbx_json j; struct zbx_json_parse jp, jp_tasks; int availability_ts, history_records = 0, discovery_records = 0, areg_records = 0, more_history = 0, more_discovery = 0, more_areg = 0, proxy_delay, host_avail_records = 0; zbx_uint64_t history_lastid = 0, discovery_lastid = 0, areg_lastid = 0, flags = 0; zbx_timespec_t ts; char *error = NULL, *buffer = NULL; zbx_vector_tm_task_t tasks; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); *more = ZBX_PROXY_DATA_DONE; zbx_json_init(&j, 16 * ZBX_KIBIBYTE); zbx_json_addstring(&j, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_PROXY_DATA, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&j, ZBX_PROTO_TAG_HOST, CONFIG_HOSTNAME, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&j, ZBX_PROTO_TAG_SESSION, zbx_dc_get_session_token(), ZBX_JSON_TYPE_STRING); if (SUCCEED == upload_state && CONFIG_PROXYDATA_FREQUENCY <= now - data_timestamp && ZBX_PROXY_UPLOAD_DISABLED != *hist_upload_state) { if (SUCCEED == zbx_get_interface_availability_data(&j, &availability_ts)) flags |= ZBX_DATASENDER_AVAILABILITY; history_records = zbx_proxy_get_hist_data(&j, &history_lastid, &more_history); if (0 != history_lastid) flags |= ZBX_DATASENDER_HISTORY; discovery_records = zbx_proxy_get_dhis_data(&j, &discovery_lastid, &more_discovery); if (0 != discovery_records) flags |= ZBX_DATASENDER_DISCOVERY; areg_records = zbx_proxy_get_areg_data(&j, &areg_lastid, &more_areg); if (0 != areg_records) flags |= ZBX_DATASENDER_AUTOREGISTRATION; host_avail_records = zbx_proxy_get_host_active_availability(&j); if (ZBX_PROXY_DATA_MORE != more_history && ZBX_PROXY_DATA_MORE != more_discovery && ZBX_PROXY_DATA_MORE != more_areg) { data_timestamp = now; } } zbx_vector_tm_task_create(&tasks); if (SUCCEED == upload_state && ZBX_TASK_UPDATE_FREQUENCY <= now - task_timestamp) { task_timestamp = now; zbx_tm_get_remote_tasks(&tasks, 0, 0); if (0 != tasks.values_num) { zbx_tm_json_serialize_tasks(&j, &tasks); flags |= ZBX_DATASENDER_TASKS; } flags |= ZBX_DATASENDER_TASKS_REQUEST; } if (SUCCEED != upload_state) flags |= ZBX_DATASENDER_TASKS_REQUEST; if (0 != flags) { size_t buffer_size, reserved; if (ZBX_PROXY_DATA_MORE == more_history || ZBX_PROXY_DATA_MORE == more_discovery || ZBX_PROXY_DATA_MORE == more_areg) { zbx_json_adduint64(&j, ZBX_PROTO_TAG_MORE, ZBX_PROXY_DATA_MORE); *more = ZBX_PROXY_DATA_MORE; } zbx_json_addstring(&j, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING); zbx_timespec(&ts); zbx_json_adduint64(&j, ZBX_PROTO_TAG_CLOCK, ts.sec); zbx_json_adduint64(&j, ZBX_PROTO_TAG_NS, ts.ns); if (0 != (flags & ZBX_DATASENDER_HISTORY) && 0 != (proxy_delay = zbx_proxy_get_delay(history_lastid))) zbx_json_adduint64(&j, ZBX_PROTO_TAG_PROXY_DELAY, proxy_delay); if (SUCCEED != zbx_compress(j.buffer, j.buffer_size, &buffer, &buffer_size)) { zabbix_log(LOG_LEVEL_ERR,"cannot compress data: %s", zbx_compress_strerror()); goto clean; } reserved = j.buffer_size; zbx_json_free(&j); /* json buffer can be large, free as fast as possible */ zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE); /* retry till have a connection */ if (FAIL == zbx_connect_to_server(&sock, CONFIG_SOURCE_IP, &zbx_addrs, 600, config_timeout, CONFIG_PROXYDATA_FREQUENCY, LOG_LEVEL_WARNING, config_tls)) { zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); goto clean; } zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); upload_state = zbx_put_data_to_server(&sock, &buffer, buffer_size, reserved, &error); get_hist_upload_state(sock.buffer, hist_upload_state); if (SUCCEED != upload_state) { *more = ZBX_PROXY_DATA_DONE; if (ZBX_PROXY_UPLOAD_DISABLED != *hist_upload_state) { zabbix_log(LOG_LEVEL_WARNING, "cannot send proxy data to server at \"%s\": %s", sock.peer, error); } zbx_free(error); } else { if (0 != (flags & ZBX_DATASENDER_AVAILABILITY)) zbx_set_availability_diff_ts(availability_ts); if (SUCCEED == zbx_json_open(sock.buffer, &jp)) { if (SUCCEED == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_TASKS, &jp_tasks)) flags |= ZBX_DATASENDER_TASKS_RECV; } if (0 != (flags & ZBX_DATASENDER_DB_UPDATE)) { zbx_db_begin(); if (0 != (flags & ZBX_DATASENDER_TASKS)) { zbx_tm_update_task_status(&tasks, ZBX_TM_STATUS_DONE); zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); } if (0 != (flags & ZBX_DATASENDER_TASKS_RECV)) { zbx_tm_json_deserialize_tasks(&jp_tasks, &tasks); zbx_tm_save_tasks(&tasks); } if (0 != (flags & ZBX_DATASENDER_HISTORY)) { zbx_uint64_t history_maxid; DB_RESULT result; DB_ROW row; result = zbx_db_select("select max(id) from proxy_history"); if (NULL == (row = zbx_db_fetch(result)) || SUCCEED == zbx_db_is_null(row[0])) history_maxid = history_lastid; else ZBX_STR2UINT64(history_maxid, row[0]); zbx_db_free_result(result); reset_proxy_history_count(history_maxid - history_lastid); zbx_proxy_set_hist_lastid(history_lastid); } if (0 != (flags & ZBX_DATASENDER_DISCOVERY)) zbx_proxy_set_dhis_lastid(discovery_lastid); if (0 != (flags & ZBX_DATASENDER_AUTOREGISTRATION)) zbx_proxy_set_areg_lastid(areg_lastid); zbx_db_commit(); } } zbx_disconnect_from_server(&sock); } clean: zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); zbx_vector_tm_task_destroy(&tasks); zbx_json_free(&j); zbx_free(buffer); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s more:%d flags:0x" ZBX_FS_UX64, __func__, zbx_result_string(upload_state), *more, flags); return history_records + discovery_records + areg_records + host_avail_records; } /****************************************************************************** * * * Purpose: periodically sends history and events to the server * * * ******************************************************************************/ ZBX_THREAD_ENTRY(datasender_thread, args) { zbx_thread_datasender_args *datasender_args_in = (zbx_thread_datasender_args *) (((zbx_thread_args_t *)args)->args); int records = 0, hist_upload_state = ZBX_PROXY_UPLOAD_ENABLED, more; double time_start, time_diff = 0.0, time_now; const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; unsigned char process_type = info->process_type; int server_num = info->server_num; int process_num = info->process_num; zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), server_num, get_process_type_string(process_type), process_num); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) zbx_tls_init_child(datasender_args_in->zbx_config_tls, datasender_args_in->zbx_get_program_type_cb_arg); #endif zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type)); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); while (ZBX_IS_RUNNING()) { time_now = zbx_time(); zbx_update_env(get_process_type_string(process_type), time_now); zbx_setproctitle("%s [sent %d values in " ZBX_FS_DBL " sec, sending data]", get_process_type_string(process_type), records, time_diff); records = 0; time_start = time_now; do { records += proxy_data_sender(&more, (int)time_now, &hist_upload_state, datasender_args_in->zbx_config_tls, info, datasender_args_in->config_timeout); time_now = zbx_time(); time_diff = time_now - time_start; } while (ZBX_PROXY_DATA_MORE == more && time_diff < SEC_PER_MIN && ZBX_IS_RUNNING()); zbx_setproctitle("%s [sent %d values in " ZBX_FS_DBL " sec, idle %d sec]", get_process_type_string(process_type), records, time_diff, ZBX_PROXY_DATA_MORE != more ? ZBX_TASK_UPDATE_FREQUENCY : 0); if (ZBX_PROXY_DATA_MORE != more) zbx_sleep_loop(info, ZBX_TASK_UPDATE_FREQUENCY); } zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num); while (1) zbx_sleep(SEC_PER_MIN); }