/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "history_compress.h" #if defined(HAVE_POSTGRESQL) #include "zbxdbhigh.h" #include "zbxstr.h" #include "zbxdb.h" #define ZBX_TS_UNIX_NOW "zbx_ts_unix_now" #define ZBX_TS_UNIX_NOW_CREATE "create or replace function "ZBX_TS_UNIX_NOW"() returns integer language sql" \ " stable as $$ select extract(epoch from now())::integer $$" #define COMPRESSION_TOLERANCE (SEC_PER_HOUR * 2) #define COMPRESSION_POLICY_REMOVE "remove_compression_policy" #define COMPRESSION_POLICY_ADD "add_compression_policy" /* Compression policy: chunks containing data older than provided data are compressed. */ #define POLICY_COMPRESS_AFTER 0 /* Compression policy: chunks with creation time older than this cut-off point are compressed. */ #define POLICY_COMPRESS_CREATED_BEFORE 1 typedef struct { const char *name; const char *segmentby; const char *orderby; int compression_policy; } zbx_history_table_compression_options_t; static zbx_history_table_compression_options_t compression_tables[] = { {"history", "itemid", "clock,ns", POLICY_COMPRESS_AFTER}, {"history_uint", "itemid", "clock,ns", POLICY_COMPRESS_AFTER}, {"history_str", "itemid", "clock,ns", POLICY_COMPRESS_AFTER}, {"history_text", "itemid", "clock,ns", POLICY_COMPRESS_AFTER}, {"history_log", "itemid", "clock,ns", POLICY_COMPRESS_AFTER}, {"history_bin", "itemid", "clock,ns", POLICY_COMPRESS_AFTER}, {"trends", "itemid", "clock", POLICY_COMPRESS_AFTER}, {"trends_uint", "itemid", "clock", POLICY_COMPRESS_AFTER}, /* Since auditlog table uses CUID from auditid field to partition table into chunks we need to use different */ /* compression policy due to internal TimescaleDB bug. */ {"auditlog", "auditid", "clock", POLICY_COMPRESS_CREATED_BEFORE} }; static unsigned char compression_status_cache = 0; static int compress_older_cache = 0; /****************************************************************************** * * * Purpose: check that hypertables are segmented * * * * Parameters: table_name - [IN] hypertable name * * segmentby - [IN] field to segment by * * orderby - [IN] field to order by * * * ******************************************************************************/ static void hk_check_table_segmentation(const char *table_name, const char *segmentby, const char *orderby) { zbx_db_result_t result; zbx_db_row_t row; int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s(): table: %s", __func__, table_name); /* get hypertable's 'segmentby' attribute name */ result = zbx_db_select("select attname from timescaledb_information.compression_settings" " where hypertable_schema='%s' and hypertable_name='%s'" " and segmentby_column_index is not null", zbx_db_get_schema_esc(), table_name); for (i = 0; NULL != (row = zbx_db_fetch(result)); i++) { if (0 != strcmp(row[0], segmentby)) i++; } if (1 != i) { zbx_db_execute("alter table %s set (timescaledb.compress,timescaledb.compress_segmentby='%s'," "timescaledb.compress_orderby='%s')", table_name, segmentby, orderby); } zbx_db_free_result(result); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: returns data compression age configured for hypertable * * * * Parameters: table_name - [IN] hypertable name * * compression_policy - [IN] * * * * Return value: >=0 - data compression age in seconds * * -1 - hypertable has different compression policy * * * ******************************************************************************/ static int hk_get_compression_age(const char *table_name, int compression_policy) { int age = 0; zbx_db_result_t result; zbx_db_row_t row; const char *field = compression_policy == POLICY_COMPRESS_AFTER ? "compress_after" : "compress_created_before"; zabbix_log(LOG_LEVEL_DEBUG, "In %s(): table: %s", __func__, table_name); result = zbx_db_select("select extract(epoch from (config::json->>'%s')::interval) from" " timescaledb_information.jobs where application_name like 'Compression%%' and" " hypertable_schema='%s' and hypertable_name='%s'", field, zbx_db_get_schema_esc(), table_name); if (NULL != (row = zbx_db_fetch(result))) { /* extraction from JSON may return empty field when JSON exists but field doesn't */ if (NULL == row[0]) { zabbix_log(LOG_LEVEL_ERR, "Unexpected TimescaleDB configuration: the %s table does not have %s " "compression policy", table_name, field); age = -1; } else { age = atoi(row[0]); } } zbx_db_free_result(result); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() age: %d", __func__, age); return age; } /****************************************************************************** * * * Purpose: ensures that table compression is configured to specified age * * * * Parameters: table_name - [IN] hypertable name * * age - [IN] compression age * * compression_policy - [IN] * * * ******************************************************************************/ static void hk_set_table_compression_age(const char *table_name, int age, int compression_policy) { int compress_after; zabbix_log(LOG_LEVEL_DEBUG, "In %s(): table: %s age %d, compression_policy %d", __func__, table_name, age, compression_policy); if (age != (compress_after = hk_get_compression_age(table_name, compression_policy)) && -1 != compress_after) { zbx_db_result_t res = NULL; if (0 != compress_after) zbx_db_free_result(zbx_db_select("select %s('%s')", COMPRESSION_POLICY_REMOVE, table_name)); zabbix_log(LOG_LEVEL_DEBUG, "adding compression policy to table: %s age %d", table_name, age); switch (compression_policy) { case POLICY_COMPRESS_AFTER: res = zbx_db_select("select %s('%s', integer '%d')", COMPRESSION_POLICY_ADD, table_name, age); break; case POLICY_COMPRESS_CREATED_BEFORE: res = zbx_db_select("select %s('%s', compress_created_before => interval '%d')", COMPRESSION_POLICY_ADD, table_name, age); break; default: THIS_SHOULD_NEVER_HAPPEN; break; } if (NULL == res) zabbix_log(LOG_LEVEL_ERR, "failed to add compression policy to table '%s'", table_name); else zbx_db_free_result(res); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: turns on table compression for items or chunks older than * * specified age * * * * Parameters: age - [IN] compression age * * * ******************************************************************************/ static void hk_history_enable_compression(int age) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); for (size_t i = 0; i < ARRSIZE(compression_tables); i++) { const zbx_history_table_compression_options_t *table = &compression_tables[i]; if (POLICY_COMPRESS_AFTER == table->compression_policy) { zbx_db_result_t res; res = zbx_db_select("select set_integer_now_func('%s', '"ZBX_TS_UNIX_NOW"', true)", table->name); if (NULL == res) { zabbix_log(LOG_LEVEL_WARNING, "Table \"%s\" is not a hypertable. Execute TimescaleDB" " configuration step as described in Zabbix documentation to upgrade" " schema.", table->name); continue; } zbx_db_free_result(res); } hk_check_table_segmentation(table->name, table->segmentby, table->orderby); hk_set_table_compression_age(table->name, age, table->compression_policy); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: turns off table compression * * * ******************************************************************************/ static void hk_history_disable_compression(void) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); for (size_t i = 0; i < ARRSIZE(compression_tables); i++) { const zbx_history_table_compression_options_t *table = &compression_tables[i]; if (0 >= hk_get_compression_age(table->name, table->compression_policy)) continue; zbx_db_free_result(zbx_db_select("select %s('%s')", COMPRESSION_POLICY_REMOVE, table->name)); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } #endif /****************************************************************************** * * * Purpose: initialize compression for history/trends tables * * * ******************************************************************************/ void hk_history_compression_init(void) { #if defined(HAVE_POSTGRESQL) int disable_compression = 0; char *db_log_level = NULL; zbx_config_t cfg; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_db_connect(ZBX_DB_CONNECT_NORMAL); zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION); compression_status_cache = cfg.db.history_compression_status; compress_older_cache = cfg.db.history_compress_older; if (0 == zbx_strcmp_null(cfg.db.extension, ZBX_DB_EXTENSION_TIMESCALEDB)) { /* suppress notice logs during DB initialization */ zbx_db_result_t result = zbx_db_select("show client_min_messages"); zbx_db_row_t row; if (NULL != (row = zbx_db_fetch(result))) { db_log_level = zbx_strdup(db_log_level, row[0]); zbx_db_execute("set client_min_messages to warning"); } zbx_db_free_result(result); if (ON == zbx_tsdb_get_compression_availability() && ON == cfg.db.history_compression_status) { if (0 == cfg.db.history_compress_older) { disable_compression = 1; hk_history_disable_compression(); } else { zbx_db_execute(ZBX_TS_UNIX_NOW_CREATE); hk_history_enable_compression(cfg.db.history_compress_older + COMPRESSION_TOLERANCE); } } else hk_history_disable_compression(); } else if (ON == cfg.db.history_compression_status) disable_compression = 1; if (0 != disable_compression && ZBX_DB_OK > zbx_db_execute("update config set compression_status=0")) zabbix_log(LOG_LEVEL_ERR, "failed to set database compression status"); zbx_config_clean(&cfg); if (NULL != db_log_level) { zbx_db_execute("set client_min_messages to %s", db_log_level); zbx_free(db_log_level); } zbx_db_close(); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); #endif } /****************************************************************************** * * * Purpose: updates history compression period settings * * * * Parameters: cfg - [IN] database extension history compression settings * * * ******************************************************************************/ void hk_history_compression_update(zbx_config_db_t *cfg) { #if defined(HAVE_POSTGRESQL) zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (ON == zbx_tsdb_get_compression_availability() && ON == cfg->history_compression_status) { if (cfg->history_compression_status != compression_status_cache || cfg->history_compress_older != compress_older_cache) { zbx_db_execute(ZBX_TS_UNIX_NOW_CREATE); hk_history_enable_compression(cfg->history_compress_older + COMPRESSION_TOLERANCE); } } else if (cfg->history_compression_status != compression_status_cache) hk_history_disable_compression(); compression_status_cache = cfg->history_compression_status; compress_older_cache = cfg->history_compress_older; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); #else ZBX_UNUSED(cfg); #endif }