/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "proxybuffer.h" #include "pb_autoreg.h" #include "pb_discovery.h" #include "pb_history.h" #include "zbxalgo.h" #include "zbxcommon.h" #include "zbxdb.h" #include "zbxdbhigh.h" #include "zbxmutexs.h" #include "zbxnum.h" #include "zbxproxybuffer.h" #include "zbxshmem.h" #include "zbxstr.h" #define PB_DB_FLUSH_DISABLED 0 #define PB_DB_FLUSH_ENABLED 1 ZBX_PTR_VECTOR_IMPL(pb_history_ptr, zbx_pb_history_t *) ZBX_PTR_VECTOR_IMPL(pb_discovery_ptr, zbx_pb_discovery_t *) static zbx_pb_t *pb_data = NULL; static zbx_shmem_info_t *pb_mem = NULL; ZBX_SHMEM_FUNC_IMPL(__pb, pb_mem) static void pb_init_state(zbx_pb_t *pb); /* remap states to incoming data destination - database or memory */ zbx_pb_state_t pb_dst[] = {PB_DATABASE, PB_MEMORY, PB_MEMORY, PB_DATABASE}; zbx_pb_state_t get_pb_dst(int i) { return pb_dst[i]; } /* remap states to outgoing data source - database or memory */ zbx_pb_state_t pb_src[] = {PB_DATABASE, PB_DATABASE, PB_MEMORY, PB_MEMORY}; zbx_pb_state_t get_pb_src(int i) { return pb_src[i]; } const char *pb_state_desc[] = {"database", "database->memory", "memory", "memory->database"}; void pb_lock(void) { zbx_mutex_lock(pb_data->mutex); } void pb_unlock(void) { zbx_mutex_unlock(pb_data->mutex); } void *pb_malloc(size_t size) { return __pb_shmem_malloc_func(NULL, size); } void pb_free(void *ptr) { __pb_shmem_free_func(ptr); } char *pb_strdup(const char *str) { size_t len; char *cpy; len = strlen(str) + 1; if (NULL == (cpy = (char *)__pb_shmem_malloc_func(NULL, len))) return NULL; memcpy(cpy, str, len); return cpy; } size_t pb_get_free_size(void) { return (size_t)pb_mem->free_size; } /****************************************************************************** * * * Purpose: free the required number of bytes in proxy memory buffer * * * * Return value: SUCCEED - the space was freed successfully * * FAIL - all records were discarded without freeing enough * * space * * * * Comments: The space freed by discarding record is estimated and also the * * memory might be fragmented. So there is possibility that new * * record allocation will still fail. However calling this function * * in loop would eventually discard all records - so that would * * indicate that cache is too small to hold the new record. * * * ******************************************************************************/ int pb_free_space(zbx_pb_t *pb, size_t size) { int ret = SUCCEED; zabbix_log(LOG_LEVEL_DEBUG, "In %s() size:" ZBX_FS_SIZE_T, __func__, size); ssize_t size_left = (ssize_t)size; while (0 < size_left && SUCCEED == ret) { zbx_pb_history_t *hrow; zbx_pb_discovery_t *drow; zbx_pb_autoreg_t *arow; int clock; if (SUCCEED == zbx_list_peek(&pb->history, (void **)&hrow)) { clock = hrow->ts.sec; } else { hrow = NULL; clock = INT_MAX; } if (SUCCEED == zbx_list_peek(&pb->discovery, (void **)&drow) && drow->clock < clock) { clock = drow->clock; hrow = NULL; } else drow = NULL; if (SUCCEED == zbx_list_peek(&pb->autoreg, (void **)&arow) && arow->clock < clock) { zabbix_log(LOG_LEVEL_TRACE, "%s() discarding auto registration record from proxy memory buffer," " id:" ZBX_FS_UI64 " clock:%d", __func__, arow->id, arow->clock); zbx_list_pop(&pb->autoreg, NULL); size_left -= (ssize_t)pb_autoreg_estimate_row_size(arow->host, arow->host_metadata, arow->listen_ip, arow->listen_dns); pb_list_free_autoreg(&pb->autoreg, arow); continue; } if (NULL != hrow) { zabbix_log(LOG_LEVEL_TRACE, "%s() discarding history record from proxy memory buffer," " id:" ZBX_FS_UI64 " clock:%d", __func__, hrow->id, hrow->ts.sec); zbx_list_pop(&pb->history, NULL); size_left -= (ssize_t)pb_history_estimate_row_size(hrow->value, hrow->source); pb_list_free_history(&pb->history, hrow); continue; } if (NULL != drow) { zbx_uint64_t handleid = drow->handleid; do { zabbix_log(LOG_LEVEL_TRACE, "%s() discarding discovery record from proxy memory buffer," " id:" ZBX_FS_UI64 " clock:%d", __func__, drow->id, drow->clock); zbx_list_pop(&pb->discovery, NULL); size_left -= (ssize_t)pb_discovery_estimate_row_size(drow->value, drow->ip, drow->dns, drow->error); pb_list_free_discovery(&pb->discovery, drow); } while (SUCCEED == zbx_list_peek(&pb->discovery, (void **)&drow) && drow->handleid == handleid); continue; } ret = FAIL; } zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: check if proxy history table has unsent data and update lastid * * if necessary * * * * Parameters: table - [IN] history table * * field - [IN] key field name * * lastid - [OUT] id of the last uploaded row * * maxid - [OUT] max id of the row in database * * * * Return value: SUCCEED - table has unsent data * * FAIL - otherwise * * * ******************************************************************************/ static int pb_check_unsent_rows(const char *table, const char *field, zbx_uint64_t *lastid, zbx_uint64_t *maxid) { zbx_db_result_t result; zbx_db_row_t row; int ret; zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:%s", __func__, table); result = zbx_db_select("select nextid from ids where table_name='%s' and field_name='%s'", table, field); if (NULL == (row = zbx_db_fetch(result))) *lastid = 0; else ZBX_STR2UINT64(*lastid, row[0]); zbx_db_free_result(result); result = zbx_db_select("select max(id) from %s", table); if (NULL != (row = zbx_db_fetch(result))) ZBX_DBROW2UINT64(*maxid, row[0]); else *maxid = 0; zbx_db_free_result(result); if (*lastid < *maxid) { ret = (0 < *maxid ? SUCCEED : FAIL); } else { ret = FAIL; if (*lastid > *maxid) { *lastid = *maxid; pb_set_lastid(table, field, *maxid); } } zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s " ZBX_FS_UI64 "/" ZBX_FS_UI64, __func__, zbx_result_string(ret), *lastid, *maxid); return ret; } /****************************************************************************** * * * Purpose: set cache state and log the changes * * * ******************************************************************************/ void pb_set_state(zbx_pb_t *pb, zbx_pb_state_t state, const char *message) { zabbix_log(LOG_LEVEL_DEBUG, "In %s() %s => %s", __func__, pb_state_desc[pb->state], pb_state_desc[state]); if (PB_DATABASE == pb->state || PB_MEMORY == pb->state) pb->changes_num++; switch (state) { case PB_DATABASE: zabbix_log(LOG_LEVEL_DEBUG, "switched proxy buffer to database state: %s", message); break; case PB_DATABASE_MEMORY: zabbix_log(LOG_LEVEL_DEBUG, "initiated proxy buffer transition to memory state: %s", message); break; case PB_MEMORY: zabbix_log(LOG_LEVEL_DEBUG, "switched proxy buffer to memory state: %s", message); break; case PB_MEMORY_DATABASE: zabbix_log(LOG_LEVEL_DEBUG, "initiated proxy buffer transition to database state: %s", message); break; } pb->state = state; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: set initial buffer working state based on existing history data * * * ******************************************************************************/ static void pb_init_state(zbx_pb_t *pb) { int history_ret, discovery_ret, autoreg_ret; zbx_uint64_t lastid, maxid; if (ZBX_PB_MODE_MEMORY == pb->mode) { pb_set_state(pb, PB_MEMORY, "proxy buffer initialized in memory mode"); return; } history_ret = pb_check_unsent_rows("proxy_history", "history_lastid", &lastid, &maxid); pb->history_lastid_db = maxid; pb->history_lastid_sent = lastid; discovery_ret = pb_check_unsent_rows("proxy_dhistory", "dhistory_lastid", &lastid, &maxid); autoreg_ret = pb_check_unsent_rows("proxy_autoreg_host", "autoreg_host_lastid", &lastid, &maxid); if (ZBX_PB_MODE_DISK == pb->mode) pb_set_state(pb, PB_DATABASE, "proxy buffer initialized in disk mode"); else if (SUCCEED == history_ret || SUCCEED == discovery_ret || SUCCEED == autoreg_ret) pb_set_state(pb, PB_DATABASE, "unsent database records found"); else pb_set_state(pb, PB_MEMORY, "no unsent database records found"); } zbx_pb_t *get_pb_data(void) { return pb_data; } zbx_uint64_t pb_get_lastid(const char *table_name, const char *lastidfield) { zbx_db_result_t result; zbx_db_row_t row; zbx_uint64_t lastid; zabbix_log(LOG_LEVEL_DEBUG, "In %s() field:'%s.%s'", __func__, table_name, lastidfield); result = zbx_db_select("select nextid from ids where table_name='%s' and field_name='%s'", table_name, lastidfield); if (NULL == (row = zbx_db_fetch(result))) lastid = 0; else ZBX_STR2UINT64(lastid, row[0]); zbx_db_free_result(result); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():" ZBX_FS_UI64, __func__, lastid); return lastid; } /****************************************************************************** * * * Purpose: Get discovery/auto registration data from the database. * * * ******************************************************************************/ void pb_get_rows_db(struct zbx_json *j, const char *proto_tag, const zbx_history_table_t *ht, zbx_uint64_t *lastid, zbx_uint64_t *id, int *records_num, int *more) { size_t offset = 0; int f, records_num_last = *records_num, retries = 1; char sql[MAX_STRING_LEN]; zbx_db_result_t result; zbx_db_row_t row; struct timespec t_sleep = { 0, 100000000L }, t_rem; zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s'", __func__, ht->table); *more = ZBX_PROXY_DATA_DONE; offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, "select id"); for (f = 0; NULL != ht->fields[f].field; f++) offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, ",%s", ht->fields[f].field); try_again: zbx_snprintf(sql + offset, sizeof(sql) - offset, " from %s where id>" ZBX_FS_UI64 " order by id", ht->table, *id); result = zbx_db_select_n(sql, ZBX_MAX_HRECORDS); while (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(*lastid, row[0]); if (1 < *lastid - *id) { /* At least one record is missing. It can happen if some DB syncer process has */ /* started but not yet committed a transaction or a rollback occurred in a DB syncer. */ if (0 < retries--) { zbx_db_free_result(result); zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing." " Waiting " ZBX_FS_DBL " sec, retrying.", __func__, *lastid - *id - 1, (double)t_sleep.tv_sec + (double)t_sleep.tv_nsec / 1e9); nanosleep(&t_sleep, &t_rem); goto try_again; } else { zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing. No more retries.", __func__, *lastid - *id - 1); } } if (0 == *records_num) zbx_json_addarray(j, proto_tag); zbx_json_addobject(j, NULL); for (f = 0; NULL != ht->fields[f].field; f++) { if (NULL != ht->fields[f].default_value && 0 == strcmp(row[f + 1], ht->fields[f].default_value)) continue; zbx_json_addstring(j, ht->fields[f].tag, row[f + 1], ht->fields[f].jt); } (*records_num)++; zbx_json_close(j); /* stop gathering data to avoid exceeding the maximum packet size */ if (ZBX_DATA_JSON_RECORD_LIMIT < j->buffer_offset) { *more = ZBX_PROXY_DATA_MORE; break; } *id = *lastid; } zbx_db_free_result(result); if (ZBX_MAX_HRECORDS == *records_num - records_num_last) *more = ZBX_PROXY_DATA_MORE; zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d lastid:" ZBX_FS_UI64 " more:%d size:" ZBX_FS_SIZE_T, __func__, *records_num - records_num_last, *lastid, *more, (zbx_fs_size_t)j->buffer_offset); } void pb_set_lastid(const char *table_name, const char *lastidfield, const zbx_uint64_t lastid) { zabbix_log(LOG_LEVEL_DEBUG, "In %s() [%s.%s:" ZBX_FS_UI64 "]", __func__, table_name, lastidfield, lastid); if (0 == lastid) { zbx_db_execute("delete from ids where table_name='%s' and field_name='%s'", table_name, lastidfield); } else { zbx_db_result_t result; result = zbx_db_select("select 1 from ids where table_name='%s' and field_name='%s'", table_name, lastidfield); if (NULL == zbx_db_fetch(result)) { zbx_db_execute("insert into ids (table_name,field_name,nextid) values ('%s','%s'," ZBX_FS_UI64 ")", table_name, lastidfield, lastid); } else { zbx_db_execute("update ids set nextid=" ZBX_FS_UI64 " where table_name='%s' and field_name='%s'", lastid, table_name, lastidfield); } zbx_db_free_result(result); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: flush cached lastids to the database * * * ******************************************************************************/ static void pb_flush_lastids(zbx_pb_t *pb) { if (0 != pb->history_lastid_sent) pb_history_set_lastid(pb->history_lastid_sent); if (0 != pb->discovery_lastid_sent) pb_discovery_set_lastid(pb->discovery_lastid_sent); if (0 != pb->autoreg_lastid_sent) pb_autoreg_set_lastid(pb->autoreg_lastid_sent); } static void pb_flush(zbx_pb_t *pb) { if (ZBX_PB_MODE_MEMORY != pb->mode && (SUCCEED == pb_history_has_mem_rows(pb) || SUCCEED == pb_discovery_has_mem_rows(pb) || SUCCEED == pb_autoreg_has_mem_rows(pb))) { do { zbx_db_begin(); pb_autoreg_flush(pb); pb_discovery_flush(pb); pb_history_flush(pb); pb_flush_lastids(pb); } while (ZBX_DB_DOWN == zbx_db_commit()); } pb_history_clear(pb, UINT64_MAX); pb_discovery_clear(pb, UINT64_MAX); pb_autoreg_clear(pb, UINT64_MAX); } void pd_fallback_to_database(zbx_pb_t *pb, const char *message) { pb_flush(pb); pb_set_state(pb, PB_DATABASE, message); } /****************************************************************************** * * * Purpose: update proxy buffer state after successful upload based on * * records left and handles pending * * * ******************************************************************************/ static void pb_update_state(zbx_pb_t *pb, int more) { if (ZBX_PB_MODE_HYBRID != pb->mode) return; switch (pb->state) { case PB_MEMORY: /* memory state can switch to database when: */ /* 1) no space left to cache data */ /* 2) cached data exceeds maximum age */ /* Both cases ar checked when adding data to cache. */ break; case PB_MEMORY_DATABASE: if (SUCCEED != pb_history_has_mem_rows(pb) && SUCCEED != pb_discovery_has_mem_rows(pb) && SUCCEED != pb_autoreg_has_mem_rows(pb)) { zbx_db_begin(); pb_flush_lastids(pb); zbx_db_commit(); pb_set_state(pb, PB_DATABASE, "memory records have been uploaded"); } break; case PB_DATABASE: if (ZBX_PROXY_DATA_DONE == more) pb_set_state(pb, PB_DATABASE_MEMORY, "no more database records to upload"); break; case PB_DATABASE_MEMORY: if (0 == pb_data->db_handles_num && pb_data->history_lastid_db <= pb_data->history_lastid_sent && pb_data->discovery_lastid_db <= pb_data->discovery_lastid_sent && pb_data->autoreg_lastid_db <= pb_data->autoreg_lastid_sent) { pb_set_state(pb, PB_MEMORY, "database records have been uploaded"); } break; } } zbx_uint64_t pb_get_next_handleid(zbx_pb_t *pb) { return ++pb->handleid; } /****************************************************************************** * * * Purpose: register opened data handle * * * ******************************************************************************/ zbx_uint64_t pb_register_handle(zbx_pb_t *pb, zbx_vector_uint64_t *handleids) { zbx_uint64_t handleid; zabbix_log(LOG_LEVEL_TRACE, "In %s()", __func__); handleid = pb_get_next_handleid(pb); zbx_vector_uint64_append(handleids, handleid); zabbix_log(LOG_LEVEL_TRACE, "End of %s() handleid:" ZBX_FS_UI64 " handles:%d", __func__, handleid, handleids->values_num); return handleid; } /****************************************************************************** * * * Purpose: deregister data handle * * * ******************************************************************************/ void pb_deregister_handle(zbx_vector_uint64_t *handleids, zbx_uint64_t handleid) { int i; zabbix_log(LOG_LEVEL_TRACE, "In %s() handleid:" ZBX_FS_UI64, __func__, handleid); if (FAIL != (i = zbx_vector_uint64_search(handleids, handleid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))) zbx_vector_uint64_remove_noorder(handleids, i); zabbix_log(LOG_LEVEL_TRACE, "End of %s() handles:%d", __func__, handleids->values_num); } /****************************************************************************** * * * Purpose: wait for the opened data handles to be closed * * * * parameters: handleids - [IN] handle list to wait on * * * ******************************************************************************/ void pb_wait_handles(const zbx_vector_uint64_t *handleids) { int i; zbx_uint64_t handleid = 0; struct timespec delay = { 0, 100000000L }; if (NULL == handleids) { nanosleep(&delay, NULL); return; } pb_lock(); for (i = 0; i < handleids->values_num; i++) { if (handleids->values[i] > handleid) handleid = handleids->values[i]; } pb_unlock(); if (0 != handleid) { int wait_handles; do { wait_handles = 0; nanosleep(&delay, NULL); pb_lock(); for (i = 0; i < handleids->values_num; i++) { if (handleids->values[i] <= handleid) { wait_handles = 1; break; } } pb_unlock(); } while (0 != wait_handles); } } /* public api */ /****************************************************************************** * * * Purpose: create proxy buffer * * * * Parameters: mode - [IN] * * size - [IN] cache size in bytes * * age - [IN] maximum allowed data age * * offline_buffer [IN] offline buffer in seconds * * error - [OUT] error message * * * * Return value: SUCCEED - proxy buffer was created successfully * * FAIL - otherwise * * * ******************************************************************************/ int zbx_pb_create(int mode, zbx_uint64_t size, int age, int offline_buffer, char **error) { int ret = FAIL, allow_oom; zabbix_log(LOG_LEVEL_DEBUG, "In %s() mode:%d", __func__, mode); if (ZBX_PB_MODE_DISK == mode) { /* allocate proxy buffer only to store statistics and track opened history handles */ size = ZBX_KIBIBYTE * 16; allow_oom = 0; } else allow_oom = 1; if (FAIL == zbx_shmem_create(&pb_mem, size, "proxy memory buffer size", "ProxyMemoryBufferSize", allow_oom, error)) { goto out; } pb_data = (zbx_pb_t *)__pb_shmem_malloc_func(NULL, sizeof(zbx_pb_t)); memset(pb_data, 0, sizeof(zbx_pb_t)); if (SUCCEED != zbx_mutex_create(&pb_data->mutex, ZBX_MUTEX_PROXY_BUFFER, error)) goto out; zbx_list_create_ext(&pb_data->history, __pb_shmem_malloc_func, __pb_shmem_free_func); zbx_list_create_ext(&pb_data->discovery, __pb_shmem_malloc_func, __pb_shmem_free_func); zbx_list_create_ext(&pb_data->autoreg, __pb_shmem_malloc_func, __pb_shmem_free_func); zbx_vector_uint64_create_ext(&pb_data->history_handleids, __pb_shmem_malloc_func, __pb_shmem_realloc_func, __pb_shmem_free_func); /* preallocate handle tracking vector to avoid handling memory allocation errors later */ zbx_vector_uint64_reserve(&pb_data->history_handleids, 100); pb_data->mode = mode; pb_data->max_age = age; pb_data->offline_buffer = offline_buffer; ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s(): %s", __func__, ZBX_NULL2EMPTY_STR(*error)); return ret; } /****************************************************************************** * * * Purpose: initialize proxy buffer * * * ******************************************************************************/ void zbx_pb_init(void) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); pb_init_state(pb_data); zabbix_log(LOG_LEVEL_DEBUG, "End of %s(): state:%d", __func__, pb_data->state); } /****************************************************************************** * * * Purpose: destroy proxy buffer * * * ******************************************************************************/ void zbx_pb_destroy(void) { zbx_mutex_destroy(&pb_data->mutex); zbx_shmem_destroy(pb_mem); } /****************************************************************************** * * * Purpose: parse proxy buffer mode configuration string * * * ******************************************************************************/ int zbx_pb_parse_mode(const char *str, int *mode) { if (NULL == str || '\0' == *str || 0 == strcmp(str, "disk")) *mode = ZBX_PB_MODE_DISK; else if (0 == strcmp(str, "memory")) *mode = ZBX_PB_MODE_MEMORY; else if (0 == strcmp(str, "hybrid")) *mode = ZBX_PB_MODE_HYBRID; else return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: update proxy buffer state based on records left and handles * * pending * * * ******************************************************************************/ void zbx_pb_update_state(int more) { zabbix_log(LOG_LEVEL_DEBUG, "In %s() more:%d", __func__, more); pb_lock(); pb_update_state(pb_data, more); pb_unlock(); zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); } void zbx_pb_disable(void) { pb_lock(); if (ZBX_PB_MODE_HYBRID == pb_data->mode) { pb_data->state = PB_DATABASE; pb_data->mode = ZBX_PB_MODE_DISK; } pb_unlock(); } /****************************************************************************** * * * Purpose: flush cache to database * * * ******************************************************************************/ void zbx_pb_flush(void) { pb_flush(pb_data); } /****************************************************************************** * * * Purpose: get proxy buffer memory information * * * ******************************************************************************/ int zbx_pb_get_mem_info(zbx_pb_mem_info_t *info, char **error) { if (ZBX_PB_MODE_DISK == pb_data->mode) { *error = zbx_strdup(NULL, "Proxy memory buffer is disabled."); return FAIL; } pb_lock(); info->mem_total = pb_mem->total_size; info->mem_used = pb_mem->total_size - pb_mem->free_size; pb_unlock(); return SUCCEED; } /****************************************************************************** * * * Purpose: get proxy buffer state information * * * ******************************************************************************/ void zbx_pb_get_state_info(zbx_pb_state_info_t *info) { if (ZBX_PB_MODE_DISK == pb_data->mode) { info->changes_num = 0; info->state = 0; } else { pb_lock(); info->state = (PB_MEMORY == pb_dst[pb_data->state] ? 1 : 0); info->changes_num = pb_data->changes_num; pb_unlock(); } } /****************************************************************************** * * * Purpose: get shared memory allocator statistics * * * ******************************************************************************/ void zbx_pb_get_mem_stats(zbx_shmem_stats_t *stats) { pb_lock(); zbx_shmem_get_stats(pb_mem, stats); pb_unlock(); } void pb_add_json_field(struct zbx_json *j, zbx_history_table_t *history_table, const char *fld_name, void *value, int type) { union { int val_int; zbx_uint64_t val_u64; } def; zbx_history_field_t *fld = history_table->fields; while (NULL != fld->field && 0 != strcmp(fld->field, fld_name)) fld++; if (NULL == fld->field ) { zabbix_log(LOG_LEVEL_WARNING, "%s() error of table:%s unknown field name:%s", __func__, history_table->table, fld_name); THIS_SHOULD_NEVER_HAPPEN; return; } switch (type) { case ZBX_TYPE_CHAR: if (NULL != fld->default_value && (0 == strcmp(fld->default_value, *(char**)value))) break; zbx_json_addstring(j, fld->tag, *(char**)value, ZBX_JSON_TYPE_STRING); break; case ZBX_TYPE_INT: if (NULL != fld->default_value && (SUCCEED == zbx_is_int(fld->default_value, &def.val_int) && def.val_int == *(int*)value)) { break; } zbx_json_addint64(j, fld->tag, *(int*)value); break; case ZBX_TYPE_UINT: if (NULL != fld->default_value && (SUCCEED == zbx_is_uint64(fld->default_value, &def.val_u64) && def.val_u64 == *(zbx_uint64_t*)value)) { break; } zbx_json_adduint64(j, fld->tag, *(zbx_uint64_t*)value); break; default: zabbix_log(LOG_LEVEL_WARNING, "%s() unsupported type:%d", __func__, type); THIS_SHOULD_NEVER_HAPPEN; } }