/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "pb_history.h" #include "proxybuffer.h" #include "zbx_host_constants.h" #include "zbx_item_constants.h" #include "zbxcacheconfig.h" #include "zbxcachehistory.h" #include "zbxcommon.h" #include "zbxdb.h" #include "zbxdbhigh.h" #include "zbxjson.h" #include "zbxnum.h" #include "zbxproxybuffer.h" #include "zbxshmem.h" #include "zbxtime.h" static void pb_history_add_rows_db(zbx_list_t *rows, zbx_list_item_t *next, zbx_uint64_t *lastid); struct zbx_pb_history_data { zbx_pb_state_t state; zbx_list_t rows; int rows_num; zbx_db_insert_t db_insert; zbx_uint64_t handleid; }; void pb_list_free_history(zbx_list_t *list, zbx_pb_history_t *row) { if (NULL != row->value) list->mem_free_func(row->value); if (NULL != row->source) list->mem_free_func(row->source); list->mem_free_func(row); } /****************************************************************************** * * * Purpose: estimate approximate history row size in cache * * * ******************************************************************************/ size_t pb_history_estimate_row_size(const char *value, const char *source) { size_t size = 0; size += zbx_shmem_required_chunk_size(sizeof(zbx_pb_history_t)); size += zbx_shmem_required_chunk_size(sizeof(zbx_list_item_t)); size += zbx_shmem_required_chunk_size(strlen(value) + 1); size += zbx_shmem_required_chunk_size(strlen(source) + 1); return size; } static void pb_history_free(zbx_pb_history_t *row) { if (0 == (row->flags & ZBX_PROXY_HISTORY_FLAG_NOVALUE)) { zbx_free(row->value); zbx_free(row->source); } zbx_free(row); } static void pb_history_add_value(zbx_pb_history_data_t *data, zbx_uint64_t itemid, int state, const char *value, const zbx_timespec_t *ts, int flags, zbx_uint64_t lastlogsize, int mtime, int timestamp, int logeventid, int severity, const char *source, time_t now) { if (PB_MEMORY == data->state) { zbx_pb_history_t *row; row = (zbx_pb_history_t *)zbx_malloc(NULL, sizeof(zbx_pb_history_t)); row->id = 0; row->itemid = itemid; row->state = state; row->ts = *ts; row->flags = flags; row->lastlogsize = lastlogsize; row->mtime = mtime; row->timestamp = timestamp; row->logeventid = logeventid; row->severity = severity; row->value = zbx_strdup(NULL, value); row->source = zbx_strdup(NULL, source); row->write_clock = now; zbx_list_append(&data->rows, row, NULL); data->rows_num++; } else { zbx_db_insert_add_values(&data->db_insert, __UINT64_C(0), itemid, ts->sec, timestamp, source, severity, value, logeventid, ts->ns, state, lastlogsize, mtime, flags, (int)now); } } /****************************************************************************** * * * Purpose: read proxy history data from the database * * * * Parameters: lastid - [IN] id of last processed proxy history * * record * * rows - [OUT] read proxy history rows * * more - [OUT] set to ZBX_PROXY_DATA_MORE if there * * might be more data to read * * * * Return value: The number of records read. * * * ******************************************************************************/ static int pb_history_get_rows_db(zbx_uint64_t lastid, zbx_vector_pb_history_ptr_t *rows, int *more) { zbx_db_result_t result; zbx_db_row_t row; char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_uint64_t id, gapid = 0; zbx_pb_history_t *hist; zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, lastid); try_again: zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select id,itemid,clock,ns,timestamp,source,severity," "value,logeventid,state,lastlogsize,mtime,flags" " from proxy_history" " where id>" ZBX_FS_UI64 " order by id", lastid); result = zbx_db_select_n(sql, ZBX_MAX_HRECORDS - rows->values_num); zbx_free(sql); while (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(id, row[0]); if (1 < id - lastid) { /* At least one record is missing. It can happen if some DB syncer process has */ /* started but not yet committed a transaction or a rollback occurred in a DB syncer. */ if (id != gapid) { zbx_db_free_result(result); gapid = id; pb_wait_handles(&get_pb_data()->history_handleids); goto try_again; } else { zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing. No more retries.", __func__, id - lastid - 1); } } hist = (zbx_pb_history_t *)zbx_malloc(NULL, sizeof(zbx_pb_history_t)); hist->id = id; ZBX_STR2UINT64(hist->itemid, row[1]); ZBX_STR2UCHAR(hist->flags, row[12]); hist->ts.sec = atoi(row[2]); hist->ts.ns = atoi(row[3]); if (ZBX_PROXY_HISTORY_FLAG_NOVALUE != (hist->flags & ZBX_PROXY_HISTORY_MASK_NOVALUE)) { ZBX_STR2UCHAR(hist->state, row[9]); if (0 == (hist->flags & ZBX_PROXY_HISTORY_FLAG_NOVALUE)) { hist->timestamp = atoi(row[4]); hist->severity = atoi(row[6]); hist->logeventid = atoi(row[8]); hist->source = zbx_strdup(NULL, row[5]); hist->value = zbx_strdup(NULL, row[7]); } if (0 != (hist->flags & ZBX_PROXY_HISTORY_FLAG_META)) { ZBX_STR2UINT64(hist->lastlogsize, row[10]); hist->mtime = atoi(row[11]); } } zbx_vector_pb_history_ptr_append(rows, hist); lastid = id; } zbx_db_free_result(result); if (ZBX_MAX_HRECORDS != rows->values_num) *more = ZBX_PROXY_DATA_DONE; zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows:%d", __func__, rows->values_num); return rows->values_num; } /****************************************************************************** * * * Purpose: add history records to output json * * * * Parameters: j - [IN/OUT] json output buffer * * rows - [IN] history rows to export * * lastid - [OUT] id of last added record * * * * Return value: The total number of records exported. * * * ******************************************************************************/ static int pb_history_export(struct zbx_json *j, int records_num, const zbx_vector_pb_history_ptr_t *rows, zbx_uint64_t *lastid) { int i, *errcodes; zbx_pb_history_t *row; zbx_vector_pb_history_ptr_t records; zbx_vector_uint64_t itemids; zbx_hashset_t nodata_itemids; zbx_dc_item_t *dc_items; zbx_vector_pb_history_ptr_create(&records); zbx_vector_pb_history_ptr_reserve(&records, (size_t)rows->values_num); zbx_vector_uint64_create(&itemids); zbx_vector_uint64_reserve(&itemids, (size_t)rows->values_num); zbx_hashset_create(&nodata_itemids, (size_t)rows->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); /* filter out duplicate novalue updates */ for (i = rows->values_num - 1; i >= 0; i--) { if (ZBX_PROXY_HISTORY_FLAG_NOVALUE == (rows->values[i]->flags & ZBX_PROXY_HISTORY_MASK_NOVALUE)) { if (NULL != zbx_hashset_search(&nodata_itemids, &rows->values[i]->itemid)) continue; zbx_hashset_insert(&nodata_itemids, &rows->values[i]->itemid, sizeof(rows->values[i]->itemid)); } zbx_vector_pb_history_ptr_append(&records, rows->values[i]); zbx_vector_uint64_append(&itemids, rows->values[i]->itemid); } dc_items = (zbx_dc_item_t *)zbx_malloc(NULL, (size_t)records.values_num * sizeof(zbx_dc_item_t)); errcodes = (int *)zbx_malloc(NULL, (size_t)records.values_num * sizeof(int)); zbx_dc_config_get_items_by_itemids(dc_items, itemids.values, errcodes, (size_t)itemids.values_num); for (i = records.values_num - 1; i >= 0; i--) { row = records.values[i]; *lastid = row->id; if (SUCCEED != errcodes[i]) continue; if (ITEM_STATUS_ACTIVE != dc_items[i].status) continue; if (HOST_STATUS_MONITORED != dc_items[i].host.status) continue; if (0 == records_num) zbx_json_addarray(j, ZBX_PROTO_TAG_HISTORY_DATA); zbx_json_addobject(j, NULL); zbx_json_adduint64(j, ZBX_PROTO_TAG_ID, row->id); zbx_json_adduint64(j, ZBX_PROTO_TAG_ITEMID, row->itemid); zbx_json_addint64(j, ZBX_PROTO_TAG_CLOCK, row->ts.sec); zbx_json_addint64(j, ZBX_PROTO_TAG_NS, row->ts.ns); if (ZBX_PROXY_HISTORY_FLAG_NOVALUE != (row->flags & ZBX_PROXY_HISTORY_MASK_NOVALUE)) { if (ITEM_STATE_NORMAL != row->state) zbx_json_addint64(j, ZBX_PROTO_TAG_STATE, row->state); if (0 == (row->flags & ZBX_PROXY_HISTORY_FLAG_NOVALUE)) { if (0 != row->timestamp) zbx_json_addint64(j, ZBX_PROTO_TAG_LOGTIMESTAMP, row->timestamp); if ('\0' != *row->source) { zbx_json_addstring(j, ZBX_PROTO_TAG_LOGSOURCE, row->source, ZBX_JSON_TYPE_STRING); } if (0 != row->severity) zbx_json_addint64(j, ZBX_PROTO_TAG_LOGSEVERITY, row->severity); if (0 != row->logeventid) zbx_json_addint64(j, ZBX_PROTO_TAG_LOGEVENTID, row->logeventid); zbx_json_addstring(j, ZBX_PROTO_TAG_VALUE, row->value, ZBX_JSON_TYPE_STRING); } if (0 != (row->flags & ZBX_PROXY_HISTORY_FLAG_META)) { zbx_json_adduint64(j, ZBX_PROTO_TAG_LASTLOGSIZE, row->lastlogsize); zbx_json_addint64(j, ZBX_PROTO_TAG_MTIME, row->mtime); } } zbx_json_close(j); records_num++; /* stop gathering data to avoid exceeding the maximum packet size */ if (ZBX_DATA_JSON_RECORD_LIMIT < j->buffer_offset) break; } zbx_dc_config_clean_items(dc_items, errcodes, (size_t)itemids.values_num); zbx_free(errcodes); zbx_free(dc_items); zbx_hashset_destroy(&nodata_itemids); zbx_vector_uint64_destroy(&itemids); zbx_vector_pb_history_ptr_destroy(&records); return records_num; } static int pb_history_get_db(struct zbx_json *j, zbx_uint64_t *lastid, int *more) { int records_num = 0; zbx_uint64_t id; zbx_vector_pb_history_ptr_t rows; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_pb_history_ptr_create(&rows); *more = ZBX_PROXY_DATA_MORE; id = pb_get_lastid("proxy_history", "history_lastid"); /* get history data in batches by ZBX_MAX_HRECORDS records and stop if: */ /* 1) there are no more data to read */ /* 2) we have retrieved more than the total maximum number of records */ /* 3) we have gathered more than half of the maximum packet size */ while (ZBX_DATA_JSON_BATCH_LIMIT > j->buffer_offset && ZBX_MAX_HRECORDS_TOTAL > records_num && 0 != pb_history_get_rows_db(id, &rows, more)) { records_num = pb_history_export(j, records_num, &rows, lastid); /* got less data than requested - either no more data to read or the history is full of */ /* holes. In this case send retrieved data before attempting to read/wait for more data */ if (ZBX_MAX_HRECORDS > rows.values_num) break; id = *lastid; zbx_vector_pb_history_ptr_clear_ext(&rows, pb_history_free); } if (0 != records_num) zbx_json_close(j); zbx_vector_pb_history_ptr_clear_ext(&rows, pb_history_free); zbx_vector_pb_history_ptr_destroy(&rows); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() lastid:" ZBX_FS_UI64 " records_num:%d size:~" ZBX_FS_SIZE_T " more:%d", __func__, *lastid, records_num, j->buffer_offset, *more); return records_num; } /****************************************************************************** * * * Purpose: get history records from memory cache * * * ******************************************************************************/ static int pb_history_get_mem(zbx_pb_t *pb, struct zbx_json *j, zbx_uint64_t *lastid, int *more) { int records_num = 0; void *ptr; *more = ZBX_PROXY_DATA_DONE; if (SUCCEED == zbx_list_peek(&pb->history, &ptr)) { zbx_pb_history_t *row; zbx_vector_pb_history_ptr_t rows; zbx_list_iterator_t li; zbx_vector_pb_history_ptr_create(&rows); zbx_list_iterator_init(&pb->history, &li); while (1) { while (SUCCEED == zbx_list_iterator_next(&li)) { (void)zbx_list_iterator_peek(&li, (void **)&row); zbx_vector_pb_history_ptr_append(&rows, row); if (ZBX_MAX_HRECORDS <= rows.values_num) break; } records_num = pb_history_export(j, records_num, &rows, lastid); if (ZBX_MAX_HRECORDS != rows.values_num) break; if (ZBX_DATA_JSON_BATCH_LIMIT <= j->buffer_offset || records_num >= ZBX_MAX_HRECORDS_TOTAL) { *more = ZBX_PROXY_DATA_MORE; break; } zbx_vector_pb_history_ptr_clear(&rows); } zbx_vector_pb_history_ptr_destroy(&rows); if (0 != records_num) zbx_json_close(j); } return records_num; } /****************************************************************************** * * * Purpose: add history row to memory cache * * * * Parameters: pb - [IN] proxy buffer * * src - [IN] row to add * * * * Return value: SUCCEED - the row was cached successfully * * FAIL - not enough memory in cache * * * ******************************************************************************/ static int pb_history_add_row_mem(zbx_pb_t *pb, zbx_pb_history_t *src) { zbx_pb_history_t *row; int ret = FAIL; zabbix_log(LOG_LEVEL_TRACE, "In %s() free:" ZBX_FS_SIZE_T " request:" ZBX_FS_SIZE_T, __func__, pb_get_free_size(), pb_history_estimate_row_size(src->value, src->source)); if (NULL == (row = (zbx_pb_history_t *)pb_malloc(sizeof(zbx_pb_history_t)))) goto out; memcpy(row, src, sizeof(zbx_pb_history_t)); if (NULL == (row->value = pb_strdup(src->value))) { row->source = NULL; goto out; } if (NULL == (row->source = pb_strdup(src->source))) goto out; ret = zbx_list_append(&pb->history, row, NULL); out: if (SUCCEED == ret) pb->history_lastid_mem = row->id; else if (NULL != row) pb_list_free_history(&pb->history, row); zabbix_log(LOG_LEVEL_TRACE, "End of %s() ret:%s free:" ZBX_FS_SIZE_T , __func__, zbx_result_string(ret), pb_get_free_size()); return ret; } /****************************************************************************** * * * Purpose: set ids to new history rows * * * ******************************************************************************/ static void pb_history_set_row_ids(zbx_list_t *rows, int rows_num) { zbx_uint64_t id; zbx_pb_history_t *row; zbx_list_iterator_t li; id = zbx_dc_get_nextid("proxy_history", rows_num); zbx_list_iterator_init(rows, &li); while (SUCCEED == zbx_list_iterator_next(&li)) { (void)zbx_list_iterator_peek(&li, (void **)&row); row->id = id++; } } /****************************************************************************** * * * Purpose: add history rows to memory cache * * * * Parameters: pb - [IN] proxy buffer * * rows - [IN] rows to add * * * * Return value: NULL if all rows were added successfully. Otherwise the list * * item of first failed row is returned * * * ******************************************************************************/ static zbx_list_item_t *pb_history_add_rows_mem(zbx_pb_t *pb, zbx_list_t *rows) { zbx_list_iterator_t li; zbx_pb_history_t *row; int rows_num = 0; size_t size = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_list_iterator_init(rows, &li); while (SUCCEED == zbx_list_iterator_next(&li)) { (void)zbx_list_iterator_peek(&li, (void **)&row); while (SUCCEED != pb_history_add_row_mem(pb, row)) { if (ZBX_PB_MODE_MEMORY != pb->mode) goto out; /* in memory mode keep discarding old records until new */ /* one can be written in proxy memory buffer */ if (0 == size) size = pb_history_estimate_row_size(row->value, row->source); if (FAIL == pb_free_space(get_pb_data(), size)) { zabbix_log(LOG_LEVEL_WARNING, "history record with size " ZBX_FS_SIZE_T " is too large for proxy memory buffer, discarding", size); break; } } rows_num++; } out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows_num:%d next:%p", __func__, rows_num, (void *)li.current); return li.current; } /****************************************************************************** * * * Purpose: add history rows to database cache * * * * Parameters: rows - [IN] rows to add * * next - [IN] next row to add * * lastid - [OUT] last inserted id * * ******************************************************************************/ static void pb_history_add_rows_db(zbx_list_t *rows, zbx_list_item_t *next, zbx_uint64_t *lastid) { zbx_list_iterator_t li; zbx_pb_history_t *row; int rows_num = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s() next:%p", __func__, (void *)next); if (SUCCEED == zbx_list_iterator_init_with(rows, next, &li)) { zbx_db_insert_t db_insert; zbx_db_insert_prepare(&db_insert, "proxy_history", "id", "itemid", "clock", "timestamp", "source", "severity", "value", "logeventid", "ns", "state", "lastlogsize", "mtime", "flags", "write_clock", (char *)NULL); do { (void)zbx_list_iterator_peek(&li, (void **)&row); zbx_db_insert_add_values(&db_insert, row->id, row->itemid, row->ts.sec, row->timestamp, row->source, row->severity, row->value, row->logeventid, row->ts.ns, row->state, row->lastlogsize, row->mtime, row->flags, (int)row->write_clock); rows_num++; *lastid = row->id; } while (SUCCEED == zbx_list_iterator_next(&li)); (void)zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows_num:%d", __func__, rows_num); } void pb_history_flush(zbx_pb_t *pb) { zbx_uint64_t lastid = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); pb_history_add_rows_db(&pb->history, NULL, &lastid); if (get_pb_data()->history_lastid_db < lastid) get_pb_data()->history_lastid_db = lastid; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: clear sent history records * * * ******************************************************************************/ void pb_history_clear(zbx_pb_t *pb, zbx_uint64_t lastid) { zbx_pb_history_t *row; while (SUCCEED == zbx_list_peek(&pb->history, (void **)&row)) { if (row->id > lastid) break; zbx_list_pop(&pb->history, NULL); pb_list_free_history(&pb->history, row); } } static void pb_history_data_free(zbx_pb_history_data_t *data) { if (PB_MEMORY == data->state) { zbx_pb_history_t *row; while (SUCCEED == zbx_list_pop(&data->rows, (void **)&row)) pb_list_free_history(&data->rows, row); zbx_list_destroy(&data->rows); } else { zbx_db_insert_clean(&data->db_insert); } zbx_free(data); } /****************************************************************************** * * * Purpose: check if oldest record is within allowed age * * * ******************************************************************************/ int pb_history_check_age(zbx_pb_t *pb) { zbx_pb_history_t *row; int now; now = time(NULL); while (SUCCEED == zbx_list_peek(&pb->history, (void **)&row)) { if (now - row->write_clock <= (time_t)pb->offline_buffer) break; zbx_list_pop(&pb->history, NULL); pb_list_free_history(&pb->history, row); } if (0 == pb->max_age) return SUCCEED; if (SUCCEED != zbx_list_peek(&pb->history, (void **)&row) || time(NULL) - row->write_clock < pb->max_age) return SUCCEED; return FAIL; } /****************************************************************************** * * * Purpose: write history last sent record id to database * * * ******************************************************************************/ void pb_history_set_lastid(zbx_uint64_t lastid) { pb_set_lastid("proxy_history", "history_lastid", lastid); } /****************************************************************************** * * * Purpose: check if history rows are cached in memory buffer * * * ******************************************************************************/ int pb_history_has_mem_rows(zbx_pb_t *pb) { void *ptr; return zbx_list_peek(&pb->history, &ptr); } /* public api */ /****************************************************************************** * * * Purpose: open history data cache * * * * Return value: The history data cache handle * * * ******************************************************************************/ zbx_pb_history_data_t *zbx_pb_history_open(void) { zbx_pb_history_data_t *data; zbx_pb_t *pb_data = get_pb_data(); zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); data = (zbx_pb_history_data_t *)zbx_malloc(NULL, sizeof(zbx_pb_history_data_t)); pb_lock(); data->handleid = pb_register_handle(pb_data, &(pb_data->history_handleids)); if (PB_DATABASE == (data->state = get_pb_dst(pb_data->state))) pb_data->db_handles_num++; pb_unlock(); if (PB_MEMORY == data->state) { zbx_list_create(&data->rows); data->rows_num = 0; } if (PB_DATABASE == get_pb_dst(data->state)) { zbx_db_insert_prepare(&data->db_insert, "proxy_history", "id", "itemid", "clock", "timestamp", "source", "severity", "value", "logeventid", "ns", "state", "lastlogsize", "mtime", "flags", "write_clock", (char *)NULL); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return data; } /****************************************************************************** * * * Purpose: flush the cached history data and free the handle * * * **********************************************************************/ void zbx_pb_history_close(zbx_pb_history_data_t *data) { zbx_uint64_t lastid = 0; zbx_pb_t *pb_data = get_pb_data(); zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (PB_MEMORY == data->state) { zbx_list_item_t *next = NULL; pb_lock(); if (0 == data->rows_num) goto out; pb_history_set_row_ids(&data->rows, data->rows_num); if (PB_MEMORY == pb_data->state && SUCCEED != pb_history_check_age(pb_data)) { pd_fallback_to_database(pb_data, "cached records are too old"); } else if (PB_MEMORY == get_pb_dst(pb_data->state)) { if (NULL == (next = pb_history_add_rows_mem(pb_data, &data->rows))) goto out; if (PB_DATABASE_MEMORY == pb_data->state) { pd_fallback_to_database(pb_data, "not enough space to complete transition to" " memory mode"); } else { /* initiate transition to database cache */ pb_set_state(pb_data, PB_MEMORY_DATABASE, "not enough space"); } } /* not all rows were added to memory cache - flush them to database */ pb_data->db_handles_num++; pb_unlock(); do { zbx_db_begin(); pb_history_add_rows_db(&data->rows, next, &lastid); } while (ZBX_DB_DOWN == zbx_db_commit()); } else { zbx_db_insert_autoincrement(&data->db_insert, "id"); do { zbx_db_begin(); (void)zbx_db_insert_execute(&data->db_insert); } while (ZBX_DB_DOWN == zbx_db_commit()); lastid = zbx_db_insert_get_lastid(&data->db_insert); } pb_lock(); if (pb_data->history_lastid_db < lastid) pb_data->history_lastid_db = lastid; pb_data->db_handles_num--; out: pb_deregister_handle(&(pb_data->history_handleids), data->handleid); pb_unlock(); pb_history_data_free(data); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: write normal value into history data cache * * * ******************************************************************************/ void zbx_pb_history_write_value(zbx_pb_history_data_t *data, zbx_uint64_t itemid, int state, const char *value, const zbx_timespec_t *ts, int flags, time_t now) { pb_history_add_value(data, itemid, state, value, ts, flags, 0, 0, 0, 0, 0, "", now); } /****************************************************************************** * * * Purpose: write value with metadata into history data cache * * * ******************************************************************************/ void zbx_pb_history_write_meta_value(zbx_pb_history_data_t *data, zbx_uint64_t itemid, int state, const char *value, const zbx_timespec_t *ts, int flags, zbx_uint64_t lastlogsize, int mtime, int timestamp, int logeventid, int severity, const char *source, time_t now) { pb_history_add_value(data, itemid, state, value, ts, flags, lastlogsize, mtime, timestamp, logeventid, severity, source, now); } /****************************************************************************** * * * Purpose: get history data for sending to server * * * ******************************************************************************/ int zbx_pb_history_get_rows(struct zbx_json *j, zbx_uint64_t *lastid, int *more) { int state, ret; zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, *lastid); pb_lock(); if (PB_MEMORY == (state = get_pb_src(get_pb_data()->state))) ret = pb_history_get_mem(get_pb_data(), j, lastid, more); pb_unlock(); if (PB_MEMORY != state) ret = pb_history_get_db(j, lastid, more); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows:%d", __func__, ret); return ret; } /****************************************************************************** * * * Purpose: update database lastid/clear memory records * * * ******************************************************************************/ void zbx_pb_set_history_lastid(const zbx_uint64_t lastid) { int state; zbx_pb_t *pb_data = get_pb_data(); zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, lastid); pb_lock(); pb_data->history_lastid_sent = lastid; if (PB_MEMORY == (state = get_pb_src(pb_data->state))) pb_history_clear(pb_data, lastid); pb_unlock(); if (PB_DATABASE == state) pb_history_set_lastid(lastid); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: return number of unsent history rows * * * ******************************************************************************/ zbx_uint64_t zbx_pb_history_get_unsent_num(void) { zbx_uint64_t lastid_sent, lastid; zbx_pb_t *pb_data = get_pb_data(); pb_lock(); lastid_sent = pb_data->history_lastid_sent; lastid = MAX(pb_data->history_lastid_db, pb_data->history_lastid_mem); pb_unlock(); return (lastid_sent < lastid ? lastid - lastid_sent : 0); }