/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ #include "zbxcachehistory.h" #include "zbxcachevalue.h" #include "log.h" #include "zbxmutexs.h" #include "zbxserver.h" #include "events.h" #include "zbxmodules.h" #include "module.h" #include "zbxexport.h" #include "zbxnix.h" #include "zbxavailability.h" #include "zbxconnector.h" #include "zbxtrends.h" #include "zbxnum.h" #include "zbxsysinfo.h" #include "zbx_host_constants.h" #include "zbx_trigger_constants.h" #include "zbx_item_constants.h" #include "zbxpreproc.h" #include "zbxtagfilter.h" static zbx_shmem_info_t *hc_index_mem = NULL; static zbx_shmem_info_t *hc_mem = NULL; static zbx_shmem_info_t *trend_mem = NULL; #define LOCK_CACHE zbx_mutex_lock(cache_lock) #define UNLOCK_CACHE zbx_mutex_unlock(cache_lock) #define LOCK_TRENDS zbx_mutex_lock(trends_lock) #define UNLOCK_TRENDS zbx_mutex_unlock(trends_lock) #define LOCK_CACHE_IDS zbx_mutex_lock(cache_ids_lock) #define UNLOCK_CACHE_IDS zbx_mutex_unlock(cache_ids_lock) static zbx_mutex_t cache_lock = ZBX_MUTEX_NULL; static zbx_mutex_t trends_lock = ZBX_MUTEX_NULL; static zbx_mutex_t cache_ids_lock = ZBX_MUTEX_NULL; static char *sql = NULL; static size_t sql_alloc = 4 * ZBX_KIBIBYTE; extern unsigned char program_type; extern int CONFIG_DOUBLE_PRECISION; #define ZBX_IDS_SIZE 10 #define ZBX_HC_ITEMS_INIT_SIZE 1000 #define ZBX_TRENDS_CLEANUP_TIME (SEC_PER_MIN * 55) /* the maximum time spent synchronizing history */ #define ZBX_HC_SYNC_TIME_MAX 10 /* the maximum number of items in one synchronization batch */ #define ZBX_HC_SYNC_MAX 1000 #define ZBX_HC_TIMER_MAX (ZBX_HC_SYNC_MAX / 2) #define ZBX_HC_TIMER_SOFT_MAX (ZBX_HC_TIMER_MAX - 10) /* the minimum processed item percentage of item candidates to continue synchronizing */ #define ZBX_HC_SYNC_MIN_PCNT 10 /* the maximum number of characters for history cache values */ #define ZBX_HISTORY_VALUE_LEN (1024 * 64) #define ZBX_DC_FLAGS_NOT_FOR_HISTORY (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY) #define ZBX_DC_FLAGS_NOT_FOR_TRENDS (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS) #define ZBX_DC_FLAGS_NOT_FOR_MODULES (ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD) #define ZBX_DC_FLAGS_NOT_FOR_EXPORT (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF) #define ZBX_HC_PROXYQUEUE_STATE_NORMAL 0 #define ZBX_HC_PROXYQUEUE_STATE_WAIT 1 typedef struct { char table_name[ZBX_TABLENAME_LEN_MAX]; zbx_uint64_t lastid; } ZBX_DC_ID; typedef struct { ZBX_DC_ID id[ZBX_IDS_SIZE]; } ZBX_DC_IDS; static ZBX_DC_IDS *ids = NULL; typedef struct { zbx_list_t list; zbx_hashset_t index; int state; } zbx_hc_proxyqueue_t; typedef struct { zbx_hashset_t trends; ZBX_DC_STATS stats; zbx_hashset_t history_items; zbx_binary_heap_t history_queue; int history_num; int trends_num; int trends_last_cleanup_hour; int history_num_total; int history_progress_ts; unsigned char db_trigger_queue_lock; zbx_hc_proxyqueue_t proxyqueue; int proxy_history_count; } ZBX_DC_CACHE; static ZBX_DC_CACHE *cache = NULL; /* local history cache */ #define ZBX_MAX_VALUES_LOCAL 256 #define ZBX_STRUCT_REALLOC_STEP 8 #define ZBX_STRING_REALLOC_STEP ZBX_KIBIBYTE typedef struct { size_t pvalue; size_t len; } dc_value_str_t; typedef struct { double value_dbl; zbx_uint64_t value_uint; dc_value_str_t value_str; } dc_value_t; typedef struct { zbx_uint64_t itemid; dc_value_t value; zbx_timespec_t ts; dc_value_str_t source; /* for log items only */ zbx_uint64_t lastlogsize; int timestamp; /* for log items only */ int severity; /* for log items only */ int logeventid; /* for log items only */ int mtime; unsigned char item_value_type; unsigned char value_type; unsigned char state; unsigned char flags; /* see ZBX_DC_FLAG_* above */ } dc_item_value_t; static char *string_values = NULL; static size_t string_values_alloc = 0, string_values_offset = 0; static dc_item_value_t *item_values = NULL; static size_t item_values_alloc = 0, item_values_num = 0; static void hc_add_item_values(dc_item_value_t *values, int values_num); static void hc_pop_items(zbx_vector_ptr_t *history_items); static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items); static void hc_push_items(zbx_vector_ptr_t *history_items); static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num); static void hc_queue_item(zbx_hc_item_t *item); static int hc_queue_elem_compare_func(const void *d1, const void *d2); static int hc_queue_get_size(void); static int hc_get_history_compression_age(void); /****************************************************************************** * * * Purpose: retrieves all internal metrics of the database cache * * * * Parameters: stats - [OUT] write cache metrics * * * ******************************************************************************/ void DCget_stats_all(zbx_wcache_info_t *wcache_info) { LOCK_CACHE; wcache_info->stats = cache->stats; wcache_info->history_free = hc_mem->free_size; wcache_info->history_total = hc_mem->total_size; wcache_info->index_free = hc_index_mem->free_size; wcache_info->index_total = hc_index_mem->total_size; if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) { wcache_info->trend_free = trend_mem->free_size; wcache_info->trend_total = trend_mem->orig_size; } UNLOCK_CACHE; } /****************************************************************************** * * * Purpose: get statistics of the database cache * * * ******************************************************************************/ void *DCget_stats(int request) { static zbx_uint64_t value_uint; static double value_double; void *ret; LOCK_CACHE; switch (request) { case ZBX_STATS_HISTORY_COUNTER: value_uint = cache->stats.history_counter; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_FLOAT_COUNTER: value_uint = cache->stats.history_float_counter; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_UINT_COUNTER: value_uint = cache->stats.history_uint_counter; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_STR_COUNTER: value_uint = cache->stats.history_str_counter; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_LOG_COUNTER: value_uint = cache->stats.history_log_counter; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_TEXT_COUNTER: value_uint = cache->stats.history_text_counter; ret = (void *)&value_uint; break; case ZBX_STATS_NOTSUPPORTED_COUNTER: value_uint = cache->stats.notsupported_counter; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_TOTAL: value_uint = hc_mem->total_size; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_USED: value_uint = hc_mem->total_size - hc_mem->free_size; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_FREE: value_uint = hc_mem->free_size; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_PUSED: value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size; ret = (void *)&value_double; break; case ZBX_STATS_HISTORY_PFREE: value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size; ret = (void *)&value_double; break; case ZBX_STATS_TREND_TOTAL: value_uint = trend_mem->orig_size; ret = (void *)&value_uint; break; case ZBX_STATS_TREND_USED: value_uint = trend_mem->orig_size - trend_mem->free_size; ret = (void *)&value_uint; break; case ZBX_STATS_TREND_FREE: value_uint = trend_mem->free_size; ret = (void *)&value_uint; break; case ZBX_STATS_TREND_PUSED: value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) / trend_mem->orig_size; ret = (void *)&value_double; break; case ZBX_STATS_TREND_PFREE: value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size; ret = (void *)&value_double; break; case ZBX_STATS_HISTORY_INDEX_TOTAL: value_uint = hc_index_mem->total_size; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_INDEX_USED: value_uint = hc_index_mem->total_size - hc_index_mem->free_size; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_INDEX_FREE: value_uint = hc_index_mem->free_size; ret = (void *)&value_uint; break; case ZBX_STATS_HISTORY_INDEX_PUSED: value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) / hc_index_mem->total_size; ret = (void *)&value_double; break; case ZBX_STATS_HISTORY_INDEX_PFREE: value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size; ret = (void *)&value_double; break; default: ret = NULL; } UNLOCK_CACHE; return ret; } /****************************************************************************** * * * Purpose: find existing or add new structure and return pointer * * * * Return value: pointer to a trend structure * * * ******************************************************************************/ static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid) { ZBX_DC_TREND *ptr, trend; if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid))) return ptr; memset(&trend, 0, sizeof(ZBX_DC_TREND)); trend.itemid = itemid; return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND)); } /****************************************************************************** * * * Purpose: apply disable_from changes to cache * * * ******************************************************************************/ static void DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff) { int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); LOCK_TRENDS; for (i = 0; i < trends_diff->values_num; i++) { ZBX_DC_TREND *trend; if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first))) trend->disable_from = trends_diff->values[i].second; } UNLOCK_TRENDS; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: helper function for DCflush trends * * * ******************************************************************************/ static void dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type, const char *table_name, int clock) { ZBX_DC_TREND *trend; int i; zbx_db_insert_t db_insert; zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg", "value_max", NULL); for (i = 0; i < trends_num; i++) { trend = &trends[i]; if (0 == trend->itemid) continue; if (clock != trend->clock || value_type != trend->value_type) continue; if (ITEM_VALUE_TYPE_FLOAT == value_type) { zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl); } else { zbx_uint128_t avg; /* calculate the trend average value */ zbx_udiv128_64(&avg, &trend->value_avg.ui64, trend->num); zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num, trend->value_min.ui64, avg.lo, trend->value_max.ui64); } trend->itemid = 0; } zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } /****************************************************************************** * * * Purpose: Update trends disable_until for items without trends data past or * * equal the specified clock * * * * Comments: A helper function for DCflush trends * * * ******************************************************************************/ static void dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name, int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock) { int i, j, clocks_num, now, age; ZBX_DC_TREND *trend; zbx_uint64_t itemid; size_t sql_offset; DB_RESULT result; DB_ROW row; int clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX}; now = time(NULL); age = now - clock; for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++) clocks[clocks_num] = now - clocks[clocks_num]; clocks[clocks_num] = clock; /* remove itemids with trends data past or equal the clock */ for (j = 0; j <= clocks_num && 0 < *itemids_num; j++) { sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select distinct itemid" " from %s" " where clock>=%d and", table_name, clocks[j]); if (0 < j) zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(itemid, row[0]); uint64_array_remove(itemids, itemids_num, &itemid, 1); } zbx_db_free_result(result); } /* update trends disable_until for the leftover itemids */ while (0 != *itemids_num) { itemid = itemids[--*itemids_num]; for (i = 0; i < trends_num; i++) { trend = &trends[i]; if (itemid != trend->itemid) continue; if (clock != trend->clock || value_type != trend->value_type) continue; trend->disable_from = clock; break; } } } /****************************************************************************** * * * Purpose: helper function for DCflush trends * * * ******************************************************************************/ static void dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset) { zbx_history_value_t value_min, value_avg, value_max; value_min.dbl = atof(row[2]); value_avg.dbl = atof(row[3]); value_max.dbl = atof(row[4]); if (value_min.dbl < trend->value_min.dbl) trend->value_min.dbl = value_min.dbl; if (value_max.dbl > trend->value_max.dbl) trend->value_max.dbl = value_max.dbl; trend->value_avg.dbl = trend->value_avg.dbl / (trend->num + num) * trend->num + value_avg.dbl / (trend->num + num) * num; trend->num += num; zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends set" " num=%d,value_min=" ZBX_FS_DBL64_SQL ",value_avg=" ZBX_FS_DBL64_SQL ",value_max=" ZBX_FS_DBL64_SQL " where itemid=" ZBX_FS_UI64 " and clock=%d;\n", trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl, trend->itemid, trend->clock); } /****************************************************************************** * * * Purpose: helper function for DCflush trends * * * ******************************************************************************/ static void dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset) { zbx_history_value_t value_min, value_avg, value_max; zbx_uint128_t avg; ZBX_STR2UINT64(value_min.ui64, row[2]); ZBX_STR2UINT64(value_avg.ui64, row[3]); ZBX_STR2UINT64(value_max.ui64, row[4]); if (value_min.ui64 < trend->value_min.ui64) trend->value_min.ui64 = value_min.ui64; if (value_max.ui64 > trend->value_max.ui64) trend->value_max.ui64 = value_max.ui64; /* calculate the trend average value */ zbx_umul64_64(&avg, num, value_avg.ui64); zbx_uinc128_128(&trend->value_avg.ui64, &avg); zbx_udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num); trend->num += num; zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg=" ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64 " and clock=%d;\n", trend->num, trend->value_min.ui64, avg.lo, trend->value_max.ui64, trend->itemid, trend->clock); } /****************************************************************************** * * * Purpose: helper function for DCflush trends * * * ******************************************************************************/ static void dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids, int itemids_num, int *inserts_num, unsigned char value_type, const char *table_name, int clock) { int i, num; DB_RESULT result; DB_ROW row; zbx_uint64_t itemid; ZBX_DC_TREND *trend; size_t sql_offset; sql_offset = 0; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,num,value_min,value_avg,value_max" " from %s" " where clock=%d and", table_name, clock); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num); result = zbx_db_select("%s order by itemid,clock", sql); sql_offset = 0; zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); while (NULL != (row = zbx_db_fetch(result))) { ZBX_STR2UINT64(itemid, row[0]); for (i = 0; i < trends_num; i++) { trend = &trends[i]; if (itemid != trend->itemid) continue; if (clock != trend->clock || value_type != trend->value_type) continue; break; } if (i == trends_num) { THIS_SHOULD_NEVER_HAPPEN; continue; } num = atoi(row[1]); if (value_type == ITEM_VALUE_TYPE_FLOAT) dc_trends_update_float(trend, row, num, &sql_offset); else dc_trends_update_uint(trend, row, num, &sql_offset); trend->itemid = 0; --*inserts_num; zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset); } zbx_db_free_result(result); zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (sql_offset > 16) /* In ORACLE always present begin..end; */ zbx_db_execute("%s", sql); } /****************************************************************************** * * * Purpose: flush trend to the database * * * ******************************************************************************/ static void DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff) { int num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num; unsigned char value_type; zbx_uint64_t *itemids = NULL; ZBX_DC_TREND *trend = NULL; const char *table_name; zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, *trends_num); clock = trends[0].clock; value_type = trends[0].value_type; switch (value_type) { case ITEM_VALUE_TYPE_FLOAT: table_name = "trends"; break; case ITEM_VALUE_TYPE_UINT64: table_name = "trends_uint"; break; default: assert(0); } itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num); itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t)); for (i = 0; i < *trends_num; i++) { trend = &trends[i]; if (clock != trend->clock || value_type != trend->value_type) continue; inserts_num++; if (0 != trend->disable_from) continue; uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64); if (ZBX_HC_SYNC_MAX == itemids_num) { trends_to = i + 1; break; } } if (0 != itemids_num) { dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids, &itemids_num, clock); } for (i = 0; i < trends_to; i++) { trend = &trends[i]; if (clock != trend->clock || value_type != trend->value_type) continue; if (0 != trend->disable_from && clock >= trend->disable_from) continue; uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64); } if (0 != itemids_num) { dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num, &inserts_num, value_type, table_name, clock); } zbx_free(itemids); /* if 'trends' is not a primary trends buffer */ if (NULL != trends_diff) { /* we update it too */ for (i = 0; i < trends_to; i++) { zbx_uint64_pair_t pair; if (0 == trends[i].itemid) continue; if (clock != trends[i].clock || value_type != trends[i].value_type) continue; if (0 == trends[i].disable_from || trends[i].disable_from > clock) continue; pair.first = trends[i].itemid; pair.second = clock + SEC_PER_HOUR; zbx_vector_uint64_pair_append(trends_diff, pair); } } if (0 != inserts_num) dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock); /* clean trends */ for (i = 0, num = 0; i < *trends_num; i++) { if (0 == trends[i].itemid) continue; memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND)); } *trends_num = num; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: move trend to the array of trends for flushing to DB * * * ******************************************************************************/ static void DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num) { if (*trends_num == *trends_alloc) { *trends_alloc += 256; *trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND)); } memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND)); (*trends_num)++; trend->clock = 0; trend->num = 0; memset(&trend->value_min, 0, sizeof(zbx_history_value_t)); memset(&trend->value_avg, 0, sizeof(zbx_value_avg_t)); memset(&trend->value_max, 0, sizeof(zbx_history_value_t)); } /****************************************************************************** * * * Purpose: add new value to the trends * * * ******************************************************************************/ static void DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num) { ZBX_DC_TREND *trend = NULL; int hour; hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR; trend = DCget_trend(history->itemid); if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) && SUCCEED == zbx_history_requires_trends(trend->value_type)) { DCflush_trend(trend, trends, trends_alloc, trends_num); } trend->value_type = history->value_type; trend->clock = hour; switch (trend->value_type) { case ITEM_VALUE_TYPE_FLOAT: if (trend->num == 0 || history->value.dbl < trend->value_min.dbl) trend->value_min.dbl = history->value.dbl; if (trend->num == 0 || history->value.dbl > trend->value_max.dbl) trend->value_max.dbl = history->value.dbl; trend->value_avg.dbl += history->value.dbl / (trend->num + 1) - trend->value_avg.dbl / (trend->num + 1); break; case ITEM_VALUE_TYPE_UINT64: if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64) trend->value_min.ui64 = history->value.ui64; if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64) trend->value_max.ui64 = history->value.ui64; zbx_uinc128_64(&trend->value_avg.ui64, history->value.ui64); break; } trend->num++; } /****************************************************************************** * * * Purpose: update trends cache and get list of trends to flush into database * * * * Parameters: history - [IN] array of history data * * history_num - [IN] number of history structures * * trends - [OUT] list of trends to flush into database * * trends_num - [OUT] number of trends * * compression_age - [IN] history compression age * * * ******************************************************************************/ static void DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends, int *trends_num, int compression_age) { static int last_trend_discard = 0; zbx_timespec_t ts; int trends_alloc = 0, i, hour, seconds; zbx_vector_uint64_t del_itemids; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_uint64_create(&del_itemids); zbx_timespec(&ts); seconds = ts.sec % SEC_PER_HOUR; hour = ts.sec - seconds; LOCK_TRENDS; for (i = 0; i < history_num; i++) { const ZBX_DC_HISTORY *h = &history[i]; if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags)) continue; DCadd_trend(h, trends, &trends_alloc, trends_num); } if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds) { zbx_hashset_iter_t iter; ZBX_DC_TREND *trend; zbx_hashset_iter_reset(&cache->trends, &iter); while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter))) { if (trend->clock == hour) continue; /* discard trend items that are older than compression age */ if (0 != compression_age && trend->clock < compression_age) { if (SEC_PER_HOUR < (ts.sec - last_trend_discard)) /* log once per hour */ { zabbix_log(LOG_LEVEL_TRACE, "discarding trends that are pointing to" " compressed history period"); last_trend_discard = ts.sec; } zbx_hashset_iter_remove(&iter); } else { if (SUCCEED == zbx_history_requires_trends(trend->value_type) && 0 != trend->num) DCflush_trend(trend, trends, &trends_alloc, trends_num); zbx_vector_uint64_append(&del_itemids, trend->itemid); } } cache->trends_last_cleanup_hour = hour; } UNLOCK_TRENDS; if (0 != del_itemids.values_num) { zbx_dc_config_history_sync_unset_existing_itemids(&del_itemids); if (0 != del_itemids.values_num) { LOCK_TRENDS; for (i = 0; i < del_itemids.values_num; i++) { ZBX_DC_TREND *trend; if (NULL == (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &del_itemids.values[i]))) { continue; } zbx_hashset_remove_direct(&cache->trends, trend); } UNLOCK_TRENDS; } } zbx_vector_uint64_destroy(&del_itemids); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static int zbx_trend_compare(const void *d1, const void *d2) { const ZBX_DC_TREND *p1 = (const ZBX_DC_TREND *)d1; const ZBX_DC_TREND *p2 = (const ZBX_DC_TREND *)d2; ZBX_RETURN_IF_NOT_EQUAL(p1->itemid, p2->itemid); ZBX_RETURN_IF_NOT_EQUAL(p1->clock, p2->clock); return 0; } /****************************************************************************** * * * Purpose: prepare history data using items from configuration cache * * * * Parameters: trends - [IN] trends from cache to be added to database * * trends_num - [IN] number of trends to add to database * * trends_diff - [OUT] disable_from updates * * * ******************************************************************************/ static void DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_vector_uint64_pair_t *trends_diff) { ZBX_DC_TREND *trends_tmp; if (0 != trends_num) { trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND)); memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND)); qsort(trends_tmp, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare); while (0 < trends_num) DBflush_trends(trends_tmp, &trends_num, trends_diff); zbx_free(trends_tmp); } } typedef struct { zbx_uint64_t hostid; zbx_vector_ptr_t groups; } zbx_host_info_t; /****************************************************************************** * * * Purpose: frees resources allocated to store host groups names * * * * Parameters: host_info - [IN] host information * * * ******************************************************************************/ static void zbx_host_info_clean(zbx_host_info_t *host_info) { zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free); zbx_vector_ptr_destroy(&host_info->groups); } /****************************************************************************** * * * Purpose: get hosts groups names * * * * Parameters: hosts_info - [IN/OUT] output names of host groups for a host * * hostids - [IN] hosts identifiers * * * ******************************************************************************/ static void db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids) { int i; size_t sql_offset = 0; DB_RESULT result; DB_ROW row; for (i = 0; i < hostids->values_num; i++) { zbx_host_info_t host_info = {.hostid = hostids->values[i]}; zbx_vector_ptr_create(&host_info.groups); zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info)); } zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select distinct hg.hostid,g.name" " from hstgrp g,hosts_groups hg" " where g.groupid=hg.groupid" " and"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { zbx_uint64_t hostid; zbx_host_info_t *host_info; ZBX_DBROW2UINT64(hostid, row[0]); if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid))) { THIS_SHOULD_NEVER_HAPPEN; continue; } zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1])); } zbx_db_free_result(result); } typedef struct { zbx_uint64_t itemid; char *name; zbx_history_sync_item_t *item; zbx_vector_tags_t item_tags; } zbx_item_info_t; /****************************************************************************** * * * Purpose: get item names * * * * Parameters: items_info - [IN/OUT] output item names * * itemids - [IN] the item identifiers * * * ******************************************************************************/ static void db_get_item_names_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids) { size_t sql_offset = 0; DB_RESULT result; DB_ROW row; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { zbx_uint64_t itemid; zbx_item_info_t *item_info; ZBX_DBROW2UINT64(itemid, row[0]); if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid))) { THIS_SHOULD_NEVER_HAPPEN; continue; } item_info->name = zbx_strdup(item_info->name, row[1]); } zbx_db_free_result(result); } /****************************************************************************** * * * Purpose: get item tags * * * * Parameters: items_info - [IN/OUT] output item tags * * itemids - [IN] the item identifiers * * * ******************************************************************************/ static void db_get_item_tags_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids) { size_t sql_offset = 0; DB_RESULT result; DB_ROW row; zbx_item_info_t *item_info = NULL; zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,tag,value from item_tag where"); zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num); zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " order by itemid"); result = zbx_db_select("%s", sql); while (NULL != (row = zbx_db_fetch(result))) { zbx_uint64_t itemid; zbx_tag_t *item_tag; ZBX_DBROW2UINT64(itemid, row[0]); if (NULL == item_info || item_info->itemid != itemid) { if (NULL != item_info) { zbx_vector_tags_sort(&item_info->item_tags, zbx_compare_tags); } if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid))) { THIS_SHOULD_NEVER_HAPPEN; continue; } } item_tag = (zbx_tag_t *)zbx_malloc(NULL, sizeof(*item_tag)); item_tag->tag = zbx_strdup(NULL, row[1]); item_tag->value = zbx_strdup(NULL, row[2]); zbx_vector_tags_append(&item_info->item_tags, item_tag); } if (NULL != item_info) { zbx_vector_tags_sort(&item_info->item_tags, zbx_compare_tags); } zbx_db_free_result(result); } /****************************************************************************** * * * Purpose: get item names and item tags * * * * Parameters: items_info - [IN/OUT] output item name and item tags * * itemids - [IN] the item identifiers * * * ******************************************************************************/ static void db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids) { db_get_item_names_by_itemid(items_info, itemids); db_get_item_tags_by_itemid(items_info, itemids); } /****************************************************************************** * * * Purpose: frees resources allocated to store item tags and name * * * * Parameters: item_info - [IN] item information * * * ******************************************************************************/ static void zbx_item_info_clean(zbx_item_info_t *item_info) { zbx_vector_tags_clear_ext(&item_info->item_tags, zbx_free_tag); zbx_vector_tags_destroy(&item_info->item_tags); zbx_free(item_info->name); } /****************************************************************************** * * * Purpose: export trends * * * * Parameters: trends - [IN] trends from cache * * trends_num - [IN] number of trends * * hosts_info - [IN] hosts groups names * * items_info - [IN] item names and tags * * * ******************************************************************************/ static void DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info, zbx_hashset_t *items_info) { struct zbx_json json; const ZBX_DC_TREND *trend = NULL; int i, j; const zbx_history_sync_item_t *item; zbx_host_info_t *host_info; zbx_item_info_t *item_info; zbx_uint128_t avg; /* calculate the trend average value */ zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN); for (i = 0; i < trends_num; i++) { trend = &trends[i]; if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid))) continue; item = item_info->item; if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid))) { THIS_SHOULD_NEVER_HAPPEN; continue; } zbx_json_clean(&json); zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST); zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING); zbx_json_close(&json); zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS); for (j = 0; j < host_info->groups.values_num; j++) zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING); zbx_json_close(&json); zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS); for (j = 0; j < item_info->item_tags.values_num; j++) { zbx_tag_t *item_tag = item_info->item_tags.values[j]; zbx_json_addobject(&json, NULL); zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag->tag, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag->value, ZBX_JSON_TYPE_STRING); zbx_json_close(&json); } zbx_json_close(&json); zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid); if (NULL != item_info->name) zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING); zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock); zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num); switch (trend->value_type) { case ITEM_VALUE_TYPE_FLOAT: zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl); zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl); zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64); zbx_udiv128_64(&avg, &trend->value_avg.ui64, trend->num); zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo); zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64); break; default: THIS_SHOULD_NEVER_HAPPEN; } zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, trend->value_type); zbx_trends_export_write(json.buffer, json.buffer_size); } zbx_trends_export_flush(); zbx_json_free(&json); } /****************************************************************************** * * * Purpose: export history * * * * Parameters: history - [IN/OUT] array of history data * * history_num - [IN] number of history structures * * hosts_info - [IN] hosts groups names * * items_info - [IN] item names and tags * * * ******************************************************************************/ static void DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info, zbx_hashset_t *items_info, int history_export_enabled, zbx_vector_connector_filter_t *connector_filters, unsigned char **data, size_t *data_alloc, size_t *data_offset) { const ZBX_DC_HISTORY *h; const zbx_history_sync_item_t *item; int i, j; zbx_host_info_t *host_info; zbx_item_info_t *item_info; struct zbx_json json; zbx_connector_object_t connector_object; zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN); zbx_vector_uint64_create(&connector_object.ids); for (i = 0; i < history_num; i++) { h = &history[i]; if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags)) continue; if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid))) { THIS_SHOULD_NEVER_HAPPEN; continue; } item = item_info->item; if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid))) { THIS_SHOULD_NEVER_HAPPEN; continue; } if (0 != connector_filters->values_num) { int k; for (k = 0; k < connector_filters->values_num; k++) { if (SUCCEED == zbx_match_tags(connector_filters->values[k].tags_evaltype, &connector_filters->values[k].connector_tags, &item_info->item_tags)) { zbx_vector_uint64_append(&connector_object.ids, connector_filters->values[k].connectorid); } } if (0 == connector_object.ids.values_num && FAIL == history_export_enabled) continue; } zbx_json_clean(&json); zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST); zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING); zbx_json_close(&json); zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS); for (j = 0; j < host_info->groups.values_num; j++) zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING); zbx_json_close(&json); zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS); for (j = 0; j < item_info->item_tags.values_num; j++) { zbx_tag_t *item_tag = item_info->item_tags.values[j]; zbx_json_addobject(&json, NULL); zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag->tag, ZBX_JSON_TYPE_STRING); zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag->value, ZBX_JSON_TYPE_STRING); zbx_json_close(&json); } zbx_json_close(&json); zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid); if (NULL != item_info->name) zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING); zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec); zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns); switch (h->value_type) { case ITEM_VALUE_TYPE_FLOAT: zbx_json_adddouble(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64); break; case ITEM_VALUE_TYPE_STR: zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING); break; case ITEM_VALUE_TYPE_TEXT: zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING); break; case ITEM_VALUE_TYPE_LOG: zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp); zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE, ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING); zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity); zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid); zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value, ZBX_JSON_TYPE_STRING); break; default: THIS_SHOULD_NEVER_HAPPEN; } zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, h->value_type); if (0 != connector_object.ids.values_num) { connector_object.objectid = item->itemid; connector_object.ts = h->ts; connector_object.str = json.buffer; zbx_connector_serialize_object(data, data_alloc, data_offset, &connector_object); zbx_vector_uint64_clear(&connector_object.ids); } if (SUCCEED == history_export_enabled) zbx_history_export_write(json.buffer, json.buffer_size); } if (SUCCEED == history_export_enabled) zbx_history_export_flush(); zbx_vector_uint64_destroy(&connector_object.ids); zbx_json_free(&json); } /****************************************************************************** * * * Purpose: export history and trends * * * * Parameters: history - [IN/OUT] array of history data * * history_num - [IN] number of history structures * * itemids - [IN] the item identifiers * * (used for item lookup) * * items - [IN] the items * * errcodes - [IN] item error codes * * trends - [IN] trends from cache * * trends_num - [IN] number of trends * * * ******************************************************************************/ static void DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num, const zbx_vector_uint64_t *itemids, zbx_history_sync_item_t *items, const int *errcodes, const ZBX_DC_TREND *trends, int trends_num, int history_export_enabled, zbx_vector_connector_filter_t *connector_filters, unsigned char **data, size_t *data_alloc, size_t *data_offset) { int i, index; zbx_vector_uint64_t hostids, item_info_ids; zbx_hashset_t hosts_info, items_info; zbx_history_sync_item_t *item; zbx_item_info_t item_info; zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __func__, history_num, trends_num); zbx_vector_uint64_create(&hostids); zbx_vector_uint64_create(&item_info_ids); zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean, ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC); for (i = 0; i < history_num; i++) { const ZBX_DC_HISTORY *h = &history[i]; if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags)) continue; if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))) { THIS_SHOULD_NEVER_HAPPEN; continue; } if (SUCCEED != errcodes[index]) continue; item = &items[index]; zbx_vector_uint64_append(&hostids, item->host.hostid); zbx_vector_uint64_append(&item_info_ids, item->itemid); item_info.itemid = item->itemid; item_info.name = NULL; item_info.item = item; zbx_vector_tags_create(&item_info.item_tags); zbx_hashset_insert(&items_info, &item_info, sizeof(item_info)); } if (0 == history_num) { for (i = 0; i < trends_num; i++) { const ZBX_DC_TREND *trend = &trends[i]; if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))) { THIS_SHOULD_NEVER_HAPPEN; continue; } if (SUCCEED != errcodes[index]) continue; item = &items[index]; zbx_vector_uint64_append(&hostids, item->host.hostid); zbx_vector_uint64_append(&item_info_ids, item->itemid); item_info.itemid = item->itemid; item_info.name = NULL; item_info.item = item; zbx_vector_tags_create(&item_info.item_tags); zbx_hashset_insert(&items_info, &item_info, sizeof(item_info)); } } if (0 == item_info_ids.values_num) goto clean; zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean, ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC); db_get_hosts_info_by_hostid(&hosts_info, &hostids); db_get_items_info_by_itemid(&items_info, &item_info_ids); if (0 != history_num) { DCexport_history(history, history_num, &hosts_info, &items_info, history_export_enabled, connector_filters, data, data_alloc, data_offset); } if (0 != trends_num) DCexport_trends(trends, trends_num, &hosts_info, &items_info); zbx_hashset_destroy(&hosts_info); clean: zbx_hashset_destroy(&items_info); zbx_vector_uint64_destroy(&item_info_ids); zbx_vector_uint64_destroy(&hostids); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: export all trends * * * * Parameters: trends - [IN] trends from cache * * trends_num - [IN] number of trends * * * ******************************************************************************/ static void DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num) { zbx_history_sync_item_t *items; zbx_vector_uint64_t itemids; int *errcodes; size_t i, num; zabbix_log(LOG_LEVEL_WARNING, "exporting trend data..."); while (0 < trends_num) { num = (size_t)MIN(ZBX_HC_SYNC_MAX, trends_num); items = (zbx_history_sync_item_t *)zbx_malloc(NULL, sizeof(zbx_history_sync_item_t) * num); errcodes = (int *)zbx_malloc(NULL, sizeof(int) * num); zbx_vector_uint64_create(&itemids); zbx_vector_uint64_reserve(&itemids, num); for (i = 0; i < num; i++) zbx_vector_uint64_append(&itemids, trends[i].itemid); zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_dc_config_history_sync_get_items_by_itemids(items, itemids.values, errcodes, num, ZBX_ITEM_GET_SYNC_EXPORT); DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, (int)num, FAIL, NULL, NULL, 0, 0); zbx_dc_config_clean_history_sync_items(items, errcodes, num); zbx_vector_uint64_destroy(&itemids); zbx_free(items); zbx_free(errcodes); trends += num; trends_num -= (int)num; } zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done"); } /****************************************************************************** * * * Purpose: flush all trends to the database * * * ******************************************************************************/ static void DCsync_trends(void) { zbx_hashset_iter_t iter; ZBX_DC_TREND *trends = NULL, *trend; int trends_alloc = 0, trends_num = 0, compression_age; zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, cache->trends_num); compression_age = hc_get_history_compression_age(); zabbix_log(LOG_LEVEL_WARNING, "syncing trend data..."); LOCK_TRENDS; zbx_hashset_iter_reset(&cache->trends, &iter); while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter))) { if (SUCCEED == zbx_history_requires_trends(trend->value_type) && trend->clock >= compression_age && 0 != trend->num) { DCflush_trend(trend, &trends, &trends_alloc, &trends_num); } } UNLOCK_TRENDS; if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS) && 0 != trends_num) DCexport_all_trends(trends, trends_num); if (0 < trends_num) qsort(trends, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare); zbx_db_begin(); while (trends_num > 0) DBflush_trends(trends, &trends_num, NULL); zbx_db_commit(); zbx_free(trends); zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done"); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } #define ZBX_FLAGS_TRIGGER_CREATE_NOTHING 0x00 #define ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT 0x01 #define ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT 0x02 #define ZBX_FLAGS_TRIGGER_CREATE_EVENT \ (ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT | ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT) /****************************************************************************** * * * Purpose: 1) calculate changeset of trigger fields to be updated * * 2) generate events * * * * Parameters: trigger - [IN] the trigger to process * * diffs - [OUT] the vector with trigger changes * * * * Return value: SUCCEED - trigger processed successfully * * FAIL - no changes * * * * Comments: Trigger dependency checks will be done during event processing. * * * * Event generation depending on trigger value/state changes: * * * * From \ To | OK | OK(?) | PROBLEM | PROBLEM(?) | NONE * *----------------------------------------------------------------------------* * OK | . | I | E | I | . * * | | | | | * * OK(?) | I | . | E,I | - | I * * | | | | | * * PROBLEM | E | I | E(m) | I | . * * | | | | | * * PROBLEM(?) | E,I | - | E(m),I | . | I * * * * Legend: * * 'E' - trigger event * * 'I' - internal event * * '.' - nothing * * '-' - should never happen * * * ******************************************************************************/ static int zbx_process_trigger(struct _DC_TRIGGER *trigger, zbx_vector_ptr_t *diffs) { const char *new_error; int new_state, new_value, ret = FAIL; zbx_uint64_t flags = ZBX_FLAGS_TRIGGER_DIFF_UNSET, event_flags = ZBX_FLAGS_TRIGGER_CREATE_NOTHING; zabbix_log(LOG_LEVEL_DEBUG, "In %s() triggerid:" ZBX_FS_UI64 " value:%d(%d) new_value:%d", __func__, trigger->triggerid, trigger->value, trigger->state, trigger->new_value); if (TRIGGER_VALUE_UNKNOWN == trigger->new_value) { new_state = TRIGGER_STATE_UNKNOWN; new_value = trigger->value; } else { new_state = TRIGGER_STATE_NORMAL; new_value = trigger->new_value; } new_error = (NULL == trigger->new_error ? "" : trigger->new_error); if (trigger->state != new_state) { flags |= ZBX_FLAGS_TRIGGER_DIFF_UPDATE_STATE; event_flags |= ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT; } if (0 != strcmp(trigger->error, new_error)) flags |= ZBX_FLAGS_TRIGGER_DIFF_UPDATE_ERROR; if (TRIGGER_STATE_NORMAL == new_state) { if (TRIGGER_VALUE_PROBLEM == new_value) { if (TRIGGER_VALUE_OK == trigger->value || TRIGGER_TYPE_MULTIPLE_TRUE == trigger->type) event_flags |= ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT; } else if (TRIGGER_VALUE_OK == new_value) { if (TRIGGER_VALUE_PROBLEM == trigger->value || 0 == trigger->lastchange) event_flags |= ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT; } } /* check if there is something to be updated */ if (0 == (flags & ZBX_FLAGS_TRIGGER_DIFF_UPDATE) && 0 == (event_flags & ZBX_FLAGS_TRIGGER_CREATE_EVENT)) goto out; if (0 != (event_flags & ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT)) { zbx_add_event(EVENT_SOURCE_TRIGGERS, EVENT_OBJECT_TRIGGER, trigger->triggerid, &trigger->timespec, new_value, trigger->description, trigger->expression, trigger->recovery_expression, trigger->priority, trigger->type, &trigger->tags, trigger->correlation_mode, trigger->correlation_tag, trigger->value, trigger->opdata, trigger->event_name, NULL); } if (0 != (event_flags & ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT)) { zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_TRIGGER, trigger->triggerid, &trigger->timespec, new_state, NULL, trigger->expression, trigger->recovery_expression, 0, 0, &trigger->tags, 0, NULL, 0, NULL, NULL, new_error); } zbx_append_trigger_diff(diffs, trigger->triggerid, trigger->priority, flags, trigger->value, new_state, trigger->timespec.sec, new_error); ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s flags:" ZBX_FS_UI64, __func__, zbx_result_string(ret), flags); return ret; } /****************************************************************************** * * * Comments: helper function for zbx_process_triggers() * * * ******************************************************************************/ static int zbx_trigger_topoindex_compare(const void *d1, const void *d2) { const DC_TRIGGER *t1 = *(const DC_TRIGGER * const *)d1; const DC_TRIGGER *t2 = *(const DC_TRIGGER * const *)d2; ZBX_RETURN_IF_NOT_EQUAL(t1->topoindex, t2->topoindex); return 0; } /****************************************************************************** * * * Purpose: process triggers - calculates property changeset and generates * * events * * * * Parameters: triggers - [IN] the triggers to process * * trigger_diff - [OUT] the trigger changeset * * * * Comments: The trigger_diff changeset must be cleaned by the caller: * * zbx_vector_ptr_clear_ext(trigger_diff, * * (zbx_clean_func_t)zbx_trigger_diff_free); * * * ******************************************************************************/ static void zbx_process_triggers(zbx_vector_ptr_t *triggers, zbx_vector_ptr_t *trigger_diff) { int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s() values_num:%d", __func__, triggers->values_num); if (0 == triggers->values_num) goto out; zbx_vector_ptr_sort(triggers, zbx_trigger_topoindex_compare); for (i = 0; i < triggers->values_num; i++) zbx_process_trigger((struct _DC_TRIGGER *)triggers->values[i], trigger_diff); zbx_vector_ptr_sort(trigger_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: re-calculate and update values of triggers related to the items * * * * Parameters: history - [IN] array of history data * * history_num - [IN] number of history structures * * history_itemids - [IN] the item identifiers * * (used for item lookup) * * history_items - [IN] the items * * history_errcodes - [IN] item error codes * * timers - [IN] the trigger timers * * trigger_diff - [OUT] trigger updates * * itemids - [OUT] the item identifiers * * (used for item lookup) * * timespecs - [OUT] timestamp for item identifiers * * trigger_info - [OUT] triggers * * trigger_order - [OUT] pointer to the list of triggers * * * ******************************************************************************/ static void recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num, const zbx_vector_uint64_t *history_itemids, const zbx_history_sync_item_t *history_items, const int *history_errcodes, const zbx_vector_ptr_t *timers, zbx_vector_ptr_t *trigger_diff, zbx_uint64_t *itemids, zbx_timespec_t *timespecs, zbx_hashset_t *trigger_info, zbx_vector_ptr_t *trigger_order) { int i, item_num = 0, timers_num = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (0 != history_num) { for (i = 0; i < history_num; i++) { const ZBX_DC_HISTORY *h = &history[i]; if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags)) continue; itemids[item_num] = h->itemid; timespecs[item_num] = h->ts; item_num++; } } for (i = 0; i < timers->values_num; i++) { zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)timers->values[i]; if (0 != timer->lock) timers_num++; } if (0 == item_num && 0 == timers_num) goto out; if (SUCCEED != zbx_hashset_reserve(trigger_info, MAX(100, 2 * item_num + timers_num))) { THIS_SHOULD_NEVER_HAPPEN; } zbx_vector_ptr_reserve(trigger_order, trigger_info->num_slots); if (0 != item_num) { zbx_dc_config_history_sync_get_triggers_by_itemids(trigger_info, trigger_order, itemids, timespecs, item_num); zbx_prepare_triggers((DC_TRIGGER **)trigger_order->values, trigger_order->values_num); zbx_determine_items_in_expressions(trigger_order, itemids, item_num); } if (0 != timers_num) { int offset = trigger_order->values_num; zbx_dc_get_triggers_by_timers(trigger_info, trigger_order, timers); if (offset != trigger_order->values_num) { zbx_prepare_triggers((DC_TRIGGER **)trigger_order->values + offset, trigger_order->values_num - offset); } } zbx_vector_ptr_sort(trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); zbx_evaluate_expressions(trigger_order, history_itemids, history_items, history_errcodes); zbx_process_triggers(trigger_order, trigger_diff); DCfree_triggers(trigger_order); zbx_hashset_clear(trigger_info); zbx_vector_ptr_clear(trigger_order); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static void DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const zbx_history_sync_item_t *item, ZBX_DC_HISTORY *h) { char value[MAX_BUFFER_LEN]; const char *inventory_field; zbx_inventory_value_t *inventory_value; if (ITEM_STATE_NOTSUPPORTED == h->state) return; if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode) return; if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) || NULL == (inventory_field = zbx_db_get_inventory_field(item->inventory_link))) { return; } switch (h->value_type) { case ITEM_VALUE_TYPE_FLOAT: zbx_print_double(value, sizeof(value), h->value.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64); break; case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: zbx_strscpy(value, h->value.str); break; default: return; } zbx_format_value(value, sizeof(value), item->valuemapid, ZBX_NULL2EMPTY_STR(item->units), h->value_type); inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t)); inventory_value->hostid = item->host.hostid; inventory_value->idx = item->inventory_link - 1; inventory_value->field_name = inventory_field; inventory_value->value = zbx_strdup(NULL, value); zbx_vector_ptr_append(inventory_values, inventory_value); } static void DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values) { char *value_esc; int i; for (i = 0; i < inventory_values->values_num; i++) { const zbx_inventory_value_t *inventory_value = (zbx_inventory_value_t *)inventory_values->values[i]; value_esc = zbx_db_dyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value); zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n", inventory_value->field_name, value_esc, inventory_value->hostid); zbx_db_execute_overflowed_sql(&sql, &sql_alloc, sql_offset); zbx_free(value_esc); } } static void DCinventory_value_free(zbx_inventory_value_t *inventory_value) { zbx_free(inventory_value->value); zbx_free(inventory_value); } /****************************************************************************** * * * Purpose: frees resources allocated to store str/text/log value * * * * Parameters: history - [IN] the history data * * history_num - [IN] the number of values in history data * * * ******************************************************************************/ static void dc_history_clean_value(ZBX_DC_HISTORY *history) { if (ITEM_STATE_NOTSUPPORTED == history->state) { zbx_free(history->value.err); return; } if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags)) return; switch (history->value_type) { case ITEM_VALUE_TYPE_LOG: zbx_free(history->value.log->value); zbx_free(history->value.log->source); zbx_free(history->value.log); break; case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: zbx_free(history->value.str); break; } } /****************************************************************************** * * * Purpose: frees resources allocated to store str/text/log values * * * * Parameters: history - [IN] the history data * * history_num - [IN] the number of values in history data * * * ******************************************************************************/ static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num) { int i; for (i = 0; i < history_num; i++) dc_history_clean_value(&history[i]); } /****************************************************************************** * * * Purpose: sets history data to notsupported * * * * Parameters: history - [IN] the history data * * errmsg - [IN] the error message * * * * Comments: The error message is stored directly and freed with when history * * data is cleaned. * * * ******************************************************************************/ static void dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg) { dc_history_clean_value(hdata); hdata->value.err = errmsg; hdata->state = ITEM_STATE_NOTSUPPORTED; hdata->flags |= ZBX_DC_FLAG_UNDEF; } /****************************************************************************** * * * Purpose: sets history data value * * * * Parameters: hdata - [IN/OUT] the history data * * value_type - [IN] the item value type * * value - [IN] the value to set * * * ******************************************************************************/ static void dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value) { char *errmsg = NULL; if (FAIL == zbx_variant_to_value_type(value, value_type, CONFIG_DOUBLE_PRECISION, &errmsg)) { dc_history_set_error(hdata, errmsg); return; } switch (value_type) { case ITEM_VALUE_TYPE_FLOAT: dc_history_clean_value(hdata); hdata->value.dbl = value->data.dbl; break; case ITEM_VALUE_TYPE_UINT64: dc_history_clean_value(hdata); hdata->value.ui64 = value->data.ui64; break; case ITEM_VALUE_TYPE_STR: dc_history_clean_value(hdata); hdata->value.str = value->data.str; hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_STR_VALUE_LEN)] = '\0'; break; case ITEM_VALUE_TYPE_TEXT: dc_history_clean_value(hdata); hdata->value.str = value->data.str; hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_TEXT_VALUE_LEN)] = '\0'; break; case ITEM_VALUE_TYPE_LOG: if (ITEM_VALUE_TYPE_LOG != hdata->value_type) { dc_history_clean_value(hdata); hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t)); memset(hdata->value.log, 0, sizeof(zbx_log_value_t)); } hdata->value.log->value = value->data.str; hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_LOG_VALUE_LEN)] = '\0'; } hdata->value_type = value_type; zbx_variant_set_none(value); } /****************************************************************************** * * * Purpose: normalize item value by performing truncation of long text * * values and changes value format according to the item value type * * * * Parameters: item - [IN] the item * * hdata - [IN/OUT] the historical data to process * * * ******************************************************************************/ static void normalize_item_value(const zbx_history_sync_item_t *item, ZBX_DC_HISTORY *hdata) { char *logvalue; zbx_variant_t value_var; if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE)) return; if (ITEM_STATE_NOTSUPPORTED == hdata->state) return; if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY)) hdata->ttl = item->history_sec; if (item->value_type == hdata->value_type) { /* truncate text based values if necessary */ switch (hdata->value_type) { case ITEM_VALUE_TYPE_STR: hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_STR_VALUE_LEN)] = '\0'; break; case ITEM_VALUE_TYPE_TEXT: hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_TEXT_VALUE_LEN)] = '\0'; break; case ITEM_VALUE_TYPE_LOG: logvalue = hdata->value.log->value; logvalue[zbx_db_strlen_n(logvalue, ZBX_HISTORY_LOG_VALUE_LEN)] = '\0'; break; case ITEM_VALUE_TYPE_FLOAT: if (FAIL == zbx_validate_value_dbl(hdata->value.dbl, CONFIG_DOUBLE_PRECISION)) { char buffer[ZBX_MAX_DOUBLE_LEN + 1]; dc_history_set_error(hdata, zbx_dsprintf(NULL, "Value %s is too small or too large.", zbx_print_double(buffer, sizeof(buffer), hdata->value.dbl))); } break; } return; } switch (hdata->value_type) { case ITEM_VALUE_TYPE_FLOAT: zbx_variant_set_dbl(&value_var, hdata->value.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_variant_set_ui64(&value_var, hdata->value.ui64); break; case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: zbx_variant_set_str(&value_var, hdata->value.str); hdata->value.str = NULL; break; case ITEM_VALUE_TYPE_LOG: zbx_variant_set_str(&value_var, hdata->value.log->value); hdata->value.log->value = NULL; break; default: THIS_SHOULD_NEVER_HAPPEN; return; } dc_history_set_value(hdata, item->value_type, &value_var); zbx_variant_clear(&value_var); } /****************************************************************************** * * * Purpose: calculates what item fields must be updated * * * * Parameters: item - [IN] the item * * h - [IN] the historical data to process * * * * Return value: The update data. This data must be freed by the caller. * * * * Comments: Will generate internal events when item state switches. * * * ******************************************************************************/ static zbx_item_diff_t *calculate_item_update(zbx_history_sync_item_t *item, const ZBX_DC_HISTORY *h) { zbx_uint64_t flags = 0; const char *item_error = NULL; zbx_item_diff_t *diff; if (0 != (ZBX_DC_FLAG_META & h->flags)) { if (item->lastlogsize != h->lastlogsize) flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE; if (item->mtime != h->mtime) flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME; } if (h->state != item->state) { flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE; if (ITEM_STATE_NOTSUPPORTED == h->state) { zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s", item->host.host, item->key_orig, h->value.str); zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, h->value.err); if (0 != strcmp(ZBX_NULL2EMPTY_STR(item->error), h->value.err)) item_error = h->value.err; } else { zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported", item->host.host, item->key_orig); /* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */ /* supported is handled in lld_process_discovery_rule() */ zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, NULL); item_error = ""; } } else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(ZBX_NULL2EMPTY_STR(item->error), h->value.err)) { zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host, item->key_orig, h->value.err); item_error = h->value.err; } if (NULL != item_error) flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR; if (0 == flags) return NULL; diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t)); diff->itemid = item->itemid; diff->flags = flags; if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags)) diff->lastlogsize = h->lastlogsize; if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags)) diff->mtime = h->mtime; if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags)) { diff->state = h->state; item->state = h->state; } if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags)) diff->error = item_error; return diff; } /****************************************************************************** * * * Purpose: update item data and inventory in database * * * * Parameters: item_diff - item changes * * inventory_values - inventory values * * * ******************************************************************************/ static void DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values) { size_t sql_offset = 0; int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); for (i = 0; i < item_diff->values_num; i++) { zbx_item_diff_t *diff; diff = (zbx_item_diff_t *)item_diff->values[i]; if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags)) break; } if (i != item_diff->values_num || 0 != inventory_values->values_num) { zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); if (i != item_diff->values_num) { zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff, ZBX_FLAGS_ITEM_DIFF_UPDATE_DB); } if (0 != inventory_values->values_num) DCadd_update_inventory_sql(&sql_offset, inventory_values); zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (sql_offset > 16) /* In ORACLE always present begin..end; */ zbx_db_execute("%s", sql); DCconfig_update_inventory_values(inventory_values); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: prepare itemdiff after receiving new values * * * * Parameters: history - array of history data * * history_num - number of history structures * * item_diff - vector to store prepared diff * * * ******************************************************************************/ static void DCmass_proxy_prepare_itemdiff(ZBX_DC_HISTORY *history, int history_num, zbx_vector_ptr_t *item_diff) { int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_ptr_reserve(item_diff, history_num); for (i = 0; i < history_num; i++) { zbx_item_diff_t *diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t)); diff->itemid = history[i].itemid; diff->state = history[i].state; diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE; if (0 != (ZBX_DC_FLAG_META & history[i].flags)) { diff->lastlogsize = history[i].lastlogsize; diff->mtime = history[i].mtime; diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME; } zbx_vector_ptr_append(item_diff, diff); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: update items info after new value is received * * * * Parameters: item_diff - diff of items to be updated * * * ******************************************************************************/ static void DBmass_proxy_update_items(zbx_vector_ptr_t *item_diff) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (0 != item_diff->values_num) { size_t sql_offset = 0; zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset); zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff, ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME); zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset); if (sql_offset > 16) /* In ORACLE always present begin..end; */ zbx_db_execute("%s", sql); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } typedef struct { char *table_name; char *sql; size_t sql_alloc, sql_offset; } zbx_history_dupl_select_t; static int history_value_compare_func(const void *d1, const void *d2) { const ZBX_DC_HISTORY *i1 = *(const ZBX_DC_HISTORY * const *)d1; const ZBX_DC_HISTORY *i2 = *(const ZBX_DC_HISTORY * const *)d2; ZBX_RETURN_IF_NOT_EQUAL(i1->itemid, i2->itemid); ZBX_RETURN_IF_NOT_EQUAL(i1->value_type, i2->value_type); ZBX_RETURN_IF_NOT_EQUAL(i1->ts.sec, i2->ts.sec); ZBX_RETURN_IF_NOT_EQUAL(i1->ts.ns, i2->ts.ns); return 0; } static void vc_flag_duplicates(zbx_vector_ptr_t *history_index, zbx_vector_ptr_t *duplicates) { int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); for (i = 0; i < duplicates->values_num; i++) { int idx_cached; if (FAIL != (idx_cached = zbx_vector_ptr_bsearch(history_index, duplicates->values[i], history_value_compare_func))) { ZBX_DC_HISTORY *cached_value = (ZBX_DC_HISTORY *)history_index->values[idx_cached]; dc_history_clean_value(cached_value); cached_value->flags |= ZBX_DC_FLAGS_NOT_FOR_HISTORY; } } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static void db_fetch_duplicates(zbx_history_dupl_select_t *query, unsigned char value_type, zbx_vector_ptr_t *duplicates) { DB_RESULT result; DB_ROW row; if (NULL == query->sql) return; result = zbx_db_select("%s", query->sql); while (NULL != (row = zbx_db_fetch(result))) { ZBX_DC_HISTORY *d = (ZBX_DC_HISTORY *)zbx_malloc(NULL, sizeof(ZBX_DC_HISTORY)); ZBX_STR2UINT64(d->itemid, row[0]); d->ts.sec = atoi(row[1]); d->ts.ns = atoi(row[2]); d->value_type = value_type; zbx_vector_ptr_append(duplicates, d); } zbx_db_free_result(result); zbx_free(query->sql); } static void remove_history_duplicates(zbx_vector_ptr_t *history) { int i; zbx_history_dupl_select_t select_flt = {.table_name = "history"}, select_uint = {.table_name = "history_uint"}, select_str = {.table_name = "history_str"}, select_log = {.table_name = "history_log"}, select_text = {.table_name = "history_text"}; zbx_vector_ptr_t duplicates, history_index; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_ptr_create(&duplicates); zbx_vector_ptr_create(&history_index); zbx_vector_ptr_append_array(&history_index, history->values, history->values_num); zbx_vector_ptr_sort(&history_index, history_value_compare_func); for (i = 0; i < history_index.values_num; i++) { ZBX_DC_HISTORY *h = history_index.values[i]; zbx_history_dupl_select_t *select_ptr; char *separator = " or"; if (h->value_type == ITEM_VALUE_TYPE_FLOAT) select_ptr = &select_flt; else if (h->value_type == ITEM_VALUE_TYPE_UINT64) select_ptr = &select_uint; else if (h->value_type == ITEM_VALUE_TYPE_STR) select_ptr = &select_str; else if (h->value_type == ITEM_VALUE_TYPE_LOG) select_ptr = &select_log; else if (h->value_type == ITEM_VALUE_TYPE_TEXT) select_ptr = &select_text; else continue; if (NULL == select_ptr->sql) { zbx_snprintf_alloc(&select_ptr->sql, &select_ptr->sql_alloc, &select_ptr->sql_offset, "select itemid,clock,ns" " from %s" " where", select_ptr->table_name); separator = ""; } zbx_snprintf_alloc(&select_ptr->sql, &select_ptr->sql_alloc, &select_ptr->sql_offset, "%s (itemid=" ZBX_FS_UI64 " and clock=%d and ns=%d)", separator , h->itemid, h->ts.sec, h->ts.ns); } db_fetch_duplicates(&select_flt, ITEM_VALUE_TYPE_FLOAT, &duplicates); db_fetch_duplicates(&select_uint, ITEM_VALUE_TYPE_UINT64, &duplicates); db_fetch_duplicates(&select_str, ITEM_VALUE_TYPE_STR, &duplicates); db_fetch_duplicates(&select_log, ITEM_VALUE_TYPE_LOG, &duplicates); db_fetch_duplicates(&select_text, ITEM_VALUE_TYPE_TEXT, &duplicates); vc_flag_duplicates(&history_index, &duplicates); zbx_vector_ptr_clear_ext(&duplicates, (zbx_clean_func_t)zbx_ptr_free); zbx_vector_ptr_destroy(&duplicates); zbx_vector_ptr_destroy(&history_index); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static int add_history(ZBX_DC_HISTORY *history, int history_num, zbx_vector_ptr_t *history_values, int *ret_flush) { int i, ret = SUCCEED; for (i = 0; i < history_num; i++) { ZBX_DC_HISTORY *h = &history[i]; if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags)) continue; zbx_vector_ptr_append(history_values, h); } if (0 != history_values->values_num) ret = zbx_vc_add_values(history_values, ret_flush); return ret; } /****************************************************************************** * * * Purpose: inserting new history data after new value is received * * * * Parameters: history - array of history data * * history_num - number of history structures * * * ******************************************************************************/ static int DBmass_add_history(ZBX_DC_HISTORY *history, int history_num) { int ret, ret_flush = FLUSH_SUCCEED, num; zbx_vector_ptr_t history_values; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_ptr_create(&history_values); zbx_vector_ptr_reserve(&history_values, history_num); if (FAIL == (ret = add_history(history, history_num, &history_values, &ret_flush)) && FLUSH_DUPL_REJECTED == ret_flush) { num = history_values.values_num; remove_history_duplicates(&history_values); zbx_vector_ptr_clear(&history_values); if (SUCCEED == (ret = add_history(history, history_num, &history_values, &ret_flush))) zabbix_log(LOG_LEVEL_WARNING, "skipped %d duplicates", num - history_values.values_num); } zbx_vector_ptr_destroy(&history_values); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: helper function for DCmass_proxy_add_history() * * * * Comment: this function is meant for items with value_type other than * * ITEM_VALUE_TYPE_LOG not containing meta information in result * * * ******************************************************************************/ static void dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num) { int i, now, history_count = 0; unsigned int flags; char buffer[64], *pvalue; zbx_db_insert_t db_insert; now = (int)time(NULL); zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "flags", "write_clock", NULL); for (i = 0; i < history_num; i++) { const ZBX_DC_HISTORY *h = &history[i]; if (0 != (h->flags & ZBX_DC_FLAG_UNDEF)) continue; if (0 != (h->flags & ZBX_DC_FLAG_META)) continue; if (ITEM_STATE_NOTSUPPORTED == h->state) continue; if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE)) { switch (h->value_type) { case ITEM_VALUE_TYPE_FLOAT: zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64); break; case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: pvalue = h->value.str; break; case ITEM_VALUE_TYPE_LOG: continue; default: THIS_SHOULD_NEVER_HAPPEN; continue; } flags = 0; } else { flags = PROXY_HISTORY_FLAG_NOVALUE; pvalue = (char *)""; } history_count++; zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, flags, now); } change_proxy_history_count(history_count); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } /****************************************************************************** * * * Purpose: helper function for DCmass_proxy_add_history() * * * * Comment: this function is meant for items with value_type other than * * ITEM_VALUE_TYPE_LOG containing meta information in result * * * ******************************************************************************/ static void dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num) { int i, now, history_count = 0; char buffer[64], *pvalue; zbx_db_insert_t db_insert; now = (int)time(NULL); zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime", "flags", "write_clock", NULL); for (i = 0; i < history_num; i++) { unsigned int flags = PROXY_HISTORY_FLAG_META; const ZBX_DC_HISTORY *h = &history[i]; if (ITEM_STATE_NOTSUPPORTED == h->state) continue; if (0 != (h->flags & ZBX_DC_FLAG_UNDEF)) continue; if (0 == (h->flags & ZBX_DC_FLAG_META)) continue; if (ITEM_VALUE_TYPE_LOG == h->value_type) continue; if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE)) { switch (h->value_type) { case ITEM_VALUE_TYPE_FLOAT: zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl); break; case ITEM_VALUE_TYPE_UINT64: zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64); break; case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: pvalue = h->value.str; break; default: THIS_SHOULD_NEVER_HAPPEN; continue; } } else { flags |= PROXY_HISTORY_FLAG_NOVALUE; pvalue = (char *)""; } history_count++; zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime, flags, now); } change_proxy_history_count(history_count); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } /****************************************************************************** * * * Purpose: helper function for DCmass_proxy_add_history() * * * * Comment: this function is meant for items with value_type * * ITEM_VALUE_TYPE_LOG * * * ******************************************************************************/ static void dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num) { int i, now, history_count = 0; zbx_db_insert_t db_insert; now = (int)time(NULL); /* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */ zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity", "value", "logeventid", "lastlogsize", "mtime", "flags", "write_clock", NULL); for (i = 0; i < history_num; i++) { unsigned int flags; zbx_uint64_t lastlogsize; int mtime; const ZBX_DC_HISTORY *h = &history[i]; if (ITEM_STATE_NOTSUPPORTED == h->state) continue; if (ITEM_VALUE_TYPE_LOG != h->value_type) continue; if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE)) { zbx_log_value_t *log = h->value.log; if (0 != (h->flags & ZBX_DC_FLAG_META)) { flags = PROXY_HISTORY_FLAG_META; lastlogsize = h->lastlogsize; mtime = h->mtime; } else { flags = 0; lastlogsize = 0; mtime = 0; } zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp, ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid, lastlogsize, mtime, flags, now); } else { /* sent to server only if not 0, see proxy_get_history_data() */ const int unset_if_novalue = 0; flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE; zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "", unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags, now); } history_count++; } change_proxy_history_count(history_count); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } /****************************************************************************** * * * Purpose: helper function for DCmass_proxy_add_history() * * * ******************************************************************************/ static void dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num) { int i, now, history_count = 0; zbx_db_insert_t db_insert; now = (int)time(NULL); zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", "write_clock", NULL); for (i = 0; i < history_num; i++) { const ZBX_DC_HISTORY *h = &history[i]; if (ITEM_STATE_NOTSUPPORTED != h->state) continue; history_count++; zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err), (int)h->state, now); } change_proxy_history_count(history_count); zbx_db_insert_execute(&db_insert); zbx_db_insert_clean(&db_insert); } /****************************************************************************** * * * Purpose: inserting new history data after new value is received * * * * Parameters: history - array of history data * * history_num - number of history structures * * * ******************************************************************************/ static void DBmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num) { int i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); for (i = 0; i < history_num; i++) { const ZBX_DC_HISTORY *h = &history[i]; if (ITEM_STATE_NOTSUPPORTED == h->state) { notsupported_num++; continue; } switch (h->value_type) { case ITEM_VALUE_TYPE_LOG: hlog_num++; break; case ITEM_VALUE_TYPE_FLOAT: case ITEM_VALUE_TYPE_UINT64: case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: if (0 != (h->flags & ZBX_DC_FLAG_META)) h_meta_num++; else h_num++; break; case ITEM_VALUE_TYPE_NONE: h_num++; break; default: THIS_SHOULD_NEVER_HAPPEN; } } if (0 != h_num) dc_add_proxy_history(history, history_num); if (0 != h_meta_num) dc_add_proxy_history_meta(history, history_num); if (0 != hlog_num) dc_add_proxy_history_log(history, history_num); if (0 != notsupported_num) dc_add_proxy_history_notsupported(history, history_num); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: prepare history data using items from configuration cache and * * generate item changes to be applied and host inventory values to * * be added * * * * Parameters: history - [IN/OUT] array of history data * * itemids - [IN] the item identifiers * * (used for item lookup) * * items - [IN] the items * * errcodes - [IN] item error codes * * history_num - [IN] number of history structures * * item_diff - [OUT] the changes in item data * * inventory_values - [OUT] the inventory values to add * * compression_age - [IN] history compression age * * proxy_subscribtions - [IN] history compression age * * * ******************************************************************************/ static void DCmass_prepare_history(ZBX_DC_HISTORY *history, zbx_history_sync_item_t *items, const int *errcodes, int history_num, zbx_vector_ptr_t *item_diff, zbx_vector_ptr_t *inventory_values, int compression_age, zbx_vector_uint64_pair_t *proxy_subscribtions) { static time_t last_history_discard = 0; time_t now; int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, history_num); now = time(NULL); for (i = 0; i < history_num; i++) { ZBX_DC_HISTORY *h = &history[i]; zbx_history_sync_item_t *item; zbx_item_diff_t *diff; /* discard history items that are older than compression age */ if (0 != compression_age && h->ts.sec < compression_age) { if (SEC_PER_HOUR < (now - last_history_discard)) /* log once per hour */ { zabbix_log(LOG_LEVEL_TRACE, "discarding history that is pointing to" " compressed history period"); last_history_discard = now; } h->flags |= ZBX_DC_FLAG_UNDEF; continue; } if (SUCCEED != errcodes[i]) { h->flags |= ZBX_DC_FLAG_UNDEF; continue; } item = &items[i]; if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status) { h->flags |= ZBX_DC_FLAG_UNDEF; continue; } if (0 == item->history) { h->flags |= ZBX_DC_FLAG_NOHISTORY; } else if (now - h->ts.sec > item->history_sec) { h->flags |= ZBX_DC_FLAG_NOHISTORY; zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history " "storage period", item->host.host, item->key_orig, zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL)); } if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type) { if (0 == item->trends) { h->flags |= ZBX_DC_FLAG_NOTRENDS; } else if (now - h->ts.sec > item->trends_sec) { h->flags |= ZBX_DC_FLAG_NOTRENDS; zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside " "trends storage period", item->host.host, item->key_orig, zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL)); } } else h->flags |= ZBX_DC_FLAG_NOTRENDS; normalize_item_value(item, h); /* calculate item update and update already retrieved item status for trigger calculation */ if (NULL != (diff = calculate_item_update(item, h))) zbx_vector_ptr_append(item_diff, diff); DCinventory_value_add(inventory_values, item, h); if (0 != item->host.proxy_hostid && FAIL == is_item_processed_by_server(item->type, item->key_orig)) { zbx_uint64_pair_t p = {item->host.proxy_hostid, h->ts.sec}; zbx_vector_uint64_pair_append(proxy_subscribtions, p); } } zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: prepare history data to share them with loadable modules, sort * * data by type skipping low-level discovery data, meta information * * updates and notsupported items * * * * Parameters: history - [IN] array of history data * * history_num - [IN] number of history structures * * history_<type> - [OUT] array of historical data of a * * specific data type * * history_<type>_num - [OUT] number of values of a specific * * data type * * * ******************************************************************************/ static void DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float, int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num, ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text, int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num) { ZBX_DC_HISTORY *h; ZBX_HISTORY_FLOAT *h_float; ZBX_HISTORY_INTEGER *h_integer; ZBX_HISTORY_STRING *h_string; ZBX_HISTORY_TEXT *h_text; ZBX_HISTORY_LOG *h_log; int i; const zbx_log_value_t *log; *history_float_num = 0; *history_integer_num = 0; *history_string_num = 0; *history_text_num = 0; *history_log_num = 0; for (i = 0; i < history_num; i++) { h = &history[i]; if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags)) continue; switch (h->value_type) { case ITEM_VALUE_TYPE_FLOAT: if (NULL == history_float_cbs) continue; h_float = &history_float[(*history_float_num)++]; h_float->itemid = h->itemid; h_float->clock = h->ts.sec; h_float->ns = h->ts.ns; h_float->value = h->value.dbl; break; case ITEM_VALUE_TYPE_UINT64: if (NULL == history_integer_cbs) continue; h_integer = &history_integer[(*history_integer_num)++]; h_integer->itemid = h->itemid; h_integer->clock = h->ts.sec; h_integer->ns = h->ts.ns; h_integer->value = h->value.ui64; break; case ITEM_VALUE_TYPE_STR: if (NULL == history_string_cbs) continue; h_string = &history_string[(*history_string_num)++]; h_string->itemid = h->itemid; h_string->clock = h->ts.sec; h_string->ns = h->ts.ns; h_string->value = h->value.str; break; case ITEM_VALUE_TYPE_TEXT: if (NULL == history_text_cbs) continue; h_text = &history_text[(*history_text_num)++]; h_text->itemid = h->itemid; h_text->clock = h->ts.sec; h_text->ns = h->ts.ns; h_text->value = h->value.str; break; case ITEM_VALUE_TYPE_LOG: if (NULL == history_log_cbs) continue; log = h->value.log; h_log = &history_log[(*history_log_num)++]; h_log->itemid = h->itemid; h_log->clock = h->ts.sec; h_log->ns = h->ts.ns; h_log->value = log->value; h_log->source = ZBX_NULL2EMPTY_STR(log->source); h_log->timestamp = log->timestamp; h_log->logeventid = log->logeventid; h_log->severity = log->severity; break; default: THIS_SHOULD_NEVER_HAPPEN; } } } static void DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num, int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float, ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string, ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log) { if (0 != history_float_num) { int i; zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules..."); for (i = 0; NULL != history_float_cbs[i].module; i++) { zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name); history_float_cbs[i].history_float_cb(history_float, history_float_num); } zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num); } if (0 != history_integer_num) { int i; zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules..."); for (i = 0; NULL != history_integer_cbs[i].module; i++) { zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name); history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num); } zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num); } if (0 != history_string_num) { int i; zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules..."); for (i = 0; NULL != history_string_cbs[i].module; i++) { zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name); history_string_cbs[i].history_string_cb(history_string, history_string_num); } zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num); } if (0 != history_text_num) { int i; zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules..."); for (i = 0; NULL != history_text_cbs[i].module; i++) { zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name); history_text_cbs[i].history_text_cb(history_text, history_text_num); } zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num); } if (0 != history_log_num) { int i; zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules..."); for (i = 0; NULL != history_log_cbs[i].module; i++) { zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name); history_log_cbs[i].history_log_cb(history_log, history_log_num); } zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num); } } /****************************************************************************** * * * Purpose: prepares history update by checking which values must be stored * * * * Parameters: history - [IN/OUT] the history values * * history_num - [IN] the number of history values * * * ******************************************************************************/ static void proxy_prepare_history(ZBX_DC_HISTORY *history, int history_num) { int i, *errcodes; zbx_history_sync_item_t *items; zbx_vector_uint64_t itemids; zbx_vector_uint64_create(&itemids); zbx_vector_uint64_reserve(&itemids, history_num); for (i = 0; i < history_num; i++) zbx_vector_uint64_append(&itemids, history[i].itemid); items = (zbx_history_sync_item_t *)zbx_malloc(NULL, sizeof(zbx_history_sync_item_t) * (size_t)history_num); errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num); zbx_dc_config_history_sync_get_items_by_itemids(items, itemids.values, errcodes, (size_t)itemids.values_num, ZBX_ITEM_GET_SYNC); for (i = 0; i < history_num; i++) { if (SUCCEED != errcodes[i]) continue; /* store items with enabled history */ if (0 != items[i].history) continue; /* store numeric items to handle data conversion errors on server and trends */ if (ITEM_VALUE_TYPE_FLOAT == items[i].value_type || ITEM_VALUE_TYPE_UINT64 == items[i].value_type) continue; /* store discovery rules */ if (0 != (items[i].flags & ZBX_FLAG_DISCOVERY_RULE)) continue; /* store errors or first value after an error */ if (ITEM_STATE_NOTSUPPORTED == history[i].state || ITEM_STATE_NOTSUPPORTED == items[i].state) continue; /* store items linked to host inventory */ if (0 != items[i].inventory_link) continue; dc_history_clean_value(history + i); /* all checks passed, item value must not be stored in proxy history/sent to server */ history[i].flags |= ZBX_DC_FLAG_NOVALUE; } zbx_dc_config_clean_history_sync_items(items, errcodes, (size_t)history_num); zbx_free(items); zbx_free(errcodes); zbx_vector_uint64_destroy(&itemids); } static void sync_proxy_history(int *total_num, int *more) { int history_num, txn_rc; time_t sync_start; zbx_vector_ptr_t history_items; zbx_vector_ptr_t item_diff; ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX]; zbx_vector_ptr_create(&history_items); zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX); zbx_vector_ptr_create(&item_diff); sync_start = time(NULL); do { *more = ZBX_SYNC_DONE; LOCK_CACHE; hc_pop_items(&history_items); /* select and take items out of history cache */ history_num = history_items.values_num; UNLOCK_CACHE; if (0 == history_num) break; hc_get_item_values(history, &history_items); /* copy item data from history cache */ proxy_prepare_history(history, history_items.values_num); DCmass_proxy_prepare_itemdiff(history, history_num, &item_diff); do { zbx_db_begin(); DBmass_proxy_add_history(history, history_num); DBmass_proxy_update_items(&item_diff); } while (ZBX_DB_DOWN == (txn_rc = zbx_db_commit())); LOCK_CACHE; hc_push_items(&history_items); /* return items to history cache */ if (ZBX_DB_FAIL != txn_rc) { if (0 != item_diff.values_num) DCconfig_items_apply_changes(&item_diff); cache->history_num -= history_num; if (0 != hc_queue_get_size()) *more = ZBX_SYNC_MORE; UNLOCK_CACHE; *total_num += history_num; hc_free_item_values(history, history_num); } else { *more = ZBX_SYNC_MORE; UNLOCK_CACHE; } zbx_vector_ptr_clear(&history_items); zbx_vector_ptr_clear_ext(&item_diff, zbx_default_mem_free_func); /* Exit from sync loop if we have spent too much time here */ /* unless we are doing full sync. This is done to allow */ /* syncer process to update their statistics. */ } while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start); zbx_vector_ptr_destroy(&item_diff); zbx_vector_ptr_destroy(&history_items); } /****************************************************************************** * * * Purpose: flush history cache to database, process triggers of flushed * * and timer triggers from timer queue * * * * Parameters: sync_timeout - [IN] the timeout in seconds * * values_num - [IN/OUT] the number of synced values * * triggers_num - [IN/OUT] the number of processed timers * * more - [OUT] a flag indicating the cache emptiness: * * ZBX_SYNC_DONE - nothing to sync, go idle * * ZBX_SYNC_MORE - more data to sync * * * * Comments: This function loops syncing history values by 1k batches and * * processing timer triggers by batches of 500 triggers. * * Unless full sync is being done the loop is aborted if either * * timeout has passed or there are no more data to process. * * The last is assumed when the following is true: * * a) history cache is empty or less than 10% of batch values were * * processed (the other items were locked by triggers) * * b) less than 500 (full batch) timer triggers were processed * * * ******************************************************************************/ static void sync_server_history(int *values_num, int *triggers_num, int *more) { static ZBX_HISTORY_FLOAT *history_float; static ZBX_HISTORY_INTEGER *history_integer; static ZBX_HISTORY_STRING *history_string; static ZBX_HISTORY_TEXT *history_text; static ZBX_HISTORY_LOG *history_log; static int module_enabled = FAIL; int i, history_num, history_float_num, history_integer_num, history_string_num, history_text_num, history_log_num, txn_error, compression_age, connectors_retrieved = FAIL; unsigned int item_retrieve_mode; time_t sync_start; zbx_vector_uint64_t triggerids ; zbx_vector_ptr_t history_items, trigger_diff, item_diff, inventory_values, trigger_timers, trigger_order; zbx_vector_uint64_pair_t trends_diff, proxy_subscribtions; ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX]; zbx_uint64_t trigger_itemids[ZBX_HC_SYNC_MAX]; zbx_timespec_t trigger_timespecs[ZBX_HC_SYNC_MAX]; zbx_history_sync_item_t *items = NULL; int *errcodes = NULL; zbx_vector_uint64_t itemids; zbx_hashset_t trigger_info; unsigned char *data = NULL; size_t data_alloc = 0, data_offset; zbx_vector_connector_filter_t connector_filters_history, connector_filters_events; if (NULL == history_float && NULL != history_float_cbs) { module_enabled = SUCCEED; history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float, ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT)); } if (NULL == history_integer && NULL != history_integer_cbs) { module_enabled = SUCCEED; history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer, ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER)); } if (NULL == history_string && NULL != history_string_cbs) { module_enabled = SUCCEED; history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string, ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING)); } if (NULL == history_text && NULL != history_text_cbs) { module_enabled = SUCCEED; history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text, ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT)); } if (NULL == history_log && NULL != history_log_cbs) { module_enabled = SUCCEED; history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log, ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG)); } compression_age = hc_get_history_compression_age(); zbx_vector_connector_filter_create(&connector_filters_history); zbx_vector_connector_filter_create(&connector_filters_events); zbx_vector_ptr_create(&inventory_values); zbx_vector_ptr_create(&item_diff); zbx_vector_ptr_create(&trigger_diff); zbx_vector_uint64_pair_create(&trends_diff); zbx_vector_uint64_pair_create(&proxy_subscribtions); zbx_vector_uint64_create(&triggerids); zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX); zbx_vector_ptr_create(&trigger_timers); zbx_vector_ptr_reserve(&trigger_timers, ZBX_HC_TIMER_MAX); zbx_vector_ptr_create(&history_items); zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX); zbx_vector_ptr_create(&trigger_order); zbx_hashset_create(&trigger_info, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_create(&itemids); sync_start = time(NULL); item_retrieve_mode = 0 == zbx_has_export_dir() ? ZBX_ITEM_GET_SYNC : ZBX_ITEM_GET_SYNC_EXPORT; do { int trends_num = 0, timers_num = 0, ret = SUCCEED; ZBX_DC_TREND *trends = NULL; *more = ZBX_SYNC_DONE; LOCK_CACHE; hc_pop_items(&history_items); /* select and take items out of history cache */ UNLOCK_CACHE; if (0 != history_items.values_num) { if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids))) { LOCK_CACHE; hc_push_items(&history_items); UNLOCK_CACHE; zbx_vector_ptr_clear(&history_items); } } else history_num = 0; if (0 != history_num) { zbx_dc_um_handle_t *um_handle; if (FAIL == connectors_retrieved) { zbx_dc_config_history_sync_get_connector_filters(&connector_filters_history, &connector_filters_events); connectors_retrieved = SUCCEED; if (0 != connector_filters_history.values_num) item_retrieve_mode = ZBX_ITEM_GET_SYNC_EXPORT; } zbx_vector_ptr_sort(&history_items, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); hc_get_item_values(history, &history_items); /* copy item data from history cache */ if (NULL == items) { items = (zbx_history_sync_item_t *)zbx_malloc(NULL, sizeof(zbx_history_sync_item_t) * (size_t)ZBX_HC_SYNC_MAX); } if (NULL == errcodes) errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)ZBX_HC_SYNC_MAX); zbx_vector_uint64_reserve(&itemids, history_num); for (i = 0; i < history_num; i++) zbx_vector_uint64_append(&itemids, history[i].itemid); zbx_dc_config_history_sync_get_items_by_itemids(items, itemids.values, errcodes, (size_t)history_num, item_retrieve_mode); um_handle = zbx_dc_open_user_macros(); DCmass_prepare_history(history, items, errcodes, history_num, &item_diff, &inventory_values, compression_age, &proxy_subscribtions); if (FAIL != (ret = DBmass_add_history(history, history_num))) { DCconfig_items_apply_changes(&item_diff); DCmass_update_trends(history, history_num, &trends, &trends_num, compression_age); if (0 != trends_num) zbx_tfc_invalidate_trends(trends, trends_num); do { zbx_db_begin(); DBmass_update_items(&item_diff, &inventory_values); DBmass_update_trends(trends, trends_num, &trends_diff); /* process internal events generated by DCmass_prepare_history() */ zbx_process_events(NULL, NULL); if (ZBX_DB_OK == (txn_error = zbx_db_commit())) DCupdate_trends(&trends_diff); else zbx_reset_event_recovery(); zbx_vector_uint64_pair_clear(&trends_diff); } while (ZBX_DB_DOWN == txn_error); } zbx_dc_close_user_macros(um_handle); zbx_clean_events(); zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free); zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free); } if (FAIL != ret) { /* don't process trigger timers when server is shutting down */ if (ZBX_IS_RUNNING()) { zbx_dc_get_trigger_timers(&trigger_timers, time(NULL), ZBX_HC_TIMER_SOFT_MAX, ZBX_HC_TIMER_MAX); } timers_num = trigger_timers.values_num; if (ZBX_HC_TIMER_SOFT_MAX <= timers_num) *more = ZBX_SYNC_MORE; if (0 != history_num || 0 != timers_num) { for (i = 0; i < trigger_timers.values_num; i++) { zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)trigger_timers.values[i]; if (0 != timer->lock) zbx_vector_uint64_append(&triggerids, timer->triggerid); } do { zbx_db_begin(); recalculate_triggers(history, history_num, &itemids, items, errcodes, &trigger_timers, &trigger_diff, trigger_itemids, trigger_timespecs, &trigger_info, &trigger_order); /* process trigger events generated by recalculate_triggers() */ zbx_process_events(&trigger_diff, &triggerids); if (0 != trigger_diff.values_num) zbx_db_save_trigger_changes(&trigger_diff); if (ZBX_DB_OK == (txn_error = zbx_db_commit())) DCconfig_triggers_apply_changes(&trigger_diff); else zbx_clean_events(); zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free); } while (ZBX_DB_DOWN == txn_error); if (ZBX_DB_OK == txn_error) zbx_events_update_itservices(); } } if (0 != triggerids.values_num) { *triggers_num += triggerids.values_num; DCconfig_unlock_triggers(&triggerids); zbx_vector_uint64_clear(&triggerids); } if (0 != trigger_timers.values_num) { zbx_dc_reschedule_trigger_timers(&trigger_timers, time(NULL)); zbx_vector_ptr_clear(&trigger_timers); } if (0 != proxy_subscribtions.values_num) { zbx_vector_uint64_pair_sort(&proxy_subscribtions, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_dc_proxy_update_nodata(&proxy_subscribtions); zbx_vector_uint64_pair_clear(&proxy_subscribtions); } if (0 != history_num) { LOCK_CACHE; hc_push_items(&history_items); /* return items to history cache */ cache->history_num -= history_num; if (0 != hc_queue_get_size()) { /* Continue sync if enough of sync candidates were processed */ /* (meaning most of sync candidates are not locked by triggers). */ /* Otherwise better to wait a bit for other syncers to unlock */ /* items rather than trying and failing to sync locked items over */ /* and over again. */ if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num) *more = ZBX_SYNC_MORE; } UNLOCK_CACHE; *values_num += history_num; } if (FAIL != ret) { int event_export_enabled = FAIL; if (0 != history_num) { const ZBX_DC_HISTORY *phistory = NULL; const ZBX_DC_TREND *ptrends = NULL; int history_num_loc = 0, trends_num_loc = 0; int history_export_enabled = FAIL; if (SUCCEED == module_enabled) { DCmodule_prepare_history(history, history_num, history_float, &history_float_num, history_integer, &history_integer_num, history_string, &history_string_num, history_text, &history_text_num, history_log, &history_log_num); DCmodule_sync_history(history_float_num, history_integer_num, history_string_num, history_text_num, history_log_num, history_float, history_integer, history_string, history_text, history_log); } if (SUCCEED == (history_export_enabled = zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY)) || 0 != connector_filters_history.values_num) { phistory = history; history_num_loc = history_num; } if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS)) { ptrends = trends; trends_num_loc = trends_num; } if (NULL != phistory || NULL != ptrends) { data_offset = 0; DCexport_history_and_trends(phistory, history_num_loc, &itemids, items, errcodes, ptrends, trends_num_loc, history_export_enabled, &connector_filters_history, &data, &data_alloc, &data_offset); if (0 != data_offset) { zbx_connector_send(ZBX_IPC_CONNECTOR_REQUEST, data, (zbx_uint32_t)data_offset); } } } else if (0 != timers_num) { if (FAIL == connectors_retrieved) { zbx_dc_config_history_sync_get_connector_filters(&connector_filters_history, &connector_filters_events); connectors_retrieved = SUCCEED; if (0 != connector_filters_history.values_num) item_retrieve_mode = ZBX_ITEM_GET_SYNC_EXPORT; } } if (SUCCEED == (event_export_enabled = zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS)) || 0 != connector_filters_events.values_num) { data_offset = 0; zbx_export_events(event_export_enabled, &connector_filters_events, &data, &data_alloc, &data_offset); if (0 != data_offset) { zbx_connector_send(ZBX_IPC_CONNECTOR_REQUEST, data, (zbx_uint32_t)data_offset); } } } if (0 != history_num || 0 != timers_num) zbx_clean_events(); if (0 != history_num) { zbx_free(trends); zbx_dc_config_clean_history_sync_items(items, errcodes, (size_t)history_num); zbx_vector_ptr_clear(&history_items); hc_free_item_values(history, history_num); } zbx_vector_uint64_clear(&itemids); /* Exit from sync loop if we have spent too much time here. */ /* This is done to allow syncer process to update its statistics. */ } while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start); zbx_free(items); zbx_free(errcodes); zbx_free(data); zbx_vector_connector_filter_clear_ext(&connector_filters_events, zbx_connector_filter_free); zbx_vector_connector_filter_clear_ext(&connector_filters_history, zbx_connector_filter_free); zbx_vector_connector_filter_destroy(&connector_filters_events); zbx_vector_connector_filter_destroy(&connector_filters_history); zbx_vector_ptr_destroy(&trigger_order); zbx_hashset_destroy(&trigger_info); zbx_vector_uint64_destroy(&itemids); zbx_vector_ptr_destroy(&history_items); zbx_vector_ptr_destroy(&inventory_values); zbx_vector_ptr_destroy(&item_diff); zbx_vector_ptr_destroy(&trigger_diff); zbx_vector_uint64_pair_destroy(&trends_diff); zbx_vector_uint64_pair_destroy(&proxy_subscribtions); zbx_vector_ptr_destroy(&trigger_timers); zbx_vector_uint64_destroy(&triggerids); } /****************************************************************************** * * * Purpose: writes updates and new data from history cache to database * * * * Comments: This function is used to flush history cache at server/proxy * * exit. * * Other processes are already terminated, so cache locking is * * unnecessary. * * * ******************************************************************************/ static void sync_history_cache_full(void) { int values_num = 0, triggers_num = 0, more; zbx_hashset_iter_t iter; zbx_hc_item_t *item; zbx_binary_heap_t tmp_history_queue; zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num); /* History index cache might be full without any space left for queueing items from history index to */ /* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add */ /* all items from history index to the new history queue. */ /* */ /* Assertions that must be true. */ /* * This is the main server or proxy process, */ /* * There are no other users of history index cache stored in shared memory. Other processes */ /* should have quit by this point. */ /* * other parts of the program do not hold pointers to the elements of history queue that is */ /* stored in the shared memory. */ if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) { /* unlock all triggers before full sync so no items are locked by triggers */ DCconfig_unlock_all_triggers(); } tmp_history_queue = cache->history_queue; zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY); zbx_hashset_iter_reset(&cache->history_items, &iter); /* add all items from history index to the new history queue */ while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter))) { if (NULL != item->tail) { item->status = ZBX_HC_ITEM_STATUS_NORMAL; hc_queue_item(item); } } if (0 != hc_queue_get_size()) { zabbix_log(LOG_LEVEL_WARNING, "syncing history data..."); do { if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) sync_server_history(&values_num, &triggers_num, &more); else sync_proxy_history(&values_num, &more); zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", (double)values_num / (cache->history_num + values_num) * 100); } while (0 != hc_queue_get_size()); zabbix_log(LOG_LEVEL_WARNING, "syncing history data done"); } zbx_binary_heap_destroy(&cache->history_queue); cache->history_queue = tmp_history_queue; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: log progress of syncing history data * * * ******************************************************************************/ void zbx_log_sync_history_cache_progress(void) { double pcnt = -1.0; int ts_last, ts_next, sec; LOCK_CACHE; if (INT_MAX == cache->history_progress_ts) { UNLOCK_CACHE; return; } ts_last = cache->history_progress_ts; sec = time(NULL); if (0 == cache->history_progress_ts) { cache->history_num_total = cache->history_num; cache->history_progress_ts = sec; } if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num) { if (0 != cache->history_num_total) pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total; cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec); } ts_next = cache->history_progress_ts; UNLOCK_CACHE; if (0 == ts_last) zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... "); if (-1.0 != pcnt) zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt); if (INT_MAX == ts_next) zabbix_log(LOG_LEVEL_WARNING, "syncing history data done"); } /****************************************************************************** * * * Purpose: writes updates and new data from history cache to database * * * * Parameters: values_num - [OUT] the number of synced values * * more - [OUT] a flag indicating the cache emptiness: * * ZBX_SYNC_DONE - nothing to sync, go idle * * ZBX_SYNC_MORE - more data to sync * * * ******************************************************************************/ void zbx_sync_history_cache(int *values_num, int *triggers_num, int *more) { zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num); *values_num = 0; *triggers_num = 0; if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) sync_server_history(values_num, triggers_num, more); else sync_proxy_history(values_num, more); } /****************************************************************************** * * * local history cache * * * ******************************************************************************/ static void dc_string_buffer_realloc(size_t len) { if (string_values_alloc >= string_values_offset + len) return; do { string_values_alloc += ZBX_STRING_REALLOC_STEP; } while (string_values_alloc < string_values_offset + len); string_values = (char *)zbx_realloc(string_values, string_values_alloc); } static dc_item_value_t *dc_local_get_history_slot(void) { if (ZBX_MAX_VALUES_LOCAL == item_values_num) dc_flush_history(); if (item_values_alloc == item_values_num) { item_values_alloc += ZBX_STRUCT_REALLOC_STEP; item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t)); } return &item_values[item_values_num++]; } static void dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->item_value_type = item_value_type; item_value->value_type = ITEM_VALUE_TYPE_FLOAT; item_value->state = ITEM_STATE_NORMAL; item_value->flags = flags; if (0 != (item_value->flags & ZBX_DC_FLAG_META)) { item_value->lastlogsize = lastlogsize; item_value->mtime = mtime; } if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) item_value->value.value_dbl = value_orig; } static void dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->item_value_type = item_value_type; item_value->value_type = ITEM_VALUE_TYPE_UINT64; item_value->state = ITEM_STATE_NORMAL; item_value->flags = flags; if (0 != (item_value->flags & ZBX_DC_FLAG_META)) { item_value->lastlogsize = lastlogsize; item_value->mtime = mtime; } if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) item_value->value.value_uint = value_orig; } static void dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->item_value_type = item_value_type; item_value->value_type = ITEM_VALUE_TYPE_TEXT; item_value->state = ITEM_STATE_NORMAL; item_value->flags = flags; if (0 != (item_value->flags & ZBX_DC_FLAG_META)) { item_value->lastlogsize = lastlogsize; item_value->mtime = mtime; } if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) { item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1; dc_string_buffer_realloc(item_value->value.value_str.len); item_value->value.value_str.pvalue = string_values_offset; memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len); string_values_offset += item_value->value.value_str.len; } else item_value->value.value_str.len = 0; } static void dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->item_value_type = item_value_type; item_value->value_type = ITEM_VALUE_TYPE_LOG; item_value->state = ITEM_STATE_NORMAL; item_value->flags = flags; if (0 != (item_value->flags & ZBX_DC_FLAG_META)) { item_value->lastlogsize = lastlogsize; item_value->mtime = mtime; } if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) { item_value->severity = log->severity; item_value->logeventid = log->logeventid; item_value->timestamp = log->timestamp; item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1; if (NULL != log->source && '\0' != *log->source) item_value->source.len = zbx_db_strlen_n(log->source, ZBX_HISTORY_LOG_SOURCE_LEN) + 1; else item_value->source.len = 0; } else { item_value->value.value_str.len = 0; item_value->source.len = 0; } if (0 != item_value->value.value_str.len + item_value->source.len) { dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len); if (0 != item_value->value.value_str.len) { item_value->value.value_str.pvalue = string_values_offset; memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len); string_values_offset += item_value->value.value_str.len; } if (0 != item_value->source.len) { item_value->source.pvalue = string_values_offset; memcpy(&string_values[string_values_offset], log->source, item_value->source.len); string_values_offset += item_value->source.len; } } } static void dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->state = ITEM_STATE_NOTSUPPORTED; item_value->flags = flags; if (0 != (item_value->flags & ZBX_DC_FLAG_META)) { item_value->lastlogsize = lastlogsize; item_value->mtime = mtime; } item_value->value.value_str.len = zbx_db_strlen_n(error, ZBX_ITEM_ERROR_LEN) + 1; dc_string_buffer_realloc(item_value->value.value_str.len); item_value->value.value_str.pvalue = string_values_offset; memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len); string_values_offset += item_value->value.value_str.len; } static void dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->state = ITEM_STATE_NORMAL; item_value->flags = ZBX_DC_FLAG_LLD; item_value->value.value_str.len = strlen(value_orig) + 1; dc_string_buffer_realloc(item_value->value.value_str.len); item_value->value.value_str.pvalue = string_values_offset; memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len); string_values_offset += item_value->value.value_str.len; } static void dc_local_add_history_empty(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, unsigned char flags) { dc_item_value_t *item_value; item_value = dc_local_get_history_slot(); item_value->itemid = itemid; item_value->ts = *ts; item_value->item_value_type = item_value_type; item_value->value_type = ITEM_VALUE_TYPE_NONE; item_value->state = ITEM_STATE_NORMAL; item_value->flags = flags; } /****************************************************************************** * * * Purpose: add new value to the cache * * * * Parameters: itemid - [IN] the itemid * * item_value_type - [IN] the item value type * * item_flags - [IN] the item flags (e. g. lld rule) * * result - [IN] agent result containing the value * * to add * * ts - [IN] the value timestamp * * state - [IN] the item state * * error - [IN] the error message in case item state * * is ITEM_STATE_NOTSUPPORTED * * * ******************************************************************************/ void dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags, AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error) { unsigned char value_flags; if (ITEM_STATE_NOTSUPPORTED == state) { zbx_uint64_t lastlogsize; int mtime; if (NULL != result && 0 != ZBX_ISSET_META(result)) { value_flags = ZBX_DC_FLAG_META; lastlogsize = result->lastlogsize; mtime = result->mtime; } else { value_flags = 0; lastlogsize = 0; mtime = 0; } dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags); return; } if (NULL == result) return; /* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */ if (!ZBX_ISSET_VALUE(result) && !ZBX_ISSET_META(result) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) return; value_flags = 0; if (!ZBX_ISSET_VALUE(result)) value_flags |= ZBX_DC_FLAG_NOVALUE; if (ZBX_ISSET_META(result)) value_flags |= ZBX_DC_FLAG_META; /* Add data to the local history cache if: */ /* 1) the NOVALUE flag is set (data contains either meta information or timestamp) */ /* 2) the NOVALUE flag is not set and value conversion succeeded */ if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE)) { if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags)) { if (NULL == ZBX_GET_TEXT_RESULT(result)) return; /* proxy stores low-level discovery (lld) values in db */ if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type)) dc_local_add_history_lld(itemid, ts, result->text); return; } if (ZBX_ISSET_LOG(result)) { dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize, result->mtime, value_flags); } else if (ZBX_ISSET_UI64(result)) { dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize, result->mtime, value_flags); } else if (ZBX_ISSET_DBL(result)) { dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize, result->mtime, value_flags); } else if (ZBX_ISSET_STR(result)) { dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize, result->mtime, value_flags); } else if (ZBX_ISSET_TEXT(result)) { dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize, result->mtime, value_flags); } else { THIS_SHOULD_NEVER_HAPPEN; } } else { if (0 != (value_flags & ZBX_DC_FLAG_META)) { dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime, value_flags); } else dc_local_add_history_empty(itemid, item_value_type, ts, value_flags); } } /****************************************************************************** * * * Purpose: add new variant value to the cache * * * * Parameters: itemid - [IN] the itemid * * item_value_type - [IN] the item value type * * item_flags - [IN] the item flags (e. g. lld rule) * * result - [IN] agent result containing the value * * to add * * ts - [IN] the value timestamp * * state - [IN] the item state * * error - [IN] the error message in case item state * * is ITEM_STATE_NOTSUPPORTED * * * ******************************************************************************/ void dc_add_history_variant(zbx_uint64_t itemid, unsigned char value_type, unsigned char item_flags, zbx_variant_t *value, zbx_timespec_t ts, const zbx_pp_value_opt_t *value_opt) { unsigned char value_flags = 0; zbx_uint64_t lastlogsize; int mtime; if (0 != (value_opt->flags & ZBX_PP_VALUE_OPT_META)) { value_flags = ZBX_DC_FLAG_META; lastlogsize = value_opt->lastlogsize; mtime = value_opt->mtime; value_flags |= ZBX_DC_FLAG_META; } else { value_flags = 0; lastlogsize = 0; mtime = 0; } if (ZBX_VARIANT_ERR == value->type) { dc_local_add_history_notsupported(itemid, &ts, value->data.err, lastlogsize, mtime, value_flags); return; } if (ZBX_VARIANT_NONE == value->type) value_flags |= ZBX_DC_FLAG_NOVALUE; /* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */ if (ZBX_DC_FLAG_NOVALUE == (value_flags & (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_META)) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) { return; } /* Add data to the local history cache if: */ /* 1) the NOVALUE flag is set (data contains either meta information or timestamp) */ /* 2) the NOVALUE flag is not set and value conversion succeeded */ if (0 != (value_flags & ZBX_DC_FLAG_NOVALUE)) { if (0 != (value_flags & ZBX_DC_FLAG_META)) dc_local_add_history_log(itemid, value_type, &ts, NULL, lastlogsize, mtime, value_flags); else dc_local_add_history_empty(itemid, value_type, &ts, value_flags); return; } if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags)) { if (ZBX_VARIANT_STR != value->type) return; /* proxy stores low-level discovery (lld) values in db */ if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type)) dc_local_add_history_lld(itemid, &ts, value->data.str); return; } if (0 != (value_opt->flags & ZBX_PP_VALUE_OPT_LOG)) { zbx_log_t log; zbx_variant_convert(value, ZBX_VARIANT_STR); log.logeventid = value_opt->logeventid; log.severity = value_opt->severity; log.timestamp = value_opt->timestamp; log.source = value_opt->source; log.value = value->data.str; dc_local_add_history_log(itemid, value_type, &ts, &log, lastlogsize, mtime, value_flags); return; } switch (value->type) { case ZBX_VARIANT_UI64: dc_local_add_history_uint(itemid, value_type, &ts, value->data.ui64, lastlogsize, mtime, value_flags); break; case ZBX_VARIANT_DBL: dc_local_add_history_dbl(itemid, value_type, &ts, value->data.dbl, lastlogsize, mtime, value_flags); break; case ZBX_VARIANT_STR: dc_local_add_history_text(itemid, value_type, &ts, value->data.str, lastlogsize, mtime, value_flags); break; default: THIS_SHOULD_NEVER_HAPPEN; } } void dc_flush_history(void) { if (0 == item_values_num) return; LOCK_CACHE; hc_add_item_values(item_values, item_values_num); cache->history_num += item_values_num; UNLOCK_CACHE; item_values_num = 0; string_values_offset = 0; } /****************************************************************************** * * * history cache storage * * * ******************************************************************************/ ZBX_SHMEM_FUNC_IMPL(__hc_index, hc_index_mem) ZBX_SHMEM_FUNC_IMPL(__hc, hc_mem) /****************************************************************************** * * * Purpose: compares history queue elements * * * ******************************************************************************/ static int hc_queue_elem_compare_func(const void *d1, const void *d2) { const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1; const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2; const zbx_hc_item_t *item1 = (const zbx_hc_item_t *)e1->data; const zbx_hc_item_t *item2 = (const zbx_hc_item_t *)e2->data; /* compare by timestamp of the oldest value */ return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts); } /****************************************************************************** * * * Purpose: free history item data allocated in history cache * * * * Parameters: data - [IN] history item data * * * ******************************************************************************/ static void hc_free_data(zbx_hc_data_t *data) { if (ITEM_STATE_NOTSUPPORTED == data->state) { __hc_shmem_free_func(data->value.str); } else { if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE)) { switch (data->value_type) { case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: __hc_shmem_free_func(data->value.str); break; case ITEM_VALUE_TYPE_LOG: __hc_shmem_free_func(data->value.log->value); if (NULL != data->value.log->source) __hc_shmem_free_func(data->value.log->source); __hc_shmem_free_func(data->value.log); break; } } } __hc_shmem_free_func(data); } /****************************************************************************** * * * Purpose: put back item into history queue * * * * Parameters: data - [IN] history item data * * * ******************************************************************************/ static void hc_queue_item(zbx_hc_item_t *item) { zbx_binary_heap_elem_t elem = {item->itemid, (const void *)item}; zbx_binary_heap_insert(&cache->history_queue, &elem); } /****************************************************************************** * * * Purpose: returns history item by itemid * * * * Parameters: itemid - [IN] the item id * * * * Return value: the history item or NULL if the requested item is not in * * history cache * * * ******************************************************************************/ static zbx_hc_item_t *hc_get_item(zbx_uint64_t itemid) { return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid); } /****************************************************************************** * * * Purpose: adds a new item to history cache * * * * Parameters: itemid - [IN] the item id * * [IN] the item data * * * * Return value: the added history item * * * ******************************************************************************/ static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data) { zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, 0, data, data}; return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local)); } /****************************************************************************** * * * Purpose: copies string value to history cache * * * * Parameters: str - [IN] the string value * * * * Return value: the copied string or NULL if there was not enough memory * * * ******************************************************************************/ static char *hc_mem_value_str_dup(const dc_value_str_t *str) { char *ptr; if (NULL == (ptr = (char *)__hc_shmem_malloc_func(NULL, str->len))) return NULL; memcpy(ptr, &string_values[str->pvalue], str->len - 1); ptr[str->len - 1] = '\0'; return ptr; } /****************************************************************************** * * * Purpose: clones string value into history data memory * * * * Parameters: dst - [IN/OUT] a reference to the cloned value * * str - [IN] the string value to clone * * * * Return value: SUCCESS - either there was no need to clone the string * * (it was empty or already cloned) or the string was * * cloned successfully * * FAIL - not enough memory * * * * Comments: This function can be called in loop with the same dst value * * until it finishes cloning string value. * * * ******************************************************************************/ static int hc_clone_history_str_data(char **dst, const dc_value_str_t *str) { if (0 == str->len) return SUCCEED; if (NULL != *dst) return SUCCEED; if (NULL != (*dst = hc_mem_value_str_dup(str))) return SUCCEED; return FAIL; } /****************************************************************************** * * * Purpose: clones log value into history data memory * * * * Parameters: dst - [IN/OUT] a reference to the cloned value * * item_value - [IN] the log value to clone * * * * Return value: SUCCESS - the log value was cloned successfully * * FAIL - not enough memory * * * * Comments: This function can be called in loop with the same dst value * * until it finishes cloning log value. * * * ******************************************************************************/ static int hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value) { if (NULL == *dst) { /* using realloc instead of malloc just to suppress 'not used' warning for realloc */ if (NULL == (*dst = (zbx_log_value_t *)__hc_shmem_realloc_func(NULL, sizeof(zbx_log_value_t)))) return FAIL; memset(*dst, 0, sizeof(zbx_log_value_t)); } if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str)) return FAIL; if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source)) return FAIL; (*dst)->logeventid = item_value->logeventid; (*dst)->severity = item_value->severity; (*dst)->timestamp = item_value->timestamp; return SUCCEED; } /****************************************************************************** * * * Purpose: clones item value from local cache into history cache * * * * Parameters: data - [IN/OUT] a reference to the cloned value * * item_value - [IN] the item value * * * * Return value: SUCCESS - the item value was cloned successfully * * FAIL - not enough memory * * * * Comments: This function can be called in loop with the same data value * * until it finishes cloning item value. * * * ******************************************************************************/ static int hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value) { if (NULL == *data) { if (NULL == (*data = (zbx_hc_data_t *)__hc_shmem_malloc_func(NULL, sizeof(zbx_hc_data_t)))) return FAIL; memset(*data, 0, sizeof(zbx_hc_data_t)); (*data)->state = item_value->state; (*data)->ts = item_value->ts; (*data)->flags = item_value->flags; } if (0 != (ZBX_DC_FLAG_META & item_value->flags)) { (*data)->lastlogsize = item_value->lastlogsize; (*data)->mtime = item_value->mtime; } if (ITEM_STATE_NOTSUPPORTED == item_value->state) { if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str))) return FAIL; (*data)->value_type = item_value->value_type; cache->stats.notsupported_counter++; return SUCCEED; } if (0 != (ZBX_DC_FLAG_LLD & item_value->flags)) { if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str))) return FAIL; (*data)->value_type = ITEM_VALUE_TYPE_TEXT; cache->stats.history_text_counter++; cache->stats.history_counter++; return SUCCEED; } if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags)) { switch (item_value->value_type) { case ITEM_VALUE_TYPE_FLOAT: (*data)->value.dbl = item_value->value.value_dbl; break; case ITEM_VALUE_TYPE_UINT64: (*data)->value.ui64 = item_value->value.value_uint; break; case ITEM_VALUE_TYPE_STR: if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str, &item_value->value.value_str)) { return FAIL; } break; case ITEM_VALUE_TYPE_TEXT: if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str, &item_value->value.value_str)) { return FAIL; } break; case ITEM_VALUE_TYPE_LOG: if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value)) return FAIL; break; } switch (item_value->item_value_type) { case ITEM_VALUE_TYPE_FLOAT: cache->stats.history_float_counter++; break; case ITEM_VALUE_TYPE_UINT64: cache->stats.history_uint_counter++; break; case ITEM_VALUE_TYPE_STR: cache->stats.history_str_counter++; break; case ITEM_VALUE_TYPE_TEXT: cache->stats.history_text_counter++; break; case ITEM_VALUE_TYPE_LOG: cache->stats.history_log_counter++; break; } cache->stats.history_counter++; } (*data)->value_type = item_value->value_type; return SUCCEED; } /****************************************************************************** * * * Purpose: adds item values to the history cache * * * * Parameters: values - [IN] the item values to add * * values_num - [IN] the number of item values to add * * * * Comments: If the history cache is full this function will wait until * * history syncers processes values freeing enough space to store * * the new value. * * * ******************************************************************************/ static void hc_add_item_values(dc_item_value_t *values, int values_num) { dc_item_value_t *item_value; int i; zbx_hc_item_t *item; for (i = 0; i < values_num; i++) { zbx_hc_data_t *data = NULL; item_value = &values[i]; /* a record with metadata and no value can be dropped if */ /* the metadata update is copied to the last queued value */ if (NULL != (item = hc_get_item(item_value->itemid)) && 0 != (item_value->flags & ZBX_DC_FLAG_NOVALUE) && 0 != (item_value->flags & ZBX_DC_FLAG_META)) { /* skip metadata updates when only one value is queued, */ /* because the item might be already being processed */ if (item->head != item->tail) { item->head->lastlogsize = item_value->lastlogsize; item->head->mtime = item_value->mtime; item->head->flags |= ZBX_DC_FLAG_META; continue; } } if (SUCCEED != hc_clone_history_data(&data, item_value)) { do { UNLOCK_CACHE; zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second."); sleep(1); LOCK_CACHE; } while (SUCCEED != hc_clone_history_data(&data, item_value)); item = hc_get_item(item_value->itemid); } if (NULL == item) { item = hc_add_item(item_value->itemid, data); hc_queue_item(item); } else { item->head->next = data; item->head = data; } item->values_num++; } } /****************************************************************************** * * * Purpose: copies item value from history cache into the specified history * * value * * * * Parameters: history - [OUT] the history value * * itemid - [IN] the item identifier * * data - [IN] the history data to copy * * * * Comments: handling of uninitialized fields in dc_add_proxy_history_log() * * * ******************************************************************************/ static void hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data) { history->itemid = itemid; history->ts = data->ts; history->state = data->state; history->flags = data->flags; history->lastlogsize = data->lastlogsize; history->mtime = data->mtime; if (ITEM_STATE_NOTSUPPORTED == data->state) { history->value.err = zbx_strdup(NULL, data->value.str); history->flags |= ZBX_DC_FLAG_UNDEF; return; } history->value_type = data->value_type; if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags)) { switch (data->value_type) { case ITEM_VALUE_TYPE_FLOAT: history->value.dbl = data->value.dbl; break; case ITEM_VALUE_TYPE_UINT64: history->value.ui64 = data->value.ui64; break; case ITEM_VALUE_TYPE_STR: case ITEM_VALUE_TYPE_TEXT: history->value.str = zbx_strdup(NULL, data->value.str); break; case ITEM_VALUE_TYPE_LOG: history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t)); history->value.log->value = zbx_strdup(NULL, data->value.log->value); if (NULL != data->value.log->source) history->value.log->source = zbx_strdup(NULL, data->value.log->source); else history->value.log->source = NULL; history->value.log->timestamp = data->value.log->timestamp; history->value.log->severity = data->value.log->severity; history->value.log->logeventid = data->value.log->logeventid; break; } } } /****************************************************************************** * * * Purpose: pops the next batch of history items from cache for processing * * * * Parameters: history_items - [OUT] the locked history items * * * * Comments: The history_items must be returned back to history cache with * * hc_push_items() function after they have been processed. * * * ******************************************************************************/ static void hc_pop_items(zbx_vector_ptr_t *history_items) { zbx_binary_heap_elem_t *elem; zbx_hc_item_t *item; while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue)) { elem = zbx_binary_heap_find_min(&cache->history_queue); item = (zbx_hc_item_t *)elem->data; zbx_vector_ptr_append(history_items, item); zbx_binary_heap_remove_min(&cache->history_queue); } } /****************************************************************************** * * * Purpose: gets item history values * * * * Parameters: history - [OUT] the history values * * history_items - [IN] the history items * * * ******************************************************************************/ static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items) { int i, history_num = 0; zbx_hc_item_t *item; /* we don't need to lock history cache because no other processes can */ /* change item's history data until it is pushed back to history queue */ for (i = 0; i < history_items->values_num; i++) { item = (zbx_hc_item_t *)history_items->values[i]; if (ZBX_HC_ITEM_STATUS_BUSY == item->status) continue; hc_copy_history_data(&history[history_num++], item->itemid, item->tail); } } /****************************************************************************** * * * Purpose: push back the processed history items into history cache * * * * Parameters: history_items - [IN] the history items containing processed * * (available) and busy items * * * * Comments: This function removes processed value from history cache. * * If there is no more data for this item, then the item itself is * * removed from history index. * * * ******************************************************************************/ void hc_push_items(zbx_vector_ptr_t *history_items) { int i; zbx_hc_item_t *item; zbx_hc_data_t *data_free; for (i = 0; i < history_items->values_num; i++) { item = (zbx_hc_item_t *)history_items->values[i]; switch (item->status) { case ZBX_HC_ITEM_STATUS_BUSY: /* reset item status before returning it to queue */ item->status = ZBX_HC_ITEM_STATUS_NORMAL; hc_queue_item(item); break; case ZBX_HC_ITEM_STATUS_NORMAL: item->values_num--; data_free = item->tail; item->tail = item->tail->next; hc_free_data(data_free); if (NULL == item->tail) zbx_hashset_remove(&cache->history_items, item); else hc_queue_item(item); break; } } } /****************************************************************************** * * * Purpose: retrieve the size of history queue * * * ******************************************************************************/ int hc_queue_get_size(void) { return cache->history_queue.elems_num; } int hc_get_history_compression_age(void) { #if defined(HAVE_POSTGRESQL) zbx_config_t cfg; int compression_age = 0; zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION); if (ON == cfg.db.history_compression_status && 0 != cfg.db.history_compress_older) { compression_age = (int)time(NULL) - cfg.db.history_compress_older; } zbx_config_clean(&cfg); return compression_age; #else return 0; #endif } /****************************************************************************** * * * Purpose: Allocate shared memory for trend cache (part of database cache) * * * * Comments: Is optionally called from init_database_cache() * * * ******************************************************************************/ ZBX_SHMEM_FUNC_IMPL(__trend, trend_mem) static int init_trend_cache(char **error) { size_t sz; int ret; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error))) goto out; sz = zbx_shmem_required_size(1, "trend cache", "TrendCacheSize"); if (SUCCEED != (ret = zbx_shmem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0, error))) { goto out; } CONFIG_TRENDS_CACHE_SIZE -= sz; cache->trends_num = 0; cache->trends_last_cleanup_hour = 0; #define INIT_HASHSET_SIZE 100 /* Should be calculated dynamically based on trends size? */ /* Still does not make sense to have it more than initial */ /* item hashset size in configuration cache. */ zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL, __trend_shmem_malloc_func, __trend_shmem_realloc_func, __trend_shmem_free_func); #undef INIT_HASHSET_SIZE out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: Allocate shared memory for database cache * * * ******************************************************************************/ int init_database_cache(char **error) { int ret; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (NULL != cache) { ret = SUCCEED; goto out; } if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error))) goto out; if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error))) goto out; if (SUCCEED != (ret = zbx_shmem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache", "HistoryCacheSize", 1, error))) { goto out; } if (SUCCEED != (ret = zbx_shmem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache", "HistoryIndexCacheSize", 0, error))) { goto out; } cache = (ZBX_DC_CACHE *)__hc_index_shmem_malloc_func(NULL, sizeof(ZBX_DC_CACHE)); memset(cache, 0, sizeof(ZBX_DC_CACHE)); ids = (ZBX_DC_IDS *)__hc_index_shmem_malloc_func(NULL, sizeof(ZBX_DC_IDS)); memset(ids, 0, sizeof(ZBX_DC_IDS)); zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL, __hc_index_shmem_malloc_func, __hc_index_shmem_realloc_func, __hc_index_shmem_free_func); zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY, __hc_index_shmem_malloc_func, __hc_index_shmem_realloc_func, __hc_index_shmem_free_func); if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) { zbx_hashset_create_ext(&(cache->proxyqueue.index), ZBX_HC_SYNC_MAX, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL, __hc_index_shmem_malloc_func, __hc_index_shmem_realloc_func, __hc_index_shmem_free_func); zbx_list_create_ext(&(cache->proxyqueue.list), __hc_index_shmem_malloc_func, __hc_index_shmem_free_func); cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL; if (SUCCEED != (ret = init_trend_cache(error))) goto out; } cache->history_num_total = 0; cache->history_progress_ts = 0; cache->db_trigger_queue_lock = 1; cache->proxy_history_count = 0; if (NULL == sql) sql = (char *)zbx_malloc(sql, sql_alloc); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: change proxy_history_count by count * * * ******************************************************************************/ void change_proxy_history_count(int change_count) { LOCK_CACHE; cache->proxy_history_count += change_count; UNLOCK_CACHE; } /****************************************************************************** * * * Purpose: change proxy_history_count by count * * * ******************************************************************************/ void reset_proxy_history_count(int reset) { LOCK_CACHE; cache->proxy_history_count = reset; UNLOCK_CACHE; } /****************************************************************************** * * * Purpose: get proxy_history_count value * * * ******************************************************************************/ int get_proxy_history_count(void) { int proxy_history_count; LOCK_CACHE; proxy_history_count = cache->proxy_history_count; UNLOCK_CACHE; return proxy_history_count; } /****************************************************************************** * * * Purpose: writes updates and new data from pool and cache data to database * * * ******************************************************************************/ static void DCsync_all(void) { zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()"); sync_history_cache_full(); if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) DCsync_trends(); zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()"); } /****************************************************************************** * * * Purpose: Free memory allocated for database cache * * * ******************************************************************************/ void free_database_cache(int sync) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (ZBX_SYNC_ALL == sync) DCsync_all(); cache = NULL; zbx_shmem_destroy(hc_mem); hc_mem = NULL; zbx_shmem_destroy(hc_index_mem); hc_index_mem = NULL; zbx_mutex_destroy(&cache_lock); zbx_mutex_destroy(&cache_ids_lock); if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) { zbx_shmem_destroy(trend_mem); trend_mem = NULL; zbx_mutex_destroy(&trends_lock); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: Return next id for requested table * * * ******************************************************************************/ zbx_uint64_t DCget_nextid(const char *table_name, int num) { int i; DB_RESULT result; DB_ROW row; const ZBX_TABLE *table; ZBX_DC_ID *id; zbx_uint64_t min = 0, max = ZBX_DB_MAX_ID, nextid, lastid; zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d", __func__, table_name, num); LOCK_CACHE_IDS; for (i = 0; i < ZBX_IDS_SIZE; i++) { id = &ids->id[i]; if ('\0' == *id->table_name) break; if (0 == strcmp(id->table_name, table_name)) { nextid = id->lastid + 1; id->lastid += num; lastid = id->lastid; UNLOCK_CACHE_IDS; zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]", __func__, table_name, nextid, lastid); return nextid; } } if (i == ZBX_IDS_SIZE) { zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids"); exit(EXIT_FAILURE); } table = zbx_db_get_table(table_name); result = zbx_db_select("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64, table->recid, table_name, table->recid, min, max); if (NULL != result) { zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name)); if (NULL == (row = zbx_db_fetch(result)) || SUCCEED == zbx_db_is_null(row[0])) id->lastid = min; else ZBX_STR2UINT64(id->lastid, row[0]); nextid = id->lastid + 1; id->lastid += num; lastid = id->lastid; } else nextid = lastid = 0; UNLOCK_CACHE_IDS; zbx_db_free_result(result); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]", __func__, table_name, nextid, lastid); return nextid; } /****************************************************************************** * * * Purpose: performs interface availability reset for hosts with * * availability set on interfaces without enabled items * * * ******************************************************************************/ void DCupdate_interfaces_availability(void) { zbx_vector_availability_ptr_t interfaces; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_availability_ptr_create(&interfaces); if (SUCCEED != DCreset_interfaces_availability(&interfaces)) goto out; zbx_availabilities_flush(&interfaces); out: zbx_vector_availability_ptr_clear_ext(&interfaces, zbx_interface_availability_free); zbx_vector_availability_ptr_destroy(&interfaces); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: get history cache diagnostics statistics * * * ******************************************************************************/ void zbx_hc_get_diag_stats(zbx_uint64_t *items_num, zbx_uint64_t *values_num) { LOCK_CACHE; *values_num = cache->history_num; *items_num = cache->history_items.num_data; UNLOCK_CACHE; } /****************************************************************************** * * * Purpose: get shared memory allocator statistics * * * ******************************************************************************/ void zbx_hc_get_mem_stats(zbx_shmem_stats_t *data, zbx_shmem_stats_t *index) { LOCK_CACHE; if (NULL != data) zbx_shmem_get_stats(hc_mem, data); if (NULL != index) zbx_shmem_get_stats(hc_index_mem, index); UNLOCK_CACHE; } /****************************************************************************** * * * Purpose: get statistics of cached items * * * ******************************************************************************/ void zbx_hc_get_items(zbx_vector_uint64_pair_t *items) { zbx_hashset_iter_t iter; zbx_hc_item_t *item; LOCK_CACHE; zbx_vector_uint64_pair_reserve(items, cache->history_items.num_data); zbx_hashset_iter_reset(&cache->history_items, &iter); while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter))) { zbx_uint64_pair_t pair = {item->itemid, item->values_num}; zbx_vector_uint64_pair_append_ptr(items, &pair); } UNLOCK_CACHE; } /****************************************************************************** * * * Purpose: checks if database trigger queue table is locked * * * ******************************************************************************/ int zbx_db_trigger_queue_locked(void) { return 0 == cache->db_trigger_queue_lock ? FAIL : SUCCEED; } /****************************************************************************** * * * Purpose: unlocks database trigger queue table * * * ******************************************************************************/ void zbx_db_trigger_queue_unlock(void) { cache->db_trigger_queue_lock = 0; } /****************************************************************************** * * * Purpose: return first proxy in a queue, function assumes that a queue is * * not empty * * * * Return value: proxyid at the top a queue * * * ******************************************************************************/ static zbx_uint64_t zbx_hc_proxyqueue_peek(void) { zbx_uint64_t *p_val; if (NULL == cache->proxyqueue.list.head) return 0; p_val = (zbx_uint64_t *)(cache->proxyqueue.list.head->data); return *p_val; } /****************************************************************************** * * * Purpose: add new proxyid to a queue * * * * Parameters: proxyid - [IN] the proxy id * * * ******************************************************************************/ static void zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid) { if (NULL == zbx_hashset_search(&cache->proxyqueue.index, &proxyid)) { zbx_uint64_t *ptr; ptr = zbx_hashset_insert(&cache->proxyqueue.index, &proxyid, sizeof(proxyid)); zbx_list_append(&cache->proxyqueue.list, ptr, NULL); } } /****************************************************************************** * * * Purpose: try to dequeue proxyid from a proxy queue * * * * Parameters: chk_proxyid - [IN] the proxyid * * * * Return value: SUCCEED - retrieval successful * * FAIL - otherwise * * * ******************************************************************************/ static int zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid) { zbx_uint64_t top_val; void *rem_val = 0; top_val = zbx_hc_proxyqueue_peek(); if (proxyid != top_val) return FAIL; if (FAIL == zbx_list_pop(&cache->proxyqueue.list, &rem_val)) return FAIL; zbx_hashset_remove_direct(&cache->proxyqueue.index, rem_val); return SUCCEED; } /****************************************************************************** * * * Purpose: remove all proxies from proxy priority queue * * * ******************************************************************************/ static void zbx_hc_proxyqueue_clear(void) { zbx_list_destroy(&cache->proxyqueue.list); zbx_hashset_clear(&cache->proxyqueue.index); } /****************************************************************************** * * * Purpose: check status of a history cache usage, enqueue/dequeue proxy * * from priority list and accordingly enable or disable wait mode * * * * Parameters: proxyid - [IN] the proxyid * * * * Return value: SUCCEED - proxy can be processed now * * FAIL - proxy cannot be processed now, it got enqueued * * * ******************************************************************************/ int zbx_hc_check_proxy(zbx_uint64_t proxyid) { double hc_pused; int ret; zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxyid:"ZBX_FS_UI64, __func__, proxyid); LOCK_CACHE; hc_pused = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size; if (20 >= hc_pused) { cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL; zbx_hc_proxyqueue_clear(); ret = SUCCEED; goto out; } if (ZBX_HC_PROXYQUEUE_STATE_WAIT == cache->proxyqueue.state) { zbx_hc_proxyqueue_enqueue(proxyid); if (60 < hc_pused) { ret = FAIL; goto out; } cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL; } else { if (80 <= hc_pused) { cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_WAIT; zbx_hc_proxyqueue_enqueue(proxyid); ret = FAIL; goto out; } } if (0 == zbx_hc_proxyqueue_peek()) { ret = SUCCEED; goto out; } ret = zbx_hc_proxyqueue_dequeue(proxyid); out: UNLOCK_CACHE; zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; }