/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "zbxhistory.h" #include "history.h" #include "zbxalgo.h" #include "zbxdbhigh.h" #include "zbxcacheconfig.h" typedef struct { unsigned char initialized; zbx_vector_ptr_t dbinserts; } zbx_sql_writer_t; static zbx_sql_writer_t writer; typedef void (*vc_str2value_func_t)(zbx_history_value_t *value, zbx_db_row_t row); /* history table data */ typedef struct { /* table name */ const char *name; /* field list */ const char *fields; /* string to value converter function, used to convert string value of DB row */ /* to the value of appropriate type */ vc_str2value_func_t rtov; } zbx_vc_history_table_t; /* row to value converters for all value types */ static void row2value_str(zbx_history_value_t *value, zbx_db_row_t row) { value->str = zbx_strdup(NULL, row[0]); } static void row2value_dbl(zbx_history_value_t *value, zbx_db_row_t row) { value->dbl = atof(row[0]); } static void row2value_ui64(zbx_history_value_t *value, zbx_db_row_t row) { ZBX_STR2UINT64(value->ui64, row[0]); } /* timestamp, logeventid, severity, source, value */ static void row2value_log(zbx_history_value_t *value, zbx_db_row_t row) { value->log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t)); value->log->timestamp = atoi(row[0]); value->log->logeventid = atoi(row[1]); value->log->severity = atoi(row[2]); value->log->source = '\0' == *row[3] ? NULL : zbx_strdup(NULL, row[3]); value->log->value = zbx_strdup(NULL, row[4]); } /* value_type - history table data mapping */ static zbx_vc_history_table_t vc_history_tables[] = { {"history", "value", row2value_dbl}, {"history_str", "value", row2value_str}, {"history_log", "timestamp,logeventid,severity,source,value", row2value_log}, {"history_uint", "value", row2value_ui64}, {"history_text", "value", row2value_str}, {"history_bin", "value", row2value_str} }; /****************************************************************************************************************** * * * common sql service support * * * ******************************************************************************************************************/ /************************************************************************************ * * * Purpose: initializes sql writer for a new batch of history values * * * ************************************************************************************/ static void sql_writer_init(void) { if (0 != writer.initialized) return; zbx_vector_ptr_create(&writer.dbinserts); writer.initialized = 1; } /************************************************************************************ * * * Purpose: releases initialized sql writer by freeing allocated resources and * * setting its state to uninitialized. * * * ************************************************************************************/ static void sql_writer_release(void) { int i; for (i = 0; i < writer.dbinserts.values_num; i++) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)writer.dbinserts.values[i]; zbx_db_insert_clean(db_insert); zbx_free(db_insert); } zbx_vector_ptr_clear(&writer.dbinserts); zbx_vector_ptr_destroy(&writer.dbinserts); writer.initialized = 0; } /************************************************************************************ * * * Purpose: adds bulk insert data to be flushed later * * * * Parameters: db_insert - [IN] bulk insert data * * * ************************************************************************************/ static void sql_writer_add_dbinsert(zbx_db_insert_t *db_insert) { sql_writer_init(); zbx_vector_ptr_append(&writer.dbinserts, db_insert); } /************************************************************************************ * * * Purpose: flushes bulk insert data into database * * * ************************************************************************************/ static int sql_writer_flush(void) { int i, txn_error; /* The writer might be uninitialized only if the history */ /* was already flushed. In that case, return SUCCEED */ if (0 == writer.initialized) return SUCCEED; do { zbx_db_begin(); for (i = 0; i < writer.dbinserts.values_num; i++) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)writer.dbinserts.values[i]; zbx_db_insert_execute(db_insert); } } while (ZBX_DB_DOWN == (txn_error = zbx_db_commit())); sql_writer_release(); if (ZBX_DB_OK == txn_error) { return FLUSH_SUCCEED; } else { if (ZBX_DB_FAIL == txn_error && ERR_Z3008 == zbx_db_last_errcode()) return FLUSH_DUPL_REJECTED; return FLUSH_FAIL; } } /****************************************************************************************************************** * * * database writing support * * * ******************************************************************************************************************/ static void add_history_dbl(const zbx_vector_dc_history_ptr_t *history) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)zbx_malloc(NULL, sizeof(zbx_db_insert_t)); zbx_db_insert_prepare(db_insert, "history", "itemid", "clock", "ns", "value", (char *)NULL); for (int i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; if (ITEM_VALUE_TYPE_FLOAT != h->value_type) continue; zbx_db_insert_add_values(db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.dbl); } sql_writer_add_dbinsert(db_insert); } static void add_history_uint(const zbx_vector_dc_history_ptr_t *history) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)zbx_malloc(NULL, sizeof(zbx_db_insert_t)); zbx_db_insert_prepare(db_insert, "history_uint", "itemid", "clock", "ns", "value", (char *)NULL); for (int i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; if (ITEM_VALUE_TYPE_UINT64 != h->value_type) continue; zbx_db_insert_add_values(db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.ui64); } sql_writer_add_dbinsert(db_insert); } static void add_history_str(const zbx_vector_dc_history_ptr_t *history) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)zbx_malloc(NULL, sizeof(zbx_db_insert_t)); zbx_db_insert_prepare(db_insert, "history_str", "itemid", "clock", "ns", "value", (char *)NULL); for (int i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; if (ITEM_VALUE_TYPE_STR != h->value_type) continue; zbx_db_insert_add_values(db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.str); } sql_writer_add_dbinsert(db_insert); } static void add_history_text(const zbx_vector_dc_history_ptr_t *history) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)zbx_malloc(NULL, sizeof(zbx_db_insert_t)); zbx_db_insert_prepare(db_insert, "history_text", "itemid", "clock", "ns", "value", (char *)NULL); for (int i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; if (ITEM_VALUE_TYPE_TEXT != h->value_type) continue; zbx_db_insert_add_values(db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.str); } sql_writer_add_dbinsert(db_insert); } static void add_history_log(const zbx_vector_dc_history_ptr_t *history) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)zbx_malloc(NULL, sizeof(zbx_db_insert_t)); zbx_db_insert_prepare(db_insert, "history_log", "itemid", "clock", "ns", "timestamp", "source", "severity", "value", "logeventid", (char *)NULL); for (int i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; const zbx_log_value_t *log; if (ITEM_VALUE_TYPE_LOG != h->value_type) continue; log = h->value.log; zbx_db_insert_add_values(db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp, ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid); } sql_writer_add_dbinsert(db_insert); } static void add_history_bin(const zbx_vector_dc_history_ptr_t *history) { zbx_db_insert_t *db_insert = (zbx_db_insert_t *)zbx_malloc(NULL, sizeof(zbx_db_insert_t)); zbx_db_insert_prepare(db_insert, "history_bin", "itemid", "clock", "ns", "value", (char *)NULL); for (int i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; if (ITEM_VALUE_TYPE_BIN != h->value_type) continue; zbx_db_insert_add_values(db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.str); } sql_writer_add_dbinsert(db_insert); } /****************************************************************************************************************** * * * database reading support * * * ******************************************************************************************************************/ /********************************************************************************* * * * Purpose: reads item history data from database * * * * Parameters: itemid - [IN] the itemid * * value_type - [IN] the value type (see ITEM_VALUE_TYPE_* defs) * * values - [OUT] the item history data values * * seconds - [IN] the time period to read * * end_timestamp - [IN] the value timestamp to start reading with * * * * Return value: SUCCEED - the history data were read successfully * * FAIL - otherwise * * * * Comments: This function reads all values with timestamps in range: * * end_timestamp - seconds < <value timestamp> <= end_timestamp * * * *********************************************************************************/ static int db_read_values_by_time(zbx_uint64_t itemid, int value_type, zbx_vector_history_record_t *values, int seconds, int end_timestamp) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; zbx_db_result_t result; zbx_db_row_t row; zbx_vc_history_table_t *table = &vc_history_tables[value_type]; time_t time_from; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select clock,ns,%s" " from %s" " where itemid=" ZBX_FS_UI64, table->fields, table->name, itemid); time_from = end_timestamp - seconds; zbx_recalc_time_period(&time_from, ZBX_RECALC_TIME_PERIOD_HISTORY); if (ZBX_JAN_2038 == end_timestamp) { zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " and clock>" ZBX_FS_I64, time_from); } else if (1 == seconds) { if (time_from != end_timestamp - seconds) { zbx_free(sql); goto out; } zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " and clock=%d", end_timestamp); } else { zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " and clock>" ZBX_FS_I64 " and clock<=%d", time_from, end_timestamp); } result = zbx_db_select("%s", sql); zbx_free(sql); if (NULL == result) goto out; while (NULL != (row = zbx_db_fetch(result))) { zbx_history_record_t value; value.timestamp.sec = atoi(row[0]); value.timestamp.ns = atoi(row[1]); table->rtov(&value.value, row + 2); zbx_vector_history_record_append_ptr(values, &value); } zbx_db_free_result(result); out: return SUCCEED; } /************************************************************************************ * * * Purpose: reads item history data from database * * * * Parameters: itemid - [IN] the itemid * * value_type - [IN] the value type (see ITEM_VALUE_TYPE_* defs) * * values - [OUT] the item history data values * * count - [IN] the number of values to read + 1 * * end_timestamp - [IN] the value timestamp to start reading with * * * * Return value: SUCCEED - the history data were read successfully * * FAIL - otherwise * * * * Comments: this function reads <count> values before <count_timestamp> (including)* * plus all values in range: * * count_timestamp < <value timestamp> <= read_timestamp * * * * To speed up the reading time with huge data loads, data is read by * * smaller time segments (hours, day, week, month) and the next (larger) * * time segment is read only if the requested number of values (<count>) * * is not yet retrieved. * * * ************************************************************************************/ static int db_read_values_by_count(zbx_uint64_t itemid, int value_type, zbx_vector_history_record_t *values, int count, int end_timestamp) { time_t clock_from; char *sql = NULL; size_t sql_alloc = 0, sql_offset; int clock_to, step = 0, ret = FAIL; zbx_db_result_t result; zbx_db_row_t row; zbx_vc_history_table_t *table = &vc_history_tables[value_type]; const int periods[] = {SEC_PER_HOUR, 12 * SEC_PER_HOUR, SEC_PER_DAY, SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, 0, -1}; clock_to = end_timestamp; while (-1 != periods[step] && 1 < count) { if (0 > (clock_from = clock_to - periods[step])) { clock_from = clock_to; step = ARRSIZE(periods) - 1; } sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select clock,ns,%s" " from %s" " where itemid=" ZBX_FS_UI64 " and clock<=%d", table->fields, table->name, itemid, clock_to); if (clock_from != clock_to) { zbx_recalc_time_period(&clock_from, ZBX_RECALC_TIME_PERIOD_HISTORY); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " and clock>" ZBX_FS_I64, clock_from); } zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " order by clock desc"); result = zbx_db_select_n(sql, count); if (NULL == result) goto out; while (NULL != (row = zbx_db_fetch(result))) { zbx_history_record_t value; value.timestamp.sec = atoi(row[0]); value.timestamp.ns = atoi(row[1]); table->rtov(&value.value, row + 2); zbx_vector_history_record_append_ptr(values, &value); count--; } zbx_db_free_result(result); clock_to -= periods[step]; step++; } if (0 < count) { /* no more data in database, return success */ ret = SUCCEED; goto out; } /* drop data from the last second and read the whole second again */ /* to ensure that data is cached by seconds */ end_timestamp = values->values[values->values_num - 1].timestamp.sec; while (0 < values->values_num && values->values[values->values_num - 1].timestamp.sec == end_timestamp) { values->values_num--; zbx_history_record_clear(&values->values[values->values_num], value_type); } ret = db_read_values_by_time(itemid, value_type, values, 1, end_timestamp); out: zbx_free(sql); return ret; } /************************************************************************************ * * * Purpose: reads item history data from database * * * * Parameters: itemid - [IN] the itemid * * value_type - [IN] the value type (see ITEM_VALUE_TYPE_* defs) * * values - [OUT] the item history data values * * seconds - [IN] the time period to read * * count - [IN] the number of values to read * * end_timestamp - [IN] the value timestamp to start reading with * * * * Return value: SUCCEED - the history data were read successfully * * FAIL - otherwise * * * * Comments: this function reads <count> values from <seconds> period before * * <count_timestamp> (including) plus all values in range: * * count_timestamp < <value timestamp> <= read_timestamp * * * ************************************************************************************/ static int db_read_values_by_time_and_count(zbx_uint64_t itemid, int value_type, zbx_vector_history_record_t *values, int seconds, int count, int end_timestamp) { int ret = FAIL; char *sql = NULL; size_t sql_alloc = 0, sql_offset; zbx_db_result_t result; zbx_db_row_t row; zbx_vc_history_table_t *table = &vc_history_tables[value_type]; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select clock,ns,%s" " from %s" " where itemid=" ZBX_FS_UI64, table->fields, table->name, itemid); if (1 == seconds) { zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " and clock=%d", end_timestamp); } else { zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " and clock>%d and clock<=%d order by clock desc", end_timestamp - seconds, end_timestamp); } result = zbx_db_select_n(sql, count); zbx_free(sql); if (NULL == result) goto out; while (NULL != (row = zbx_db_fetch(result)) && 0 < count--) { zbx_history_record_t value; value.timestamp.sec = atoi(row[0]); value.timestamp.ns = atoi(row[1]); table->rtov(&value.value, row + 2); zbx_vector_history_record_append_ptr(values, &value); } zbx_db_free_result(result); if (0 < count) { /* no more data in the specified time period, return success */ ret = SUCCEED; goto out; } /* Drop data from the last second and read the whole second again */ /* to ensure that data is cached by seconds. */ /* Because the initial select has limit option (zbx_db_select_n()) */ /* we have to perform another select to read the last second data. */ end_timestamp = values->values[values->values_num - 1].timestamp.sec; while (0 < values->values_num && values->values[values->values_num - 1].timestamp.sec == end_timestamp) { values->values_num--; zbx_history_record_clear(&values->values[values->values_num], value_type); } ret = db_read_values_by_time(itemid, value_type, values, 1, end_timestamp); out: zbx_free(sql); return ret; } /****************************************************************************************************************** * * * history interface support * * * ******************************************************************************************************************/ /************************************************************************************ * * * Purpose: destroys history storage interface * * * * Parameters: hist - [IN] the history storage interface * * * ************************************************************************************/ static void sql_destroy(zbx_history_iface_t *hist) { ZBX_UNUSED(hist); } /************************************************************************************ * * * Purpose: gets item history data from history storage * * * * Parameters: hist - [IN] the history storage interface * * itemid - [IN] the itemid * * start - [IN] the period start timestamp * * count - [IN] the number of values to read * * end - [IN] the period end timestamp * * values - [OUT] the item history data values * * * * Return value: SUCCEED - the history data were read successfully * * FAIL - otherwise * * * * Comments: This function reads <count> values from ]<start>,<end>] interval or * * all values from the specified interval if count is zero. * * * ************************************************************************************/ static int sql_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end, zbx_vector_history_record_t *values) { if (0 == count) return db_read_values_by_time(itemid, hist->value_type, values, end - start, end); if (0 == start) return db_read_values_by_count(itemid, hist->value_type, values, count, end); return db_read_values_by_time_and_count(itemid, hist->value_type, values, end - start, count, end); } /********************************************************************************************** * * * Purpose: sends history data to storage * * * * Parameters: * * hist - [IN] history storage interface * * history - [IN] history data vector (may have mixed value types) * * config_history_storage_pipelines - [IN] is unused, but signature must contain it to be * * compatible with elastic version of _add_values * * * *********************************************************************************************/ static int sql_add_values(zbx_history_iface_t *hist, const zbx_vector_dc_history_ptr_t *history, int config_history_storage_pipelines) { int i, h_num = 0; ZBX_UNUSED(config_history_storage_pipelines); for (i = 0; i < history->values_num; i++) { const zbx_dc_history_t *h = history->values[i]; if (h->value_type == hist->value_type) h_num++; } if (0 != h_num) hist->data.sql_history_func(history); return h_num; } /************************************************************************************ * * * Purpose: flushes the history data to storage * * * * Parameters: hist - [IN] the history storage interface * * * * Comments: This function will try to flush the data until it succeeds or * * unrecoverable error occurs * * * ************************************************************************************/ static int sql_flush(zbx_history_iface_t *hist) { ZBX_UNUSED(hist); return sql_writer_flush(); } /************************************************************************************ * * * Purpose: initializes history storage interface * * * * Parameters: hist - [IN] history storage interface * * value_type - [IN] target value type * * * ************************************************************************************/ void zbx_history_sql_init(zbx_history_iface_t *hist, unsigned char value_type) { hist->value_type = value_type; hist->destroy = sql_destroy; hist->add_values = sql_add_values; hist->flush = sql_flush; hist->get_values = sql_get_values; switch (value_type) { case ITEM_VALUE_TYPE_FLOAT: hist->data.sql_history_func = add_history_dbl; break; case ITEM_VALUE_TYPE_UINT64: hist->data.sql_history_func = add_history_uint; break; case ITEM_VALUE_TYPE_STR: hist->data.sql_history_func = add_history_str; break; case ITEM_VALUE_TYPE_TEXT: hist->data.sql_history_func = add_history_text; break; case ITEM_VALUE_TYPE_LOG: hist->data.sql_history_func = add_history_log; break; case ITEM_VALUE_TYPE_BIN: hist->data.sql_history_func = add_history_bin; break; case ITEM_VALUE_TYPE_NONE: default: THIS_SHOULD_NEVER_HAPPEN; exit(EXIT_FAILURE); } hist->requires_trends = 1; }