/*
** Zabbix
** Copyright (C) 2001-2022 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "common.h"
#include "zbxdb.h"
#include "dbcache.h"
#include "log.h"
#include "history_compress.h"

#if defined(HAVE_POSTGRESQL)

#define ZBX_TS_SEGMENT_BY	"itemid"
#define ZBX_TS_UNIX_NOW		"zbx_ts_unix_now"
#define ZBX_TS_UNIX_NOW_CREATE	"create or replace function "ZBX_TS_UNIX_NOW"() returns integer language sql" \
				" stable as $$ select extract(epoch from now())::integer $$"

#define COMPRESSION_TOLERANCE		(SEC_PER_HOUR * 2)
#define COMPRESSION_POLICY_REMOVE	(0 == ZBX_DB_TSDB_V1 ? "remove_compression_policy" : \
					"remove_compress_chunks_policy")
#define COMPRESSION_POLICY_ADD		(0 == ZBX_DB_TSDB_V1 ? "add_compression_policy" : \
					"add_compress_chunks_policy")

typedef enum
{
	ZBX_COMPRESS_TABLE_HISTORY = 0,
	ZBX_COMPRESS_TABLE_TRENDS
} zbx_compress_table_t;

typedef struct
{
	const char		*name;
	zbx_compress_table_t	type;
} zbx_history_table_compression_options_t;

static zbx_history_table_compression_options_t	compression_tables[] = {
	{"history",		ZBX_COMPRESS_TABLE_HISTORY},
	{"history_uint",	ZBX_COMPRESS_TABLE_HISTORY},
	{"history_str",		ZBX_COMPRESS_TABLE_HISTORY},
	{"history_text",	ZBX_COMPRESS_TABLE_HISTORY},
	{"history_log",		ZBX_COMPRESS_TABLE_HISTORY},
	{"trends",		ZBX_COMPRESS_TABLE_TRENDS},
	{"trends_uint",		ZBX_COMPRESS_TABLE_TRENDS}
};

static unsigned char	compression_status_cache = 0;
static int		compress_older_cache = 0;

/******************************************************************************
 *                                                                            *
 * Function: hk_check_table_segmentation                                      *
 *                                                                            *
 * Purpose: check that hypertables are segmented by itemid                    *
 *                                                                            *
 * Parameters: table_name - [IN] hypertable name                              *
 *             type       - [IN] history or trends                            *
 *                                                                            *
 ******************************************************************************/
static void	hk_check_table_segmentation(const char *table_name, zbx_compress_table_t type)
{
	DB_RESULT	result;
	DB_ROW		row;
	int		i;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s(): table: %s", __func__, table_name);

	/* get hypertable segmentby attribute name */

	if (1 == ZBX_DB_TSDB_V1)
	{
		result = DBselect("select c.attname from _timescaledb_catalog.hypertable_compression c"
				" inner join _timescaledb_catalog.hypertable h on (h.id=c.hypertable_id)"
				" where c.segmentby_column_index<>0 and h.table_name='%s'", table_name);
	}
	else
	{
		result = DBselect("select attname from timescaledb_information.compression_settings"
				" where hypertable_schema='%s' and hypertable_name='%s'"
				" and segmentby_column_index is not null", zbx_db_get_schema_esc(), table_name);
	}

	for (i = 0; NULL != (row = DBfetch(result)); i++)
	{
		if (0 != strcmp(row[0], ZBX_TS_SEGMENT_BY))
			i++;
	}

	if (1 != i)
	{
		DBexecute("alter table %s set (timescaledb.compress,timescaledb.compress_segmentby='%s',"
				"timescaledb.compress_orderby='%s')", table_name, ZBX_TS_SEGMENT_BY,
				(ZBX_COMPRESS_TABLE_HISTORY == type) ? "clock,ns" : "clock");
	}

	DBfree_result(result);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: hk_get_table_compression_age                                     *
 *                                                                            *
 * Purpose: returns data compression age configured for hypertable            *
 *                                                                            *
 * Parameters: table_name - [IN] hypertable name                              *
 *                                                                            *
 * Return value: data compression age in seconds                              *
 *                                                                            *
 ******************************************************************************/
static int	hk_get_table_compression_age(const char *table_name)
{
	int		age = 0;
	DB_RESULT	result;
	DB_ROW		row;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s(): table: %s", __func__, table_name);

	if (1 == ZBX_DB_TSDB_V1)
	{
		result = DBselect("select (p.older_than).integer_interval"
				" from _timescaledb_config.bgw_policy_compress_chunks p"
				" inner join _timescaledb_catalog.hypertable h on (h.id=p.hypertable_id)"
				" where h.table_name='%s'", table_name);
	}
	else
	{
		result = DBselect("select extract(epoch from (config::json->>'compress_after')::interval) from"
				" timescaledb_information.jobs where application_name like 'Compression%%' and"
				" hypertable_schema='%s' and hypertable_name='%s'", zbx_db_get_schema_esc(), table_name);
	}

	if (NULL != (row = DBfetch(result)))
		age = atoi(row[0]);

	DBfree_result(result);
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() age: %d", __func__, age);

	return age;
}

/******************************************************************************
 *                                                                            *
 * Function: hk_check_table_compression_age                                   *
 *                                                                            *
 * Purpose: ensures that table compression is configured to specified age     *
 *                                                                            *
 * Parameters: table_name - [IN] hypertable name                              *
 *             age        - [IN] compression age                              *
 *                                                                            *
 ******************************************************************************/
static void	hk_check_table_compression_age(const char *table_name, int age)
{
	int	compress_after;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s(): table: %s age %d", __func__, table_name, age);

	if (age != (compress_after = hk_get_table_compression_age(table_name)))
	{
		DB_RESULT	res;

		if (0 != compress_after)
			DBfree_result(DBselect("select %s('%s')", COMPRESSION_POLICY_REMOVE, table_name));

		zabbix_log(LOG_LEVEL_DEBUG, "adding compression policy to table: %s age %d", table_name, age);

		res = DBselect("select %s('%s', integer '%d')", COMPRESSION_POLICY_ADD, table_name, age);

		if (NULL == res)
			zabbix_log(LOG_LEVEL_ERR, "failed to add compression policy to table '%s'", table_name);
		else
			DBfree_result(res);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: hk_history_enable_compression                                    *
 *                                                                            *
 * Purpose: turns table compression on for items older than specified age     *
 *                                                                            *
 * Parameters: age - [IN] compression age                                     *
 *                                                                            *
 ******************************************************************************/
static void	hk_history_enable_compression(int age)
{
	int	i;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	for (i = 0; i < (int)ARRSIZE(compression_tables); i++)
	{
		DBfree_result(DBselect("select set_integer_now_func('%s', '"ZBX_TS_UNIX_NOW"', true)",
				compression_tables[i].name));
		hk_check_table_segmentation(compression_tables[i].name, compression_tables[i].type);
		hk_check_table_compression_age(compression_tables[i].name, age);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: hk_history_disable_compression                                   *
 *                                                                            *
 * Purpose: turns table compression off                                       *
 *                                                                            *
 ******************************************************************************/
static void	hk_history_disable_compression(void)
{
	int	i;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	for (i = 0; i < (int)ARRSIZE(compression_tables); i++)
	{
		if (0 == hk_get_table_compression_age(compression_tables[i].name))
			continue;

		DBfree_result(DBselect("select %s('%s')", COMPRESSION_POLICY_REMOVE, compression_tables[i].name));
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

#endif

/******************************************************************************
 *                                                                            *
 * Function: hk_history_compression_init                                      *
 *                                                                            *
 * Purpose: initializing compression for history/trends tables                *
 *                                                                            *
 ******************************************************************************/
void	hk_history_compression_init(void)
{
#if defined(HAVE_POSTGRESQL)
	int		disable_compression = 0;
	char		*db_log_level = NULL;
	zbx_config_t	cfg;
	DB_RESULT	result;
	DB_ROW		row;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	DBconnect(ZBX_DB_CONNECT_NORMAL);
	zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION);
	compression_status_cache = cfg.db.history_compression_status;
	compress_older_cache = cfg.db.history_compress_older;

	if (ON == cfg.db.history_compression_availability)
	{
		/* surpress notice logs during DB initialization */
		result = DBselect("show client_min_messages");

		if (NULL != (row = DBfetch(result)))
		{
			db_log_level = zbx_strdup(db_log_level, row[0]);
			DBexecute("set client_min_messages to warning");
		}
		DBfree_result(result);

		if (ON == cfg.db.history_compression_status)
		{
			if (0 == cfg.db.history_compress_older)
			{
				disable_compression = 1;
				hk_history_disable_compression();
			}
			else
			{
				DBexecute(ZBX_TS_UNIX_NOW_CREATE);
				hk_history_enable_compression(cfg.db.history_compress_older + COMPRESSION_TOLERANCE);
			}
		}
		else
		{
			hk_history_disable_compression();
		}
	}
	else if (ON == cfg.db.history_compression_status)
	{
		disable_compression = 1;
	}

	if (0 != disable_compression && ZBX_DB_OK > DBexecute("update config set compression_status=0"))
		zabbix_log(LOG_LEVEL_ERR, "failed to set database compression status");

	zbx_config_clean(&cfg);

	if (NULL != db_log_level)
	{
		DBexecute("set client_min_messages to %s", db_log_level);
		zbx_free(db_log_level);
	}

	DBclose();

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
#endif
}

/******************************************************************************
 *                                                                            *
 * Function: hk_history_compression_update                                    *
 *                                                                            *
 * Purpose: history compression settings periodic update                      *
 *                                                                            *
 * Parameters: cfg - [IN] database extension history compression settings     *
 *                                                                            *
 ******************************************************************************/
void	hk_history_compression_update(zbx_config_db_t *cfg)
{
#if defined(HAVE_POSTGRESQL)
	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (OFF == cfg->history_compression_availability)
		goto out;

	if (ON == cfg->history_compression_status)
	{
		if (cfg->history_compression_status != compression_status_cache ||
				cfg->history_compress_older != compress_older_cache)
		{
			DBexecute(ZBX_TS_UNIX_NOW_CREATE);
			hk_history_enable_compression(cfg->history_compress_older + COMPRESSION_TOLERANCE);
		}
	}
	else if (cfg->history_compression_status != compression_status_cache)
	{
		hk_history_disable_compression();
	}

	compression_status_cache = cfg->history_compression_status;
	compress_older_cache = cfg->history_compress_older;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
#else
	ZBX_UNUSED(cfg);
#endif
}