/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "zbxdbwrap.h" #include "zbxdbhigh.h" #include "zbxsysinfo.h" #include "zbxexpression.h" #include "zbxtasks.h" #include "zbxdiscovery.h" #include "zbxalgo.h" #include "zbxcrypto.h" #include "zbxavailability.h" #include "zbx_availability_constants.h" #include "zbxcommshigh.h" #include "zbxnum.h" #include "zbxtime.h" #include "zbxip.h" #include "version.h" #include "zbxversion.h" #include "zbx_host_constants.h" #include "zbx_item_constants.h" #include "zbxcachehistory.h" #include "zbxcacheconfig.h" #include "zbxcomms.h" #include "zbxdb.h" #include "zbxdbschema.h" #include "zbxexpr.h" #include "zbxipcservice.h" #include "zbxjson.h" #include "zbxstr.h" /* the space reserved in json buffer to hold at least one record plus service data */ #define ZBX_DATA_JSON_RESERVED (ZBX_HISTORY_TEXT_VALUE_LEN * 4 + ZBX_KIBIBYTE * 4) #define ZBX_DATA_JSON_RECORD_LIMIT (ZBX_MAX_RECV_DATA_SIZE - ZBX_DATA_JSON_RESERVED) #define ZBX_DATA_JSON_BATCH_LIMIT ((ZBX_MAX_RECV_DATA_SIZE - ZBX_DATA_JSON_RESERVED) / 2) /* the maximum number of values processed in one batch */ #define ZBX_HISTORY_VALUES_MAX 256 typedef struct { zbx_uint64_t druleid; zbx_vector_uint64_t dcheckids; zbx_vector_ptr_t ips; } zbx_drule_t; ZBX_PTR_VECTOR_DECL(dservice_ptr, zbx_dservice_t *) ZBX_PTR_VECTOR_IMPL(dservice_ptr, zbx_dservice_t *) typedef struct { char ip[ZBX_INTERFACE_IP_LEN_MAX]; zbx_vector_dservice_ptr_t services; } zbx_drule_ip_t; typedef struct { const char *field; const char *tag; zbx_json_type_t jt; const char *default_value; } zbx_history_field_t; typedef struct { const char *table, *lastidfield; zbx_history_field_t fields[ZBX_MAX_FIELDS]; } zbx_history_table_t; typedef int (*zbx_client_item_validator_t)(zbx_history_recv_item_t *item, zbx_socket_t *sock, void *args, char **error); typedef struct { zbx_uint64_t hostid; int value; } zbx_host_rights_t; static zbx_lld_process_agent_result_func_t lld_process_agent_result_cb = NULL; static zbx_preprocess_item_value_func_t preprocess_item_value_cb = NULL; static zbx_preprocessor_flush_func_t preprocessor_flush_cb = NULL; void zbx_init_library_dbwrap(zbx_lld_process_agent_result_func_t lld_process_agent_result_func, zbx_preprocess_item_value_func_t preprocess_item_value_func, zbx_preprocessor_flush_func_t preprocessor_flush_func) { lld_process_agent_result_cb = lld_process_agent_result_func; preprocess_item_value_cb = preprocess_item_value_func; preprocessor_flush_cb = preprocessor_flush_func; } /****************************************************************************** * * * Purpose: check proxy connection permissions (encryption configuration and * * if peer proxy address is allowed) * * * * Parameters: * * proxy - [IN] the proxy data * * sock - [IN] connection socket context * * error - [OUT] error message * * * * Return value: * * SUCCEED - connection permission check was successful * * FAIL - otherwise * * * ******************************************************************************/ int zbx_proxy_check_permissions(const zbx_dc_proxy_t *proxy, const zbx_socket_t *sock, char **error) { #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) zbx_tls_conn_attr_t attr; #endif if ('\0' != *proxy->allowed_addresses && FAIL == zbx_tcp_check_allowed_peers(sock, proxy->allowed_addresses)) { *error = zbx_strdup(*error, "connection is not allowed"); return FAIL; } #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type) { if (SUCCEED != zbx_tls_get_attr_cert(sock, &attr)) { *error = zbx_strdup(*error, "internal error: cannot get connection attributes"); THIS_SHOULD_NEVER_HAPPEN; return FAIL; } } #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK)) else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type) { if (SUCCEED != zbx_tls_get_attr_psk(sock, &attr)) { *error = zbx_strdup(*error, "internal error: cannot get connection attributes"); THIS_SHOULD_NEVER_HAPPEN; return FAIL; } } #endif else if (ZBX_TCP_SEC_UNENCRYPTED != sock->connection_type) { *error = zbx_strdup(*error, "internal error: invalid connection type"); THIS_SHOULD_NEVER_HAPPEN; return FAIL; } #endif if (0 == ((unsigned int)proxy->tls_accept & sock->connection_type)) { *error = zbx_dsprintf(NULL, "connection of type \"%s\" is not allowed for proxy \"%s\"", zbx_tcp_connection_type_name(sock->connection_type), proxy->name); return FAIL; } #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type) { /* simplified match, not compliant with RFC 4517, 4518 */ if ('\0' != *proxy->tls_issuer && 0 != strcmp(proxy->tls_issuer, attr.issuer)) { *error = zbx_dsprintf(*error, "proxy \"%s\" certificate issuer does not match", proxy->name); return FAIL; } /* simplified match, not compliant with RFC 4517, 4518 */ if ('\0' != *proxy->tls_subject && 0 != strcmp(proxy->tls_subject, attr.subject)) { *error = zbx_dsprintf(*error, "proxy \"%s\" certificate subject does not match", proxy->name); return FAIL; } } #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK)) else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type) { if (strlen(proxy->tls_psk_identity) != attr.psk_identity_len || 0 != memcmp(proxy->tls_psk_identity, attr.psk_identity, attr.psk_identity_len)) { *error = zbx_dsprintf(*error, "proxy \"%s\" is using false PSK identity", proxy->name); return FAIL; } } #endif #endif return SUCCEED; } /****************************************************************************** * * * Purpose: checks host connection permissions (encryption configuration) * * * * Parameters: * * host - [IN] the host data * * sock - [IN] connection socket context * * error - [OUT] error message * * * * Return value: * * SUCCEED - connection permission check was successful * * FAIL - otherwise * * * ******************************************************************************/ static int zbx_host_check_permissions(const zbx_history_recv_host_t *host, const zbx_socket_t *sock, char **error) { #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) zbx_tls_conn_attr_t attr; if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type) { if (SUCCEED != zbx_tls_get_attr_cert(sock, &attr)) { *error = zbx_strdup(*error, "internal error: cannot get connection attributes"); THIS_SHOULD_NEVER_HAPPEN; return FAIL; } } #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK)) else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type) { if (SUCCEED != zbx_tls_get_attr_psk(sock, &attr)) { *error = zbx_strdup(*error, "internal error: cannot get connection attributes"); THIS_SHOULD_NEVER_HAPPEN; return FAIL; } } #endif else if (ZBX_TCP_SEC_UNENCRYPTED != sock->connection_type) { *error = zbx_strdup(*error, "internal error: invalid connection type"); THIS_SHOULD_NEVER_HAPPEN; return FAIL; } #endif if (0 == ((unsigned int)host->tls_accept & sock->connection_type)) { *error = zbx_dsprintf(NULL, "connection of type \"%s\" is not allowed for host \"%s\"", zbx_tcp_connection_type_name(sock->connection_type), host->host); return FAIL; } #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type) { /* simplified match, not compliant with RFC 4517, 4518 */ if ('\0' != *host->tls_issuer && 0 != strcmp(host->tls_issuer, attr.issuer)) { *error = zbx_dsprintf(*error, "host \"%s\" certificate issuer does not match", host->host); return FAIL; } /* simplified match, not compliant with RFC 4517, 4518 */ if ('\0' != *host->tls_subject && 0 != strcmp(host->tls_subject, attr.subject)) { *error = zbx_dsprintf(*error, "host \"%s\" certificate subject does not match", host->host); return FAIL; } } #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK)) else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type) { if (strlen(host->tls_psk_identity) != attr.psk_identity_len || 0 != memcmp(host->tls_psk_identity, attr.psk_identity, attr.psk_identity_len)) { *error = zbx_dsprintf(*error, "host \"%s\" is using false PSK identity", host->host); return FAIL; } } #endif #endif return SUCCEED; } /****************************************************************************** * * * Purpose: * * Extract a proxy name from JSON and find the proxy ID in configuration * * cache, and check access rights. The proxy must be configured in active * * mode. * * * * Parameters: * * jp - [IN] JSON with the proxy name * * proxy - [OUT] the proxy data * * error - [OUT] error message * * * * Return value: * * SUCCEED - proxy ID was found in database * * FAIL - an error occurred (e.g. an unknown proxy, the proxy is * * configured in passive mode or access denied) * * * ******************************************************************************/ int zbx_get_active_proxy_from_request(const struct zbx_json_parse *jp, zbx_dc_proxy_t *proxy, char **error) { char *ch_error, host[ZBX_HOSTNAME_BUF_LEN]; if (SUCCEED != zbx_json_value_by_name(jp, ZBX_PROTO_TAG_HOST, host, sizeof(host), NULL)) { *error = zbx_strdup(*error, "missing name of proxy"); return FAIL; } if (SUCCEED != zbx_check_hostname(host, &ch_error)) { *error = zbx_dsprintf(*error, "invalid proxy name \"%s\": %s", host, ch_error); zbx_free(ch_error); return FAIL; } return zbx_dc_get_active_proxy_by_name(host, proxy, error); } /****************************************************************************** * * * Purpose: * * Check access rights to a passive proxy for the given connection and * * send a response if denied. * * * * Parameters: * * sock - [IN] connection socket context * * send_response - [IN] to send or not to send a response to server. * * Value: ZBX_SEND_RESPONSE or * * ZBX_DO_NOT_SEND_RESPONSE * * req - [IN] request, included into error message * * config_tls - [IN] configured requirements to allow access * * config_timeout - [IN] * * server - [IN] * * * * Return value: * * SUCCEED - access is allowed * * FAIL - access is denied * * * ******************************************************************************/ int zbx_check_access_passive_proxy(zbx_socket_t *sock, int send_response, const char *req, const zbx_config_tls_t *config_tls, int config_timeout, const char *server) { char *msg = NULL; if (FAIL == zbx_tcp_check_allowed_peers(sock, server)) { zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" is not allowed: %s", req, sock->peer, zbx_socket_strerror()); if (ZBX_SEND_RESPONSE == send_response) zbx_send_proxy_response(sock, FAIL, "connection is not allowed", config_timeout); return FAIL; } if (0 == (config_tls->accept_modes & sock->connection_type)) { msg = zbx_dsprintf(NULL, "%s over connection of type \"%s\" is not allowed", req, zbx_tcp_connection_type_name(sock->connection_type)); zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" by proxy configuration parameter \"TLSAccept\"", msg, sock->peer); if (ZBX_SEND_RESPONSE == send_response) zbx_send_proxy_response(sock, FAIL, msg, config_timeout); zbx_free(msg); return FAIL; } #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL) if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type) { if (SUCCEED == zbx_check_server_issuer_subject(sock, config_tls->server_cert_issuer, config_tls->server_cert_subject, &msg)) { return SUCCEED; } zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" is not allowed: %s", req, sock->peer, msg); if (ZBX_SEND_RESPONSE == send_response) zbx_send_proxy_response(sock, FAIL, "certificate issuer or subject mismatch", config_timeout); zbx_free(msg); return FAIL; } else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type) { if (0 != (ZBX_PSK_FOR_PROXY & zbx_tls_get_psk_usage())) return SUCCEED; zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" is not allowed: it used PSK which is not" " configured for proxy communication with server", req, sock->peer); if (ZBX_SEND_RESPONSE == send_response) zbx_send_proxy_response(sock, FAIL, "wrong PSK used", config_timeout); return FAIL; } #endif return SUCCEED; } /****************************************************************************** * * * Return value: SUCCEED - processed successfully * * FAIL - no interface availability has been changed * * * ******************************************************************************/ int zbx_get_interface_availability_data(struct zbx_json *json, int *ts) { int i, ret = FAIL; zbx_vector_availability_ptr_t interfaces; zbx_interface_availability_t *ia; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_availability_ptr_create(&interfaces); if (SUCCEED != zbx_dc_get_interfaces_availability(&interfaces, ts)) goto out; zbx_json_addarray(json, ZBX_PROTO_TAG_INTERFACE_AVAILABILITY); for (i = 0; i < interfaces.values_num; i++) { ia = interfaces.values[i]; zbx_json_addobject(json, NULL); zbx_json_adduint64(json, ZBX_PROTO_TAG_INTERFACE_ID, ia->interfaceid); zbx_json_adduint64(json, ZBX_PROTO_TAG_AVAILABLE, ia->agent.available); zbx_json_addstring(json, ZBX_PROTO_TAG_ERROR, ia->agent.error, ZBX_JSON_TYPE_STRING); zbx_json_close(json); } zbx_json_close(json); ret = SUCCEED; out: zbx_vector_availability_ptr_clear_ext(&interfaces, zbx_interface_availability_free); zbx_vector_availability_ptr_destroy(&interfaces); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: parses interfaces availability data contents and processes it * * * * Return value: SUCCEED - processed successfully * * FAIL - an error occurred * * * ******************************************************************************/ static int process_interfaces_availability_contents(struct zbx_json_parse *jp_data, char **error) { zbx_uint64_t interfaceid; struct zbx_json_parse jp_row; const char *p = NULL; char *tmp; size_t tmp_alloc = 129; zbx_interface_availability_t *ia = NULL; zbx_vector_availability_ptr_t interfaces; int ret; tmp = (char *)zbx_malloc(NULL, tmp_alloc); zbx_vector_availability_ptr_create(&interfaces); while (NULL != (p = zbx_json_next(jp_data, p))) /* iterate the interface entries */ { if (SUCCEED != (ret = zbx_json_brackets_open(p, &jp_row))) { *error = zbx_strdup(*error, zbx_json_strerror()); goto out; } if (SUCCEED != (ret = zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_INTERFACE_ID, &tmp, &tmp_alloc, NULL))) { *error = zbx_strdup(*error, zbx_json_strerror()); goto out; } if (SUCCEED != (ret = zbx_is_uint64(tmp, &interfaceid))) { *error = zbx_strdup(*error, "interfaceid is not a valid numeric"); goto out; } ia = (zbx_interface_availability_t *)zbx_malloc(NULL, sizeof(zbx_interface_availability_t)); zbx_interface_availability_init(ia, interfaceid); if (SUCCEED == zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_AVAILABLE, &tmp, &tmp_alloc, NULL)) { ia->agent.available = atoi(tmp); ia->agent.flags |= ZBX_FLAGS_AGENT_STATUS_AVAILABLE; } if (SUCCEED == zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_ERROR, &tmp, &tmp_alloc, NULL)) { ia->agent.error = zbx_strdup(NULL, tmp); ia->agent.flags |= ZBX_FLAGS_AGENT_STATUS_ERROR; } if (SUCCEED != (ret = zbx_interface_availability_is_set(ia))) { zbx_free(ia); *error = zbx_dsprintf(*error, "no availability data for \"interfaceid\":" ZBX_FS_UI64, interfaceid); goto out; } zbx_vector_availability_ptr_append(&interfaces, ia); } if (0 < interfaces.values_num && SUCCEED == zbx_dc_set_interfaces_availability(&interfaces)) zbx_availabilities_flush(&interfaces); ret = SUCCEED; out: zbx_vector_availability_ptr_clear_ext(&interfaces, zbx_interface_availability_free); zbx_vector_availability_ptr_destroy(&interfaces); zbx_free(tmp); return ret; } int zbx_proxy_get_delay(const zbx_uint64_t lastid) { zbx_db_result_t result; zbx_db_row_t row; char *sql = NULL; int ts = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s() [lastid=" ZBX_FS_UI64 "]", __func__, lastid); sql = zbx_dsprintf(sql, "select write_clock from proxy_history where id>" ZBX_FS_UI64 " order by id asc", lastid); result = zbx_db_select_n(sql, 1); zbx_free(sql); if (NULL != (row = zbx_db_fetch(result))) ts = (int)time(NULL) - atoi(row[0]); zbx_db_free_result(result); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ts; } int zbx_proxy_get_host_active_availability(struct zbx_json *j) { zbx_ipc_message_t response; int records_num = 0; zbx_ipc_message_init(&response); zbx_availability_send(ZBX_IPC_AVAILMAN_ACTIVE_HOSTDATA, 0, 0, &response); if (0 != response.size) { zbx_vector_proxy_hostdata_ptr_t hostdata; zbx_vector_proxy_hostdata_ptr_create(&hostdata); zbx_availability_deserialize_hostdata(response.data, &hostdata); zbx_availability_serialize_json_hostdata(&hostdata, j); records_num = hostdata.values_num; zbx_vector_proxy_hostdata_ptr_clear_ext(&hostdata, (zbx_proxy_hostdata_ptr_free_func_t)zbx_ptr_free); zbx_vector_proxy_hostdata_ptr_destroy(&hostdata); } zbx_ipc_message_clean(&response); return records_num; } /****************************************************************************** * * * Purpose: processes item value depending on proxy/flags settings * * * * Parameters: item - [IN] item to process * * result - [IN] item result * * ts - [IN] value timestamp * * h_num - [OUT] number of history entries * * error - [OUT] * * * * Comments: Values gathered by server are sent to the preprocessing manager, * * while values received from proxy are already preprocessed and * * must be either directly stored to history cache or sent to lld * * manager. * * * ******************************************************************************/ static void process_item_value(const zbx_history_recv_item_t *item, AGENT_RESULT *result, zbx_timespec_t *ts, int *h_num, char *error) { if (HOST_MONITORED_BY_SERVER == item->host.monitored_by) { preprocess_item_value_cb(item->itemid, item->host.hostid, item->value_type, item->flags, result, ts, item->state, error); *h_num = 0; } else { if (0 != (ZBX_FLAG_DISCOVERY_RULE & item->flags)) { if (NULL != lld_process_agent_result_cb) lld_process_agent_result_cb(item->itemid, item->host.hostid, result, ts, error); *h_num = 0; } else { zbx_dc_add_history(item->itemid, item->value_type, item->flags, result, ts, item->state, error); *h_num = 1; } } } /****************************************************************************** * * * Purpose: process single value from incoming history data * * * * Parameters: item - [IN] the item to process * * value - [IN] the value to process * * hval - [OUT] indication that value was added to history * * * * Return value: SUCCEED - the value was processed successfully * * FAIL - otherwise * * * ******************************************************************************/ static int process_history_data_value(zbx_history_recv_item_t *item, zbx_agent_value_t *value, int *h_num) { if (ITEM_STATUS_ACTIVE != item->status) return FAIL; if (HOST_STATUS_MONITORED != item->host.status) return FAIL; /* update item nextcheck during maintenance */ if (SUCCEED == zbx_in_maintenance_without_data_collection(item->host.maintenance_status, item->host.maintenance_type, item->type) && item->host.maintenance_from <= value->ts.sec) { return SUCCEED; } if (NULL == value->value && ITEM_STATE_NOTSUPPORTED == value->state) { THIS_SHOULD_NEVER_HAPPEN; return FAIL; } if (ITEM_STATE_NOTSUPPORTED == value->state || (NULL != value->value && 0 == strcmp(value->value, ZBX_NOTSUPPORTED))) { zabbix_log(LOG_LEVEL_DEBUG, "hostid:" ZBX_FS_UI64 " item %s error: %s", item->host.hostid, item->key_orig, value->value); item->state = ITEM_STATE_NOTSUPPORTED; process_item_value(item, NULL, &value->ts, h_num, value->value); } else { AGENT_RESULT result; zbx_init_agent_result(&result); if (NULL != value->value) { if (ITEM_VALUE_TYPE_LOG == item->value_type) { zbx_log_t *log; log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t)); log->value = zbx_strdup(NULL, value->value); zbx_replace_invalid_utf8(log->value); if (0 == value->timestamp) { log->timestamp = 0; zbx_calc_timestamp(log->value, &log->timestamp, item->logtimefmt); } else log->timestamp = value->timestamp; log->logeventid = value->logeventid; log->severity = value->severity; if (NULL != value->source) { log->source = zbx_strdup(NULL, value->source); zbx_replace_invalid_utf8(log->source); } else log->source = NULL; SET_LOG_RESULT(&result, log); } else zbx_set_agent_result_type(&result, ITEM_VALUE_TYPE_TEXT, value->value); } if (0 != value->meta) zbx_set_agent_result_meta(&result, value->lastlogsize, value->mtime); item->state = ITEM_STATE_NORMAL; process_item_value(item, &result, &value->ts, h_num, NULL); zbx_free_agent_result(&result); } return SUCCEED; } /****************************************************************************** * * * Purpose: process new item values * * * * Parameters: items - [IN] the items to process * * values - [IN] the item values value to process * * errcodes - [IN/OUT] in - item configuration error code * * (FAIL - item/host was not found) * * out - value processing result * * (SUCCEED - processed, FAIL - error) * * values_num - [IN] the number of items/values to process * * nodata_win - [IN/OUT] proxy communication delay info * * * * Return value: the number of processed values * * * ******************************************************************************/ int zbx_process_history_data(zbx_history_recv_item_t *items, zbx_agent_value_t *values, int *errcodes, size_t values_num, zbx_proxy_suppress_t *nodata_win) { size_t i; int processed_num = 0, history_num; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); for (i = 0; i < values_num; i++) { if (SUCCEED != errcodes[i]) continue; history_num = 0; if (SUCCEED != process_history_data_value(&items[i], &values[i], &history_num)) { /* clean failed items to avoid updating their runtime data */ errcodes[i] = FAIL; continue; } if (HOST_MONITORED_BY_SERVER != items[i].host.monitored_by && NULL != nodata_win && 0 != (nodata_win->flags & ZBX_PROXY_SUPPRESS_ACTIVE) && 0 < history_num) { if (values[i].ts.sec <= nodata_win->period_end) { nodata_win->values_num++; } else { nodata_win->flags &= (~ZBX_PROXY_SUPPRESS_MORE); } zabbix_log(LOG_LEVEL_TRACE, "%s() flags:%d values_num:%d value_time:%d period_end:%d", __func__, nodata_win->flags, nodata_win->values_num, values[i].ts.sec, nodata_win->period_end); } processed_num++; } if (0 < processed_num) zbx_dc_items_update_nextcheck(items, values, errcodes, values_num); preprocessor_flush_cb(); zbx_dc_flush_history(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num); return processed_num; } /****************************************************************************** * * * Purpose: frees resources allocated to store agent values * * * * Parameters: values - [IN] the values to clean * * values_num - [IN] the number of items in values array * * * ******************************************************************************/ static void zbx_agent_values_clean(zbx_agent_value_t *values, size_t values_num) { size_t i; for (i = 0; i < values_num; i++) { zbx_free(values[i].value); zbx_free(values[i].source); } } /****************************************************************************** * * * Purpose: calculates difference between server and client (proxy, active * * agent or sender) time and log it * * * * Parameters: level - [IN] log level * * jp - [IN] JSON with clock, [ns] fields * * ts_recv - [IN] the connection timestamp * * * ******************************************************************************/ static void log_client_timediff(int level, struct zbx_json_parse *jp, const zbx_timespec_t *ts_recv) { char tmp[32]; zbx_timespec_t client_timediff; int sec, ns; if (SUCCEED != ZBX_CHECK_LOG_LEVEL(level)) return; if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp), NULL)) { sec = atoi(tmp); client_timediff.sec = ts_recv->sec - sec; if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_NS, tmp, sizeof(tmp), NULL)) { ns = atoi(tmp); client_timediff.ns = ts_recv->ns - ns; if (client_timediff.sec > 0 && client_timediff.ns < 0) { client_timediff.sec--; client_timediff.ns += 1000000000; } else if (client_timediff.sec < 0 && client_timediff.ns > 0) { client_timediff.sec++; client_timediff.ns -= 1000000000; } zabbix_log(level, "%s(): timestamp from json %d seconds and %d nanosecond, " "delta time from json %d seconds and %d nanosecond", __func__, sec, ns, client_timediff.sec, client_timediff.ns); } else { zabbix_log(level, "%s(): timestamp from json %d seconds, " "delta time from json %d seconds", __func__, sec, client_timediff.sec); } } } /****************************************************************************** * * * Purpose: parses agent value from history data json row * * * * Parameters: jp_row - [IN] JSON with history data row * * unique_shift - [IN/OUT] auto increment nanoseconds to ensure * * unique value of timestamps * * av - [OUT] the agent value * * * * Return value: SUCCEED - the value was parsed successfully * * FAIL - otherwise * * * ******************************************************************************/ static int parse_history_data_row_value(const struct zbx_json_parse *jp_row, zbx_timespec_t *unique_shift, zbx_agent_value_t *av) { char *tmp = NULL; size_t tmp_alloc = 0; int ret = FAIL; memset(av, 0, sizeof(zbx_agent_value_t)); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_CLOCK, &tmp, &tmp_alloc, NULL)) { if (FAIL == zbx_is_uint31(tmp, &av->ts.sec)) goto out; if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_NS, &tmp, &tmp_alloc, NULL)) { if (FAIL == zbx_is_uint_n_range(tmp, tmp_alloc, &av->ts.ns, sizeof(av->ts.ns), 0LL, 999999999LL)) { goto out; } } else { /* ensure unique value timestamp (clock, ns) if only clock is available */ av->ts.sec += unique_shift->sec; av->ts.ns = unique_shift->ns++; if (unique_shift->ns > 999999999) { unique_shift->sec++; unique_shift->ns = 0; } } } else zbx_timespec(&av->ts); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_STATE, &tmp, &tmp_alloc, NULL)) av->state = (unsigned char)atoi(tmp); /* Unsupported item meta information must be ignored for backwards compatibility. */ /* New agents will not send meta information for items in unsupported state. */ if (ITEM_STATE_NOTSUPPORTED != av->state) { if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LASTLOGSIZE, &tmp, &tmp_alloc, NULL)) { av->meta = 1; /* contains meta information */ zbx_is_uint64(tmp, &av->lastlogsize); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_MTIME, &tmp, &tmp_alloc, NULL)) av->mtime = atoi(tmp); } } if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_VALUE, &tmp, &tmp_alloc, NULL)) av->value = zbx_strdup(av->value, tmp); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGTIMESTAMP, &tmp, &tmp_alloc, NULL)) av->timestamp = atoi(tmp); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGSOURCE, &tmp, &tmp_alloc, NULL)) av->source = zbx_strdup(av->source, tmp); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGSEVERITY, &tmp, &tmp_alloc, NULL)) av->severity = atoi(tmp); if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGEVENTID, &tmp, &tmp_alloc, NULL)) av->logeventid = atoi(tmp); if (SUCCEED != zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_ID, &tmp, &tmp_alloc, NULL) || SUCCEED != zbx_is_uint64(tmp, &av->id)) { av->id = 0; } zbx_free(tmp); ret = SUCCEED; out: return ret; } /****************************************************************************** * * * Purpose: parses item identifier from history data json row * * * * Parameters: jp_row - [IN] JSON with history data row * * itemid - [OUT] the item identifier * * * * Return value: SUCCEED - the item identifier was parsed successfully * * FAIL - otherwise * * * ******************************************************************************/ static int parse_history_data_row_itemid(const struct zbx_json_parse *jp_row, zbx_uint64_t *itemid) { char buffer[MAX_ID_LEN + 1]; if (SUCCEED != zbx_json_value_by_name(jp_row, ZBX_PROTO_TAG_ITEMID, buffer, sizeof(buffer), NULL)) return FAIL; if (SUCCEED != zbx_is_uint64(buffer, itemid)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: parses host,key pair from history data json row * * * * Parameters: jp_row - [IN] JSON with history data row * * hk - [OUT] the host,key pair * * * * Return value: SUCCEED - the host,key pair was parsed successfully * * FAIL - otherwise * * * ******************************************************************************/ static int parse_history_data_row_hostkey(const struct zbx_json_parse *jp_row, zbx_host_key_t *hk) { size_t str_alloc; str_alloc = 0; zbx_free(hk->host); if (SUCCEED != zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_HOST, &hk->host, &str_alloc, NULL)) return FAIL; str_alloc = 0; zbx_free(hk->key); if (SUCCEED != zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_KEY, &hk->key, &str_alloc, NULL)) { zbx_free(hk->host); return FAIL; } return SUCCEED; } /****************************************************************************** * * * Purpose: parses up to ZBX_HISTORY_VALUES_MAX item values and host,key * * pairs from history data json * * * * Parameters: jp_data - [IN] JSON with history data array * * pnext - [IN/OUT] the pointer to the next item in json, * * NULL - no more data left * * values - [OUT] the item values * * hostkeys - [OUT] the corresponding host,key pairs * * values_num - [OUT] number of elements in values and hostkeys * * arrays * * parsed_num - [OUT] the number of values parsed * * unique_shift - [IN/OUT] auto increment nanoseconds to ensure * * unique value of timestamps * * * * Return value: SUCCEED - values were parsed successfully * * FAIL - an error occurred * * * ******************************************************************************/ static int parse_history_data(struct zbx_json_parse *jp_data, const char **pnext, zbx_agent_value_t *values, zbx_host_key_t *hostkeys, int *values_num, int *parsed_num, zbx_timespec_t *unique_shift) { struct zbx_json_parse jp_row; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); *values_num = 0; *parsed_num = 0; if (NULL == *pnext) { if (NULL == (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX) { ret = SUCCEED; goto out; } } /* iterate the history data rows */ do { if (FAIL == zbx_json_brackets_open(*pnext, &jp_row)) { zabbix_log(LOG_LEVEL_WARNING, "%s", zbx_json_strerror()); goto out; } (*parsed_num)++; if (SUCCEED != parse_history_data_row_hostkey(&jp_row, &hostkeys[*values_num])) continue; if (SUCCEED != parse_history_data_row_value(&jp_row, unique_shift, &values[*values_num])) continue; (*values_num)++; } while (NULL != (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX); ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s processed:%d/%d", __func__, zbx_result_string(ret), *values_num, *parsed_num); return ret; } /****************************************************************************** * * * Purpose: parses up to ZBX_HISTORY_VALUES_MAX item values and item * * identifiers from history data json * * * * Parameters: jp_data - [IN] JSON with history data array * * pnext - [IN/OUT] the pointer to the next item in * * json, NULL - no more data left * * values - [OUT] the item values * * itemids - [OUT] the corresponding item identifiers * * values_num - [OUT] number of elements in values and itemids * * arrays * * parsed_num - [OUT] the number of values parsed * * unique_shift - [IN/OUT] auto increment nanoseconds to ensure * * unique value of timestamps * * info - [OUT] address of a pointer to the info string * * (should be freed by the caller) * * * * Return value: SUCCEED - values were parsed successfully * * FAIL - an error occurred * * * * Comments: This function is used to parse the new proxy history data * * protocol introduced in Zabbix v3.3. * * * ******************************************************************************/ static int parse_history_data_by_itemids(struct zbx_json_parse *jp_data, const char **pnext, zbx_agent_value_t *values, zbx_uint64_t *itemids, int *values_num, int *parsed_num, zbx_timespec_t *unique_shift, char **error) { struct zbx_json_parse jp_row; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); *values_num = 0; *parsed_num = 0; if (NULL == *pnext) { if (NULL == (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX) { ret = SUCCEED; goto out; } } /* iterate the history data rows */ do { if (FAIL == zbx_json_brackets_open(*pnext, &jp_row)) { *error = zbx_strdup(*error, zbx_json_strerror()); goto out; } (*parsed_num)++; if (SUCCEED != parse_history_data_row_itemid(&jp_row, &itemids[*values_num])) continue; if (SUCCEED != parse_history_data_row_value(&jp_row, unique_shift, &values[*values_num])) continue; (*values_num)++; } while (NULL != (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX); ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s processed:%d/%d", __func__, zbx_result_string(ret), *values_num, *parsed_num); return ret; } /****************************************************************************** * * * Purpose: validates item received from proxy * * * * Parameters: item - [IN/OUT] the item data * * sock - [IN] the connection socket * * args - [IN] the validator arguments * * error - unused * * * * Return value: SUCCEED - the validation was successful * * FAIL - otherwise * * * ******************************************************************************/ static int proxy_item_validator(zbx_history_recv_item_t *item, zbx_socket_t *sock, void *args, char **error) { zbx_uint64_t *proxyid = (zbx_uint64_t *)args; ZBX_UNUSED(sock); ZBX_UNUSED(error); /* don't process item if its host was assigned to another proxy */ if (item->host.proxyid != *proxyid) return FAIL; /* don't process aggregate/calculated items coming from proxy */ if (ITEM_TYPE_CALCULATED == item->type) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: parses history data array and process the data * * * * * * Parameters: sock - [IN] socket for host permission validation * * validator_func - [IN] function to validate item permission * * validator_args - [IN] validator function arguments * * jp_data - [IN] JSON with history data array * * session - [IN] the data session * * nodata_win - [OUT] counter of delayed values * * info - [OUT] address of a pointer to the info * * string (should be freed by the caller) * * mode - [IN] item retrieve mode is used to retrieve * * only necessary data to reduce time * * spent holding read lock * * * * Return value: SUCCEED - processed successfully * * FAIL - an error occurred * * * * Comments: This function is used to parse the new proxy history data * * protocol introduced in Zabbix v3.3. * * * ******************************************************************************/ static int process_history_data_by_itemids(zbx_socket_t *sock, zbx_client_item_validator_t validator_func, void *validator_args, struct zbx_json_parse *jp_data, zbx_session_t *session, zbx_proxy_suppress_t *nodata_win, char **info, unsigned int mode) { const char *pnext = NULL; int ret = SUCCEED, processed_num = 0, total_num = 0, values_num, read_num, i, *errcodes; double sec; zbx_history_recv_item_t *items; char *error = NULL; zbx_uint64_t itemids[ZBX_HISTORY_VALUES_MAX], last_valueid = 0; zbx_agent_value_t values[ZBX_HISTORY_VALUES_MAX]; zbx_timespec_t unique_shift = {0, 0}; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); items = (zbx_history_recv_item_t *)zbx_malloc(NULL, sizeof(zbx_history_recv_item_t) * ZBX_HISTORY_VALUES_MAX); errcodes = (int *)zbx_malloc(NULL, sizeof(int) * ZBX_HISTORY_VALUES_MAX); sec = zbx_time(); while (SUCCEED == parse_history_data_by_itemids(jp_data, &pnext, values, itemids, &values_num, &read_num, &unique_shift, &error) && 0 != values_num) { zbx_dc_config_history_recv_get_items_by_itemids(items, itemids, errcodes, (size_t)values_num, mode); for (i = 0; i < values_num; i++) { if (SUCCEED != errcodes[i]) continue; /* check and discard if duplicate data */ if (NULL != session && 0 != values[i].id && values[i].id <= session->last_id) { errcodes[i] = FAIL; continue; } if (SUCCEED != validator_func(&items[i], sock, validator_args, &error)) { if (NULL != error) { zabbix_log(LOG_LEVEL_WARNING, "%s", error); zbx_free(error); } errcodes[i] = FAIL; } } processed_num += zbx_process_history_data(items, values, errcodes, values_num, nodata_win); total_num += read_num; last_valueid = values[values_num - 1].id; zbx_agent_values_clean(values, values_num); if (NULL == pnext) break; } if (NULL != session && 0 != last_valueid) { if (session->last_id > last_valueid) { zabbix_log(LOG_LEVEL_WARNING, "received id:" ZBX_FS_UI64 " is less than last id:" ZBX_FS_UI64, last_valueid, session->last_id); } else session->last_id = last_valueid; } zbx_free(errcodes); zbx_free(items); if (NULL == error) { ret = SUCCEED; *info = zbx_dsprintf(*info, "processed: %d; failed: %d; total: %d; seconds spent: " ZBX_FS_DBL, processed_num, total_num - processed_num, total_num, zbx_time() - sec); } else { zbx_free(*info); *info = error; } zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: validates item received from active agent * * * * Parameters: item - [IN] the item data * * sock - [IN] the connection socket * * args - [IN] the validator arguments * * error - [OUT] the error message * * * * Return value: SUCCEED - the validation was successful * * FAIL - otherwise * * * ******************************************************************************/ static int agent_item_validator(zbx_history_recv_item_t *item, zbx_socket_t *sock, void *args, char **error) { zbx_host_rights_t *rights = (zbx_host_rights_t *)args; if (0 != item->host.proxyid) return FAIL; if (ITEM_TYPE_ZABBIX_ACTIVE != item->type) return FAIL; if (rights->hostid != item->host.hostid) { rights->hostid = item->host.hostid; rights->value = zbx_host_check_permissions(&item->host, sock, error); } return rights->value; } /****************************************************************************** * * * Purpose: validates item received from sender * * * * Parameters: item - [IN] the item data * * sock - [IN] the connection socket * * args - [IN] the validator arguments * * error - [OUT] the error message * * * * Return value: SUCCEED - the validation was successful * * FAIL - otherwise * * * ******************************************************************************/ static int sender_item_validator(zbx_history_recv_item_t *item, zbx_socket_t *sock, void *args, char **error) { zbx_host_rights_t *rights; char key_short[VALUE_ERRMSG_MAX * ZBX_MAX_BYTES_IN_UTF8_CHAR + 1]; if (HOST_MONITORED_BY_SERVER != item->host.monitored_by) { *error = zbx_dsprintf(*error, "cannot process item \"%s\" trap:" " host is monitored by a proxy or proxy group", zbx_truncate_itemkey(item->key_orig, VALUE_ERRMSG_MAX, key_short, sizeof(key_short))); return FAIL; } switch (item->type) { case ITEM_TYPE_HTTPAGENT: if (0 == item->allow_traps) { *error = zbx_dsprintf(*error, "cannot process HTTP agent item \"%s\" trap:" " trapping is not enabled", zbx_truncate_itemkey(item->key_orig, VALUE_ERRMSG_MAX, key_short, sizeof(key_short))); return FAIL; } break; case ITEM_TYPE_TRAPPER: break; default: *error = zbx_dsprintf(*error, "cannot process item \"%s\" trap:" " item type \"%d\" cannot be used with traps", zbx_truncate_itemkey(item->key_orig, VALUE_ERRMSG_MAX, key_short, sizeof(key_short)), item->type); return FAIL; } if ('\0' != *item->trapper_hosts) /* list of allowed hosts not empty */ { char *allowed_peers; int ret; allowed_peers = zbx_strdup(NULL, item->trapper_hosts); zbx_substitute_simple_macros_allowed_hosts(item, &allowed_peers); ret = zbx_tcp_check_allowed_peers(sock, allowed_peers); zbx_free(allowed_peers); if (FAIL == ret) { *error = zbx_dsprintf(*error, "cannot process item \"%s\" trap: %s", zbx_truncate_itemkey(item->key_orig, VALUE_ERRMSG_MAX, key_short, sizeof(key_short)), zbx_socket_strerror()); return FAIL; } } rights = (zbx_host_rights_t *)args; if (rights->hostid != item->host.hostid) { rights->hostid = item->host.hostid; rights->value = zbx_host_check_permissions(&item->host, sock, error); } return rights->value; } static void process_history_data_by_keys(zbx_socket_t *sock, zbx_client_item_validator_t validator_func, void *validator_args, char **info, struct zbx_json_parse *jp_data, const char *token) { int values_num, read_num, processed_num = 0, total_num = 0, i; zbx_timespec_t unique_shift = {0, 0}; const char *pnext = NULL; char *error = NULL; zbx_host_key_t *hostkeys; zbx_history_recv_item_t *items; zbx_session_t *session = NULL; zbx_uint64_t last_hostid = 0; zbx_agent_value_t values[ZBX_HISTORY_VALUES_MAX]; int errcodes[ZBX_HISTORY_VALUES_MAX]; double sec; sec = zbx_time(); items = (zbx_history_recv_item_t *)zbx_malloc(NULL, sizeof(zbx_history_recv_item_t) * ZBX_HISTORY_VALUES_MAX); hostkeys = (zbx_host_key_t *)zbx_malloc(NULL, sizeof(zbx_host_key_t) * ZBX_HISTORY_VALUES_MAX); memset(hostkeys, 0, sizeof(zbx_host_key_t) * ZBX_HISTORY_VALUES_MAX); while (SUCCEED == parse_history_data(jp_data, &pnext, values, hostkeys, &values_num, &read_num, &unique_shift) && 0 != values_num) { zbx_dc_config_history_recv_get_items_by_keys(items, hostkeys, errcodes, (size_t)values_num); for (i = 0; i < values_num; i++) { if (SUCCEED != errcodes[i]) { zabbix_log(LOG_LEVEL_DEBUG, "cannot retrieve key \"%s\" on host \"%s\" from " "configuration cache", hostkeys[i].key, hostkeys[i].host); continue; } if (last_hostid != items[i].host.hostid) { last_hostid = items[i].host.hostid; if (NULL != token) { session = zbx_dc_get_or_create_session(last_hostid, token, ZBX_SESSION_TYPE_DATA); } } /* check and discard if duplicate data */ if (NULL != session && 0 != values[i].id && values[i].id <= session->last_id) { errcodes[i] = FAIL; continue; } if (SUCCEED != validator_func(&items[i], sock, validator_args, &error)) { if (NULL != error) { zabbix_log(LOG_LEVEL_WARNING, "%s", error); zbx_free(error); } else { zabbix_log(LOG_LEVEL_DEBUG, "unknown validation error for item \"%s\"", (NULL == items[i].key) ? items[i].key_orig : items[i].key); } errcodes[i] = FAIL; } if (NULL != session) session->last_id = values[i].id; } processed_num += zbx_process_history_data(items, values, errcodes, values_num, NULL); total_num += read_num; zbx_agent_values_clean(values, values_num); if (NULL == pnext) break; } for (i = 0; i < ZBX_HISTORY_VALUES_MAX; i++) { zbx_free(hostkeys[i].host); zbx_free(hostkeys[i].key); } zbx_free(hostkeys); zbx_free(items); *info = zbx_dsprintf(*info, "processed: %d; failed: %d; total: %d; seconds spent: " ZBX_FS_DBL, processed_num, total_num - processed_num, total_num, zbx_time() - sec); } /****************************************************************************** * * * Purpose: peek first host name in host:key history data array * * * * Parameters: jp_data - [IN] JSON with history data * * host - [OUT] host of first host:key record * * host_len - [IN] host buffer length * * error - [OUT] error message. * * * * Return value: SUCCEED - host name was returned successfully * * FAIL - no history records (in this case error is NULL) * * or history records have invalid format * * * ******************************************************************************/ static int peek_hostkey_host(const struct zbx_json_parse *jp_data, char *host, size_t host_len, char **error) { const char *pnext = NULL; struct zbx_json_parse jp_row; if (NULL == (pnext = zbx_json_next(jp_data, pnext))) { *error = NULL; return FAIL; } if (FAIL == zbx_json_brackets_open(pnext, &jp_row)) { *error = zbx_dsprintf(*error, "cannot open \"%s\" token", ZBX_PROTO_TAG_DATA); return FAIL; } if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_HOST, host, host_len, NULL)) { *error = zbx_dsprintf(*error, "cannot find \"%s\" token in data", ZBX_PROTO_TAG_HOST); return FAIL; } return SUCCEED; } /****************************************************************************** * * * Purpose: process history data received from Zabbix active agent * * * * Parameters: sock - [IN] connection socket * * jp - [IN] JSON with history data * * ts - [IN] connection timestamp * * info - [OUT] address of a pointer to the info string * * (should be freed by the caller) * * * * Return value: SUCCEED - processed successfully * * FAIL - an error occurred * * * ******************************************************************************/ int zbx_process_agent_history_data(zbx_socket_t *sock, struct zbx_json_parse *jp, zbx_timespec_t *ts, char **info) { zbx_comms_redirect_t redirect; struct zbx_json_parse jp_data; int ret = FAIL, version; char *token = NULL; zbx_session_t *session; zbx_uint64_t hostid; size_t token_alloc = 0; zbx_host_rights_t rights = {0}; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); log_client_timediff(LOG_LEVEL_DEBUG, jp, ts); if (SUCCEED != zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DATA, &jp_data)) { ret = SUCCEED; goto out; } if (SUCCEED == zbx_json_value_by_name_dyn(jp, ZBX_PROTO_TAG_SESSION, &token, &token_alloc, NULL)) { size_t token_len; if (ZBX_SESSION_TOKEN_SIZE != (token_len = strlen(token))) { *info = zbx_dsprintf(*info, "invalid session token length %d", (int)token_len); goto out; } } char tmp[MAX_STRING_LEN]; if (SUCCEED != zbx_json_value_by_name(jp, ZBX_PROTO_TAG_VERSION, tmp, sizeof(tmp), NULL) || FAIL == (version = zbx_get_component_version_without_patch(tmp))) { version = ZBX_COMPONENT_VERSION(4, 2, 0); } if (ZBX_COMPONENT_VERSION(4, 4, 0) > version) { if (SUCCEED != peek_hostkey_host(&jp_data, tmp, sizeof(tmp), info)) { if (NULL == *info) ret = SUCCEED; goto out; } } else { if (SUCCEED != zbx_json_value_by_name(jp, ZBX_PROTO_TAG_HOST, tmp, sizeof(tmp), NULL)) { *info = zbx_dsprintf(*info, "cannot find \"%s\" token", ZBX_PROTO_TAG_HOST); goto out; } } if (FAIL == (ret = zbx_dc_config_get_hostid_by_name(tmp, sock, &hostid, &redirect))) { *info = zbx_dsprintf(*info, "unknown host '%s'", tmp); /* send success response so agent will not retry upload with the same non-existing host */ ret = SUCCEED; goto out; } if (SUCCEED_PARTIAL == ret) { struct zbx_json j; zbx_json_init(&j, 1024); zbx_add_redirect_response(&j, &redirect); *info = zbx_strdup(NULL, j.buffer); zbx_json_free(&j); goto out; } if (ZBX_COMPONENT_VERSION(4, 4, 0) <= version) { if (NULL == token) session = NULL; else session = zbx_dc_get_or_create_session(hostid, token, ZBX_SESSION_TYPE_DATA); ret = process_history_data_by_itemids(sock, agent_item_validator, &rights, &jp_data, session, NULL, info, ZBX_ITEM_GET_DEFAULT); } else { process_history_data_by_keys(sock, agent_item_validator, &rights, info, &jp_data, token); ret = SUCCEED; } out: zbx_free(token); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: process history data received from Zabbix sender * * * * Parameters: sock - [IN] connection socket * * jp - [IN] JSON with history data * * ts - [IN] connection timestamp * * info - [OUT] address of a pointer to the info string * * (should be freed by the caller) * * * * Return value: SUCCEED - processed successfully * * FAIL - an error occurred * * * ******************************************************************************/ int zbx_process_sender_history_data(zbx_socket_t *sock, struct zbx_json_parse *jp, zbx_timespec_t *ts, char **info) { zbx_host_rights_t rights = {0}; int ret = FAIL; zbx_dc_um_handle_t *um_handle; struct zbx_json_parse jp_data; char host[ZBX_HOSTNAME_BUF_LEN]; if (SUCCEED == zbx_vps_monitor_capped()) { *info = zbx_strdup(*info, "data collection has been paused"); return FAIL; } log_client_timediff(LOG_LEVEL_DEBUG, jp, ts); um_handle = zbx_dc_open_user_macros(); if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DATA, &jp_data)) { if (SUCCEED == (ret = peek_hostkey_host(&jp_data, host, sizeof(host), info))) { zbx_comms_redirect_t redirect; zbx_uint64_t hostid; if (SUCCEED_PARTIAL == (ret = zbx_dc_config_get_hostid_by_name(host, sock, &hostid, &redirect))) { struct zbx_json j; zbx_json_init(&j, 1024); zbx_add_redirect_response(&j, &redirect); *info = zbx_strdup(NULL, j.buffer); zbx_json_free(&j); goto out; } } process_history_data_by_keys(sock, sender_item_validator, &rights, info, &jp_data, NULL); ret = SUCCEED; } else *info = zbx_dsprintf(*info, "cannot open \"%s\" token", ZBX_PROTO_TAG_DATA); out: zbx_dc_close_user_macros(um_handle); return ret; } static void zbx_dservice_ptr_free(zbx_dservice_t *service) { zbx_free(service); } static void zbx_drule_ip_free(zbx_drule_ip_t *ip) { zbx_vector_dservice_ptr_clear_ext(&ip->services, zbx_dservice_ptr_free); zbx_vector_dservice_ptr_destroy(&ip->services); zbx_free(ip); } static void zbx_drule_free(zbx_drule_t *drule) { zbx_vector_ptr_clear_ext(&drule->ips, (zbx_clean_func_t)zbx_drule_ip_free); zbx_vector_ptr_destroy(&drule->ips); zbx_vector_uint64_destroy(&drule->dcheckids); zbx_free(drule); } /****************************************************************************** * * * Purpose: process services discovered on IP address * * * ******************************************************************************/ static int process_services(const zbx_vector_dservice_ptr_t *services, const char *ip, const zbx_add_event_func_t add_event_cb, zbx_uint64_t druleid, zbx_vector_uint64_t *dcheckids, zbx_uint64_t unique_dcheckid, int *processed_num, int ip_idx, zbx_discovery_update_host_func_t discovery_update_host_cb, zbx_discovery_update_service_func_t discovery_update_service_cb, zbx_discovery_update_service_down_func_t discovery_update_service_down_cb, zbx_discovery_find_host_func_t discovery_find_host_cb) { zbx_db_dhost dhost; zbx_dservice_t *service; int services_num, ret = FAIL, i, dchecks = 0; zbx_vector_dservice_ptr_t services_old; zbx_vector_uint64_t dserviceids; zbx_db_drule drule = {.druleid = druleid, .unique_dcheckid = unique_dcheckid}; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); memset(&dhost, 0, sizeof(dhost)); zbx_vector_dservice_ptr_create(&services_old); zbx_vector_uint64_create(&dserviceids); /* find host update */ for (i = *processed_num; i < services->values_num; i++) { service = services->values[i]; zabbix_log(LOG_LEVEL_DEBUG, "%s() druleid:" ZBX_FS_UI64 " dcheckid:" ZBX_FS_UI64 " unique_dcheckid:" ZBX_FS_UI64 " time:'%s %s' ip:'%s' dns:'%s' port:%hu status:%d value:'%s'", __func__, drule.druleid, service->dcheckid, drule.unique_dcheckid, zbx_date2str(service->itemtime, NULL), zbx_time2str(service->itemtime, NULL), ip, service->dns, service->port, service->status, service->value); if (0 == service->dcheckid) break; dchecks++; } /* stop processing current discovery rule and save proxy history until host update is available */ if (i == services->values_num) { zbx_db_insert_t db_insert; zbx_db_insert_prepare(&db_insert, "proxy_dhistory", "id", "clock", "druleid", "ip", "port", "value", "status", "dcheckid", "dns", "error", (char *)NULL); for (i = *processed_num; i < services->values_num; i++) { zbx_db_insert_add_values(&db_insert, __UINT64_C(0), (int)service->itemtime, drule.druleid, ip, service->port, service->value, service->status, service->dcheckid, service->dns, ""); } zbx_db_insert_autoincrement(&db_insert, "id"); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); goto fail; } services_num = i; if (0 == *processed_num && 0 == ip_idx) { zbx_db_result_t result; zbx_db_row_t row; zbx_uint64_t dcheckid; result = zbx_db_select( "select dcheckid,clock,port,value,status,dns,ip" " from proxy_dhistory" " where druleid=" ZBX_FS_UI64 " order by id", drule.druleid); for (i = 0; NULL != (row = zbx_db_fetch(result)); i++) { if (SUCCEED == zbx_db_is_null(row[0])) continue; ZBX_STR2UINT64(dcheckid, row[0]); if (0 == strcmp(ip, row[6])) { service = (zbx_dservice_t *)zbx_malloc(NULL, sizeof(zbx_dservice_t)); service->dcheckid = dcheckid; service->itemtime = (time_t)atoi(row[1]); service->port = atoi(row[2]); zbx_strlcpy_utf8(service->value, row[3], ZBX_MAX_DISCOVERED_VALUE_SIZE); service->status = atoi(row[4]); zbx_strlcpy(service->dns, row[5], ZBX_INTERFACE_DNS_LEN_MAX); zbx_vector_dservice_ptr_append(&services_old, service); zbx_vector_uint64_append(dcheckids, service->dcheckid); dchecks++; } } zbx_db_free_result(result); if (0 != i) { zbx_db_execute("delete from proxy_dhistory" " where druleid=" ZBX_FS_UI64, drule.druleid); } zbx_vector_uint64_sort(dcheckids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_uniq(dcheckids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); if (SUCCEED != zbx_db_lock_druleid(drule.druleid)) { zabbix_log(LOG_LEVEL_DEBUG, "druleid:" ZBX_FS_UI64 " does not exist", drule.druleid); goto fail; } if (0 != dchecks && SUCCEED != zbx_db_lock_ids("dchecks", "dcheckid", dcheckids)) { zabbix_log(LOG_LEVEL_DEBUG, "checks are not available for druleid:" ZBX_FS_UI64, drule.druleid); goto fail; } } if (0 == dchecks) { discovery_find_host_cb(druleid, ip, &dhost); /* we will mark all services as DOWN */ if (0 == dhost.dhostid) { (*processed_num)++; zabbix_log(LOG_LEVEL_DEBUG, "cannot process update of unknown host without services"); goto out; } } else { for (i = 0; i < services_old.values_num; i++) { service = services_old.values[i]; if (FAIL == zbx_vector_uint64_bsearch(dcheckids, service->dcheckid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)) { zabbix_log(LOG_LEVEL_DEBUG, "dcheckid:" ZBX_FS_UI64 " does not exist", service->dcheckid); continue; } discovery_update_service_cb(NULL, drule.druleid, service->dcheckid, drule.unique_dcheckid, &dhost, ip, service->dns, service->port, service->status, service->value, service->itemtime, &dserviceids, add_event_cb); } for (;*processed_num < services_num; (*processed_num)++) { service = services->values[*processed_num]; if (FAIL == zbx_vector_uint64_bsearch(dcheckids, service->dcheckid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)) { zabbix_log(LOG_LEVEL_DEBUG, "dcheckid:" ZBX_FS_UI64 " does not exist", service->dcheckid); continue; } discovery_update_service_cb(NULL, drule.druleid, service->dcheckid, drule.unique_dcheckid, &dhost, ip, service->dns, service->port, service->status, service->value, service->itemtime, &dserviceids, add_event_cb); } } service = services->values[(*processed_num)++]; if (0 != dhost.dhostid) discovery_update_service_down_cb(dhost.dhostid, service->itemtime, &dserviceids); discovery_update_host_cb(NULL, 0, &dhost, NULL, NULL, service->status, service->itemtime, add_event_cb); out: ret = SUCCEED; fail: zbx_vector_dservice_ptr_clear_ext(&services_old, zbx_dservice_ptr_free); zbx_vector_dservice_ptr_destroy(&services_old); zbx_vector_uint64_destroy(&dserviceids); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /********************************************************************************* * * * Purpose: parses discovery data contents and processes it * * * * Parameters: * * jp_data - [IN] JSON with discovery data * * events_cbs - [IN] * * discovery_update_host_cb - [IN] * * discovery_update_service_cb - [IN] * * error - [OUT] address of pointer to info string * * (should be freed by the caller) * * * * Return value: SUCCEED - processed successfully * * FAIL - error occurred * * * *********************************************************************************/ static int process_discovery_data_contents(struct zbx_json_parse *jp_data, const zbx_events_funcs_t *events_cbs, zbx_discovery_update_host_func_t discovery_update_host_cb, zbx_discovery_update_service_func_t discovery_update_service_cb, zbx_discovery_update_service_down_func_t discovery_update_service_down_cb, zbx_discovery_find_host_func_t discovery_find_host_cb, zbx_discovery_update_drule_func_t discovery_update_drule_cb, char **error) { zbx_db_result_t result; zbx_db_row_t row; zbx_uint64_t dcheckid, druleid; struct zbx_json_parse jp_row; int status, ret = SUCCEED, i, j; unsigned short port; const char *p = NULL; char ip[ZBX_INTERFACE_IP_LEN_MAX], tmp[MAX_STRING_LEN], dns[ZBX_INTERFACE_DNS_LEN_MAX], *value = NULL; time_t itemtime; size_t value_alloc = ZBX_MAX_DISCOVERED_VALUE_SIZE; zbx_vector_ptr_t drules; zbx_drule_t *drule; zbx_drule_ip_t *drule_ip; zbx_dservice_t *service; zbx_vector_discoverer_drule_error_t drule_errors; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); value = (char *)zbx_malloc(value, value_alloc); zbx_vector_ptr_create(&drules); zbx_vector_discoverer_drule_error_create(&drule_errors); while (NULL != (p = zbx_json_next(jp_data, p))) { if (FAIL == zbx_json_brackets_open(p, &jp_row)) goto json_parse_error; if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp), NULL)) goto json_parse_error; itemtime = atoi(tmp); if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DRULE, tmp, sizeof(tmp), NULL)) goto json_parse_error; ZBX_STR2UINT64(druleid, tmp); if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_STATUS, tmp, sizeof(tmp), NULL)) status = atoi(tmp); else status = 0; if (DOBJECT_STATUS_FINALIZED == status) { zbx_discoverer_drule_error_t dre_val = {.druleid = druleid, .error = NULL}; if (FAIL != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_ERROR, tmp, sizeof(tmp), NULL) && '\0' != tmp[0]) { dre_val.error = zbx_strdup(NULL, tmp); } zbx_vector_discoverer_drule_error_append(&drule_errors, dre_val); continue; } if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DCHECK, tmp, sizeof(tmp), NULL)) goto json_parse_error; if ('\0' != *tmp) ZBX_STR2UINT64(dcheckid, tmp); else dcheckid = 0; if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_IP, ip, sizeof(ip), NULL)) goto json_parse_error; if (SUCCEED != zbx_is_ip(ip)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid IP address", __func__, ip); continue; } if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_PORT, tmp, sizeof(tmp), NULL)) { port = 0; } else if (FAIL == zbx_is_ushort(tmp, &port)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid port", __func__, tmp); continue; } if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_VALUE, &value, &value_alloc, NULL)) *value = '\0'; if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DNS, dns, sizeof(dns), NULL)) { *dns = '\0'; } else if ('\0' != *dns && FAIL == zbx_validate_hostname(dns)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid hostname", __func__, dns); continue; } if (FAIL == (i = zbx_vector_ptr_search(&drules, &druleid, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC))) { drule = (zbx_drule_t *)zbx_malloc(NULL, sizeof(zbx_drule_t)); drule->druleid = druleid; zbx_vector_ptr_create(&drule->ips); zbx_vector_uint64_create(&drule->dcheckids); zbx_vector_ptr_append(&drules, drule); } else drule = drules.values[i]; if (FAIL == (i = zbx_vector_ptr_search(&drule->ips, ip, ZBX_DEFAULT_STR_COMPARE_FUNC))) { drule_ip = (zbx_drule_ip_t *)zbx_malloc(NULL, sizeof(zbx_drule_ip_t)); zbx_strlcpy(drule_ip->ip, ip, ZBX_INTERFACE_IP_LEN_MAX); zbx_vector_dservice_ptr_create(&drule_ip->services); zbx_vector_ptr_append(&drule->ips, drule_ip); } else drule_ip = drule->ips.values[i]; service = (zbx_dservice_t *)zbx_malloc(NULL, sizeof(zbx_dservice_t)); if (0 != (service->dcheckid = dcheckid)) zbx_vector_uint64_append(&drule->dcheckids, service->dcheckid); service->port = port; service->status = status; zbx_strlcpy_utf8(service->value, value, ZBX_MAX_DISCOVERED_VALUE_SIZE); zbx_strlcpy(service->dns, dns, ZBX_INTERFACE_DNS_LEN_MAX); service->itemtime = itemtime; zbx_vector_dservice_ptr_append(&drule_ip->services, service); continue; json_parse_error: *error = zbx_strdup(*error, zbx_json_strerror()); ret = FAIL; goto json_parse_return; } for (i = 0; i < drules.values_num; i++) { zbx_uint64_t unique_dcheckid; int ret2 = SUCCEED; drule = (zbx_drule_t *)drules.values[i]; zbx_db_begin(); result = zbx_db_select( "select dcheckid" " from dchecks" " where druleid=" ZBX_FS_UI64 " and uniq=1", drule->druleid); if (NULL != (row = zbx_db_fetch(result))) ZBX_STR2UINT64(unique_dcheckid, row[0]); else unique_dcheckid = 0; zbx_db_free_result(result); for (j = 0; j < drule->ips.values_num && SUCCEED == ret2; j++) { int processed_num = 0; drule_ip = (zbx_drule_ip_t *)drule->ips.values[j]; while (processed_num != drule_ip->services.values_num) { if (FAIL == (ret2 = process_services(&drule_ip->services, drule_ip->ip, events_cbs->add_event_cb, drule->druleid, &drule->dcheckids, unique_dcheckid, &processed_num, j, discovery_update_host_cb, discovery_update_service_cb, discovery_update_service_down_cb, discovery_find_host_cb))) { break; } } } if (NULL != events_cbs->process_events_cb) events_cbs->process_events_cb(NULL, NULL, NULL); if (NULL != events_cbs->clean_events_cb) events_cbs->clean_events_cb(); zbx_db_commit(); } for (i = 0; i < drule_errors.values_num; i++) { discovery_update_drule_cb(NULL, drule_errors.values[i].druleid, drule_errors.values[i].error, 0); } json_parse_return: zbx_free(value); zbx_vector_ptr_clear_ext(&drules, (zbx_clean_func_t)zbx_drule_free); zbx_vector_ptr_destroy(&drules); zbx_vector_discoverer_drule_error_clear_ext(&drule_errors, zbx_discoverer_drule_error_free); zbx_vector_discoverer_drule_error_destroy(&drule_errors); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: parse autoregistration data contents and process it * * * * Parameters: * * jp_data - [IN] JSON with autoregistration data * * proxy - [IN] * * events_cbs - [IN] * * autoreg_host_free_cb - [IN] * * autoreg_flush_hosts_cb - [IN] * * autoreg_prepare_host_cb - [IN] * * error - [OUT] address of a pointer to the info * * string (should be freed by the caller) * * * * Return value: SUCCEED - processed successfully * * FAIL - an error occurred * * * ******************************************************************************/ static int process_autoregistration_contents(struct zbx_json_parse *jp_data, const zbx_dc_proxy_t *proxy, const zbx_events_funcs_t *events_cbs, zbx_autoreg_host_free_func_t autoreg_host_free_cb, zbx_autoreg_flush_hosts_func_t autoreg_flush_hosts_cb, zbx_autoreg_prepare_host_func_t autoreg_prepare_host_cb, char **error) { struct zbx_json_parse jp_row; int ret = SUCCEED; const char *p = NULL; time_t itemtime; char host[ZBX_HOSTNAME_BUF_LEN], ip[ZBX_INTERFACE_IP_LEN_MAX], dns[ZBX_INTERFACE_DNS_LEN_MAX], tmp[MAX_STRING_LEN], *host_metadata = NULL; unsigned short port; size_t host_metadata_alloc = 1; /* for at least NUL-terminating string */ zbx_vector_autoreg_host_ptr_t autoreg_hosts; zbx_conn_flags_t flags = ZBX_CONN_DEFAULT; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (0 == zbx_dc_get_auto_registration_action_count()) { zabbix_log(LOG_LEVEL_DEBUG, "cannot process auto registration contents, all autoregistration actions" " are disabled"); goto out; } zbx_vector_autoreg_host_ptr_create(&autoreg_hosts); host_metadata = (char *)zbx_malloc(host_metadata, host_metadata_alloc); while (NULL != (p = zbx_json_next(jp_data, p))) { unsigned int connection_type; if (FAIL == (ret = zbx_json_brackets_open(p, &jp_row))) break; if (FAIL == (ret = zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp), NULL))) break; itemtime = atoi(tmp); if (FAIL == (ret = zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_HOST, host, sizeof(host), NULL))) break; if (FAIL == zbx_check_hostname(host, NULL)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid Zabbix host name", __func__, host); continue; } if (FAIL == zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_HOST_METADATA, &host_metadata, &host_metadata_alloc, NULL)) { *host_metadata = '\0'; } if (FAIL != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_FLAGS, tmp, sizeof(tmp), NULL)) { int flags_int; flags_int = atoi(tmp); switch (flags_int) { case ZBX_CONN_DEFAULT: case ZBX_CONN_IP: case ZBX_CONN_DNS: flags = (zbx_conn_flags_t)flags_int; break; default: flags = ZBX_CONN_DEFAULT; zabbix_log(LOG_LEVEL_WARNING, "wrong flags value: %d for host \"%s\":", flags_int, host); } } if (FAIL == (ret = zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_IP, ip, sizeof(ip), NULL))) { if (ZBX_CONN_DNS == flags) { *ip = '\0'; ret = SUCCEED; } else break; } else if (SUCCEED != zbx_is_ip(ip)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid IP address", __func__, ip); continue; } if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DNS, dns, sizeof(dns), NULL)) { *dns = '\0'; } else if ('\0' != *dns && FAIL == zbx_validate_hostname(dns)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid hostname", __func__, dns); continue; } if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_PORT, tmp, sizeof(tmp), NULL)) { port = ZBX_DEFAULT_AGENT_PORT; } else if (FAIL == zbx_is_ushort(tmp, &port)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid port", __func__, tmp); continue; } if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_TLS_ACCEPTED, tmp, sizeof(tmp), NULL)) { connection_type = ZBX_TCP_SEC_UNENCRYPTED; } else if (FAIL == zbx_is_uint32(tmp, &connection_type) || (ZBX_TCP_SEC_UNENCRYPTED != connection_type && ZBX_TCP_SEC_TLS_PSK != connection_type && ZBX_TCP_SEC_TLS_CERT != connection_type)) { zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid value for \"" ZBX_PROTO_TAG_TLS_ACCEPTED "\"", __func__, tmp); continue; } autoreg_prepare_host_cb(&autoreg_hosts, host, ip, dns, port, connection_type, host_metadata, (unsigned short)flags, (int)itemtime); } if (0 != autoreg_hosts.values_num) { zbx_db_begin(); autoreg_flush_hosts_cb(&autoreg_hosts, proxy, events_cbs); zbx_db_commit(); zbx_autoreg_host_invalidate_cache(&autoreg_hosts); } zbx_free(host_metadata); zbx_vector_autoreg_host_ptr_clear_ext(&autoreg_hosts, autoreg_host_free_cb); zbx_vector_autoreg_host_ptr_destroy(&autoreg_hosts); if (SUCCEED != ret) *error = zbx_strdup(*error, zbx_json_strerror()); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: parse tasks contents and saves the received tasks * * * * Parameters: jp_tasks - [IN] JSON with tasks data * * * ******************************************************************************/ static void process_tasks_contents(struct zbx_json_parse *jp_tasks) { zbx_vector_tm_task_t tasks; zbx_vector_tm_task_create(&tasks); zbx_tm_json_deserialize_tasks(jp_tasks, &tasks); zbx_db_begin(); zbx_tm_save_tasks(&tasks); zbx_db_commit(); zbx_vector_tm_task_clear_ext(&tasks, zbx_tm_task_free); zbx_vector_tm_task_destroy(&tasks); } /****************************************************************************** * * * Purpose: appends text to the string on a new line * * * ******************************************************************************/ static void zbx_strcatnl_alloc(char **info, size_t *info_alloc, size_t *info_offset, const char *text) { if (0 != *info_offset) zbx_chrcpy_alloc(info, info_alloc, info_offset, '\n'); zbx_strcpy_alloc(info, info_alloc, info_offset, text); } /********************************************************************************** * * * Purpose: detect lost connection with proxy and calculate suppression * * window if possible * * * * Parameters: ts - [IN] timestamp when the proxy connection was * * established * * proxy_staus - [IN] - active or passive proxy * * proxydata_frequency - [IN] * * diff - [IN/OUT] the properties to update * * * *********************************************************************************/ static void check_proxy_nodata(const zbx_timespec_t *ts, unsigned char proxy_status, int proxydata_frequency, zbx_proxy_diff_t *diff) { int delay; if (0 != (diff->nodata_win.flags & ZBX_PROXY_SUPPRESS_ACTIVE)) { diff->nodata_win.values_num = 0; /* reset counter of new suppress values received from proxy */ return; /* only for current packet */ } delay = ts->sec - diff->lastaccess; if ((PROXY_OPERATING_MODE_PASSIVE == proxy_status && (2 * proxydata_frequency) < delay && NET_DELAY_MAX < delay) || (PROXY_OPERATING_MODE_ACTIVE == proxy_status && NET_DELAY_MAX < delay)) { diff->nodata_win.values_num = 0; diff->nodata_win.period_end = ts->sec; diff->flags |= ZBX_FLAGS_PROXY_DIFF_UPDATE_SUPPRESS_WIN; diff->nodata_win.flags |= ZBX_PROXY_SUPPRESS_ENABLE; } } /****************************************************************************** * * * Purpose: detect lack of data during lost connectivity * * * * Parameters: ts - [IN] timestamp when the proxy connection was * * established * * proxy_staus - [IN] - active or passive proxy * * diff - [IN/OUT] the properties to update * * * ******************************************************************************/ static void check_proxy_nodata_empty(const zbx_timespec_t *ts, unsigned char proxy_status, zbx_proxy_diff_t *diff) { int delay_empty; if (0 != (diff->nodata_win.flags & ZBX_PROXY_SUPPRESS_EMPTY) && 0 != diff->nodata_win.values_num) diff->nodata_win.flags &= (~ZBX_PROXY_SUPPRESS_EMPTY); if (0 == (diff->nodata_win.flags & ZBX_PROXY_SUPPRESS_EMPTY) || 0 != diff->nodata_win.values_num) return; delay_empty = ts->sec - diff->nodata_win.period_end; if (PROXY_OPERATING_MODE_PASSIVE == proxy_status || (PROXY_OPERATING_MODE_ACTIVE == proxy_status && NET_DELAY_MAX < delay_empty)) { diff->nodata_win.period_end = 0; diff->nodata_win.flags = ZBX_PROXY_SUPPRESS_DISABLE; } } /***************************************************************************** * * * Purpose: processes 'proxy data' request * * * * Parameters: * * proxy - [IN] source proxy * * jp - [IN] JSON with proxy data * * ts - [IN] timestamp when proxy connection was * * established * * proxy_status - [IN] active or passive proxy mode * * events_cbs - [IN] * * proxydata_frequency - [IN] * * discovery_update_host_cb - [IN] * * discovery_update_service_cb - [IN] * * autoreg_host_free_cb - [IN] * * autoreg_flush_hosts_cb - [IN] * * autoreg_prepare_host_cb - [IN] * * more - [OUT] available data flag * * error - [OUT] address of pointer to info string * * (should be freed by the caller) * * * * Return value: SUCCEED - processed successfully * * FAIL - error occurred * * * *****************************************************************************/ int zbx_process_proxy_data(const zbx_dc_proxy_t *proxy, const struct zbx_json_parse *jp, const zbx_timespec_t *ts, unsigned char proxy_status, const zbx_events_funcs_t *events_cbs, int proxydata_frequency, zbx_discovery_update_host_func_t discovery_update_host_cb, zbx_discovery_update_service_func_t discovery_update_service_cb, zbx_discovery_update_service_down_func_t discovery_update_service_down_cb, zbx_discovery_find_host_func_t discovery_find_host_cb, zbx_discovery_update_drule_func_t discovery_update_drule_cb, zbx_autoreg_host_free_func_t autoreg_host_free_cb, zbx_autoreg_flush_hosts_func_t autoreg_flush_hosts_cb, zbx_autoreg_prepare_host_func_t autoreg_prepare_host_cb, int *more, char **error) { struct zbx_json_parse jp_data; int ret = SUCCEED, flags_old, lastaccess; char *error_step = NULL, value[MAX_STRING_LEN]; size_t error_alloc = 0, error_offset = 0; zbx_proxy_diff_t proxy_diff; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); proxy_diff.flags = ZBX_FLAGS_PROXY_DIFF_UNSET; proxy_diff.hostid = proxy->proxyid; if (SUCCEED != (ret = zbx_dc_get_proxy_nodata_win(proxy_diff.hostid, &proxy_diff.nodata_win, &lastaccess))) { zabbix_log(LOG_LEVEL_WARNING, "cannot get proxy communication delay"); ret = FAIL; goto out; } proxy_diff.lastaccess = lastaccess; if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_MORE, value, sizeof(value), NULL)) proxy_diff.more_data = atoi(value); else proxy_diff.more_data = ZBX_PROXY_DATA_DONE; if (NULL != more) *more = proxy_diff.more_data; if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_PROXY_DELAY, value, sizeof(value), NULL)) proxy_diff.proxy_delay = atoi(value); else proxy_diff.proxy_delay = 0; proxy_diff.flags |= ZBX_FLAGS_PROXY_DIFF_UPDATE_PROXYDELAY; flags_old = proxy_diff.nodata_win.flags; /* first packet can be empty for active proxy */ check_proxy_nodata(ts, proxy_status, proxydata_frequency, &proxy_diff); zabbix_log(LOG_LEVEL_DEBUG, "%s() flag_win:%d/%d flag:%d proxy_status:%d period_end:%d delay:" ZBX_FS_TIME_T " timestamp:%d lastaccess:" ZBX_FS_TIME_T " proxy_delay:%d more:%d", __func__, proxy_diff.nodata_win.flags, flags_old, (int)proxy_diff.flags, proxy_status, proxy_diff.nodata_win.period_end, (zbx_fs_time_t)(ts->sec - proxy_diff.lastaccess), ts->sec, (zbx_fs_time_t)proxy_diff.lastaccess, proxy_diff.proxy_delay, proxy_diff.more_data); if (ZBX_FLAGS_PROXY_DIFF_UNSET != proxy_diff.flags) zbx_dc_update_proxy(&proxy_diff); if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_INTERFACE_AVAILABILITY, &jp_data)) { if (SUCCEED != (ret = process_interfaces_availability_contents(&jp_data, &error_step))) zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step); } flags_old = proxy_diff.nodata_win.flags; if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_HISTORY_DATA, &jp_data)) { zbx_session_t *session = NULL; if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_SESSION, value, sizeof(value), NULL)) { size_t token_len; if (ZBX_SESSION_TOKEN_SIZE != (token_len = strlen(value))) { *error = zbx_dsprintf(*error, "invalid session token length %d", (int)token_len); ret = FAIL; goto out; } session = zbx_dc_get_or_create_session(proxy->proxyid, value, ZBX_SESSION_TYPE_DATA); } if (SUCCEED != (ret = process_history_data_by_itemids(NULL, proxy_item_validator, (void *)&proxy->proxyid, &jp_data, session, &proxy_diff.nodata_win, &error_step, ZBX_ITEM_GET_PROCESS))) { zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step); } } if (0 != (proxy_diff.nodata_win.flags & ZBX_PROXY_SUPPRESS_ACTIVE)) { check_proxy_nodata_empty(ts, proxy_status, &proxy_diff); if (0 < proxy_diff.nodata_win.values_num || flags_old != proxy_diff.nodata_win.flags) proxy_diff.flags |= ZBX_FLAGS_PROXY_DIFF_UPDATE_SUPPRESS_WIN; zabbix_log(LOG_LEVEL_DEBUG, "Result of %s() flag_win:%d/%d flag:%d values_num:%d", __func__, proxy_diff.nodata_win.flags, flags_old, (int)proxy_diff.flags, proxy_diff.nodata_win.values_num); } if (ZBX_FLAGS_PROXY_DIFF_UNSET != proxy_diff.flags) zbx_dc_update_proxy(&proxy_diff); if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DISCOVERY_DATA, &jp_data)) { if (SUCCEED != (ret = process_discovery_data_contents(&jp_data, events_cbs, discovery_update_host_cb, discovery_update_service_cb, discovery_update_service_down_cb, discovery_find_host_cb, discovery_update_drule_cb, &error_step))) { zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step); } } if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_AUTOREGISTRATION, &jp_data)) { if (SUCCEED != (ret = process_autoregistration_contents(&jp_data, proxy, events_cbs, autoreg_host_free_cb, autoreg_flush_hosts_cb, autoreg_prepare_host_cb, &error_step))) { zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step); } } if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_TASKS, &jp_data)) process_tasks_contents(&jp_data); if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_PROXY_ACTIVE_AVAIL_DATA, &jp_data)) { const char *ptr; zbx_vector_proxy_hostdata_ptr_t host_avails; struct zbx_json_parse jp_host; char buffer[ZBX_KIBIBYTE]; zbx_vector_proxy_hostdata_ptr_create(&host_avails); for (ptr = NULL; NULL != (ptr = zbx_json_next(&jp_data, ptr));) { zbx_proxy_hostdata_t *host; if (SUCCEED != zbx_json_brackets_open(ptr, &jp_host)) continue; if (SUCCEED == zbx_json_value_by_name(&jp_host, ZBX_PROTO_TAG_HOSTID, buffer, sizeof(buffer), NULL)) { host = (zbx_proxy_hostdata_t *)zbx_malloc(NULL, sizeof(zbx_proxy_hostdata_t)); host->hostid = atoi(buffer); } else continue; if (FAIL == zbx_json_value_by_name(&jp_host, ZBX_PROTO_TAG_ACTIVE_STATUS, buffer, sizeof(buffer), NULL)) { zbx_free(host); continue; } host->status = atoi(buffer); zbx_vector_proxy_hostdata_ptr_append(&host_avails, host); } if (0 != host_avails.values_num) { unsigned char *data = NULL; zbx_uint32_t data_len; zbx_dc_host_t *hosts; int i, *errcodes; zbx_vector_uint64_t hostids; zbx_vector_proxy_hostdata_ptr_t proxy_host_avails; zbx_vector_uint64_create(&hostids); for (i = 0; i < host_avails.values_num; i++) zbx_vector_uint64_append(&hostids, host_avails.values[i]->hostid); hosts = (zbx_dc_host_t *)zbx_malloc(NULL, sizeof(zbx_dc_host_t) * host_avails.values_num); errcodes = (int *)zbx_malloc(NULL, sizeof(int) * host_avails.values_num); zbx_dc_config_get_hosts_by_hostids(hosts, hostids.values, errcodes, hostids.values_num); zbx_vector_uint64_destroy(&hostids); zbx_vector_proxy_hostdata_ptr_create(&proxy_host_avails); for (i = 0; i < host_avails.values_num; i++) { if (SUCCEED == errcodes[i] && hosts[i].proxyid == proxy->proxyid) zbx_vector_proxy_hostdata_ptr_append(&proxy_host_avails, host_avails.values[i]); } zbx_free(errcodes); zbx_free(hosts); data_len = zbx_availability_serialize_proxy_hostdata(&data, &proxy_host_avails, proxy->proxyid); zbx_availability_send(ZBX_IPC_AVAILMAN_PROCESS_PROXY_HOSTDATA, data, data_len, NULL); zbx_vector_proxy_hostdata_ptr_destroy(&proxy_host_avails); zbx_vector_proxy_hostdata_ptr_clear_ext(&host_avails, (zbx_proxy_hostdata_ptr_free_func_t)zbx_ptr_free); zbx_free(data); } zbx_vector_proxy_hostdata_ptr_destroy(&host_avails); } out: zbx_free(error_step); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: flushes lastaccess changes for proxies every * * ZBX_PROXY_LASTACCESS_UPDATE_FREQUENCY seconds * * * ******************************************************************************/ static void zbx_db_flush_proxy_lastaccess(void) { zbx_vector_uint64_pair_t lastaccess; zbx_vector_uint64_pair_create(&lastaccess); zbx_dc_get_proxy_lastaccess(&lastaccess); if (0 != lastaccess.values_num) { char *sql; size_t sql_alloc = 256, sql_offset = 0; int i; sql = (char *)zbx_malloc(NULL, sql_alloc); zbx_db_begin(); zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); for (i = 0; i < lastaccess.values_num; i++) { zbx_uint64_pair_t *pair = &lastaccess.values[i]; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update proxy_rtdata" " set lastaccess=%d" " where proxyid=" ZBX_FS_UI64 ";\n", (int)pair->second, pair->first); zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset); } zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (16 < sql_offset) /* in ORACLE always present begin..end; */ zbx_db_execute("%s", sql); zbx_db_commit(); zbx_free(sql); } zbx_vector_uint64_pair_destroy(&lastaccess); } /****************************************************************************** * * * Purpose: updates proxy version and compatibility with server in database * * * * Parameters: proxy - [IN] the proxy to update version for * * diff - [IN] indicates changes to the proxy * * * ******************************************************************************/ static void db_update_proxy_version(zbx_dc_proxy_t *proxy, zbx_proxy_diff_t *diff) { if (0 != (diff->flags & ZBX_FLAGS_PROXY_DIFF_UPDATE_VERSION)) { if (0 != proxy->version_int) { zabbix_log(LOG_LEVEL_DEBUG, "proxy \"%s\" protocol version updated from %u.%u to %u.%u", proxy->name, ZBX_COMPONENT_VERSION_MAJOR(proxy->version_int), ZBX_COMPONENT_VERSION_MINOR(proxy->version_int), ZBX_COMPONENT_VERSION_MAJOR(diff->version_int), ZBX_COMPONENT_VERSION_MINOR(diff->version_int)); } if (ZBX_DB_OK > zbx_db_execute( "update proxy_rtdata" " set version=%u,compatibility=%u" " where proxyid=" ZBX_FS_UI64, ZBX_COMPONENT_VERSION_TO_DEC_FORMAT(diff->version_int), diff->compatibility, diff->hostid)) { zabbix_log(LOG_LEVEL_WARNING, "Failed to update proxy version and compatibility with server for" " proxy '%s'.", proxy->name); } } } /****************************************************************************** * * * Purpose: gets proxy version compatibility with server version * * * * Parameters: proxy_version - [IN] proxy_version * * * * Return value: proxy version compatibility with server version * * * ******************************************************************************/ static zbx_proxy_compatibility_t zbx_get_proxy_compatibility(int proxy_version) { #define SERVER_VERSION ZBX_COMPONENT_VERSION(ZABBIX_VERSION_MAJOR, ZABBIX_VERSION_MINOR, 0) if (0 == proxy_version) return ZBX_PROXY_VERSION_UNDEFINED; proxy_version = ZBX_COMPONENT_VERSION_WITHOUT_PATCH(proxy_version); if (SERVER_VERSION == proxy_version) return ZBX_PROXY_VERSION_CURRENT; if (SERVER_VERSION < proxy_version) return ZBX_PROXY_VERSION_UNSUPPORTED; #if (ZABBIX_VERSION_MINOR == 0) if (ZABBIX_VERSION_MAJOR == 1 + ZBX_COMPONENT_VERSION_MAJOR(proxy_version)) return ZBX_PROXY_VERSION_OUTDATED; #elif (ZABBIX_VERSION_MINOR > 0) if (ZABBIX_VERSION_MAJOR == ZBX_COMPONENT_VERSION_MAJOR(proxy_version)) return ZBX_PROXY_VERSION_OUTDATED; #endif return ZBX_PROXY_VERSION_UNSUPPORTED; #undef SERVER_VERSION } /****************************************************************************** * * * Purpose: updates proxy runtime properties in cache and database. * * * * Parameters: proxy - [IN/OUT] the proxy * * version_str - [IN] the proxy version as string * * version_int - [IN] the proxy version in numeric representation * * lastaccess - [IN] the last proxy access time * * compress - [IN] 1 if proxy is using data compression, * * 0 otherwise * * flags_add - [IN] additional flags for update proxy * * * * Comments: The proxy parameter properties are also updated. * * * ******************************************************************************/ void zbx_update_proxy_data(zbx_dc_proxy_t *proxy, char *version_str, int version_int, time_t lastaccess, zbx_uint64_t flags_add) { zbx_proxy_diff_t diff; zbx_proxy_compatibility_t compatibility; compatibility = zbx_get_proxy_compatibility(version_int); diff.hostid = proxy->proxyid; diff.flags = ZBX_FLAGS_PROXY_DIFF_UPDATE | flags_add; diff.version_str = version_str; diff.version_int = version_int; diff.compatibility = compatibility; diff.lastaccess = lastaccess; zbx_dc_update_proxy(&diff); db_update_proxy_version(proxy, &diff); zbx_strlcpy(proxy->version_str, version_str, sizeof(proxy->version_str)); proxy->version_int = version_int; proxy->compatibility = compatibility; proxy->lastaccess = lastaccess; zbx_db_flush_proxy_lastaccess(); } /****************************************************************************** * * * Purpose: flushes last_version_error_time changes runtime * * variable for proxies structures * * * ******************************************************************************/ static void zbx_update_proxy_lasterror(zbx_dc_proxy_t *proxy) { zbx_proxy_diff_t diff; diff.hostid = proxy->proxyid; diff.flags = ZBX_FLAGS_PROXY_DIFF_UPDATE_LASTERROR; diff.lastaccess = time(NULL); diff.last_version_error_time = proxy->last_version_error_time; zbx_dc_update_proxy(&diff); } /****************************************************************************** * * * Purpose: check server and proxy versions and compatibility rules * * * * Parameters: * * proxy - [IN] the source proxy * * version - [IN] the version of proxy * * * * Return value: * * SUCCEED - no compatibility issue * * FAIL - compatibility check fault * * * ******************************************************************************/ int zbx_check_protocol_version(zbx_dc_proxy_t *proxy, int version) { zbx_proxy_compatibility_t compatibility; compatibility = zbx_get_proxy_compatibility(version); /* warn if another proxy version is used and proceed with compatibility rules*/ if (ZBX_PROXY_VERSION_CURRENT != compatibility) { time_t now = time(NULL); int print_log = 0; if (proxy->last_version_error_time <= now) { print_log = 1; proxy->last_version_error_time = now + 5 * SEC_PER_MIN; zbx_update_proxy_lasterror(proxy); } if (ZBX_PROXY_VERSION_UNSUPPORTED == compatibility) { if (1 == print_log) { zabbix_log(LOG_LEVEL_WARNING, "Proxy \"%s\" version %u.%u.%u is not supported by server" " version %d.%d.%d.", proxy->name, ZBX_COMPONENT_VERSION_MAJOR(version), ZBX_COMPONENT_VERSION_MINOR(version), ZBX_COMPONENT_VERSION_PATCH(version), ZABBIX_VERSION_MAJOR, ZABBIX_VERSION_MINOR, ZABBIX_VERSION_PATCH); } return FAIL; } else if (ZBX_PROXY_VERSION_OUTDATED == compatibility && 1 == print_log) { zabbix_log(LOG_LEVEL_WARNING, "Proxy \"%s\" version %u.%u.%u is outdated, only data collection" " and remote execution is available with server version %d.%d.%d.", proxy->name, ZBX_COMPONENT_VERSION_MAJOR(version), ZBX_COMPONENT_VERSION_MINOR(version), ZBX_COMPONENT_VERSION_PATCH(version), ZABBIX_VERSION_MAJOR, ZABBIX_VERSION_MINOR, ZABBIX_VERSION_PATCH); } else if (ZBX_PROXY_VERSION_UNDEFINED == compatibility) return FAIL; } return SUCCEED; }