/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "pb_discovery.h"
#include "zbxcachehistory.h"
#include "zbxcommon.h"
#include "zbxdb.h"
#include "zbxdbhigh.h"
#include "zbxjson.h"
#include "zbxproxybuffer.h"
#include "zbxshmem.h"
#include "zbxdbschema.h"

static zbx_history_table_t	dht = {
	"proxy_dhistory", "dhistory_lastid",
		{
		{"clock",		ZBX_PROTO_TAG_CLOCK,		ZBX_JSON_TYPE_INT,	NULL},
		{"druleid",		ZBX_PROTO_TAG_DRULE,		ZBX_JSON_TYPE_INT,	NULL},
		{"dcheckid",		ZBX_PROTO_TAG_DCHECK,		ZBX_JSON_TYPE_INT,	NULL},
		{"ip",			ZBX_PROTO_TAG_IP,		ZBX_JSON_TYPE_STRING,	NULL},
		{"dns",			ZBX_PROTO_TAG_DNS,		ZBX_JSON_TYPE_STRING,	""},
		{"port",		ZBX_PROTO_TAG_PORT,		ZBX_JSON_TYPE_INT,	"0"},
		{"value",		ZBX_PROTO_TAG_VALUE,		ZBX_JSON_TYPE_STRING,	""},
		{"status",		ZBX_PROTO_TAG_STATUS,		ZBX_JSON_TYPE_INT,	"0"},
		{"error",		ZBX_PROTO_TAG_ERROR,		ZBX_JSON_TYPE_STRING,	""},
		{0}
		}
};

static void	pb_discovery_add_rows_db(zbx_list_t *rows, zbx_list_item_t *next, zbx_uint64_t *lastid);

struct zbx_pb_discovery_data
{
	zbx_pb_state_t	state;
	zbx_list_t	rows;
	int		rows_num;
	zbx_db_insert_t	db_insert;
	zbx_uint64_t	handleid;
};

void	pb_list_free_discovery(zbx_list_t *list, zbx_pb_discovery_t *row)
{
	if (NULL != row->ip)
		list->mem_free_func(row->ip);
	if (NULL != row->dns)
		list->mem_free_func(row->dns);
	if (NULL != row->value)
		list->mem_free_func(row->value);
	if (NULL != row->error)
		list->mem_free_func(row->error);
	list->mem_free_func(row);
}

/******************************************************************************
 *                                                                            *
 * Purpose: estimate approximate discovery row size in cache                  *
 *                                                                            *
 ******************************************************************************/
size_t	pb_discovery_estimate_row_size(const char *value, const char *ip, const char *dns, const char *error)
{
	size_t	size = 0;

	size += zbx_shmem_required_chunk_size(sizeof(zbx_pb_discovery_t));
	size += zbx_shmem_required_chunk_size(sizeof(zbx_list_item_t));
	size += zbx_shmem_required_chunk_size(strlen(value) + 1);
	size += zbx_shmem_required_chunk_size(strlen(ip) + 1);
	size += zbx_shmem_required_chunk_size(strlen(dns) + 1);
	size += zbx_shmem_required_chunk_size(strlen(error) + 1);

	return size;
}

static int	pb_get_discovery_db(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
{
	int		records_num = 0;
	zbx_uint64_t	id;

	id = pb_get_lastid(dht.table, dht.lastidfield);

	/* get history data in batches by ZBX_MAX_HRECORDS records and stop if: */
	/*   1) there are no more data to read                                  */
	/*   2) we have retrieved more than the total maximum number of records */
	/*   3) we have gathered more than half of the maximum packet size      */
	while (ZBX_DATA_JSON_BATCH_LIMIT > j->buffer_offset)
	{
		pb_get_rows_db(j, ZBX_PROTO_TAG_DISCOVERY_DATA, &dht, lastid, &id, &records_num, more);

		if (ZBX_PROXY_DATA_DONE == *more || ZBX_MAX_HRECORDS_TOTAL <= records_num)
			break;
	}

	if (0 != records_num)
		zbx_json_close(j);

	return records_num;
}

static void	pb_discovery_write_row(zbx_pb_discovery_data_t *data, zbx_uint64_t druleid, zbx_uint64_t dcheckid,
		const char *ip, const char *dns, int port, int status, const char *value, int clock, const char *error)
{
	if (PB_MEMORY == data->state)
	{
		zbx_pb_discovery_t	*row;

		row = (zbx_pb_discovery_t *)zbx_malloc(NULL, sizeof(zbx_pb_discovery_t));
		row->id = 0;
		row->druleid = druleid;
		row->dcheckid = dcheckid;
		row->ip = zbx_strdup(NULL, ip);
		row->dns = zbx_strdup(NULL, dns);
		row->port = port;
		row->status = status;
		row->value = zbx_strdup(NULL, value);
		row->clock = clock;
		row->error = zbx_strdup(NULL, ZBX_NULL2EMPTY_STR(error));

		zbx_list_append(&data->rows, row, NULL);
		data->rows_num++;
	}
	else
	{
		zbx_db_insert_add_values(&data->db_insert, __UINT64_C(0), clock, druleid, ip, port, value, status,
				dcheckid, dns, ZBX_NULL2EMPTY_STR(error));
	}
}

void	pb_discovery_flush(zbx_pb_t *pb)
{
	zbx_uint64_t	lastid = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	pb_discovery_add_rows_db(&pb->discovery, NULL, &lastid);

	if (get_pb_data()->discovery_lastid_db < lastid)
		get_pb_data()->discovery_lastid_db = lastid;

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: add discovery row to memory cache                                 *
 *                                                                            *
 * Parameters: pb  - [IN] proxy buffer                                        *
 *             src - [IN] row to add                                          *
 *                                                                            *
 * Return value: SUCCEED - the row was cached successfully                    *
 *               FAIL    - not enough memory in cache                         *
 *                                                                            *
 ******************************************************************************/
static int	pb_discovery_add_row_mem(zbx_pb_t *pb, zbx_pb_discovery_t *src)
{
	zbx_pb_discovery_t	*row;
	int			ret = FAIL;

	zabbix_log(LOG_LEVEL_TRACE, "In %s() free:" ZBX_FS_SIZE_T " request:" ZBX_FS_SIZE_T, __func__,
			pb_get_free_size(), pb_discovery_estimate_row_size(src->value, src->ip, src->dns, src->error));

	if (NULL == (row = (zbx_pb_discovery_t *)pb_malloc(sizeof(zbx_pb_discovery_t))))
		goto out;

	memcpy(row, src, sizeof(zbx_pb_discovery_t));

	if (NULL == (row->ip = pb_strdup(src->ip)))
	{
		row->dns = NULL;
		row->value = NULL;
		goto out;
	}

	if (NULL == (row->dns = pb_strdup(src->dns)))
	{
		row->value = NULL;
		goto out;
	}

	if (NULL == (row->value = pb_strdup(src->value)))
		goto out;

	if (NULL == (row->error = pb_strdup(src->error)))
		goto out;

	ret = zbx_list_append(&pb->discovery, row, NULL);
out:
	if (SUCCEED != ret && NULL != row)
		pb_list_free_discovery(&pb->discovery, row);

	zabbix_log(LOG_LEVEL_TRACE, "End of %s() ret:%s free:" ZBX_FS_SIZE_T, __func__, zbx_result_string(ret),
			pb_get_free_size());

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: set ids to new discovery rows                                     *
 *                                                                            *
 ******************************************************************************/
static void	pb_discovery_set_row_ids(zbx_list_t *rows, int rows_num, zbx_uint64_t handleid)
{
	zbx_uint64_t		id;
	zbx_pb_discovery_t	*row;
	zbx_list_iterator_t	li;

	id = zbx_dc_get_nextid("proxy_dhistory", rows_num);
	zbx_list_iterator_init(rows, &li);

	while (SUCCEED == zbx_list_iterator_next(&li))
	{
		(void)zbx_list_iterator_peek(&li, (void **)&row);
		row->id = id++;
		row->handleid = handleid;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: add discovery rows to memory cache                                *
 *                                                                            *
 * Parameters: pb   - [IN] proxy buffer                                       *
 *             rows - [IN] rows to add                                        *
 *                                                                            *
 * Return value: NULL if all rows were added successfully. Otherwise the list *
 *               item of first failed row is returned                         *
 *                                                                            *
 ******************************************************************************/
static zbx_list_item_t	*pb_discovery_add_rows_mem(zbx_pb_t *pb, zbx_list_t *rows)
{
	zbx_list_iterator_t	li;
	zbx_pb_discovery_t	*row;
	int			rows_num = 0;
	size_t			size = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_list_iterator_init(rows, &li);

	while (SUCCEED == zbx_list_iterator_next(&li))
	{
		(void)zbx_list_iterator_peek(&li, (void **)&row);

		while (SUCCEED != pb_discovery_add_row_mem(pb, row))
		{
			if (ZBX_PB_MODE_MEMORY != pb->mode)
				goto out;

			/* in memory mode keep discarding old records until new */
			/* one can be written in proxy memory buffer            */

			if (0 == size)
				size = pb_discovery_estimate_row_size(row->value, row->ip, row->dns, row->error);

			if (FAIL == pb_free_space(get_pb_data(), size))
			{
				zabbix_log(LOG_LEVEL_WARNING, "discovery record with size " ZBX_FS_SIZE_T
						" is too large for proxy memory buffer, discarding", size);
				break;
			}
		}

		rows_num++;
	}
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows_num:%d next:%p", __func__, rows_num, li.current);

	return li.current;
}

/******************************************************************************
 *                                                                            *
 * Purpose: add discovery rows to database cache                              *
 *                                                                            *
 * Parameters: rows   - [IN] rows to add                                      *
 *             next   - [IN] next row to add                                  *
 *             lastid - [OUT] last inserted id                                *
 *                                                                            *
 ******************************************************************************/
static void	pb_discovery_add_rows_db(zbx_list_t *rows, zbx_list_item_t *next, zbx_uint64_t *lastid)
{
	zbx_list_iterator_t	li;
	zbx_pb_discovery_t	*row;
	zbx_db_insert_t		db_insert;
	int			rows_num = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() next:%p", __func__, next);

	if (SUCCEED == zbx_list_iterator_init_with(rows, next, &li))
	{
		zbx_db_insert_prepare(&db_insert, "proxy_dhistory", "id", "clock", "druleid", "ip", "port", "value",
				"status", "dcheckid", "dns", "error", (char *)NULL);

		do
		{
			(void)zbx_list_iterator_peek(&li, (void **)&row);
			zbx_db_insert_add_values(&db_insert, row->id, row->clock, row->druleid, row->ip, row->port,
					row->value, row->status, row->dcheckid, row->dns, row->error);
			rows_num++;
			*lastid = row->id;
		}
		while (SUCCEED == zbx_list_iterator_next(&li));

		(void)zbx_db_insert_execute(&db_insert);
		zbx_db_insert_clean(&db_insert);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows_num:%d", __func__, rows_num);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get discovery records from memory cache                           *
 *                                                                            *
 ******************************************************************************/
static int	pb_discovery_get_mem(zbx_pb_t *pb, struct zbx_json *j, zbx_uint64_t *lastid, int *more)
{
#define	pb_add_json_field_helper(fld_name, type)						\
	pb_add_json_field(j, &dht, ZBX_STR(fld_name), &row->fld_name, type)

	int			records_num = 0;
	zbx_list_iterator_t	li;
	zbx_pb_discovery_t	*row;
	void			*ptr;

	*more = ZBX_PROXY_DATA_DONE;

	if (SUCCEED == zbx_list_peek(&pb->discovery, &ptr))
	{
		zbx_json_addarray(j, ZBX_PROTO_TAG_DISCOVERY_DATA);
		zbx_list_iterator_init(&pb->discovery, &li);

		while (SUCCEED == zbx_list_iterator_next(&li))
		{
			if (ZBX_DATA_JSON_BATCH_LIMIT <= j->buffer_offset || records_num >= ZBX_MAX_HRECORDS_TOTAL)
			{
				*more = ZBX_PROXY_DATA_MORE;
				break;
			}

			(void)zbx_list_iterator_peek(&li, (void **)&row);

			zbx_json_addobject(j, NULL);
			pb_add_json_field_helper(clock, ZBX_TYPE_INT);
			pb_add_json_field_helper(druleid, ZBX_TYPE_UINT);
			pb_add_json_field_helper(dcheckid, ZBX_TYPE_UINT);
			pb_add_json_field_helper(ip, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(dns, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(port, ZBX_TYPE_INT);
			pb_add_json_field_helper(value, ZBX_TYPE_CHAR);
			pb_add_json_field_helper(status, ZBX_TYPE_INT);
			pb_add_json_field_helper(error, ZBX_TYPE_CHAR);
			zbx_json_close(j);

			records_num++;
			*lastid = row->id;
		}

		zbx_json_close(j);
	}

	return records_num;

#	undef	pb_add_json_field_helper
}

/******************************************************************************
 *                                                                            *
 * Purpose: clear sent discovery records                                      *
 *                                                                            *
 ******************************************************************************/
void	pb_discovery_clear(zbx_pb_t *pb, zbx_uint64_t lastid)
{
	zbx_pb_discovery_t	*row;

	while (SUCCEED == zbx_list_peek(&pb->discovery, (void **)&row))
	{
		if (row->id > lastid)
			break;

		zbx_list_pop(&pb->discovery, NULL);
		pb_list_free_discovery(&pb->discovery, row);
	}
}

static void	pb_discovery_data_free(zbx_pb_discovery_data_t *data)
{
	if (PB_MEMORY == data->state)
	{
		zbx_pb_discovery_t	*row;

		while (SUCCEED == zbx_list_pop(&data->rows, (void **)&row))
			pb_list_free_discovery(&data->rows, row);

		zbx_list_destroy(&data->rows);
	}
	else
	{
		zbx_db_insert_clean(&data->db_insert);
	}

	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if oldest record is within allowed age                      *
 *                                                                            *
 ******************************************************************************/
int	pb_discovery_check_age(zbx_pb_t *pb)
{
	zbx_pb_discovery_t	*row;
	int			now;

	now = (int)time(NULL);

	while (SUCCEED == zbx_list_peek(&pb->discovery, (void **)&row))
	{
		if (now - row->clock <= pb->offline_buffer)
			break;

		zbx_list_pop(&pb->discovery, NULL);
		pb_list_free_discovery(&pb->discovery, row);
	}

	if (0 == pb->max_age)
		return SUCCEED;

	if (SUCCEED != zbx_list_peek(&pb->discovery, (void **)&row) || time(NULL) - row->clock < pb->max_age)
		return SUCCEED;

	return FAIL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: write discovery last sent record id to database                   *
 *                                                                            *
 ******************************************************************************/
void	pb_discovery_set_lastid(zbx_uint64_t lastid)
{
	pb_set_lastid(dht.table, dht.lastidfield, lastid);
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if discovery rows are cached in memory buffer               *
 *                                                                            *
 ******************************************************************************/
int	pb_discovery_has_mem_rows(zbx_pb_t *pb)
{
	void	*ptr;

	return zbx_list_peek(&pb->discovery, &ptr);
}

/* public api */

/******************************************************************************
 *                                                                            *
 * Purpose: open discovery data cache                                         *
 *                                                                            *
 * Return value: The discovery data cache handle                              *
 *                                                                            *
 ******************************************************************************/
zbx_pb_discovery_data_t	*zbx_pb_discovery_open(void)
{
	zbx_pb_discovery_data_t	*data;
	zbx_pb_t		*pb_data = get_pb_data();

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	data = (zbx_pb_discovery_data_t *)zbx_malloc(NULL, sizeof(zbx_pb_discovery_data_t));

	pb_lock();

	data->handleid = pb_get_next_handleid(pb_data);

	if (PB_DATABASE == (data->state = get_pb_dst(pb_data->state)))
		pb_data->db_handles_num++;

	pb_unlock();

	if (PB_MEMORY == data->state)
	{
		zbx_list_create(&data->rows);
		data->rows_num = 0;
	}
	else
	{
		zbx_db_insert_prepare(&data->db_insert, "proxy_dhistory", "id", "clock", "druleid", "ip", "port",
				"value", "status", "dcheckid", "dns", "error", (char *)NULL);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return data;
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush the cached discovery data and free the handle               *
 *                                                                            *
 ******************************************************************************/
void	zbx_pb_discovery_close(zbx_pb_discovery_data_t *data)
{
	zbx_uint64_t	lastid = 0;
	zbx_pb_t	*pb_data = get_pb_data();

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (PB_MEMORY == data->state)
	{
		zbx_list_item_t	*next = NULL;

		if (0 == data->rows_num)
			goto out;

		pb_lock();

		pb_discovery_set_row_ids(&data->rows, data->rows_num, data->handleid);

		if (PB_MEMORY == pb_data->state && SUCCEED != pb_discovery_check_age(pb_data))
		{
			pd_fallback_to_database(pb_data, "cached records are too old");
		}
		else if (PB_MEMORY == get_pb_dst(pb_data->state))
		{
			if (NULL == (next = pb_discovery_add_rows_mem(pb_data, &data->rows)))
			{
				pb_unlock();
				goto out;
			}

			if (PB_DATABASE_MEMORY == pb_data->state)
			{
				pd_fallback_to_database(pb_data, "not enough space to complete transition to memory"
						" mode");
			}
			else
			{
				/* initiate transition to database cache */
				pb_set_state(pb_data, PB_MEMORY_DATABASE, "not enough space");
			}
		}

		/* not all rows were added to memory cache - flush them to database */
		pb_data->db_handles_num++;
		pb_unlock();

		do
		{
			zbx_db_begin();
			pb_discovery_add_rows_db(&data->rows, next, &lastid);
		}
		while (ZBX_DB_DOWN == zbx_db_commit());
	}
	else
	{
		zbx_db_insert_autoincrement(&data->db_insert, "id");

		do
		{
			zbx_db_begin();
			(void)zbx_db_insert_execute(&data->db_insert);
		}
		while (ZBX_DB_DOWN == zbx_db_commit());

		lastid = zbx_db_insert_get_lastid(&data->db_insert);
	}

	pb_lock();

	if (pb_data->discovery_lastid_db < lastid)
		pb_data->discovery_lastid_db = lastid;

	pb_data->db_handles_num--;

	pb_unlock();
out:
	pb_discovery_data_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: write service data into discovery data cache                      *
 *                                                                            *
 ******************************************************************************/
void	zbx_pb_discovery_write_service(zbx_pb_discovery_data_t *data, zbx_uint64_t druleid, zbx_uint64_t dcheckid,
		const char *ip, const char *dns, int port, int status, const char *value, int clock)
{
	pb_discovery_write_row(data, druleid, dcheckid, ip, dns, port, status, value, clock, "");
}

/******************************************************************************
 *                                                                            *
 * Purpose: write host data into discovery data cache                         *
 *                                                                            *
 ******************************************************************************/
void	zbx_pb_discovery_write_host(zbx_pb_discovery_data_t *data, zbx_uint64_t druleid, const char *ip,
		const char *dns, int status, int clock, const char *error)
{
	pb_discovery_write_row(data, druleid, 0, ip, dns, 0, status, "", clock, error);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get discovery rows for sending to server                          *
 *                                                                            *
 ******************************************************************************/
int	zbx_pb_discovery_get_rows(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
{
	int	state, ret;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, *lastid);

	pb_lock();

	if (PB_MEMORY == (state = get_pb_src(get_pb_data()->state)))
		ret = pb_discovery_get_mem(get_pb_data(), j, lastid, more);

	pb_unlock();

	if (PB_MEMORY != state)
		ret = pb_get_discovery_db(j, lastid, more);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rows:%d", __func__, ret);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: update database lastid/clear memory records                       *
 *                                                                            *
 ******************************************************************************/
void	zbx_pb_discovery_set_lastid(const zbx_uint64_t lastid)
{
	int		state;
	zbx_pb_t	*pb_data = get_pb_data();

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, lastid);

	pb_lock();

	pb_data->discovery_lastid_sent = lastid;

	if (PB_MEMORY == (state = get_pb_src(pb_data->state)))
		pb_discovery_clear(pb_data, lastid);

	pb_unlock();

	if (PB_DATABASE == state)
		pb_discovery_set_lastid(lastid);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}