/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ #include "proxydata.h" #include "zbxdbwrap.h" #include "zbxcachehistory.h" #include "log.h" #include "zbxtasks.h" #include "zbxmutexs.h" #include "zbxnix.h" #include "zbxcompress.h" #include "zbxcommshigh.h" #include "zbxavailability.h" #include "zbxnum.h" #include "zbx_host_constants.h" #include "../taskmanager/taskmanager.h" extern unsigned char program_type; static zbx_mutex_t proxy_lock = ZBX_MUTEX_NULL; #define LOCK_PROXY_HISTORY if (0 != (program_type & ZBX_PROGRAM_TYPE_PROXY_PASSIVE)) zbx_mutex_lock(proxy_lock) #define UNLOCK_PROXY_HISTORY if (0 != (program_type & ZBX_PROGRAM_TYPE_PROXY_PASSIVE)) zbx_mutex_unlock(proxy_lock) int zbx_send_proxy_data_response(const DC_PROXY *proxy, zbx_socket_t *sock, const char *info, int status, int upload_status) { struct zbx_json json; zbx_vector_tm_task_t tasks; int ret, flags = ZBX_TCP_PROTOCOL; zbx_vector_tm_task_create(&tasks); zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN); switch (upload_status) { case ZBX_PROXY_UPLOAD_DISABLED: zbx_json_addstring(&json, ZBX_PROTO_TAG_PROXY_UPLOAD, ZBX_PROTO_VALUE_PROXY_UPLOAD_DISABLED, ZBX_JSON_TYPE_STRING); break; case ZBX_PROXY_UPLOAD_ENABLED: zbx_json_addstring(&json, ZBX_PROTO_TAG_PROXY_UPLOAD, ZBX_PROTO_VALUE_PROXY_UPLOAD_ENABLED, ZBX_JSON_TYPE_STRING); break; } if (SUCCEED == status) { zbx_json_addstring(&json, ZBX_PROTO_TAG_RESPONSE, ZBX_PROTO_VALUE_SUCCESS, ZBX_JSON_TYPE_STRING); zbx_tm_get_remote_tasks(&tasks, proxy->hostid, proxy->compatibility); } else zbx_json_addstring(&json, ZBX_PROTO_TAG_RESPONSE, ZBX_PROTO_VALUE_FAILED, ZBX_JSON_TYPE_STRING); if (NULL != info && '\0' != *info) zbx_json_addstring(&json, ZBX_PROTO_TAG_INFO, info, ZBX_JSON_TYPE_STRING); if (0 != tasks.values_num) zbx_tm_json_serialize_tasks(&json, &tasks); if (0 != proxy->auto_compress) flags |= ZBX_TCP_COMPRESS; if (SUCCEED == (ret = zbx_tcp_send_ext(sock, json.buffer, strlen(json.buffer), 0, flags, 0))) { if (0 != tasks.values_num) zbx_tm_update_task_status(&tasks, ZBX_TM_STATUS_INPROGRESS); } zbx_json_free(&json); zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); zbx_vector_tm_task_destroy(&tasks); return ret; } /****************************************************************************** * * * Purpose: check if the 'proxy data' packet has historical data * * * * Return value: SUCCEED - the 'proxy data' contains no historical records * * FAIL - otherwise * * * ******************************************************************************/ static int proxy_data_no_history(const struct zbx_json_parse *jp) { struct zbx_json_parse jp_data; if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_HISTORY_DATA, &jp_data)) return FAIL; if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DISCOVERY_DATA, &jp_data)) return FAIL; if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_AUTOREGISTRATION, &jp_data)) return FAIL; if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_INTERFACE_AVAILABILITY, &jp_data)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: receive 'proxy data' request from proxy * * * * Parameters: sock - [IN] the connection socket * * jp - [IN] the received JSON data * * ts - [IN] the connection timestamp * * config_timeout - [IN] * * * ******************************************************************************/ void zbx_recv_proxy_data(zbx_socket_t *sock, struct zbx_json_parse *jp, zbx_timespec_t *ts, int config_timeout) { int ret = FAIL, upload_status = 0, status, version_int, responded = 0; char *error = NULL, *version_str = NULL; DC_PROXY proxy; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (SUCCEED != (status = zbx_get_active_proxy_from_request(jp, &proxy, &error))) { zabbix_log(LOG_LEVEL_WARNING, "cannot parse proxy data from active proxy at \"%s\": %s", sock->peer, error); goto out; } if (SUCCEED != (status = zbx_proxy_check_permissions(&proxy, sock, &error))) { zabbix_log(LOG_LEVEL_WARNING, "cannot accept connection from proxy \"%s\" at \"%s\", allowed address:" " \"%s\": %s", proxy.host, sock->peer, proxy.proxy_address, error); goto out; } version_str = zbx_get_proxy_protocol_version_str(jp); version_int = zbx_get_proxy_protocol_version_int(version_str); if (SUCCEED != zbx_check_protocol_version(&proxy, version_int)) { upload_status = ZBX_PROXY_UPLOAD_DISABLED; error = zbx_strdup(error, "current proxy version is not supported by server"); goto reply; } if (FAIL == (ret = zbx_hc_check_proxy(proxy.hostid))) { upload_status = ZBX_PROXY_UPLOAD_DISABLED; ret = proxy_data_no_history(jp); } else upload_status = ZBX_PROXY_UPLOAD_ENABLED; if (SUCCEED == ret) { if (SUCCEED != (ret = zbx_process_proxy_data(&proxy, jp, ts, HOST_STATUS_PROXY_ACTIVE, NULL, &error))) { zabbix_log(LOG_LEVEL_WARNING, "received invalid proxy data from proxy \"%s\" at \"%s\": %s", proxy.host, sock->peer, error); goto out; } } if (!ZBX_IS_RUNNING()) { error = zbx_strdup(error, "Zabbix server shutdown in progress"); zabbix_log(LOG_LEVEL_WARNING, "cannot process proxy data from active proxy at \"%s\": %s", sock->peer, error); ret = FAIL; goto out; } reply: zbx_send_proxy_data_response(&proxy, sock, error, ret, upload_status); responded = 1; out: if (SUCCEED == status) /* moved the unpredictable long operation to the end */ /* we are trying to save info about lastaccess to detect communication problem */ { int lastaccess; if (ZBX_PROXY_UPLOAD_DISABLED == upload_status) lastaccess = time(NULL); else lastaccess = ts->sec; zbx_update_proxy_data(&proxy, version_str, version_int, lastaccess, (0 != (sock->protocol & ZBX_TCP_COMPRESS) ? 1 : 0), 0); } if (0 == responded) { int flags = ZBX_TCP_PROTOCOL; if (0 != (sock->protocol & ZBX_TCP_COMPRESS)) flags |= ZBX_TCP_COMPRESS; zbx_send_response_ext(sock, ret, error, NULL, flags, config_timeout); } zbx_free(error); zbx_free(version_str); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); } /****************************************************************************** * * * Purpose: sends data from proxy to server * * * * Parameters: sock - [IN] the connection socket * * buffer - * * buffer_size - * * reserved - * * config_timeout - [IN] * * error - [OUT] the error message * * * ******************************************************************************/ static int send_data_to_server(zbx_socket_t *sock, char **buffer, size_t buffer_size, size_t reserved, int config_timeout, char **error) { if (SUCCEED != zbx_tcp_send_ext(sock, *buffer, buffer_size, reserved, ZBX_TCP_PROTOCOL | ZBX_TCP_COMPRESS, config_timeout)) { *error = zbx_strdup(*error, zbx_socket_strerror()); return FAIL; } zbx_free(*buffer); if (SUCCEED != zbx_recv_response(sock, config_timeout, error)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: sends 'proxy data' request to server * * * * Parameters: sock - [IN] connection socket * * ts - [IN] connection timestamp * * config_comms - [IN] proxy configuration for communication * * with server * * * ******************************************************************************/ void zbx_send_proxy_data(zbx_socket_t *sock, zbx_timespec_t *ts, const zbx_config_comms_args_t *config_comms) { struct zbx_json j; zbx_uint64_t areg_lastid = 0, history_lastid = 0, discovery_lastid = 0; char *error = NULL, *buffer = NULL; int availability_ts, more_history, more_discovery, more_areg, proxy_delay; zbx_vector_tm_task_t tasks; struct zbx_json_parse jp, jp_tasks; size_t buffer_size, reserved; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (SUCCEED != zbx_check_access_passive_proxy(sock, ZBX_DO_NOT_SEND_RESPONSE, "proxy data request", config_comms->config_tls, config_comms->config_timeout)) { /* do not send any reply to server in this case as the server expects proxy data */ goto out; } LOCK_PROXY_HISTORY; zbx_json_init(&j, ZBX_JSON_STAT_BUF_LEN); zbx_json_addstring(&j, ZBX_PROTO_TAG_SESSION, zbx_dc_get_session_token(), ZBX_JSON_TYPE_STRING); zbx_get_interface_availability_data(&j, &availability_ts); zbx_proxy_get_hist_data(&j, &history_lastid, &more_history); zbx_proxy_get_dhis_data(&j, &discovery_lastid, &more_discovery); zbx_proxy_get_areg_data(&j, &areg_lastid, &more_areg); zbx_proxy_get_host_active_availability(&j); zbx_vector_tm_task_create(&tasks); zbx_tm_get_remote_tasks(&tasks, 0, 0); if (0 != tasks.values_num) zbx_tm_json_serialize_tasks(&j, &tasks); if (ZBX_PROXY_DATA_MORE == more_history || ZBX_PROXY_DATA_MORE == more_discovery || ZBX_PROXY_DATA_MORE == more_areg) { zbx_json_adduint64(&j, ZBX_PROTO_TAG_MORE, ZBX_PROXY_DATA_MORE); } zbx_json_addstring(&j, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING); zbx_json_adduint64(&j, ZBX_PROTO_TAG_CLOCK, ts->sec); zbx_json_adduint64(&j, ZBX_PROTO_TAG_NS, ts->ns); if (0 != history_lastid && 0 != (proxy_delay = zbx_proxy_get_delay(history_lastid))) zbx_json_adduint64(&j, ZBX_PROTO_TAG_PROXY_DELAY, proxy_delay); if (SUCCEED != zbx_compress(j.buffer, j.buffer_size, &buffer, &buffer_size)) { zabbix_log(LOG_LEVEL_ERR,"cannot compress data: %s", zbx_compress_strerror()); goto clean; } reserved = j.buffer_size; zbx_json_free(&j); /* json buffer can be large, free as fast as possible */ if (SUCCEED == send_data_to_server(sock, &buffer, buffer_size, reserved, config_comms->config_timeout, &error)) { zbx_set_availability_diff_ts(availability_ts); zbx_db_begin(); if (0 != history_lastid) { zbx_uint64_t history_maxid; DB_RESULT result; DB_ROW row; result = zbx_db_select("select max(id) from proxy_history"); if (NULL == (row = zbx_db_fetch(result)) || SUCCEED == zbx_db_is_null(row[0])) history_maxid = history_lastid; else ZBX_STR2UINT64(history_maxid, row[0]); zbx_db_free_result(result); reset_proxy_history_count(history_maxid - history_lastid); zbx_proxy_set_hist_lastid(history_lastid); } if (0 != discovery_lastid) zbx_proxy_set_dhis_lastid(discovery_lastid); if (0 != areg_lastid) zbx_proxy_set_areg_lastid(areg_lastid); if (0 != tasks.values_num) { zbx_tm_update_task_status(&tasks, ZBX_TM_STATUS_DONE); zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); } if (SUCCEED == zbx_json_open(sock->buffer, &jp)) { if (SUCCEED == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_TASKS, &jp_tasks)) { zbx_tm_json_deserialize_tasks(&jp_tasks, &tasks); zbx_tm_save_tasks(&tasks); } } zbx_db_commit(); } else { zabbix_log(LOG_LEVEL_WARNING, "cannot send proxy data to server at \"%s\": %s", sock->peer, error); zbx_free(error); } clean: zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); zbx_vector_tm_task_destroy(&tasks); zbx_json_free(&j); UNLOCK_PROXY_HISTORY; out: zbx_free(buffer); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: sends 'proxy data' request to server * * * * Parameters: sock - [IN] connection socket * * ts - [IN] connection timestamp * * config_comms - [IN] proxy configuration for communication * * with server * * * ******************************************************************************/ void zbx_send_task_data(zbx_socket_t *sock, zbx_timespec_t *ts, const zbx_config_comms_args_t *config_comms) { struct zbx_json j; char *error = NULL, *buffer = NULL; zbx_vector_tm_task_t tasks; struct zbx_json_parse jp, jp_tasks; size_t buffer_size, reserved; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (SUCCEED != zbx_check_access_passive_proxy(sock, ZBX_DO_NOT_SEND_RESPONSE, "proxy data request", config_comms->config_tls, config_comms->config_timeout)) { /* do not send any reply to server in this case as the server expects proxy data */ goto out; } zbx_json_init(&j, ZBX_JSON_STAT_BUF_LEN); zbx_vector_tm_task_create(&tasks); zbx_tm_get_remote_tasks(&tasks, 0, 0); if (0 != tasks.values_num) zbx_tm_json_serialize_tasks(&j, &tasks); zbx_json_addstring(&j, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING); zbx_json_adduint64(&j, ZBX_PROTO_TAG_CLOCK, ts->sec); zbx_json_adduint64(&j, ZBX_PROTO_TAG_NS, ts->ns); if (SUCCEED != zbx_compress(j.buffer, j.buffer_size, &buffer, &buffer_size)) { zabbix_log(LOG_LEVEL_ERR,"cannot compress data: %s", zbx_compress_strerror()); goto clean; } reserved = j.buffer_size; zbx_json_free(&j); /* json buffer can be large, free as fast as possible */ if (SUCCEED == send_data_to_server(sock, &buffer, buffer_size, reserved, config_comms->config_timeout, &error)) { zbx_db_begin(); if (0 != tasks.values_num) { zbx_tm_update_task_status(&tasks, ZBX_TM_STATUS_DONE); zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); } if (SUCCEED == zbx_json_open(sock->buffer, &jp)) { if (SUCCEED == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_TASKS, &jp_tasks)) { zbx_tm_json_deserialize_tasks(&jp_tasks, &tasks); zbx_tm_save_tasks(&tasks); } } zbx_db_commit(); } else { zabbix_log(LOG_LEVEL_WARNING, "cannot send task data to server at \"%s\": %s", sock->peer, error); zbx_free(error); } clean: zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); zbx_vector_tm_task_destroy(&tasks); zbx_json_free(&j); out: zbx_free(buffer); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } int init_proxy_history_lock(char **error) { if (0 != (program_type & ZBX_PROGRAM_TYPE_PROXY_PASSIVE)) return zbx_mutex_create(&proxy_lock, ZBX_MUTEX_PROXY_HISTORY, error); return SUCCEED; } void free_proxy_history_lock(void) { if (0 != (program_type & ZBX_PROGRAM_TYPE_PROXY_PASSIVE)) zbx_mutex_destroy(&proxy_lock); }