/*
** Zabbix
** Copyright (C) 2001-2022 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "common.h"
#include "daemon.h"

#include "zbxself.h"
#include "log.h"
#include "zbxipcservice.h"
#include "lld_manager.h"
#include "lld_protocol.h"

extern unsigned char	process_type, program_type;
extern int		server_num, process_num;

extern int	CONFIG_LLDWORKER_FORKS;

/*
 * The LLD queue is organized as a queue (rule_queue binary heap) of LLD rules,
 * sorted by their oldest value timestamps. The values are stored in linked lists,
 * each rule having its own list of values. Values inside list are not sorted, so
 * in the case a LLD rule received a value with past timestamp, it will be processed
 * in queuing order, not the value chronological order.
 *
 * During processing the rule with oldest value is popped from queue and sent
 * to a free worker. After processing the rule worker sends done response and
 * manager removes the oldest value from rule's value list. If there are no more
 * values in the list the rule is removed from the index (rule_index hashset),
 * otherwise the rule is enqueued back in LLD queue.
 *
 */

typedef struct
{
	/* workers vector, created during manager initialization */
	zbx_vector_ptr_t	workers;

	/* free workers */
	zbx_queue_ptr_t		free_workers;

	/* workers indexed by IPC service clients */
	zbx_hashset_t		workers_client;

	/* the next worker index to be assigned to new IPC service clients */
	int			next_worker_index;

	/* index of queued LLD rules */
	zbx_hashset_t		rule_index;

	/* LLD rule queue, ordered by the oldest values */
	zbx_binary_heap_t	rule_queue;

	/* the number of queued LLD rules */
	zbx_uint64_t		queued_num;

}
zbx_lld_manager_t;

typedef struct
{
	zbx_ipc_client_t	*client;
	zbx_lld_rule_t		*rule;
}
zbx_lld_worker_t;

/* workers_client hashset support */
static zbx_hash_t	worker_hash_func(const void *d)
{
	const zbx_lld_worker_t	*worker = *(const zbx_lld_worker_t **)d;

	zbx_hash_t hash =  ZBX_DEFAULT_PTR_HASH_FUNC(&worker->client);

	return hash;
}

static int	worker_compare_func(const void *d1, const void *d2)
{
	const zbx_lld_worker_t	*p1 = *(const zbx_lld_worker_t **)d1;
	const zbx_lld_worker_t	*p2 = *(const zbx_lld_worker_t **)d2;

	ZBX_RETURN_IF_NOT_EQUAL(p1->client, p2->client);
	return 0;
}

/* rule_queue binary heap support */
static int	rule_elem_compare_func(const void *d1, const void *d2)
{
	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;

	const zbx_lld_rule_t	*rule1 = (const zbx_lld_rule_t *)e1->data;
	const zbx_lld_rule_t	*rule2 = (const zbx_lld_rule_t *)e2->data;

	/* compare by timestamp of the oldest value */
	return zbx_timespec_compare(&rule1->head->ts, &rule2->head->ts);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_data_free                                                    *
 *                                                                            *
 * Purpose: frees LLD data                                                    *
 *                                                                            *
 ******************************************************************************/
static void	lld_data_free(zbx_lld_data_t *data)
{
	zbx_free(data->value);
	zbx_free(data->error);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_rule_clear                                                   *
 *                                                                            *
 * Purpose: clears LLD rule                                                   *
 *                                                                            *
 ******************************************************************************/
static void	lld_rule_clear(zbx_lld_rule_t *rule)
{
	zbx_lld_data_t	*data;

	while (NULL != rule->head)
	{
		data = rule->head;
		rule->head = data->next;
		lld_data_free(data);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: lld_worker_free                                                  *
 *                                                                            *
 * Purpose: frees LLD worker                                                  *
 *                                                                            *
 ******************************************************************************/
static void	lld_worker_free(zbx_lld_worker_t *worker)
{
	zbx_free(worker);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_manager_init                                                 *
 *                                                                            *
 * Purpose: initializes LLD manager                                           *
 *                                                                            *
 * Parameters: manager - [IN] the manager to initialize                       *
 *                                                                            *
 ******************************************************************************/
static void	lld_manager_init(zbx_lld_manager_t *manager)
{
	int			i;
	zbx_lld_worker_t	*worker;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, CONFIG_LLDWORKER_FORKS);

	zbx_vector_ptr_create(&manager->workers);
	zbx_queue_ptr_create(&manager->free_workers);
	zbx_hashset_create(&manager->workers_client, 0, worker_hash_func, worker_compare_func);

	zbx_hashset_create_ext(&manager->rule_index, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
			(zbx_clean_func_t)lld_rule_clear,
			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);

	zbx_binary_heap_create(&manager->rule_queue, rule_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);

	manager->next_worker_index = 0;

	for (i = 0; i < CONFIG_LLDWORKER_FORKS; i++)
	{
		worker = (zbx_lld_worker_t *)zbx_malloc(NULL, sizeof(zbx_lld_worker_t));

		worker->client = NULL;

		zbx_vector_ptr_append(&manager->workers, worker);
	}

	manager->queued_num = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_manager_destroy                                              *
 *                                                                            *
 * Purpose: destroys LLD manager                                              *
 *                                                                            *
 * Parameters: manager - [IN] the manager to destroy                          *
 *                                                                            *
 ******************************************************************************/
static void	lld_manager_destroy(zbx_lld_manager_t *manager)
{
	zbx_binary_heap_destroy(&manager->rule_queue);
	zbx_hashset_destroy(&manager->rule_index);
	zbx_queue_ptr_destroy(&manager->free_workers);
	zbx_hashset_destroy(&manager->workers_client);
	zbx_vector_ptr_clear_ext(&manager->workers, (zbx_clean_func_t)lld_worker_free);
	zbx_vector_ptr_destroy(&manager->workers);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_get_worker_by_client                                         *
 *                                                                            *
 * Purpose: returns worker by connected IPC client data                       *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker                            *
 *                                                                            *
 * Return value: The LLD worker                                               *
 *                                                                            *
 ******************************************************************************/
static zbx_lld_worker_t	*lld_get_worker_by_client(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_lld_worker_t	**worker, worker_local, *plocal = &worker_local;

	plocal->client = client;
	worker = (zbx_lld_worker_t **)zbx_hashset_search(&manager->workers_client, &plocal);

	if (NULL == worker)
	{
		THIS_SHOULD_NEVER_HAPPEN;
		exit(EXIT_FAILURE);
	}

	return *worker;
}

/******************************************************************************
 *                                                                            *
 * Function: lld_register_worker                                              *
 *                                                                            *
 * Purpose: registers worker                                                  *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker IPC client data            *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	lld_register_worker(zbx_lld_manager_t *manager, zbx_ipc_client_t *client,
		const zbx_ipc_message_t *message)
{
	zbx_lld_worker_t	*worker;
	pid_t			ppid;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	memcpy(&ppid, message->data, sizeof(ppid));

	if (ppid != getppid())
	{
		zbx_ipc_client_close(client);
		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
	}
	else
	{
		if (manager->next_worker_index == manager->workers.values_num)
		{
			THIS_SHOULD_NEVER_HAPPEN;
			exit(EXIT_FAILURE);
		}

		worker = (zbx_lld_worker_t *)manager->workers.values[manager->next_worker_index++];
		worker->client = client;

		zbx_hashset_insert(&manager->workers_client, &worker, sizeof(zbx_lld_worker_t *));
		zbx_queue_ptr_push(&manager->free_workers, worker);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_queue_rule                                                   *
 *                                                                            *
 * Purpose: queues LLD rule                                                   *
 *                                                                            *
 * Parameters: manager - [IN] the LLD manager                                 *
 *             rule    - [IN] the LLD rule                                    *
 *                                                                            *
 ******************************************************************************/
static void	lld_queue_rule(zbx_lld_manager_t *manager, zbx_lld_rule_t *rule)
{
	zbx_binary_heap_elem_t	elem = {rule->hostid, rule};

	zbx_binary_heap_insert(&manager->rule_queue, &elem);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_queue_request                                                *
 *                                                                            *
 * Purpose: queues low level discovery request                                *
 *                                                                            *
 * Parameters: manager - [IN] the LLD manager                                 *
 *             message - [IN] the message with LLD request                    *
 *                                                                            *
 ******************************************************************************/
static void	lld_queue_request(zbx_lld_manager_t *manager, const zbx_ipc_message_t *message)
{
	zbx_uint64_t	hostid;
	zbx_lld_rule_t	*rule;
	zbx_lld_data_t	*data;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	data = (zbx_lld_data_t *)zbx_malloc(NULL, sizeof(zbx_lld_data_t));
	data->next = NULL;

	zbx_lld_deserialize_item_value(message->data, &data->itemid, &hostid, &data->value, &data->ts, &data->meta,
			&data->lastlogsize, &data->mtime, &data->error);

	if (NULL == (rule = zbx_hashset_search(&manager->rule_index, &hostid)))
	{
		zbx_lld_rule_t	rule_local = {.hostid = hostid, .values_num = 0, .tail = data, .head = data};

		data->prev = NULL;

		rule = zbx_hashset_insert(&manager->rule_index, &rule_local, sizeof(rule_local));
		lld_queue_rule(manager, rule);
	}
	else
	{
		if (0 == data->meta)
		{
			zbx_lld_data_t	*data_ptr;

			for (data_ptr = rule->tail; NULL != data_ptr; data_ptr = data_ptr->prev)
			{
				/* if there are multiple values then they should be different, check only last one */
				if (data_ptr->itemid == data->itemid)
					break;
			}

			if (NULL != data_ptr && 0 == zbx_strcmp_null(data->error, data_ptr->error) &&
					0 == zbx_strcmp_null(data->value, data_ptr->value))
			{
				zabbix_log(LOG_LEVEL_DEBUG, "skip repeating values for discovery rule:" ZBX_FS_UI64,
						data->itemid);

				lld_data_free(data);
				goto out;
			}
		}

		data->prev = rule->tail;
		rule->tail->next = data;
		rule->tail = data;
	}

	zabbix_log(LOG_LEVEL_DEBUG, "queuing discovery rule:" ZBX_FS_UI64, data->itemid);

	rule->values_num++;
	manager->queued_num++;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_process_next_request                                         *
 *                                                                            *
 * Purpose: processes next LLD request from queue                             *
 *                                                                            *
 * Parameters: manager - [IN] the LLD manager                                 *
 *             worker  - [IN] the target worker                               *
 *                                                                            *
 ******************************************************************************/
static void	lld_process_next_request(zbx_lld_manager_t *manager, zbx_lld_worker_t *worker)
{
	zbx_binary_heap_elem_t	*elem;
	unsigned char		*buf;
	zbx_uint32_t		buf_len;
	zbx_lld_data_t		*data;

	elem = zbx_binary_heap_find_min(&manager->rule_queue);
	worker->rule = (zbx_lld_rule_t *)elem->data;
	zbx_binary_heap_remove_min(&manager->rule_queue);

	data = worker->rule->head;
	buf_len = zbx_lld_serialize_item_value(&buf, data->itemid, 0, data->value, &data->ts, data->meta,
			data->lastlogsize, data->mtime, data->error);
	zbx_ipc_client_send(worker->client, ZBX_IPC_LLD_TASK, buf, buf_len);
	zbx_free(buf);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_process_queue                                                *
 *                                                                            *
 * Purpose: sends queued LLD rules to free workers                            *
 *                                                                            *
 * Parameters: manager - [IN] the LLD manager                                 *
 *                                                                            *
 ******************************************************************************/
static void	lld_process_queue(zbx_lld_manager_t *manager)
{
	zbx_lld_worker_t	*worker;

	while (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue))
	{
		if (NULL == (worker = zbx_queue_ptr_pop(&manager->free_workers)))
			break;

		lld_process_next_request(manager, worker);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: lld_process_result                                               *
 *                                                                            *
 * Purpose: processes LLD worker 'done' response                              *
 *                                                                            *
 * Parameters: manager - [IN] the LLD manager                                 *
 * Parameters: client  - [IN] the worker's IPC client connection              *
 *                                                                            *
 ******************************************************************************/
static void	lld_process_result(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_lld_worker_t	*worker;
	zbx_lld_rule_t		*rule;
	zbx_lld_data_t		*data;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = lld_get_worker_by_client(manager, client);

	zabbix_log(LOG_LEVEL_DEBUG, "discovery rule:" ZBX_FS_UI64 " has been processed", worker->rule->head->itemid);

	rule = worker->rule;
	worker->rule = NULL;

	data = rule->head;
	rule->head = rule->head->next;

	if (NULL == rule->head)
	{
		zbx_hashset_remove_direct(&manager->rule_index, rule);
	}
	else
	{
		rule->head->prev = NULL;
		rule->values_num--;
		lld_queue_rule(manager, rule);
	}

	lld_data_free(data);

	if (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue))
		lld_process_next_request(manager, worker);
	else
		zbx_queue_ptr_push(&manager->free_workers, worker);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_process_diag_stats                                           *
 *                                                                            *
 * Purpose: processes external diagnostic statistics request                  *
 *                                                                            *
 * Parameters: manager - [IN] the LLD manager                                 *
 * Parameters: client  - [IN] the external IPC connection                     *
 *                                                                            *
 ******************************************************************************/
static void	lld_process_diag_stats(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
{
	unsigned char	*data;
	zbx_uint32_t	data_len;

	data_len = zbx_lld_serialize_diag_stats(&data, manager->rule_index.num_data, manager->queued_num);
	zbx_ipc_client_send(client, ZBX_IPC_LLD_DIAG_STATS_RESULT, data, data_len);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_diag_item_compare_values_desc                                *
 *                                                                            *
 * Purpose: sort lld manager cache item view by second value                  *
 *          (number of values) in descending order                            *
 *                                                                            *
 ******************************************************************************/
static int	lld_diag_item_compare_values_desc(const void *d1, const void *d2)
{
	zbx_lld_rule_info_t	*r1 = *(zbx_lld_rule_info_t **)d1;
	zbx_lld_rule_info_t	*r2 = *(zbx_lld_rule_info_t **)d2;

	return r2->values_num - r1->values_num;
}

/******************************************************************************
 *                                                                            *
 * Function: lld_process_diag_top                                             *
 *                                                                            *
 * Purpose: processes external top items request                              *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker IPC client data            *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	lld_process_top_items(zbx_lld_manager_t *manager, zbx_ipc_client_t *client,
		const zbx_ipc_message_t *message)
{
	int			limit;
	unsigned char		*data;
	zbx_uint32_t		data_len;
	zbx_vector_ptr_t	view;
	zbx_hashset_iter_t	iter;
	zbx_hashset_t		rule_infos;
	zbx_lld_rule_t		*rule;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_lld_deserialize_top_items_request(message->data, &limit);

	zbx_hashset_create(&rule_infos, MAX(1000, (size_t)manager->rule_index.num_data), ZBX_DEFAULT_UINT64_HASH_FUNC,
			ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_ptr_create(&view);

	zbx_hashset_iter_reset(&manager->rule_index, &iter);
	while (NULL != (rule = (zbx_lld_rule_t *)zbx_hashset_iter_next(&iter)))
	{
		zbx_lld_data_t	*data_ptr;

		for (data_ptr = rule->head; NULL != data_ptr; data_ptr = data_ptr->next)
		{
			zbx_lld_rule_info_t	*rule_info, rule_info_local = {.itemid = data_ptr->itemid};

			rule_info = (zbx_lld_rule_info_t *)zbx_hashset_search(&rule_infos, &rule_info_local);

			if (NULL == rule_info)
			{
				rule_info = (zbx_lld_rule_info_t *)zbx_hashset_insert(&rule_infos, &rule_info_local,
						sizeof(zbx_lld_rule_info_t));
				zbx_vector_ptr_append(&view, rule_info);
			}

			rule_info->values_num++;
		}
	}

	zbx_vector_ptr_sort(&view, lld_diag_item_compare_values_desc);

	data_len = zbx_lld_serialize_top_items_result(&data, (const zbx_lld_rule_info_t **)view.values,
			MIN(limit, view.values_num));
	zbx_ipc_client_send(client, ZBX_IPC_LLD_TOP_ITEMS_RESULT, data, data_len);

	zbx_free(data);
	zbx_vector_ptr_destroy(&view);
	zbx_hashset_destroy(&rule_infos);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: lld_manager_thread                                               *
 *                                                                            *
 * Purpose: main processing loop                                              *
 *                                                                            *
 ******************************************************************************/
ZBX_THREAD_ENTRY(lld_manager_thread, args)
{
#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */

	zbx_ipc_service_t	lld_service;
	char			*error = NULL;
	zbx_ipc_client_t	*client;
	zbx_ipc_message_t	*message;
	double			time_stat, time_now, sec, time_idle = 0;
	zbx_lld_manager_t	manager;
	zbx_uint64_t		processed_num = 0;
	int			ret;

	process_type = ((zbx_thread_args_t *)args)->process_type;
	server_num = ((zbx_thread_args_t *)args)->server_num;
	process_num = ((zbx_thread_args_t *)args)->process_num;

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
			server_num, get_process_type_string(process_type), process_num);

	if (FAIL == zbx_ipc_service_start(&lld_service, ZBX_IPC_SERVICE_LLD, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start LLD manager service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	lld_manager_init(&manager);

	/* initialize statistics */
	time_stat = zbx_time();

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();

		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL
					"sec during " ZBX_FS_DBL " sec]",
					get_process_type_string(process_type), process_num, processed_num,
					time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			processed_num = 0;
		}

		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
		ret = zbx_ipc_service_recv(&lld_service, 1, &client, &message);
		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);

		sec = zbx_time();
		zbx_update_env(sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_LLD_REGISTER:
					lld_register_worker(&manager, client, message);
					break;
				case ZBX_IPC_LLD_REQUEST:
					lld_queue_request(&manager, message);
					lld_process_queue(&manager);
					break;
				case ZBX_IPC_LLD_DONE:
					lld_process_result(&manager, client);
					processed_num++;
					manager.queued_num--;
					break;
				case ZBX_IPC_LLD_QUEUE:
					zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
							sizeof(zbx_uint64_t));
					break;
				case ZBX_IPC_LLD_DIAG_STATS:
					lld_process_diag_stats(&manager, client);
					break;
				case ZBX_IPC_LLD_TOP_ITEMS:
					lld_process_top_items(&manager, client, message);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);
	}

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);

	zbx_ipc_service_close(&lld_service);
	lld_manager_destroy(&manager);
}