/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "pb_autoreg.h"
#include "proxybuffer.h"
#include "zbxcachehistory.h"
#include "zbxcommon.h"
#include "zbxdb.h"
#include "zbxdbhigh.h"
#include "zbxjson.h"
#include "zbxproxybuffer.h"
#include "zbxshmem.h"
#include "zbxdbschema.h"

static zbx_history_table_t	areg = {
	"proxy_autoreg_host", "autoreg_host_lastid",
		{
		{"clock",		ZBX_PROTO_TAG_CLOCK,		ZBX_JSON_TYPE_INT,	NULL},
		{"host",		ZBX_PROTO_TAG_HOST,		ZBX_JSON_TYPE_STRING,	NULL},
		{"listen_ip",		ZBX_PROTO_TAG_IP,		ZBX_JSON_TYPE_STRING,	""},
		{"listen_dns",		ZBX_PROTO_TAG_DNS,		ZBX_JSON_TYPE_STRING,	""},
		{"listen_port",		ZBX_PROTO_TAG_PORT,		ZBX_JSON_TYPE_INT,	"0"},
		{"host_metadata",	ZBX_PROTO_TAG_HOST_METADATA,	ZBX_JSON_TYPE_STRING,	""},
		{"flags",		ZBX_PROTO_TAG_FLAGS,		ZBX_JSON_TYPE_INT,	"0"},
		{"tls_accepted",	ZBX_PROTO_TAG_TLS_ACCEPTED,	ZBX_JSON_TYPE_INT,	"0"},
		{0}
		}
};

static void	pb_autoreg_write_host_db(zbx_uint64_t id, const char *host, const char *ip, const char *dns,
		unsigned short port, unsigned int connection_type, const char *host_metadata, int flags, int clock)
{
	zbx_db_insert_t	db_insert;

	zbx_db_insert_prepare(&db_insert, "proxy_autoreg_host", "id", "host", "listen_ip", "listen_dns", "listen_port",
			"tls_accepted", "host_metadata", "flags", "clock", (char *)NULL);

	zbx_db_insert_add_values(&db_insert, id, host, ip, dns, (int)port, (int)connection_type, host_metadata,
			flags, clock);

	zbx_db_insert_execute(&db_insert);
	zbx_db_insert_clean(&db_insert);
}

/******************************************************************************
 *                                                                            *
 * Purpose: write host data into autoregistraion data cache                   *
 *                                                                            *
 ******************************************************************************/
static int	pb_autoreg_get_db(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
{
	int		records_num = 0;
	zbx_uint64_t	id;

	id = pb_get_lastid(areg.table, areg.lastidfield);

	/* get history data in batches by ZBX_MAX_HRECORDS records and stop if: */
	/*   1) there are no more data to read                                  */
	/*   2) we have retrieved more than the total maximum number of records */
	/*   3) we have gathered more than half of the maximum packet size      */
	while (ZBX_DATA_JSON_BATCH_LIMIT > j->buffer_offset)
	{
		pb_get_rows_db(j, ZBX_PROTO_TAG_AUTOREGISTRATION, &areg, lastid, &id, &records_num, more);

		if (ZBX_PROXY_DATA_DONE == *more || ZBX_MAX_HRECORDS_TOTAL <= records_num)
			break;
	}

	if (0 != records_num)
		zbx_json_close(j);

	return records_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: free autoregistration record                                      *
 *                                                                            *
 ******************************************************************************/
void	pb_list_free_autoreg(zbx_list_t *list, zbx_pb_autoreg_t *row)
{
	if (NULL != row->host)
		list->mem_free_func(row->host);

	if (NULL != row->listen_ip)
		list->mem_free_func(row->listen_ip);

	if (NULL != row->listen_dns)
		list->mem_free_func(row->listen_dns);

	if (NULL != row->host_metadata)
		list->mem_free_func(row->host_metadata);

	list->mem_free_func(row);
}

/******************************************************************************
 *                                                                            *
 * Purpose: estimate approximate autoregistration row size in cache           *
 *                                                                            *
 ******************************************************************************/
size_t	pb_autoreg_estimate_row_size(const char *host, const char *host_metadata, const char *ip, const char *dns)
{
	size_t	size = 0;

	size += zbx_shmem_required_chunk_size(sizeof(zbx_pb_autoreg_t));
	size += zbx_shmem_required_chunk_size(sizeof(zbx_list_item_t));
	size += zbx_shmem_required_chunk_size(strlen(host) + 1);
	size += zbx_shmem_required_chunk_size(strlen(host_metadata) + 1);
	size += zbx_shmem_required_chunk_size(strlen(ip) + 1);
	size += zbx_shmem_required_chunk_size(strlen(dns) + 1);

	return size;
}

/******************************************************************************
 *                                                                            *
 * Purpose: add auto registration record to memory cache                      *
 *                                                                            *
 ******************************************************************************/
static int	pb_autoreg_add_row_mem(zbx_pb_t *pb, const char *host, const char *ip, const char *dns,
		unsigned short port, unsigned int connection_type, const char *host_metadata, int flags, int clock)
{
	zbx_pb_autoreg_t	*row;
	int			ret = FAIL;

	zabbix_log(LOG_LEVEL_TRACE, "In %s() free:" ZBX_FS_SIZE_T " request:" ZBX_FS_SIZE_T, __func__,
			pb_get_free_size(), pb_autoreg_estimate_row_size(host, host_metadata, ip, dns));

	if (NULL == (row = (zbx_pb_autoreg_t *)pb_malloc(sizeof(zbx_pb_autoreg_t))))
			goto out;

	memset(row, 0, sizeof(zbx_pb_autoreg_t));

	if (NULL == (row->host = pb_strdup(host)))
		goto out;

	if (NULL == (row->listen_ip = pb_strdup(ip)))
		goto out;

	if (NULL == (row->listen_dns = pb_strdup(dns)))
		goto out;

	if (NULL == (row->host_metadata = pb_strdup(host_metadata)))
		goto out;

	row->listen_port = (int)port;
	row->tls_accepted = (int)connection_type;
	row->flags = flags;
	row->clock = clock;

	ret = zbx_list_append(&pb->autoreg, row, NULL);
out:
	if (SUCCEED == ret)
		row->id = zbx_dc_get_nextid("proxy_autoreg_host", 1);
	else if (NULL != row)
		pb_list_free_autoreg(&get_pb_data()->autoreg, row);

	zabbix_log(LOG_LEVEL_TRACE, "End of %s() ret:%s free:" ZBX_FS_SIZE_T , __func__, zbx_result_string(ret),
			pb_get_free_size());

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: write auto registration record in memory cache, discarding old    *
 *          records in memory mode to free space if necessary                 *
 *                                                                            *
 ******************************************************************************/
static int	pb_autoreg_write_host_mem(zbx_pb_t *pb, const char *host, const char *ip, const char *dns,
		unsigned short port, unsigned int connection_type, const char *host_metadata, int flags, int clock)
{
	size_t	size = 0;

	while (FAIL == pb_autoreg_add_row_mem(pb, host, ip, dns, port, connection_type, host_metadata, flags, clock))
	{
		if (ZBX_PB_MODE_MEMORY != pb->mode)
			return FAIL;

		/* in memory mode keep discarding old records until new */
		/* one can be written in proxy memory buffer            */

		if (0 == size)
			size = pb_autoreg_estimate_row_size(host, host_metadata, ip, dns);

		if (FAIL == pb_free_space(get_pb_data(), size))
		{
			zabbix_log(LOG_LEVEL_WARNING, "auto registration record with size " ZBX_FS_SIZE_T
					" is too large for proxy memory buffer, discarding", size);
			break;
		}
	}

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get autoregistration records from memory cache                    *
 *                                                                            *
 ******************************************************************************/
static int	pb_autoreg_get_mem(zbx_pb_t *pb, struct zbx_json *j, zbx_uint64_t *lastid, int *more)
{
#define	pb_add_json_field_helper(fld_name, type)						\
	pb_add_json_field(j, &areg, ZBX_STR(fld_name), &row->fld_name, type)

	int			records_num = 0;
	zbx_list_iterator_t	li;
	zbx_pb_autoreg_t	*row;
	void			*ptr;

	*more = ZBX_PROXY_DATA_DONE;

	if (SUCCEED == zbx_list_peek(&pb->autoreg, &ptr))
	{
		zbx_json_addarray(j, ZBX_PROTO_TAG_AUTOREGISTRATION);
		zbx_list_iterator_init(&pb->autoreg, &li);

		while (SUCCEED == zbx_list_iterator_next(&li))
		{
			if (ZBX_DATA_JSON_BATCH_LIMIT <= j->buffer_offset || records_num >= ZBX_MAX_HRECORDS_TOTAL)
			{
				*more = ZBX_PROXY_DATA_MORE;
				break;
			}

			(void)zbx_list_iterator_peek(&li, (void **)&row);

			zbx_json_addobject(j, NULL);
			pb_add_json_field_helper(clock, ZBX_TYPE_INT);
			pb_add_json_field_helper(host, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(listen_ip, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(listen_dns, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(listen_port, ZBX_TYPE_INT);
			pb_add_json_field_helper(host_metadata, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(flags, ZBX_TYPE_INT);
			pb_add_json_field_helper(tls_accepted, ZBX_TYPE_INT);
			zbx_json_close(j);

			records_num++;
			*lastid = row->id;
		}

		zbx_json_close(j);
	}

	return records_num;

#	undef	pb_add_json_field_helper
}

/******************************************************************************
 *                                                                            *
 * Purpose: clear sent autoregistration records                               *
 *                                                                            *
 ******************************************************************************/
void	pb_autoreg_clear(zbx_pb_t *pb, zbx_uint64_t lastid)
{
	zbx_pb_autoreg_t	*row;

	while (SUCCEED == zbx_list_peek(&pb->autoreg, (void **)&row))
	{
		if (row->id > lastid)
			break;

		zbx_list_pop(&pb->autoreg, NULL);
		pb_list_free_autoreg(&pb->autoreg, row);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush cached autoregistration data to database                    *
 *                                                                            *
 ******************************************************************************/
void	pb_autoreg_flush(zbx_pb_t *pb)
{
	zbx_pb_autoreg_t	*row;
	zbx_db_insert_t		db_insert;
	void			*ptr;
	int			rows_num = 0;
	zbx_uint64_t		lastid = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (SUCCEED == zbx_list_peek(&pb->autoreg, &ptr))
	{
		zbx_list_iterator_t	li;

		zbx_db_insert_prepare(&db_insert, "proxy_autoreg_host", "id", "host", "listen_ip", "listen_dns",
				"listen_port", "tls_accepted", "host_metadata", "flags", "clock", (char *)NULL);

		zbx_list_iterator_init(&pb->autoreg, &li);

		while (SUCCEED == zbx_list_iterator_next(&li))
		{
			(void)zbx_list_iterator_peek(&li, (void **)&row);
			zbx_db_insert_add_values(&db_insert, row->id, row->host, row->listen_ip, row->listen_dns,
					row->listen_port, row->tls_accepted, row->host_metadata, row->flags, row->clock);
			rows_num++;
			lastid = row->id;
		}

		zbx_db_insert_execute(&db_insert);
		zbx_db_insert_clean(&db_insert);
	}

	if (get_pb_data()->autoreg_lastid_db < lastid)
		get_pb_data()->autoreg_lastid_db = lastid;

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows_num:%d", __func__, rows_num);
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if oldest record is within allowed age                      *
 *                                                                            *
 ******************************************************************************/
int	pb_autoreg_check_age(zbx_pb_t *pb)
{
	zbx_pb_autoreg_t	*row;
	int			now;

	now = (int)time(NULL);

	while (SUCCEED == zbx_list_peek(&pb->autoreg, (void **)&row))
	{
		if (now - row->clock <= pb->offline_buffer)
			break;

		zbx_list_pop(&pb->autoreg, NULL);
		pb_list_free_autoreg(&pb->autoreg, row);
	}

	if (0 == pb->max_age)
		return SUCCEED;

	if (SUCCEED != zbx_list_peek(&pb->autoreg, (void **)&row) || time(NULL) - row->clock < pb->max_age)
		return SUCCEED;

	return FAIL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: write last sent auto registration record id to database           *
 *                                                                            *
 ******************************************************************************/
void	pb_autoreg_set_lastid(zbx_uint64_t lastid)
{
	pb_set_lastid(areg.table, areg.lastidfield, lastid);
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if auto registration rows are cached in memory buffer       *
 *                                                                            *
 ******************************************************************************/
int	pb_autoreg_has_mem_rows(zbx_pb_t *pb)
{
	void	*ptr;

	return zbx_list_peek(&pb->autoreg, &ptr);
}

/* public api */

/******************************************************************************
 *                                                                            *
 * Purpose: write host data into autoregistration data cache                  *
 *                                                                            *
 ******************************************************************************/
void	zbx_pb_autoreg_write_host(const char *host, const char *ip, const char *dns, unsigned short port,
		unsigned int connection_type, const char *host_metadata, int flags, int clock)
{
	zbx_uint64_t	id;
	zbx_pb_t	*pb_data = get_pb_data();

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	pb_lock();

	if (PB_MEMORY == get_pb_dst(pb_data->state))
	{
		if (PB_MEMORY == pb_data->state && SUCCEED != pb_autoreg_check_age(pb_data))
		{
			pd_fallback_to_database(pb_data, "cached records are too old");
		}
		else
		{
			if (FAIL != pb_autoreg_write_host_mem(pb_data, host, ip, dns, port, connection_type,
					host_metadata, flags, clock))
			{
				pb_unlock();
				goto out;
			}

			if (PB_DATABASE_MEMORY == pb_data->state)
			{
				pd_fallback_to_database(pb_data, "not enough space to complete transition to"
						" memory mode");
			}
			else
			{
				/* initiate transition to database cache */
				pb_set_state(pb_data, PB_MEMORY_DATABASE, "not enough space");
			}
		}
	}

	pb_data->db_handles_num++;
	pb_unlock();

	id = zbx_db_get_maxid("proxy_autoreg_host");

	do
	{
		zbx_db_begin();
		pb_autoreg_write_host_db(id, host, ip, dns, port, connection_type, host_metadata, flags, clock);
	}
	while (ZBX_DB_DOWN == zbx_db_commit());

	pb_lock();

	if (pb_data->autoreg_lastid_db < id)
		pb_data->autoreg_lastid_db = id;

	pb_data->db_handles_num--;

	pb_unlock();
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get autoregistration rows for sending to server                   *
 *                                                                            *
 ******************************************************************************/
int	zbx_pb_autoreg_get_rows(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
{
	int	ret, state;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, *lastid);

	pb_lock();

	if (PB_MEMORY == (state = get_pb_src(get_pb_data()->state)))
		ret = pb_autoreg_get_mem(get_pb_data(), j, lastid, more);

	pb_unlock();

	if (PB_MEMORY != state)
		ret = pb_autoreg_get_db(j, lastid, more);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows:%d", __func__, ret);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: update last sent autoregistration record id                       *
 *                                                                            *
 ******************************************************************************/
void	zbx_pb_autoreg_set_lastid(const zbx_uint64_t lastid)
{
	int		state;
	zbx_pb_t	*pb_data = get_pb_data();

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, lastid);

	pb_lock();

	pb_data->autoreg_lastid_sent = lastid;

	if (PB_MEMORY == (state = get_pb_src(pb_data->state)))
		pb_autoreg_clear(pb_data, lastid);

	pb_unlock();

	if (PB_DATABASE == state)
		pb_autoreg_set_lastid(lastid);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}