/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "lld_worker.h"
#include "lld.h"
#include "lld_protocol.h"

#include "../events/events.h"

#include "zbxnix.h"
#include "zbxlog.h"
#include "zbxipcservice.h"
#include "zbxself.h"
#include "zbxtime.h"
#include "zbx_item_constants.h"
#include "zbxstr.h"
#include "zbxalgo.h"
#include "zbxcacheconfig.h"
#include "zbxdbhigh.h"

/******************************************************************************
 *                                                                            *
 * Purpose: registers LLD worker with LLD manager                             *
 *                                                                            *
 * Parameters: socket - [IN] connections socket                               *
 *                                                                            *
 ******************************************************************************/
static void	lld_register_worker(zbx_ipc_socket_t *socket)
{
	pid_t	ppid;

	ppid = getppid();

	zbx_ipc_socket_write(socket, ZBX_IPC_LLD_REGISTER, (unsigned char *)&ppid, sizeof(ppid));
}

/******************************************************************************
 *                                                                            *
 * Purpose: Processes LLD task and updates rule state/error in configuration  *
 *          cache and database.                                               *
 *                                                                            *
 * Parameters: message - [IN] message with LLD request                        *
 *                                                                            *
 ******************************************************************************/
static void	lld_process_task(const zbx_ipc_message_t *message)
{
	zbx_uint64_t		itemid, hostid, lastlogsize;
	char			*value, *error;
	zbx_timespec_t		ts;
	zbx_item_diff_t		diff;
	zbx_dc_item_t		item;
	int			errcode, mtime;
	unsigned char		state, meta;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_lld_deserialize_item_value(message->data, &itemid, &hostid, &value, &ts, &meta, &lastlogsize, &mtime,
			&error);

	zbx_dc_config_get_items_by_itemids(&item, &itemid, &errcode, 1);

	if (SUCCEED != errcode)
		goto out;

	zabbix_log(LOG_LEVEL_DEBUG, "processing discovery rule:" ZBX_FS_UI64, itemid);

	diff.flags = ZBX_FLAGS_ITEM_DIFF_UNSET;

	if (NULL != error || NULL != value)
	{
		if (NULL == error && SUCCEED == lld_process_discovery_rule(itemid, value, &error))
			state = ITEM_STATE_NORMAL;
		else
			state = ITEM_STATE_NOTSUPPORTED;

		if (state != item.state)
		{
			diff.state = state;
			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;

			if (ITEM_STATE_NORMAL == state)
			{
				zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became supported",
						item.host.host, item.key_orig);

				zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts,
						ITEM_STATE_NORMAL, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL,
						NULL, NULL);
			}
			else
			{
				zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became not supported: %s",
						item.host.host, item.key_orig, error);

				zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts,
						ITEM_STATE_NOTSUPPORTED, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0,
						NULL, NULL, error);
			}

			zbx_db_begin();
			zbx_process_events(NULL, NULL, NULL);
			zbx_db_commit();

			zbx_clean_events();
		}

		/* with successful LLD processing LLD error will be set to empty string */
		if (NULL != error && 0 != strcmp(error, ZBX_NULL2EMPTY_STR(item.error)))
		{
			diff.error = error;
			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
		}
	}

	if (0 != meta)
	{
		if (item.lastlogsize != lastlogsize)
		{
			diff.lastlogsize = lastlogsize;
			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
		}
		if (item.mtime != mtime)
		{
			diff.mtime = mtime;
			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
		}
	}

	if (ZBX_FLAGS_ITEM_DIFF_UNSET != diff.flags)
	{
		zbx_vector_item_diff_ptr_t	diffs;
		char				*sql = NULL;
		size_t				sql_alloc = 0, sql_offset = 0;

		zbx_vector_item_diff_ptr_create(&diffs);
		diff.itemid = itemid;
		zbx_vector_item_diff_ptr_append(&diffs, &diff);

		zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset);
		zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, &diffs, ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
		zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset);

		if (16 < sql_offset)
			zbx_db_execute("%s", sql);

		zbx_dc_config_items_apply_changes(&diffs);

		zbx_vector_item_diff_ptr_destroy(&diffs);
		zbx_free(sql);
	}

	zbx_dc_config_clean_items(&item, &errcode, 1);
out:
	zbx_free(value);
	zbx_free(error);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

ZBX_THREAD_ENTRY(lld_worker_thread, args)
{
	char			*error = NULL;
	zbx_ipc_socket_t	lld_socket;
	zbx_ipc_message_t	message;
	double			time_stat, time_idle = 0, time_now, time_read;
	zbx_uint64_t		processed_num = 0;
	zbx_thread_info_t	*info = &((zbx_thread_args_t *)args)->info;
	int			server_num = ((zbx_thread_args_t *)args)->info.server_num,
				process_num = ((zbx_thread_args_t *)args)->info.process_num;
	unsigned char		process_type = ((zbx_thread_args_t *)args)->info.process_type;

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
			server_num, get_process_type_string(process_type), process_num);

	zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type));

	zbx_ipc_message_init(&message);

	if (FAIL == zbx_ipc_socket_open(&lld_socket, ZBX_IPC_SERVICE_LLD, SEC_PER_MIN, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot connect to lld manager service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	lld_register_worker(&lld_socket);

	time_stat = zbx_time();

	zbx_db_connect(ZBX_DB_CONNECT_NORMAL);

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();
#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */
		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL " sec during "
					ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num,
					processed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			processed_num = 0;
		}
#undef STAT_INTERVAL
		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);
		if (SUCCEED != zbx_ipc_socket_read(&lld_socket, &message))
		{
			if (ZBX_IS_RUNNING())
				zabbix_log(LOG_LEVEL_CRIT, "cannot read LLD manager service request");
			exit(EXIT_FAILURE);
		}
		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

		time_read = zbx_time();
		time_idle += time_read - time_now;
		zbx_update_env(get_process_type_string(process_type), time_read);

		switch (message.code)
		{
			case ZBX_IPC_LLD_TASK:
				lld_process_task(&message);
				zbx_ipc_socket_write(&lld_socket, ZBX_IPC_LLD_DONE, NULL, 0);
				processed_num++;
				break;
		}

		zbx_ipc_message_clean(&message);
	}

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);
}