/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "zbxcacheconfig.h" #include "dbsync.h" #include "user_macro.h" #include "zbx_host_constants.h" #include "zbx_trigger_constants.h" #include "zbxcrypto.h" #include "zbxeval.h" #include "zbxnum.h" #include "zbxdbhigh.h" #include "zbxexpr.h" #include "zbxstr.h" #include "zbxinterface.h" #include "zbxip.h" #include <stddef.h> /* global correlation constants */ #define ZBX_CORRELATION_ENABLED 0 /*#define ZBX_CORRELATION_DISABLED 1*/ #define ZBX_DBSYNC_OBJ_HOST 1 #define ZBX_DBSYNC_OBJ_HOST_TAG 2 #define ZBX_DBSYNC_OBJ_ITEM 3 #define ZBX_DBSYNC_OBJ_ITEM_TAG 4 #define ZBX_DBSYNC_OBJ_TRIGGER 5 #define ZBX_DBSYNC_OBJ_TRIGGER_TAG 6 #define ZBX_DBSYNC_OBJ_FUNCTION 7 #define ZBX_DBSYNC_OBJ_ITEM_PREPROC 8 #define ZBX_DBSYNC_OBJ_DRULE 9 #define ZBX_DBSYNC_OBJ_DCHECK 10 #define ZBX_DBSYNC_OBJ_HTTPTEST 11 #define ZBX_DBSYNC_OBJ_HTTPTEST_FIELD 12 #define ZBX_DBSYNC_OBJ_HTTPTEST_ITEM 13 #define ZBX_DBSYNC_OBJ_HTTPSTEP 14 #define ZBX_DBSYNC_OBJ_HTTPSTEP_FIELD 15 #define ZBX_DBSYNC_OBJ_HTTPSTEP_ITEM 16 #define ZBX_DBSYNC_OBJ_CONNECTOR 17 #define ZBX_DBSYNC_OBJ_CONNECTOR_TAG 18 #define ZBX_DBSYNC_OBJ_PROXY 19 #define ZBX_DBSYNC_OBJ_PROXY_GROUP 20 #define ZBX_DBSYNC_OBJ_HOST_PROXY 21 /* number of dbsync objects - keep in sync with above defines */ #define ZBX_DBSYNC_OBJ_COUNT 21 #define ZBX_DBSYNC_JOURNAL(X) (X - 1) #define ZBX_DBSYNC_CHANGELOG_PRUNE_INTERVAL SEC_PER_MIN * 10 #define ZBX_DBSYNC_CHANGELOG_MAX_AGE SEC_PER_HOUR #define ZBX_DBSYNC_BATCH_SIZE 1000 typedef struct { zbx_uint64_t changelogid; int clock; } zbx_dbsync_changelog_t; ZBX_VECTOR_DECL(dbsync_changelog, zbx_dbsync_changelog_t) ZBX_VECTOR_IMPL(dbsync_changelog, zbx_dbsync_changelog_t) typedef struct { zbx_uint64_t objectid; zbx_dbsync_changelog_t changelog; } zbx_dbsync_obj_changelog_t; ZBX_VECTOR_DECL(dbsync_obj_changelog, zbx_dbsync_obj_changelog_t) ZBX_VECTOR_IMPL(dbsync_obj_changelog, zbx_dbsync_obj_changelog_t) ZBX_PTR_VECTOR_DECL(dbsync, zbx_dbsync_t *) ZBX_PTR_VECTOR_IMPL(dbsync, zbx_dbsync_t *) typedef struct { zbx_vector_uint64_t inserts; zbx_vector_uint64_t updates; zbx_vector_uint64_t deletes; zbx_vector_dbsync_t syncs; zbx_vector_dbsync_obj_changelog_t changelog; } zbx_dbsync_journal_t; typedef struct { zbx_hashset_t strpool; zbx_dc_config_t *cache; zbx_hashset_t changelog; zbx_dbsync_journal_t journals[ZBX_DBSYNC_OBJ_COUNT]; zbx_vector_dbsync_t changelog_dbsyncs; zbx_vector_dbsync_t dbsyncs; } zbx_dbsync_env_t; static zbx_dbsync_env_t dbsync_env; /* string pool support */ #define REFCOUNT_FIELD_SIZE sizeof(zbx_uint32_t) static zbx_hash_t dbsync_strpool_hash_func(const void *data) { return ZBX_DEFAULT_STRING_HASH_FUNC((char *)data + REFCOUNT_FIELD_SIZE); } static int dbsync_strpool_compare_func(const void *d1, const void *d2) { return strcmp((char *)d1 + REFCOUNT_FIELD_SIZE, (char *)d2 + REFCOUNT_FIELD_SIZE); } static char *dbsync_strdup(const char *str) { void *ptr; size_t size = REFCOUNT_FIELD_SIZE + strlen(str) + 1; ptr = zbx_hashset_insert_ext(&dbsync_env.strpool, str - REFCOUNT_FIELD_SIZE, size, REFCOUNT_FIELD_SIZE, size, ZBX_HASHSET_UNIQ_FALSE); (*(zbx_uint32_t *)ptr)++; return (char *)ptr + REFCOUNT_FIELD_SIZE; } static void dbsync_strfree(char *str) { if (NULL != str) { void *ptr = str - REFCOUNT_FIELD_SIZE; if (0 == --(*(zbx_uint32_t *)ptr)) zbx_hashset_remove_direct(&dbsync_env.strpool, ptr); } } /* macro value validators */ /****************************************************************************** * * * Purpose: compares 64 bit unsigned integer with a raw database value * * * ******************************************************************************/ static int dbsync_compare_uint64(const char *value_raw, zbx_uint64_t value) { zbx_uint64_t value_ui64; ZBX_DBROW2UINT64(value_ui64, value_raw); return (value_ui64 == value ? SUCCEED : FAIL); } /****************************************************************************** * * * Purpose: compares 32 bit signed integer with a raw database value * * * ******************************************************************************/ static int dbsync_compare_int(const char *value_raw, int value) { return (atoi(value_raw) == value ? SUCCEED : FAIL); } /****************************************************************************** * * * Purpose: compares unsigned character with a raw database value * * * ******************************************************************************/ static int dbsync_compare_uchar(const char *value_raw, unsigned char value) { unsigned char value_uchar; ZBX_STR2UCHAR(value_uchar, value_raw); return (value_uchar == value ? SUCCEED : FAIL); } /****************************************************************************** * * * Purpose: compares string with a raw database value * * * ******************************************************************************/ static int dbsync_compare_str(const char *value_raw, const char *value) { return (0 == strcmp(value_raw, value) ? SUCCEED : FAIL); } /****************************************************************************** * * * Purpose: adds a new row to the changeset * * * * Parameter: sync - [IN] the changeset * * rowid - [IN] the row identifier * * tag - [IN] the row tag (see ZBX_DBSYNC_ROW_ defines) * * dbrow - [IN] the row contents (depending on configuration cache * * removal logic for the specific object it can be * * NULL when used with ZBX_DBSYNC_ROW_REMOVE tag) * * * ******************************************************************************/ static void dbsync_add_row(zbx_dbsync_t *sync, zbx_uint64_t rowid, unsigned char tag, const zbx_db_row_t dbrow) { int i; zbx_dbsync_row_t *row; row = (zbx_dbsync_row_t *)zbx_malloc(NULL, sizeof(zbx_dbsync_row_t)); row->rowid = rowid; row->tag = tag; if (NULL != dbrow) { row->row = (char **)zbx_malloc(NULL, sizeof(char *) * (size_t)sync->columns_num); for (i = 0; i < sync->columns_num; i++) row->row[i] = (NULL == dbrow[i] ? NULL : dbsync_strdup(dbrow[i])); } else row->row = NULL; zbx_vector_ptr_append(&sync->rows, row); switch (tag) { case ZBX_DBSYNC_ROW_ADD: sync->add_num++; break; case ZBX_DBSYNC_ROW_UPDATE: sync->update_num++; break; case ZBX_DBSYNC_ROW_REMOVE: sync->remove_num++; break; } } /****************************************************************************** * * * Purpose: prepares changeset * * * * Parameter: sync - [IN] the changeset * * columns_num - [IN] the number of columns in the changeset * * preproc_row_func - [IN] the callback function used to retrieve * * associated hostids (can be NULL if * * user macros are not resolved during * * synchronization process) * * * ******************************************************************************/ static void dbsync_prepare(zbx_dbsync_t *sync, int columns_num, zbx_dbsync_preproc_row_func_t preproc_row_func) { sync->columns_num = columns_num; sync->preproc_row_func = preproc_row_func; sync->row = (char **)zbx_malloc(NULL, sizeof(char *) * (size_t)columns_num); memset(sync->row, 0, sizeof(char *) * (size_t)columns_num); } /****************************************************************************** * * * Purpose: applies necessary pre-processing before row is compared/used * * * * Parameter: sync - [IN] the changeset * * row - [IN/OUT] the data row * * * * Return value: the resulting row * * * ******************************************************************************/ static char **dbsync_preproc_row(zbx_dbsync_t *sync, char **row) { int i; if (NULL == sync->preproc_row_func) return row; /* row is unchanged, preprocessing was not performed */ if (row == sync->preproc_row_func(sync, row)) return row; /* free the resources allocated by last preprocessing call */ zbx_vector_ptr_clear_ext(&sync->columns, zbx_ptr_free); for (i = 0; i < sync->columns_num; i++) { if (sync->row[i] != row[i]) zbx_vector_ptr_append(&sync->columns, sync->row[i]); } return sync->row; } static void dbsync_journal_init(zbx_dbsync_journal_t *journal) { zbx_vector_uint64_create(&journal->inserts); zbx_vector_uint64_create(&journal->updates); zbx_vector_uint64_create(&journal->deletes); zbx_vector_dbsync_create(&journal->syncs); zbx_vector_dbsync_obj_changelog_create(&journal->changelog); } static void dbsync_journal_destroy(zbx_dbsync_journal_t *journal) { zbx_vector_uint64_destroy(&journal->inserts); zbx_vector_uint64_destroy(&journal->updates); zbx_vector_uint64_destroy(&journal->deletes); zbx_vector_dbsync_destroy(&journal->syncs); zbx_vector_dbsync_obj_changelog_destroy(&journal->changelog); } void zbx_dbsync_env_init(zbx_dc_config_t *cache) { dbsync_env.cache = cache; zbx_hashset_create(&dbsync_env.changelog, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); } /****************************************************************************** * * * Purpose: remove old (1h+) changelog records from database and cache using * * database time * * * ******************************************************************************/ static void dbsync_prune_changelog(void) { static int last_prune_time; int now; zbx_dbsync_changelog_t *changelog; zbx_db_row_t row; zbx_db_result_t result; now = time(NULL); if (0 == last_prune_time) { last_prune_time = now; return; } if (now - last_prune_time < ZBX_DBSYNC_CHANGELOG_PRUNE_INTERVAL) return; last_prune_time = now; #ifndef HAVE_ORACLE result = zbx_db_select("select %s", ZBX_DB_TIMESTAMP()); #else result = zbx_db_select("select %s from dual", ZBX_DB_TIMESTAMP()); #endif if (NULL != (row = zbx_db_fetch(result))) { int changelog_num; changelog_num = dbsync_env.changelog.num_data; now = atoi(row[0]); if (ZBX_DB_OK <= zbx_db_execute("delete from changelog where clock<%d", now - ZBX_DBSYNC_CHANGELOG_MAX_AGE)) { zbx_hashset_iter_t iter; zbx_hashset_iter_reset(&dbsync_env.changelog, &iter); while (NULL != (changelog = (zbx_dbsync_changelog_t *)zbx_hashset_iter_next(&iter))) { if (now - changelog->clock > ZBX_DBSYNC_CHANGELOG_MAX_AGE) zbx_hashset_iter_remove(&iter); } zabbix_log(LOG_LEVEL_DEBUG, "removed %d old changelog records", changelog_num - dbsync_env.changelog.num_data); } } zbx_db_free_result(result); } /****************************************************************************** * * * Purpose: remove from first vector all ids found in second vector * * * * Comments: Both vectors must be sorted in ascending order * * * ******************************************************************************/ static void dbsync_remove_duplicate_ids(zbx_vector_uint64_t *dst, const zbx_vector_uint64_t *src) { int i, j, k; for (i = 0, j = 0, k = 0; j < src->values_num && i < dst->values_num;) { if (dst->values[i] == src->values[j]) { i++; j++; continue; } if (dst->values[i] < src->values[j]) { while (i < dst->values_num && dst->values[i] < src->values[j]) dst->values[k++] = dst->values[i++]; } else { while (j < src->values_num && dst->values[i] > src->values[j]) j++; } } if (j == src->values_num && i < dst->values_num) { if (k != i) memmove(dst->values + k, dst->values + i, (size_t)(dst->values_num - i) * sizeof(zbx_uint64_t)); k += dst->values_num - i; } dst->values_num = k; } /****************************************************************************** * * * Purpose: read changelog and prepare lists of modified objects since last * * sync * * * ******************************************************************************/ int zbx_dbsync_env_prepare(unsigned char mode) { zbx_db_result_t result; zbx_db_row_t row; zbx_dbsync_changelog_t changelog_local; int changelog_num = 0; size_t i; zbx_hashset_create(&dbsync_env.strpool, 100, dbsync_strpool_hash_func, dbsync_strpool_compare_func); for (i = 0; i < ARRSIZE(dbsync_env.journals); i++) dbsync_journal_init(&dbsync_env.journals[i]); if (ZBX_DBSYNC_INIT == mode) { result = zbx_db_select("select changelogid,clock from changelog"); while (NULL != (row = zbx_db_fetch(result))) { ZBX_DBROW2UINT64(changelog_local.changelogid, row[0]); changelog_local.clock = atoi(row[1]); zbx_hashset_insert(&dbsync_env.changelog, &changelog_local, sizeof(changelog_local)); changelog_num++; } } else { result = zbx_db_select("select changelogid,object,objectid,operation,clock from changelog"); while (NULL != (row = zbx_db_fetch(result))) { int operation; zbx_dbsync_journal_t *journal; zbx_dbsync_obj_changelog_t obj; ZBX_DBROW2UINT64(obj.changelog.changelogid, row[0]); if (NULL != zbx_hashset_search(&dbsync_env.changelog, &obj.changelog)) continue; obj.changelog.clock = atoi(row[4]); ZBX_DBROW2UINT64(obj.objectid, row[2]); operation = atoi(row[3]); journal = &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(atoi(row[1]))]; zbx_vector_dbsync_obj_changelog_append(&journal->changelog, obj); switch (operation) { case ZBX_DBSYNC_ROW_ADD: zbx_vector_uint64_append(&journal->inserts, obj.objectid); break; case ZBX_DBSYNC_ROW_UPDATE: zbx_vector_uint64_append(&journal->updates, obj.objectid); break; case ZBX_DBSYNC_ROW_REMOVE: zbx_vector_uint64_append(&journal->deletes, obj.objectid); break; } changelog_num++; } } zbx_db_free_result(result); for (i = 0; i < ARRSIZE(dbsync_env.journals); i++) { zbx_vector_uint64_sort(&dbsync_env.journals[i].inserts, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_sort(&dbsync_env.journals[i].updates, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_uniq(&dbsync_env.journals[i].updates, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_uint64_sort(&dbsync_env.journals[i].deletes, ZBX_DEFAULT_UINT64_COMPARE_FUNC); /* in the case multiple changelog records are registered for the same object */ /* the operation priority is delete, insert, update: */ /* delete - if object is removed any prior changes to it does not matter */ /* insert - if object was added and then updated, only the insert operation matters */ dbsync_remove_duplicate_ids(&dbsync_env.journals[i].inserts, &dbsync_env.journals[i].deletes); dbsync_remove_duplicate_ids(&dbsync_env.journals[i].updates, &dbsync_env.journals[i].deletes); dbsync_remove_duplicate_ids(&dbsync_env.journals[i].updates, &dbsync_env.journals[i].inserts); } zbx_vector_dbsync_create(&dbsync_env.changelog_dbsyncs); zbx_vector_dbsync_create(&dbsync_env.dbsyncs); return changelog_num; } static void dbsync_env_flush_journal(zbx_dbsync_journal_t *journal) { zbx_hashset_t objectids; int i, j, objects_num; if (0 == journal->changelog.values_num) return; objects_num = journal->inserts.values_num + journal->updates.values_num; for (i = 0; i < journal->syncs.values_num; i++) objects_num += journal->syncs.values[i]->rows.values_num; zbx_hashset_create(&objectids, (size_t)objects_num, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); for (j = 0; j < journal->syncs.values_num; j++) { for (i = 0; i < journal->syncs.values[j]->rows.values_num; i++) { zbx_dbsync_row_t *row = (zbx_dbsync_row_t *)journal->syncs.values[j]->rows.values[i]; zbx_hashset_insert(&objectids, &row->rowid, sizeof(row->rowid)); } } for (i = 0; i < journal->inserts.values_num; i++) zbx_hashset_insert(&objectids, &journal->inserts.values[i], sizeof(journal->inserts.values[i])); for (i = 0; i < journal->updates.values_num; i++) zbx_hashset_insert(&objectids, &journal->updates.values[i], sizeof(journal->updates.values[i])); for (i = 0; i < journal->changelog.values_num; i++) { if (NULL != zbx_hashset_search(&objectids, &journal->changelog.values[i].objectid)) { zbx_hashset_insert(&dbsync_env.changelog, &journal->changelog.values[i].changelog, sizeof(zbx_dbsync_changelog_t)); } } zbx_hashset_destroy(&objectids); } void zbx_dbsync_env_flush_changelog(void) { size_t i; for (i = 0; i < (int)ARRSIZE(dbsync_env.journals); i++) dbsync_env_flush_journal(&dbsync_env.journals[i]); zabbix_log(LOG_LEVEL_DEBUG, "%s() changelog : %d (%d slots)", __func__, dbsync_env.changelog.num_data, dbsync_env.changelog.num_slots); } void zbx_dbsync_env_clear(void) { size_t i; zbx_vector_dbsync_destroy(&dbsync_env.dbsyncs); zbx_vector_dbsync_destroy(&dbsync_env.changelog_dbsyncs); dbsync_prune_changelog(); zbx_hashset_destroy(&dbsync_env.strpool); for (i = 0; i < ARRSIZE(dbsync_env.journals); i++) dbsync_journal_destroy(&dbsync_env.journals[i]); } int zbx_dbsync_env_changelog_num(void) { return dbsync_env.changelog.num_data; } /****************************************************************************** * * * Purpose: check if any new records were synced from tables supporting * * changelog * * * * Return value: SUCCEED - there were new records synced * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_env_changelog_dbsyncs_new_records(void) { int i; for (i = 0; i < dbsync_env.changelog_dbsyncs.values_num; i++) { zbx_dbsync_t *sync = dbsync_env.changelog_dbsyncs.values[i]; if (0 != sync->add_num) return SUCCEED; } return FAIL; } /****************************************************************************** * * * Purpose: get rows changed since last sync * * * ******************************************************************************/ static int dbsync_get_rows(zbx_dbsync_t *sync, char **sql, size_t *sql_alloc, size_t *sql_offset, const char *field, const char *order_field, zbx_vector_uint64_t *ids, unsigned char tag) { zbx_db_row_t dbrow; zbx_db_result_t result; char **row; size_t sql_offset_reset = *sql_offset; zbx_uint64_t rowid, *batch; int batch_size; zbx_vector_uint64_t read_ids; zbx_vector_uint64_create(&read_ids); zbx_vector_uint64_reserve(&read_ids, (size_t)ids->values_num); for (batch = ids->values; batch < ids->values + ids->values_num; batch += ZBX_DBSYNC_BATCH_SIZE) { batch_size = MIN(ZBX_DBSYNC_BATCH_SIZE, ids->values + ids->values_num - batch); zbx_db_add_condition_alloc(sql, sql_alloc, sql_offset, field, batch, batch_size); if (NULL != order_field) zbx_snprintf_alloc(sql, sql_alloc, sql_offset, " order by %s", order_field); if (NULL == (result = zbx_db_select("%s", *sql))) return FAIL; *sql_offset = sql_offset_reset; while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(rowid, dbrow[0]); if (NULL != (row = dbsync_preproc_row(sync, dbrow))) dbsync_add_row(sync, rowid, tag, row); zbx_vector_uint64_append(&read_ids, rowid); } zbx_db_free_result(result); } zbx_vector_uint64_sort(&read_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); dbsync_remove_duplicate_ids(ids, &read_ids); zbx_vector_uint64_destroy(&read_ids); return SUCCEED; } /****************************************************************************** * * * Purpose: read query data based on changelog journal * * * ******************************************************************************/ static int dbsync_read_journal(zbx_dbsync_t *sync, char **sql, size_t *sql_alloc, size_t *sql_offset, const char *field, const char *keyword, const char *order_field, zbx_dbsync_journal_t *journal) { int i, inserts_num, updates_num; if (ZBX_DBSYNC_TYPE_CHANGELOG != sync->type) { /* sync objects using changelog must be initialized with zbx_dbsync_init_changelog() */ THIS_SHOULD_NEVER_HAPPEN; exit(EXIT_FAILURE); } zbx_vector_dbsync_append(&journal->syncs, sync); inserts_num = journal->inserts.values_num; updates_num = journal->updates.values_num; if (0 != journal->inserts.values_num || 0 != journal->updates.values_num) { zbx_chrcpy_alloc(sql, sql_alloc, sql_offset, ' '); zbx_strcpy_alloc(sql, sql_alloc, sql_offset, keyword); if (0 != journal->inserts.values_num) { if (FAIL == dbsync_get_rows(sync, sql, sql_alloc, sql_offset, field, order_field, &journal->inserts, ZBX_DBSYNC_ROW_ADD)) { return FAIL; } } if (0 != journal->updates.values_num) { if (FAIL == dbsync_get_rows(sync, sql, sql_alloc, sql_offset, field, order_field, &journal->updates, ZBX_DBSYNC_ROW_UPDATE)) { return FAIL; } } } for (i = 0; i < journal->deletes.values_num; i++) dbsync_add_row(sync, journal->deletes.values[i], ZBX_DBSYNC_ROW_REMOVE, NULL); /* the obtained object identifiers are removed from journal */ sync->add_num = (zbx_uint64_t)(inserts_num - journal->inserts.values_num); sync->update_num = (zbx_uint64_t)(updates_num - journal->updates.values_num); sync->remove_num = (zbx_uint64_t)journal->deletes.values_num; return SUCCEED; } /****************************************************************************** * * * Purpose: initializes changeset * * * ******************************************************************************/ static void dbsync_init(zbx_dbsync_t *sync, unsigned char mode) { sync->columns_num = 0; sync->mode = mode; sync->add_num = 0; sync->update_num = 0; sync->remove_num = 0; sync->sql_time = 0; sync->sync_time = 0; sync->sync_size = 0; sync->row = NULL; sync->preproc_row_func = NULL; zbx_vector_ptr_create(&sync->columns); if (ZBX_DBSYNC_UPDATE == sync->mode) { zbx_vector_ptr_create(&sync->rows); sync->row_index = -1; } else sync->dbresult = NULL; } /****************************************************************************** * * * Purpose: initializes changeset * * * ******************************************************************************/ void zbx_dbsync_init(zbx_dbsync_t *sync, const char *name, unsigned char mode) { dbsync_init(sync, mode); if (NULL != name) { sync->from = name; zbx_vector_dbsync_append(&dbsync_env.dbsyncs, sync); } sync->type = ZBX_DBSYNC_TYPE_DIFF; } /****************************************************************************** * * * Purpose: initializes changeset for tables using changelog * * * ******************************************************************************/ void zbx_dbsync_init_changelog(zbx_dbsync_t *sync, const char *name, unsigned char mode) { dbsync_init(sync, mode); sync->from = name; sync->type = ZBX_DBSYNC_TYPE_CHANGELOG; zbx_vector_dbsync_append(&dbsync_env.changelog_dbsyncs, sync); } /****************************************************************************** * * * Purpose: frees resources allocated by changeset * * * ******************************************************************************/ void zbx_dbsync_clear(zbx_dbsync_t *sync) { /* free the resources allocated by row pre-processing */ zbx_vector_ptr_clear_ext(&sync->columns, zbx_ptr_free); zbx_vector_ptr_destroy(&sync->columns); zbx_free(sync->row); if (ZBX_DBSYNC_UPDATE == sync->mode) { int i, j; zbx_dbsync_row_t *row; for (i = 0; i < sync->rows.values_num; i++) { row = (zbx_dbsync_row_t *)sync->rows.values[i]; if (NULL != row->row) { for (j = 0; j < sync->columns_num; j++) dbsync_strfree(row->row[j]); zbx_free(row->row); } zbx_free(row); } zbx_vector_ptr_destroy(&sync->rows); } else { zbx_db_free_result(sync->dbresult); sync->dbresult = NULL; } } /****************************************************************************** * * * Purpose: gets the number of rows * * * * Parameters: sync - [IN] changeset * * * * Return value: number of rows to sync * * * ******************************************************************************/ int zbx_dbsync_get_row_num(const zbx_dbsync_t *sync) { if (ZBX_DBSYNC_UPDATE == sync->mode) return sync->rows.values_num; return zbx_db_get_row_num(sync->dbresult); } /****************************************************************************** * * * Purpose: gets the next row from the changeset * * * * Parameters: sync - [IN] the changeset * * rowid - [OUT] the row identifier (required for row removal, * * optional for new/updated rows) * * row - [OUT] the row data * * tag - [OUT] the row tag, identifying changes * * (see ZBX_DBSYNC_ROW_* defines) * * * * Return value: SUCCEED - the next row was successfully retrieved * * FAIL - no more data to retrieve * * * ******************************************************************************/ int zbx_dbsync_next(zbx_dbsync_t *sync, zbx_uint64_t *rowid, char ***row, unsigned char *tag) { if (ZBX_DBSYNC_UPDATE == sync->mode) { zbx_dbsync_row_t *sync_row; if (++sync->row_index == sync->rows.values_num) return FAIL; sync_row = (zbx_dbsync_row_t *)sync->rows.values[sync->row_index]; *rowid = sync_row->rowid; *row = sync_row->row; *tag = sync_row->tag; } else { char **dbrow; if (NULL == (dbrow = zbx_db_fetch(sync->dbresult))) { *row = NULL; return FAIL; } *row = dbsync_preproc_row(sync, dbrow); *rowid = 0; *tag = ZBX_DBSYNC_ROW_ADD; sync->add_num++; } return SUCCEED; } /****************************************************************************** * * * Purpose: encode serialized expression to be returned as db field * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ static char *encode_expression(const zbx_eval_context_t *ctx) { unsigned char *data; size_t len; char *str = NULL; len = zbx_eval_serialize(ctx, NULL, &data); zbx_base64_encode_dyn((const char *)data, &str, len); zbx_free(data); return str; } /****************************************************************************** * * * Purpose: compares config table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_config(zbx_dbsync_t *sync) { zbx_db_result_t result; zbx_dcsync_sql_start(sync); #define SELECTED_CONFIG_FIELD_COUNT 44 /* number of columns in the following zbx_db_select() */ if (NULL == (result = zbx_db_select("select discovery_groupid,snmptrap_logging," "severity_name_0,severity_name_1,severity_name_2," "severity_name_3,severity_name_4,severity_name_5," "hk_events_mode,hk_events_trigger,hk_events_internal," "hk_events_discovery,hk_events_autoreg,hk_services_mode," "hk_services,hk_audit_mode,hk_audit,hk_sessions_mode,hk_sessions," "hk_history_mode,hk_history_global,hk_history,hk_trends_mode," "hk_trends_global,hk_trends,default_inventory_mode,db_extension,autoreg_tls_accept," "compression_status,compress_older,instanceid,default_timezone,hk_events_service," "auditlog_enabled,timeout_zabbix_agent,timeout_simple_check,timeout_snmp_agent," "timeout_external_check,timeout_db_monitor,timeout_http_agent,timeout_ssh_agent," "timeout_telnet_agent,timeout_script,auditlog_mode,timeout_browser" " from config" " order by configid"))) /* if you change number of columns in zbx_db_select(), */ /* adjust SELECTED_CONFIG_FIELD_COUNT */ { return FAIL; } dbsync_prepare(sync, SELECTED_CONFIG_FIELD_COUNT, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_db_free_result(result); /* global configuration will be always synchronized directly with database */ THIS_SHOULD_NEVER_HAPPEN; zbx_dcsync_sql_end(sync); return FAIL; #undef SELECTED_CONFIG_FIELD_COUNT } /****************************************************************************** * * * Purpose: compares 'config_autoreg_tls' table with cached configuration * * data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * * Comments: * * On success this function produces a changeset with 0 or 1 record * * because 'config_autoreg_tls' table can have no more than 1 record. * * If in future you want to support multiple autoregistration PSKs and/or * * select more columns in zbx_db_select() then do not forget to sync * * changes with DCsync_autoreg_config() !!! * * * ******************************************************************************/ int zbx_dbsync_compare_autoreg_psk(zbx_dbsync_t *sync) { zbx_db_result_t result; zbx_db_row_t dbrow; int num_records = 0; zbx_dcsync_sql_start(sync); #define CONFIG_AUTOREG_TLS_FIELD_COUNT 2 /* number of columns in the following zbx_db_select() */ if (NULL == (result = zbx_db_select("select tls_psk_identity,tls_psk" " from config_autoreg_tls" " order by autoreg_tlsid"))) /* if you change number of columns in zbx_db_select(), */ /* adjust CONFIG_AUTOREG_TLS_FIELD_COUNT */ { return FAIL; } dbsync_prepare(sync, CONFIG_AUTOREG_TLS_FIELD_COUNT, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } /* 0 or 1 records are expected */ if (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; if (FAIL == dbsync_compare_str(dbrow[0], dbsync_env.cache->autoreg_psk_identity) || FAIL == dbsync_compare_str(dbrow[1], dbsync_env.cache->autoreg_psk)) { tag = ZBX_DBSYNC_ROW_UPDATE; } if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, 0, tag, dbrow); /* fictitious rowid 0 is used, there is only 1 record */ num_records = 1; } else if ('\0' != dbsync_env.cache->autoreg_psk_identity[0]) dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_REMOVE, NULL); if (1 == num_records && NULL != zbx_db_fetch(result)) zabbix_log(LOG_LEVEL_ERR, "table 'config_autoreg_tls' has multiple records"); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; #undef CONFIG_AUTOREG_TLS_FIELD_COUNT } /****************************************************************************** * * * Purpose: reads autoreg hosts table in order to cache autoreg_host entries * * for hosts monitored directly by Zabbix server * * * * Parameter: sync - [OUT] result of select, only during initialization * * * * Return value: SUCCEED - entries retrieved or already synced * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_autoreg_host(zbx_dbsync_t *sync) { if (ZBX_DBSYNC_INIT != sync->mode) return SUCCEED; zbx_dcsync_sql_start(sync); if (NULL == (sync->dbresult = zbx_db_select( "select host,listen_ip,listen_dns,host_metadata,flags,listen_port" " from autoreg_host" " where proxyid is null"))) { return FAIL; } zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares hosts table with cached configuration data * * and populates the changeset * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_hosts(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select hostid,proxyid,host,ipmi_authtype,ipmi_privilege,ipmi_username,ipmi_password," "maintenance_status,maintenance_type,maintenance_from,status,name,tls_connect," "tls_accept,tls_issuer,tls_subject,tls_psk_identity,tls_psk,maintenanceid," "proxy_groupid,monitored_by" " from hosts" " where status in (%d,%d) and flags<>%d", HOST_STATUS_MONITORED, HOST_STATUS_NOT_MONITORED, ZBX_FLAG_DISCOVERY_PROTOTYPE); dbsync_prepare(sync, 21, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "hostid", "and", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HOST)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares host inventory table row with cached configuration data * * * * Parameter: hi - [IN] the cached host inventory data * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_host_inventory(const ZBX_DC_HOST_INVENTORY *hi, const zbx_db_row_t dbrow) { int i; if (SUCCEED != dbsync_compare_uchar(dbrow[1], hi->inventory_mode)) return FAIL; for (i = 0; i < HOST_INVENTORY_FIELD_COUNT; i++) { if (FAIL == dbsync_compare_str(dbrow[i + 2], hi->values[i])) return FAIL; } return SUCCEED; } /****************************************************************************** * * * Purpose: compares host_inventory table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_host_inventory(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; ZBX_DC_HOST_INVENTORY *hi; const char *sql; zbx_dcsync_sql_start(sync); sql = "select hostid,inventory_mode,type,type_full,name,alias,os,os_full,os_short,serialno_a," "serialno_b,tag,asset_tag,macaddress_a,macaddress_b,hardware,hardware_full,software," "software_full,software_app_a,software_app_b,software_app_c,software_app_d," "software_app_e,contact,location,location_lat,location_lon,notes,chassis,model," "hw_arch,vendor,contract_number,installer_name,deployment_status,url_a,url_b," "url_c,host_networks,host_netmask,host_router,oob_ip,oob_netmask,oob_router," "date_hw_purchase,date_hw_install,date_hw_expiry,date_hw_decomm,site_address_a," "site_address_b,site_address_c,site_city,site_state,site_country,site_zip,site_rack," "site_notes,poc_1_name,poc_1_email,poc_1_phone_a,poc_1_phone_b,poc_1_cell," "poc_1_screen,poc_1_notes,poc_2_name,poc_2_email,poc_2_phone_a,poc_2_phone_b," "poc_2_cell,poc_2_screen,poc_2_notes" " from host_inventory"; if (NULL == (result = zbx_db_select("%s", sql))) { zbx_dcsync_sql_end(sync); return FAIL; } dbsync_prepare(sync, 72, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->host_inventories.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (hi = (ZBX_DC_HOST_INVENTORY *)zbx_hashset_search(&dbsync_env.cache->host_inventories, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_host_inventory(hi, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->host_inventories, &iter); while (NULL != (hi = (ZBX_DC_HOST_INVENTORY *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &hi->hostid)) dbsync_add_row(sync, hi->hostid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares hosts_templates table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_host_templates(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_iter_t iter; zbx_um_host_t **phost; zbx_hashset_t htmpls; int i; zbx_uint64_pair_t ht_local, *ht; char hostid_s[MAX_ID_LEN + 1], templateid_s[MAX_ID_LEN + 1]; char *del_row[2] = {hostid_s, templateid_s}; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select hostid,templateid" " from hosts_templates" " order by hostid"))) { return FAIL; } dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&htmpls, 100, ZBX_DEFAULT_UINT64_PAIR_HASH_FUNC, ZBX_DEFAULT_UINT64_PAIR_COMPARE_FUNC); /* index all host->template links */ zbx_hashset_iter_reset(&dbsync_env.cache->um_cache->hosts, &iter); while (NULL != (phost = (zbx_um_host_t **)zbx_hashset_iter_next(&iter))) { ht_local.first = (*phost)->hostid; for (i = 0; i < (*phost)->templateids.values_num; i++) { ht_local.second = (*phost)->templateids.values[i]; zbx_hashset_insert(&htmpls, &ht_local, sizeof(ht_local)); } } /* add new rows, remove existing rows from index */ while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(ht_local.first, dbrow[0]); ZBX_STR2UINT64(ht_local.second, dbrow[1]); if (NULL == (ht = (zbx_uint64_pair_t *)zbx_hashset_search(&htmpls, &ht_local))) dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_ADD, dbrow); else zbx_hashset_remove_direct(&htmpls, ht); } /* add removed rows */ zbx_hashset_iter_reset(&htmpls, &iter); while (NULL != (ht = (zbx_uint64_pair_t *)zbx_hashset_iter_next(&iter))) { zbx_snprintf(hostid_s, sizeof(hostid_s), ZBX_FS_UI64, ht->first); zbx_snprintf(templateid_s, sizeof(templateid_s), ZBX_FS_UI64, ht->second); dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_REMOVE, del_row); } zbx_db_free_result(result); zbx_hashset_destroy(&htmpls); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares global macro table row with cached configuration data * * * * Parameter: gmacro - [IN] the cached global macro data * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_global_macro(const zbx_um_macro_t *gmacro, const zbx_db_row_t dbrow) { char *macro = NULL, *context = NULL; int ret = FAIL; unsigned char context_op; if (FAIL == dbsync_compare_uchar(dbrow[3], gmacro->type)) return FAIL; if (ZBX_MACRO_VALUE_VAULT == atoi(dbrow[3])) { if (FAIL == um_macro_check_vault_location(gmacro, dbrow[2])) return FAIL; } else { if (FAIL == dbsync_compare_str(dbrow[2], gmacro->value)) return FAIL; } if (SUCCEED != zbx_user_macro_parse_dyn(dbrow[1], ¯o, &context, NULL, &context_op)) return FAIL; if (0 != strcmp(gmacro->name, macro)) goto out; if (NULL == context) { if (NULL != gmacro->context) goto out; ret = SUCCEED; goto out; } if (NULL == gmacro->context) goto out; if (gmacro->context_op != context_op) goto out; if (0 == strcmp(gmacro->context, context)) ret = SUCCEED; out: zbx_free(macro); zbx_free(context); return ret; } /****************************************************************************** * * * Purpose: compares global macros table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_global_macros(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid, *prowid = &rowid; zbx_um_macro_t **pmacro; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select globalmacroid,macro,value,type" " from globalmacro"))) { return FAIL; } dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->gmacros.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (pmacro = (zbx_um_macro_t **)zbx_hashset_search(&dbsync_env.cache->gmacros, &prowid))) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_global_macro(*pmacro, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->gmacros, &iter); while (NULL != (pmacro = (zbx_um_macro_t **)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &(*pmacro)->macroid)) dbsync_add_row(sync, (*pmacro)->macroid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares host macro table row with cached configuration data * * * * Parameter: hmacro - [IN] the cached host macro data * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_host_macro(const zbx_um_macro_t *hmacro, const zbx_db_row_t dbrow) { char *macro = NULL, *context = NULL; int ret = FAIL; unsigned char context_op; if (FAIL == dbsync_compare_uchar(dbrow[4], hmacro->type)) return FAIL; if (ZBX_MACRO_VALUE_VAULT == atoi(dbrow[4])) { if (FAIL == um_macro_check_vault_location(hmacro, dbrow[3])) return FAIL; } else { if (FAIL == dbsync_compare_str(dbrow[3], hmacro->value)) return FAIL; } if (FAIL == dbsync_compare_uint64(dbrow[1], hmacro->hostid)) return FAIL; if (SUCCEED != zbx_user_macro_parse_dyn(dbrow[2], ¯o, &context, NULL, &context_op)) return FAIL; if (0 != strcmp(hmacro->name, macro)) goto out; if (NULL == context) { if (NULL != hmacro->context) goto out; ret = SUCCEED; goto out; } if (NULL == hmacro->context) goto out; if (context_op != hmacro->context_op) goto out; if (0 == strcmp(hmacro->context, context)) ret = SUCCEED; out: zbx_free(macro); zbx_free(context); return ret; } /****************************************************************************** * * * Purpose: compares global macros table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_host_macros(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid, *prowid = &rowid; zbx_um_macro_t **pmacro; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select hostmacroid,hostid,macro,value,type from hostmacro"))) return FAIL; dbsync_prepare(sync, 5, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->hmacros.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (pmacro = (zbx_um_macro_t **)zbx_hashset_search(&dbsync_env.cache->hmacros, &prowid))) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_host_macro(*pmacro, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->hmacros, &iter); while (NULL != (pmacro = (zbx_um_macro_t **)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &(*pmacro)->macroid)) dbsync_add_row(sync, (*pmacro)->macroid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares interface table row with cached configuration data * * * * Parameter: interface - [IN] the cached interface data * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * * Comments: User macros used in ip, dns fields will always make compare to * * fail. * * * ******************************************************************************/ static int dbsync_compare_interface(const ZBX_DC_INTERFACE *interface, const zbx_db_row_t dbrow) { ZBX_DC_SNMPINTERFACE *snmp; if (FAIL == dbsync_compare_uint64(dbrow[1], interface->hostid)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[2], interface->type)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[3], interface->main)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[4], interface->useip)) return FAIL; if (NULL != strstr(dbrow[5], "{$")) return FAIL; if (FAIL == dbsync_compare_str(dbrow[5], interface->ip)) return FAIL; if (NULL != strstr(dbrow[6], "{$")) return FAIL; if (FAIL == dbsync_compare_str(dbrow[6], interface->dns)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[7], interface->port)) return FAIL; /* reset_availability, items_num and availability_ts are excluded from the comparison */ snmp = (ZBX_DC_SNMPINTERFACE *)zbx_hashset_search(&dbsync_env.cache->interfaces_snmp, &interface->interfaceid); if (INTERFACE_TYPE_SNMP == interface->type) { if (NULL == snmp || SUCCEED == zbx_db_is_null(dbrow[12])) /* should never happen */ return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[12], snmp->version)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[13], snmp->bulk)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[14], snmp->community)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[15], snmp->securityname)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[16], snmp->securitylevel)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[17], snmp->authpassphrase)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[18], snmp->privpassphrase)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[19], snmp->authprotocol)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[20], snmp->privprotocol)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[21], snmp->contextname)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[22], snmp->max_repetitions)) return FAIL; } else if (NULL != snmp) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: applies necessary preprocessing before row is compared/used * * * * Parameter: sync - [IN] changeset * * row - [IN] row to preprocess * * * * Return value: the preprocessed row * * * * Comments: The row preprocessing can be used to expand user macros in * * some columns. * * * ******************************************************************************/ static char **dbsync_interface_preproc_row(zbx_dbsync_t *sync, char **row) { zbx_uint64_t hostid; /* get associated host identifier */ ZBX_STR2UINT64(hostid, row[1]); memcpy(sync->row, row, sizeof(char *) * (size_t)sync->columns_num); row = sync->row; /* expand user macros */ if (NULL != strstr(row[5], "{$")) { char *addr; addr = dc_expand_user_and_func_macros_dyn(row[5], &hostid, 1, ZBX_MACRO_ENV_NONSECURE); if (SUCCEED == zbx_is_ip(addr)) row[5] = addr; else zbx_free(addr); } if (NULL != strstr(row[6], "{$")) { char *addr; addr = dc_expand_user_and_func_macros_dyn(row[6], &hostid, 1, ZBX_MACRO_ENV_NONSECURE); if (SUCCEED == zbx_is_ip(addr) || SUCCEED == zbx_validate_hostname(addr)) row[6] = addr; else zbx_free(addr); } return sync->row; } /****************************************************************************** * * * Purpose: compares interfaces table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_interfaces(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; ZBX_DC_INTERFACE *interface; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select i.interfaceid,i.hostid,i.type,i.main,i.useip,i.ip,i.dns,i.port," "i.available,i.disable_until,i.error,i.errors_from," "s.version,s.bulk,s.community,s.securityname,s.securitylevel,s.authpassphrase,s.privpassphrase," "s.authprotocol,s.privprotocol,s.contextname,s.max_repetitions" " from interface i" " left join interface_snmp s on i.interfaceid=s.interfaceid"))) { return FAIL; } dbsync_prepare(sync, 23, dbsync_interface_preproc_row); if (ZBX_DBSYNC_INIT == sync->mode) { zbx_dcsync_sql_end(sync); sync->dbresult = result; return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->interfaces.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; char **row; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); row = dbsync_preproc_row(sync, dbrow); if (NULL == (interface = (ZBX_DC_INTERFACE *)zbx_hashset_search(&dbsync_env.cache->interfaces, &rowid))) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_interface(interface, row)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, row); } zbx_hashset_iter_reset(&dbsync_env.cache->interfaces, &iter); while (NULL != (interface = (ZBX_DC_INTERFACE *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &interface->interfaceid)) dbsync_add_row(sync, interface->interfaceid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: applies necessary preprocessing before row is compared/used * * * * Parameter: sync - [IN] the changeset * * row - [IN] the row to preprocess * * * * Return value: the preprocessed row * * * * Comments: The row preprocessing can be used to expand user macros in * * some columns. * * * ******************************************************************************/ static char **dbsync_item_preproc_row(zbx_dbsync_t *sync, char **row) { unsigned char type; ZBX_STR2UCHAR(type, row[3]); /* expand user macros */ if (ITEM_TYPE_CALCULATED == type) { zbx_eval_context_t ctx; char *error = NULL; /* copy the original data */ memcpy(sync->row, row, sizeof(char *) * (size_t)sync->columns_num); row = sync->row; if (FAIL == zbx_eval_parse_expression(&ctx, row[11], ZBX_EVAL_PARSE_CALC_EXPRESSION, &error)) { zbx_eval_set_exception(&ctx, zbx_dsprintf(NULL, "Cannot parse formula: %s", error)); zbx_free(error); } row[49] = encode_expression(&ctx); zbx_eval_clear(&ctx); return sync->row; } return row; } /****************************************************************************** * * * Purpose: compares items table with cached configuration data * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_items(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select i.itemid,i.hostid,i.status,i.type,i.value_type,i.key_,i.snmp_oid,i.ipmi_sensor,i.delay," "i.trapper_hosts,i.logtimefmt,i.params,ir.state,i.authtype,i.username,i.password," "i.publickey,i.privatekey,i.flags,i.interfaceid,ir.lastlogsize,ir.mtime," "i.history,i.trends,i.inventory_link,i.valuemapid,i.units,ir.error,i.jmx_endpoint," "i.master_itemid,i.timeout,i.url,i.query_fields,i.posts,i.status_codes," "i.follow_redirects,i.post_type,i.http_proxy,i.headers,i.retrieve_mode," "i.request_method,i.output_format,i.ssl_cert_file,i.ssl_key_file,i.ssl_key_password," "i.verify_peer,i.verify_host,i.allow_traps,i.templateid,null" " from items i" " left join item_rtdata ir on i.itemid=ir.itemid"); dbsync_prepare(sync, 50, dbsync_item_preproc_row); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "i.itemid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_ITEM)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } static int dbsync_compare_item_discovery(const ZBX_DC_ITEM_DISCOVERY *item_discovery, const zbx_db_row_t dbrow) { return dbsync_compare_uint64(dbrow[1], item_discovery->parent_itemid); } /****************************************************************************** * * * Purpose: compares mapping between items, prototypes and rules with * * configuration cache * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_item_discovery(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; ZBX_DC_ITEM_DISCOVERY *item_discovery; char **row; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select itemid,parent_itemid from item_discovery"))) return FAIL; dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { zbx_dcsync_sql_end(sync); sync->dbresult = result; return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->item_discovery.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); row = dbsync_preproc_row(sync, dbrow); if (NULL == (item_discovery = (ZBX_DC_ITEM_DISCOVERY *)zbx_hashset_search( &dbsync_env.cache->item_discovery, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_item_discovery(item_discovery, row)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, row); } zbx_hashset_iter_reset(&dbsync_env.cache->item_discovery, &iter); while (NULL != (item_discovery = (ZBX_DC_ITEM_DISCOVERY *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &item_discovery->itemid)) dbsync_add_row(sync, item_discovery->itemid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: applies necessary preprocessing before row is compared/used * * * * Parameter: sync - [IN] the changeset * * row - [IN] the row to preprocess * * * * Return value: the preprocessed row * * * * Comments: The row preprocessing can be used to expand user macros in * * some columns. * * During preprocessing trigger expression/recovery expression are * * parsed, serialized and stored as base64 strings into 16,17 * * columns. * * * ******************************************************************************/ static char **dbsync_trigger_preproc_row(zbx_dbsync_t *sync, char **row) { zbx_eval_context_t ctx, ctx_r; char *error = NULL; unsigned char mode, timer = ZBX_TRIGGER_TIMER_DEFAULT, flags; ZBX_STR2UCHAR(flags, row[19]); if (ZBX_FLAG_DISCOVERY_PROTOTYPE == flags) return row; memcpy(sync->row, row, sizeof(char *) * (size_t)sync->columns_num); row = sync->row; if (FAIL == zbx_eval_parse_expression(&ctx, row[2], ZBX_EVAL_TRIGGER_EXPRESSION, &error)) { zbx_eval_set_exception(&ctx, zbx_dsprintf(NULL, "cannot parse trigger expression: %s", error)); zbx_free(error); } else { if (SUCCEED == zbx_eval_check_timer_functions(&ctx)) timer |= ZBX_TRIGGER_TIMER_EXPRESSION; } ZBX_STR2UCHAR(mode, row[10]); if (TRIGGER_RECOVERY_MODE_RECOVERY_EXPRESSION == mode) { if (FAIL == zbx_eval_parse_expression(&ctx_r, row[11], ZBX_EVAL_TRIGGER_EXPRESSION, &error)) { zbx_eval_set_exception(&ctx_r, zbx_dsprintf(NULL, "cannot parse trigger recovery" " expression: %s", error)); zbx_free(error); } else { if (SUCCEED == zbx_eval_check_timer_functions(&ctx_r)) timer |= ZBX_TRIGGER_TIMER_RECOVERY_EXPRESSION; } } row[16] = encode_expression(&ctx); zbx_eval_clear(&ctx); if (TRIGGER_RECOVERY_MODE_RECOVERY_EXPRESSION == mode) { row[17] = encode_expression(&ctx_r); zbx_eval_clear(&ctx_r); } row[18] = zbx_dsprintf(NULL, "%d", timer); return sync->row; } /****************************************************************************** * * * Purpose: compares triggers table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * * Comment: The 16th and 17th fields (starting with 0) are placeholders for * * serialized expression/recovery expression. * * The 18th field is placeholder for trigger timer flag (set if * * expression/recovery expression contains timer functions). * * * ******************************************************************************/ int zbx_dbsync_compare_triggers(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select triggerid,description,expression,error,priority,type,value,state,lastchange,status," "recovery_mode,recovery_expression,correlation_mode,correlation_tag,opdata,event_name,null," "null,null,flags" " from triggers"); dbsync_prepare(sync, 20, dbsync_trigger_preproc_row); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "triggerid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_TRIGGER)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares trigger_depends table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_trigger_dependency(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t deps; zbx_hashset_iter_t iter; ZBX_DC_TRIGGER_DEPLIST *dep_down, *dep_up; zbx_uint64_pair_t *dep, dep_local; char down_s[MAX_ID_LEN + 1], up_s[MAX_ID_LEN + 1]; char *del_row[2] = {down_s, up_s}; int i; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select triggerid_down,triggerid_up from trigger_depends"))) return FAIL; dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&deps, 100, ZBX_DEFAULT_UINT64_PAIR_HASH_FUNC, ZBX_DEFAULT_UINT64_PAIR_COMPARE_FUNC); /* index all host->template links */ zbx_hashset_iter_reset(&dbsync_env.cache->trigdeps, &iter); while (NULL != (dep_down = (ZBX_DC_TRIGGER_DEPLIST *)zbx_hashset_iter_next(&iter))) { dep_local.first = dep_down->triggerid; for (i = 0; i < dep_down->dependencies.values_num; i++) { dep_up = (ZBX_DC_TRIGGER_DEPLIST *)dep_down->dependencies.values[i]; dep_local.second = dep_up->triggerid; zbx_hashset_insert(&deps, &dep_local, sizeof(dep_local)); } } /* add new rows, remove existing rows from index */ while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(dep_local.first, dbrow[0]); ZBX_STR2UINT64(dep_local.second, dbrow[1]); if (NULL == (dep = (zbx_uint64_pair_t *)zbx_hashset_search(&deps, &dep_local))) dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_ADD, dbrow); else zbx_hashset_remove_direct(&deps, dep); } /* add removed rows */ zbx_hashset_iter_reset(&deps, &iter); while (NULL != (dep = (zbx_uint64_pair_t *)zbx_hashset_iter_next(&iter))) { zbx_snprintf(down_s, sizeof(down_s), ZBX_FS_UI64, dep->first); zbx_snprintf(up_s, sizeof(up_s), ZBX_FS_UI64, dep->second); dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_REMOVE, del_row); } zbx_db_free_result(result); zbx_hashset_destroy(&deps); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: applies necessary preprocessing before row is compared/used * * * * Parameter: sync - [IN] the changeset * * row - [IN] the row to preprocess * * * * Return value: the preprocessed row * * * * Comments: The row preprocessing can be used to expand user macros in * * some columns. * * * ******************************************************************************/ static char **dbsync_function_preproc_row(zbx_dbsync_t *sync, char **row) { const char *row3; memcpy(sync->row, row, sizeof(char *) * (size_t)sync->columns_num); row = sync->row; /* first parameter is /host/key placeholder $, don't cache it */ if (NULL == (row3 = strchr(row[3], ','))) row3 = ""; else row3++; row[3] = zbx_strdup(NULL, row3); return sync->row; } /****************************************************************************** * * * Purpose: compares functions table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_functions(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select functionid,itemid,name,parameter,triggerid from functions"); dbsync_prepare(sync, 5, dbsync_function_preproc_row); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "functionid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_FUNCTION)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares expressions table row with cached configuration data * * * * Parameter: expression - [IN] the cached expression * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_expression(const ZBX_DC_EXPRESSION *expression, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_str(dbrow[0], expression->regexp)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[2], expression->expression)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[3], expression->type)) return FAIL; if (*dbrow[4] != expression->delimiter) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[5], expression->case_sensitive)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares expressions, regexps tables with cached configuration * * data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_expressions(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; ZBX_DC_EXPRESSION *expression; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select r.name,e.expressionid,e.expression,e.expression_type,e.exp_delimiter,e.case_sensitive" " from regexps r,expressions e" " where r.regexpid=e.regexpid"))) { return FAIL; } dbsync_prepare(sync, 6, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->expressions.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[1]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (expression = (ZBX_DC_EXPRESSION *)zbx_hashset_search(&dbsync_env.cache->expressions, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_expression(expression, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->expressions, &iter); while (NULL != (expression = (ZBX_DC_EXPRESSION *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &expression->expressionid)) dbsync_add_row(sync, expression->expressionid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares actions table row with cached configuration data * * * * Parameter: action - [IN] the cached action * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_action(const zbx_dc_action_t *action, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uchar(dbrow[1], action->eventsource)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[2], action->evaltype)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[3], action->formula)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares actions table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_actions(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_action_t *action; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select actionid,eventsource,evaltype,formula" " from actions" " where eventsource<>%d" " and status=%d", EVENT_SOURCE_SERVICE, ZBX_ACTION_STATUS_ACTIVE))) { return FAIL; } dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->actions.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (action = (zbx_dc_action_t *)zbx_hashset_search(&dbsync_env.cache->actions, &rowid))) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_action(action, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->actions, &iter); while (NULL != (action = (zbx_dc_action_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &action->actionid)) dbsync_add_row(sync, action->actionid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares action operation class and flushes update row if * * necessary * * * * Parameter: sync - [OUT] the changeset * * actionid - [IN] the action identifier * * opflags - [IN] the action operation class flags * * * ******************************************************************************/ static void dbsync_compare_action_op(zbx_dbsync_t *sync, zbx_uint64_t actionid, unsigned char opflags) { zbx_dc_action_t *action; if (0 == actionid) return; if (NULL == (action = (zbx_dc_action_t *)zbx_hashset_search(&dbsync_env.cache->actions, &actionid)) || opflags != action->opflags) { char actionid_s[MAX_ID_LEN], opflags_s[MAX_ID_LEN]; char *row[] = {actionid_s, opflags_s}; zbx_snprintf(actionid_s, sizeof(actionid_s), ZBX_FS_UI64, actionid); zbx_snprintf(opflags_s, sizeof(opflags_s), "%d", opflags); dbsync_add_row(sync, actionid, ZBX_DBSYNC_ROW_UPDATE, (zbx_db_row_t)row); } } /****************************************************************************** * * * Purpose: compares actions by operation class * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_action_ops(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_uint64_t rowid, actionid = 0; unsigned char opflags = ZBX_ACTION_OPCLASS_NONE; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select a.actionid,o.recovery" " from actions a" " left join operations o" " on a.actionid=o.actionid" " where a.status=%d" " group by a.actionid,o.recovery" " order by a.actionid", ZBX_ACTION_STATUS_ACTIVE))) { return FAIL; } dbsync_prepare(sync, 2, NULL); while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(rowid, dbrow[0]); if (actionid != rowid) { dbsync_compare_action_op(sync, actionid, opflags); actionid = rowid; opflags = ZBX_ACTION_OPCLASS_NONE; } if (SUCCEED == zbx_db_is_null(dbrow[1])) continue; switch (atoi(dbrow[1])) { case 0: opflags |= ZBX_ACTION_OPCLASS_NORMAL; break; case 1: opflags |= ZBX_ACTION_OPCLASS_RECOVERY; break; case 2: opflags |= ZBX_ACTION_OPCLASS_ACKNOWLEDGE; break; } } dbsync_compare_action_op(sync, actionid, opflags); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares conditions table row with cached configuration data * * * * Parameter: condition - [IN] the cached action condition * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_action_condition(const zbx_dc_action_condition_t *condition, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uchar(dbrow[2], condition->conditiontype)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[3], condition->op)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[4], condition->value)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[5], condition->value2)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares conditions table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_action_conditions(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_action_condition_t *condition; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select c.conditionid,c.actionid,c.conditiontype,c.operator,c.value,c.value2" " from conditions c,actions a" " where c.actionid=a.actionid" " and a.status=%d", ZBX_ACTION_STATUS_ACTIVE))) { return FAIL; } dbsync_prepare(sync, 6, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->action_conditions.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (condition = (zbx_dc_action_condition_t *)zbx_hashset_search( &dbsync_env.cache->action_conditions, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_action_condition(condition, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->action_conditions, &iter); while (NULL != (condition = (zbx_dc_action_condition_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &condition->conditionid)) dbsync_add_row(sync, condition->conditionid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares trigger tags table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_trigger_tags(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select triggertagid,triggerid,tag,value from trigger_tag"); dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "triggertagid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_TRIGGER_TAG)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares item tags table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_item_tags(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemtagid,itemid,tag,value from item_tag"); dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "itemtagid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_ITEM_TAG)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares host tags table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_host_tags(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select hosttagid,hostid,tag,value from host_tag"); dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "hosttagid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HOST_TAG)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares correlation table row with cached configuration data * * * * Parameter: correlation - [IN] the cached correlation rule * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_correlation(const zbx_dc_correlation_t *correlation, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_str(dbrow[1], correlation->name)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[2], correlation->evaltype)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[3], correlation->formula)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares correlation table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_correlations(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_correlation_t *correlation; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select correlationid,name,evaltype,formula" " from correlation" " where status=%d", ZBX_CORRELATION_ENABLED))) { return FAIL; } dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->correlations.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (correlation = (zbx_dc_correlation_t *)zbx_hashset_search(&dbsync_env.cache->correlations, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_correlation(correlation, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->correlations, &iter); while (NULL != (correlation = (zbx_dc_correlation_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &correlation->correlationid)) dbsync_add_row(sync, correlation->correlationid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares correlation condition tables dbrow with cached * * configuration data * * * * Parameter: corr_condition - [IN] the cached correlation condition * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_corr_condition(const zbx_dc_corr_condition_t *corr_condition, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uint64(dbrow[1], corr_condition->correlationid)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[2], corr_condition->type)) return FAIL; switch (corr_condition->type) { case ZBX_CORR_CONDITION_OLD_EVENT_TAG: /* break; is not missing here */ case ZBX_CORR_CONDITION_NEW_EVENT_TAG: if (FAIL == dbsync_compare_str(dbrow[3], corr_condition->data.tag.tag)) return FAIL; break; case ZBX_CORR_CONDITION_OLD_EVENT_TAG_VALUE: /* break; is not missing here */ case ZBX_CORR_CONDITION_NEW_EVENT_TAG_VALUE: if (FAIL == dbsync_compare_str(dbrow[4], corr_condition->data.tag_value.tag)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[5], corr_condition->data.tag_value.value)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[6], corr_condition->data.tag_value.op)) return FAIL; break; case ZBX_CORR_CONDITION_NEW_EVENT_HOSTGROUP: if (FAIL == dbsync_compare_uint64(dbrow[7], corr_condition->data.group.groupid)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[8], corr_condition->data.group.op)) return FAIL; break; case ZBX_CORR_CONDITION_EVENT_TAG_PAIR: if (FAIL == dbsync_compare_str(dbrow[9], corr_condition->data.tag_pair.oldtag)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[10], corr_condition->data.tag_pair.newtag)) return FAIL; break; } return SUCCEED; } /****************************************************************************** * * * Purpose: compares correlation condition tables with cached configuration * * data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_corr_conditions(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_corr_condition_t *corr_condition; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select cc.corr_conditionid,cc.correlationid,cc.type,cct.tag,cctv.tag,cctv.value,cctv.operator," " ccg.groupid,ccg.operator,cctp.oldtag,cctp.newtag" " from correlation c,corr_condition cc" " left join corr_condition_tag cct" " on cct.corr_conditionid=cc.corr_conditionid" " left join corr_condition_tagvalue cctv" " on cctv.corr_conditionid=cc.corr_conditionid" " left join corr_condition_group ccg" " on ccg.corr_conditionid=cc.corr_conditionid" " left join corr_condition_tagpair cctp" " on cctp.corr_conditionid=cc.corr_conditionid" " where c.correlationid=cc.correlationid" " and c.status=%d", ZBX_CORRELATION_ENABLED))) { return FAIL; } dbsync_prepare(sync, 11, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->corr_conditions.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (corr_condition = (zbx_dc_corr_condition_t *)zbx_hashset_search( &dbsync_env.cache->corr_conditions, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_corr_condition(corr_condition, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->corr_conditions, &iter); while (NULL != (corr_condition = (zbx_dc_corr_condition_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &corr_condition->corr_conditionid)) dbsync_add_row(sync, corr_condition->corr_conditionid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares correlation operation tables dbrow with cached * * configuration data * * * * Parameter: corr_operation - [IN] the cached correlation operation * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_corr_operation(const zbx_dc_corr_operation_t *corr_operation, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uint64(dbrow[1], corr_operation->correlationid)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[2], corr_operation->type)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares correlation operation tables with cached configuration * * data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_corr_operations(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_corr_operation_t *corr_operation; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select co.corr_operationid,co.correlationid,co.type" " from correlation c,corr_operation co" " where c.correlationid=co.correlationid" " and c.status=%d", ZBX_CORRELATION_ENABLED))) { return FAIL; } dbsync_prepare(sync, 3, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->corr_operations.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (corr_operation = (zbx_dc_corr_operation_t *)zbx_hashset_search( &dbsync_env.cache->corr_operations, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_corr_operation(corr_operation, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->corr_operations, &iter); while (NULL != (corr_operation = (zbx_dc_corr_operation_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &corr_operation->corr_operationid)) dbsync_add_row(sync, corr_operation->corr_operationid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares host group table row with cached configuration data * * * * Parameter: group - [IN] the cached host group * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_host_group(const zbx_dc_hostgroup_t *group, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_str(dbrow[1], group->name)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares host groups table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_host_groups(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_hostgroup_t *group; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select groupid,name from hstgrp where type=%d", HOSTGROUP_TYPE_HOST))) return FAIL; dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->hostgroups.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); if (NULL == (group = (zbx_dc_hostgroup_t *)zbx_hashset_search(&dbsync_env.cache->hostgroups, &rowid))) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_host_group(group, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->hostgroups, &iter); while (NULL != (group = (zbx_dc_hostgroup_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &group->groupid)) dbsync_add_row(sync, group->groupid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares item preproc tables with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_item_preprocs(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select item_preprocid,itemid,type,params,step,error_handler,error_handler_params" " from item_preproc"); dbsync_prepare(sync, 7, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "item_preprocid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_ITEM_PREPROC)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: compares maintenance table row with cached configuration data * * * * Parameter: maintenance - [IN] the cached maintenance data * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_maintenance(const zbx_dc_maintenance_t *maintenance, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uchar(dbrow[1], maintenance->type)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[2], maintenance->active_since)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[3], maintenance->active_until)) return FAIL; if (FAIL == dbsync_compare_uchar(dbrow[4], maintenance->tags_evaltype)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares item params table row with cached configuration data * * * * Parameter: script - [IN] the cached item script * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_items_param(const zbx_dc_item_param_t *item_param, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uint64(dbrow[1], item_param->itemid)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[2], item_param->name)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[3], item_param->value)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares item_parameter table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_item_script_param(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_item_param_t *items_params; char **row; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select p.item_parameterid,p.itemid,p.name,p.value,i.hostid" " from item_parameter p,items i,hosts h" " where p.itemid=i.itemid" " and i.hostid=h.hostid" " and h.status in (%d,%d)" " and i.flags<>%d" " order by p.itemid", HOST_STATUS_MONITORED, HOST_STATUS_NOT_MONITORED, ZBX_FLAG_DISCOVERY_PROTOTYPE))) { return FAIL; } dbsync_prepare(sync, 5, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->items_params.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); row = dbsync_preproc_row(sync, dbrow); if (NULL == (items_params = (zbx_dc_item_param_t *) zbx_hashset_search(&dbsync_env.cache->items_params, &rowid))) { tag = ZBX_DBSYNC_ROW_ADD; } else if (FAIL == dbsync_compare_items_param(items_params, row)) { tag = ZBX_DBSYNC_ROW_UPDATE; } if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, row); } zbx_hashset_iter_reset(&dbsync_env.cache->items_params, &iter); while (NULL != (items_params = (zbx_dc_item_param_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &items_params->item_script_paramid)) dbsync_add_row(sync, items_params->item_script_paramid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares maintenances table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_maintenances(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_maintenance_t *maintenance; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select maintenanceid,maintenance_type,active_since,active_till,tags_evaltype" " from maintenances"))) { return FAIL; } dbsync_prepare(sync, 5, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->maintenances.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); maintenance = (zbx_dc_maintenance_t *)zbx_hashset_search(&dbsync_env.cache->maintenances, &rowid); if (NULL == maintenance) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_maintenance(maintenance, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->maintenances, &iter); while (NULL != (maintenance = (zbx_dc_maintenance_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &maintenance->maintenanceid)) dbsync_add_row(sync, maintenance->maintenanceid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares maintenance_tag table row with cached configuration data * * * * Parameter: maintenance_tag - [IN] the cached maintenance tag * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_maintenance_tag(const zbx_dc_maintenance_tag_t *maintenance_tag, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_int(dbrow[2], maintenance_tag->op)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[3], maintenance_tag->tag)) return FAIL; if (FAIL == dbsync_compare_str(dbrow[4], maintenance_tag->value)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares maintenances table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_maintenance_tags(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_maintenance_tag_t *maintenance_tag; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select maintenancetagid,maintenanceid,operator,tag,value" " from maintenance_tag"))) { return FAIL; } dbsync_prepare(sync, 5, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->maintenance_tags.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); maintenance_tag = (zbx_dc_maintenance_tag_t *)zbx_hashset_search(&dbsync_env.cache->maintenance_tags, &rowid); if (NULL == maintenance_tag) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_maintenance_tag(maintenance_tag, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->maintenance_tags, &iter); while (NULL != (maintenance_tag = (zbx_dc_maintenance_tag_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &maintenance_tag->maintenancetagid)) dbsync_add_row(sync, maintenance_tag->maintenancetagid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares maintenance_period table row with cached configuration * * dat * * * * Parameter: period - [IN] the cached maintenance period * * dbrow - [IN] the database row * * * * Return value: SUCCEED - the row matches configuration data * * FAIL - otherwise * * * ******************************************************************************/ static int dbsync_compare_maintenance_period(const zbx_dc_maintenance_period_t *period, const zbx_db_row_t dbrow) { if (FAIL == dbsync_compare_uchar(dbrow[1], period->type)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[2], period->every)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[3], period->month)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[4], period->dayofweek)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[5], period->day)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[6], period->start_time)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[7], period->period)) return FAIL; if (FAIL == dbsync_compare_int(dbrow[8], period->start_date)) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: compares timeperiods table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_maintenance_periods(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_t ids; zbx_hashset_iter_t iter; zbx_uint64_t rowid; zbx_dc_maintenance_period_t *period; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select t.timeperiodid,t.timeperiod_type,t.every,t.month,t.dayofweek,t.day," "t.start_time,t.period,t.start_date,m.maintenanceid" " from maintenances_windows m,timeperiods t" " where t.timeperiodid=m.timeperiodid"))) { return FAIL; } dbsync_prepare(sync, 10, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&ids, (size_t)dbsync_env.cache->maintenance_periods.num_data, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); while (NULL != (dbrow = zbx_db_fetch(result))) { unsigned char tag = ZBX_DBSYNC_ROW_NONE; ZBX_STR2UINT64(rowid, dbrow[0]); zbx_hashset_insert(&ids, &rowid, sizeof(rowid)); period = (zbx_dc_maintenance_period_t *)zbx_hashset_search(&dbsync_env.cache->maintenance_periods, &rowid); if (NULL == period) tag = ZBX_DBSYNC_ROW_ADD; else if (FAIL == dbsync_compare_maintenance_period(period, dbrow)) tag = ZBX_DBSYNC_ROW_UPDATE; if (ZBX_DBSYNC_ROW_NONE != tag) dbsync_add_row(sync, rowid, tag, dbrow); } zbx_hashset_iter_reset(&dbsync_env.cache->maintenance_periods, &iter); while (NULL != (period = (zbx_dc_maintenance_period_t *)zbx_hashset_iter_next(&iter))) { if (NULL == zbx_hashset_search(&ids, &period->timeperiodid)) dbsync_add_row(sync, period->timeperiodid, ZBX_DBSYNC_ROW_REMOVE, NULL); } zbx_hashset_destroy(&ids); zbx_db_free_result(result); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares maintenances_groups table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_maintenance_groups(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_iter_t iter; zbx_dc_maintenance_t *maintenance; zbx_hashset_t mgroups; int i; zbx_uint64_pair_t mg_local, *mg; char maintenanceid_s[MAX_ID_LEN + 1], groupid_s[MAX_ID_LEN + 1]; char *del_row[2] = {maintenanceid_s, groupid_s}; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select maintenanceid,groupid from maintenances_groups order by maintenanceid"))) return FAIL; dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&mgroups, 100, ZBX_DEFAULT_UINT64_PAIR_HASH_FUNC, ZBX_DEFAULT_UINT64_PAIR_COMPARE_FUNC); /* index all maintenance->group links */ zbx_hashset_iter_reset(&dbsync_env.cache->maintenances, &iter); while (NULL != (maintenance = (zbx_dc_maintenance_t *)zbx_hashset_iter_next(&iter))) { mg_local.first = maintenance->maintenanceid; for (i = 0; i < maintenance->groupids.values_num; i++) { mg_local.second = maintenance->groupids.values[i]; zbx_hashset_insert(&mgroups, &mg_local, sizeof(mg_local)); } } /* add new rows, remove existing rows from index */ while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(mg_local.first, dbrow[0]); ZBX_STR2UINT64(mg_local.second, dbrow[1]); if (NULL == (mg = (zbx_uint64_pair_t *)zbx_hashset_search(&mgroups, &mg_local))) dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_ADD, dbrow); else zbx_hashset_remove_direct(&mgroups, mg); } /* add removed rows */ zbx_hashset_iter_reset(&mgroups, &iter); while (NULL != (mg = (zbx_uint64_pair_t *)zbx_hashset_iter_next(&iter))) { zbx_snprintf(maintenanceid_s, sizeof(maintenanceid_s), ZBX_FS_UI64, mg->first); zbx_snprintf(groupid_s, sizeof(groupid_s), ZBX_FS_UI64, mg->second); dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_REMOVE, del_row); } zbx_db_free_result(result); zbx_hashset_destroy(&mgroups); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares maintenances_hosts table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_maintenance_hosts(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_iter_t iter; zbx_dc_maintenance_t *maintenance; zbx_hashset_t mhosts; int i; zbx_uint64_pair_t mh_local, *mh; char maintenanceid_s[MAX_ID_LEN + 1], hostid_s[MAX_ID_LEN + 1]; char *del_row[2] = {maintenanceid_s, hostid_s}; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select("select maintenanceid,hostid from maintenances_hosts order by maintenanceid"))) { return FAIL; } dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&mhosts, 100, ZBX_DEFAULT_UINT64_PAIR_HASH_FUNC, ZBX_DEFAULT_UINT64_PAIR_COMPARE_FUNC); /* index all maintenance->host links */ zbx_hashset_iter_reset(&dbsync_env.cache->maintenances, &iter); while (NULL != (maintenance = (zbx_dc_maintenance_t *)zbx_hashset_iter_next(&iter))) { mh_local.first = maintenance->maintenanceid; for (i = 0; i < maintenance->hostids.values_num; i++) { mh_local.second = maintenance->hostids.values[i]; zbx_hashset_insert(&mhosts, &mh_local, sizeof(mh_local)); } } /* add new rows, remove existing rows from index */ while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(mh_local.first, dbrow[0]); ZBX_STR2UINT64(mh_local.second, dbrow[1]); if (NULL == (mh = (zbx_uint64_pair_t *)zbx_hashset_search(&mhosts, &mh_local))) dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_ADD, dbrow); else zbx_hashset_remove_direct(&mhosts, mh); } /* add removed rows */ zbx_hashset_iter_reset(&mhosts, &iter); while (NULL != (mh = (zbx_uint64_pair_t *)zbx_hashset_iter_next(&iter))) { zbx_snprintf(maintenanceid_s, sizeof(maintenanceid_s), ZBX_FS_UI64, mh->first); zbx_snprintf(hostid_s, sizeof(hostid_s), ZBX_FS_UI64, mh->second); dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_REMOVE, del_row); } zbx_db_free_result(result); zbx_hashset_destroy(&mhosts); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: compares hosts_groups table with cached configuration data * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_compare_host_group_hosts(zbx_dbsync_t *sync) { zbx_db_row_t dbrow; zbx_db_result_t result; zbx_hashset_iter_t iter, iter_hosts; zbx_dc_hostgroup_t *group; zbx_hashset_t groups; zbx_uint64_t *phostid; zbx_uint64_pair_t gh_local, *gh; char groupid_s[MAX_ID_LEN + 1], hostid_s[MAX_ID_LEN + 1]; char *del_row[2] = {groupid_s, hostid_s}; zbx_dcsync_sql_start(sync); if (NULL == (result = zbx_db_select( "select hg.groupid,hg.hostid" " from hosts_groups hg,hosts h" " where hg.hostid=h.hostid" " and h.status in (%d,%d)" " and h.flags<>%d" " order by hg.groupid", HOST_STATUS_MONITORED, HOST_STATUS_NOT_MONITORED, ZBX_FLAG_DISCOVERY_PROTOTYPE))) { return FAIL; } dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { sync->dbresult = result; zbx_dcsync_sql_end(sync); return SUCCEED; } zbx_hashset_create(&groups, 100, ZBX_DEFAULT_UINT64_PAIR_HASH_FUNC, ZBX_DEFAULT_UINT64_PAIR_COMPARE_FUNC); /* index all group->host links */ zbx_hashset_iter_reset(&dbsync_env.cache->hostgroups, &iter); while (NULL != (group = (zbx_dc_hostgroup_t *)zbx_hashset_iter_next(&iter))) { gh_local.first = group->groupid; zbx_hashset_iter_reset(&group->hostids, &iter_hosts); while (NULL != (phostid = (zbx_uint64_t *)zbx_hashset_iter_next(&iter_hosts))) { gh_local.second = *phostid; zbx_hashset_insert(&groups, &gh_local, sizeof(gh_local)); } } /* add new rows, remove existing rows from index */ while (NULL != (dbrow = zbx_db_fetch(result))) { ZBX_STR2UINT64(gh_local.first, dbrow[0]); ZBX_STR2UINT64(gh_local.second, dbrow[1]); if (NULL == (gh = (zbx_uint64_pair_t *)zbx_hashset_search(&groups, &gh_local))) dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_ADD, dbrow); else zbx_hashset_remove_direct(&groups, gh); } /* add removed rows */ zbx_hashset_iter_reset(&groups, &iter); while (NULL != (gh = (zbx_uint64_pair_t *)zbx_hashset_iter_next(&iter))) { zbx_snprintf(groupid_s, sizeof(groupid_s), ZBX_FS_UI64, gh->first); zbx_snprintf(hostid_s, sizeof(hostid_s), ZBX_FS_UI64, gh->second); dbsync_add_row(sync, 0, ZBX_DBSYNC_ROW_REMOVE, del_row); } zbx_db_free_result(result); zbx_hashset_destroy(&groups); zbx_dcsync_sql_end(sync); return SUCCEED; } /****************************************************************************** * * * Purpose: prepare dbsync object for drules table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_drules(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select druleid,proxyid,delay,name,iprange,status,concurrency_max from drules"); dbsync_prepare(sync, 7, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "druleid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_DRULE)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for dchecks tabkle * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_dchecks(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select dcheckid,druleid,type,key_,snmp_community,ports,snmpv3_securityname," "snmpv3_securitylevel,snmpv3_authpassphrase,snmpv3_privpassphrase,uniq," "snmpv3_authprotocol,snmpv3_privprotocol,snmpv3_contextname,allow_redirect" " from dchecks"); dbsync_prepare(sync, 15, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "dcheckid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_DCHECK)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for httptest table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_httptests(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select httptestid,hostid,delay,status from httptest"); dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "httptestid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HTTPTEST)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for httptest_field table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_httptest_fields(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select httptest_fieldid,httptestid from httptest_field"); dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "httptest_fieldid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HTTPTEST_FIELD)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for httpstep table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_httpsteps(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select httpstepid,httptestid from httpstep"); dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "httpstepid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HTTPSTEP)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for httpstep_field table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_httpstep_fields(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select httpstep_fieldid,httpstepid from httpstep_field"); dbsync_prepare(sync, 2, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "httpstep_fieldid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HTTPSTEP_FIELD)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: remove deleted hosts/templates from user macro cache * * * ******************************************************************************/ void zbx_dbsync_clear_user_macros(void) { um_cache_remove_hosts(get_dc_config()->um_cache, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HOST)].deletes); } int zbx_dbsync_compare_connectors(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select connectorid,protocol,data_type,url,max_records," "max_senders,timeout,max_attempts,token,http_proxy,authtype,username,password,verify_peer," "verify_host,ssl_cert_file,ssl_key_file,ssl_key_password,status," "tags_evaltype,item_value_type,attempt_interval" " from connector"); dbsync_prepare(sync, 22, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "connectorid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_CONNECTOR)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } int zbx_dbsync_compare_connector_tags(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select connector_tagid,connectorid,operator,tag,value" " from connector_tag"); dbsync_prepare(sync, 5, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "connector_tagid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_CONNECTOR_TAG)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } int zbx_dbsync_compare_proxies(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select p.proxyid,p.name,p.operating_mode,p.tls_connect,p.tls_accept,p.tls_issuer,p.tls_subject," "p.tls_psk_identity,p.tls_psk,p.allowed_addresses,p.address,p.port,pr.lastaccess," "p.timeout_zabbix_agent,p.timeout_simple_check,p.timeout_snmp_agent," "p.timeout_external_check,p.timeout_db_monitor,p.timeout_http_agent," "p.timeout_ssh_agent,p.timeout_telnet_agent,p.timeout_script,p.custom_timeouts," "p.proxy_groupid,p.local_address,p.local_port,p.timeout_browser" " from proxy p" " left join proxy_rtdata pr" " on p.proxyid=pr.proxyid"); dbsync_prepare(sync, 27, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "p.proxyid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_PROXY)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for proxy_group table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_proxy_group(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select proxy_groupid,failover_delay,min_online,name from proxy_group"); dbsync_prepare(sync, 4, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "proxy_groupid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_PROXY_GROUP)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } /****************************************************************************** * * * Purpose: prepare dbsync object for host_proxy table * * * * Parameter: sync - [OUT] the changeset * * * * Return value: SUCCEED - the changeset was successfully calculated * * FAIL - otherwise * * * ******************************************************************************/ int zbx_dbsync_prepare_host_proxy(zbx_dbsync_t *sync) { char *sql = NULL; size_t sql_alloc = 0, sql_offset = 0; int ret = SUCCEED; zbx_dcsync_sql_start(sync); zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select hp.hostproxyid,hp.hostid,hp.host,hp.proxyid,hp.revision,h.host,hp.tls_accept," "hp.tls_issuer,hp.tls_subject,hp.tls_psk_identity,hp.tls_psk" " from host_proxy hp" " left join hosts h" " on hp.hostid=h.hostid"); dbsync_prepare(sync, 11, NULL); if (ZBX_DBSYNC_INIT == sync->mode) { if (NULL == (sync->dbresult = zbx_db_select("%s", sql))) ret = FAIL; goto out; } ret = dbsync_read_journal(sync, &sql, &sql_alloc, &sql_offset, "hostproxyid", "where", NULL, &dbsync_env.journals[ZBX_DBSYNC_JOURNAL(ZBX_DBSYNC_OBJ_HOST_PROXY)]); out: zbx_free(sql); zbx_dcsync_sql_end(sync); return ret; } void zbx_dcsync_sql_start(zbx_dbsync_t *sync) { sync->start = zbx_time(); } void zbx_dcsync_sql_end(zbx_dbsync_t *sync) { sync->sql_time = zbx_time() - sync->start; } void zbx_dcsync_sync_start(zbx_dbsync_t *sync, zbx_uint64_t used_size) { sync->start = zbx_time(); sync->used = used_size; } void zbx_dcsync_sync_end(zbx_dbsync_t *sync, zbx_uint64_t used_size) { sync->sync_time += zbx_time() - sync->start; sync->sync_size += used_size - sync->used; } static void dcsync_log_stats(const char *function_name, const zbx_dbsync_t *sync) { zabbix_log(LOG_LEVEL_DEBUG, "%s() %16s: sql:" ZBX_FS_DBL " sync:" ZBX_FS_DBL " sec " ZBX_FS_I64 " bytes (" ZBX_FS_UI64 "/" ZBX_FS_UI64 "/" ZBX_FS_UI64 ").", function_name, sync->from, sync->sql_time, sync->sync_time, sync->sync_size, sync->add_num, sync->update_num, sync->remove_num); } void zbx_dcsync_stats_dump(const char *function_name) { double sync_time_total = 0, sql_time_total = 0; zbx_int64_t total_used = 0; for (int i = 0; i < dbsync_env.changelog_dbsyncs.values_num; i++) { const zbx_dbsync_t *sync = dbsync_env.changelog_dbsyncs.values[i]; dcsync_log_stats(function_name, sync); sql_time_total += sync->sql_time; sync_time_total += sync->sync_time; total_used += sync->sync_size; } for (int i = 0; i < dbsync_env.dbsyncs.values_num; i++) { const zbx_dbsync_t *sync = dbsync_env.dbsyncs.values[i]; dcsync_log_stats(function_name, sync); sql_time_total += sync->sql_time; sync_time_total += sync->sync_time; total_used += sync->sync_size; } zabbix_log(LOG_LEVEL_DEBUG, "%s() total sql : " ZBX_FS_DBL " sec.", function_name, sql_time_total); zabbix_log(LOG_LEVEL_DEBUG, "%s() total sync : " ZBX_FS_DBL " sec.", function_name, sync_time_total); zabbix_log(LOG_LEVEL_DEBUG, "%s() total memory difference: " ZBX_FS_I64 " bytes.", function_name, total_used); }