/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "pg_service.h"
#include "zbxcommon.h"
#include "pg_cache.h"
#include "zbxalgo.h"
#include "zbxcacheconfig.h"
#include "zbxnix.h"
#include "zbxpgservice.h"
#include "zbxserialize.h"
#include "zbxthreads.h"
#include "zbxtime.h"
#include "zbxtypes.h"
#include "zbxversion.h"

/******************************************************************************
 *                                                                            *
 * Purpose: move hosts between proxy groups in cache                          *
 *                                                                            *
 * Parameter: pgs     - [IN] proxy group service                              *
 *            message - [IN] IPC message with host relocation data            *
 *                                                                            *
 ******************************************************************************/
static void	pg_update_host_pgroup(zbx_pg_service_t *pgs, zbx_ipc_message_t *message)
{
	unsigned char	*ptr = message->data;
	zbx_uint64_t	hostid, srcid, dstid;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	pg_cache_lock(pgs->cache);

	(void)pg_cache_update_groups(pgs->cache);

	while (ptr - message->data < message->size)
	{
		zbx_pg_group_t	*group;

		ptr += zbx_deserialize_value(ptr, &hostid);
		ptr += zbx_deserialize_value(ptr, &srcid);
		ptr += zbx_deserialize_value(ptr, &dstid);

		if (srcid == dstid)
		{
			/* handle host name change by re-assigning host to the same proxy */
			/* causing its hostmap revision to change                         */
			zbx_pg_host_t	*host;
			zbx_pg_proxy_t	*proxy;

			if (NULL == (host = (zbx_pg_host_t *)zbx_hashset_search(&pgs->cache->hostmap, &hostid)))
				continue;

			pg_cache_set_host_proxy(pgs->cache, hostid, host->proxyid);

			if (NULL != (proxy = (zbx_pg_proxy_t *)zbx_hashset_search(&pgs->cache->proxies,
					&host->proxyid)) && NULL != proxy->group)
			{
				pg_cache_queue_group_update(pgs->cache, proxy->group);
			}
		}
		else
		{
			if (0 != srcid)
			{
				if (NULL != (group = (zbx_pg_group_t *)zbx_hashset_search(&pgs->cache->groups, &srcid)))
					pg_cache_group_remove_host(pgs->cache, group, hostid);
			}

			if (0 != dstid)
			{
				if (NULL != (group = (zbx_pg_group_t *)zbx_hashset_search(&pgs->cache->groups, &dstid)))
					pg_cache_group_add_host(pgs->cache, group, hostid);
			}
		}
	}

	pg_cache_unlock(pgs->cache);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: update proxy lastaccess                                           *
 *                                                                            *
 * Parameter: pgs     - [IN] proxy group service                              *
 *            message - [IN] IPC message with proxy last access               *
 *                                                                            *
 ******************************************************************************/
static void	pg_update_proxy_rtdata(zbx_pg_service_t *pgs, zbx_ipc_message_t *message)
{
	unsigned char	*ptr = message->data;
	zbx_uint64_t	proxyid;
	int		lastaccess, version;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	ptr += zbx_deserialize_value(ptr, &proxyid);
	ptr += zbx_deserialize_value(ptr, &lastaccess);
	ptr += zbx_deserialize_value(ptr, &version);

	pg_cache_lock(pgs->cache);

	zbx_pg_proxy_t 	*proxy;

	if (NULL != (proxy = (zbx_pg_proxy_t *)zbx_hashset_search(&pgs->cache->proxies, &proxyid)))
	{
		if (0 != lastaccess)
			proxy->lastaccess = lastaccess;

		if (0 != version)
			proxy->version = ZBX_COMPONENT_VERSION_WITHOUT_PATCH(version);
	}

	pg_cache_unlock(pgs->cache);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get proxy configuration sync data                                 *
 *                                                                            *
 * Parameter: pgs     - [IN] proxy group service                              *
 *            message - [IN] IPC message with host relocation data            *
 *                                                                            *
 ******************************************************************************/
static void	pg_get_proxy_sync_data(zbx_pg_service_t *pgs, zbx_ipc_client_t *client, zbx_ipc_message_t *message)
{
	unsigned char	*ptr = message->data, *data, mode = ZBX_PROXY_SYNC_NONE;
	zbx_uint64_t	proxyid, proxy_hostmap_revision, hostmap_revision = 0;
	time_t		now;
	zbx_uint32_t	data_len, failover_delay_len;
	zbx_pg_proxy_t	*proxy;
	char		*failover_delay = ZBX_PG_DEFAULT_FAILOVER_DELAY_STR;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	ptr += zbx_deserialize_value(ptr, &proxyid);
	(void)zbx_deserialize_value(ptr, &proxy_hostmap_revision);

	now = time(NULL);

	pg_cache_lock(pgs->cache);

	/* if proxy is not cached or registered to proxy group return 'no sync' mode */
	/* with 0 hostmap_revision, forcing full sync next time                      */
	/* If hostmap revision is 0 it indicates that hostmap is not defined for the */
	/* proxy group. Return 'no sync' to prevent unneeded sync of proxy list and  */
	/* empty host map each sync period.                                          */
	if (NULL != (proxy = (zbx_pg_proxy_t *)zbx_hashset_search(&pgs->cache->proxies, &proxyid)) &&
			NULL != proxy->group && 0 != (hostmap_revision = proxy->group->hostmap_revision))
	{
		failover_delay = proxy->group->failover_delay;

		if (0 == proxy_hostmap_revision || proxy_hostmap_revision > hostmap_revision ||
				SEC_PER_DAY <= now - proxy->sync_time)
		{
			/* either proxy or server has been restarted or too much time has passed - */
			/* process with full sync                                                  */
			mode = ZBX_PROXY_SYNC_FULL;
		}
		else if (proxy_hostmap_revision < hostmap_revision)
		{
			for (int i = 0; i < proxy->deleted_group_hosts.values_num;)
			{
				if (proxy->deleted_group_hosts.values[i].revision <= proxy_hostmap_revision)
					zbx_vector_pg_host_remove_noorder(&proxy->deleted_group_hosts, i);
				else
					i++;
			}

			mode = ZBX_PROXY_SYNC_PARTIAL;
		}

		proxy->sync_time = now;
	}

	data_len = sizeof(unsigned char) + sizeof(zbx_uint64_t);
	zbx_serialize_prepare_str_len(data_len, failover_delay, failover_delay_len);

	if (ZBX_PROXY_SYNC_PARTIAL == mode)
	{
		data_len += (zbx_uint32_t)(sizeof(int) + (size_t)proxy->deleted_group_hosts.values_num *
				sizeof(zbx_uint64_t));
	}

	ptr = data = (unsigned char *)zbx_malloc(NULL, data_len);
	ptr += zbx_serialize_value(ptr, mode);
	ptr += zbx_serialize_value(ptr, hostmap_revision);
	ptr += zbx_serialize_str(ptr, failover_delay, failover_delay_len);

	if (ZBX_PROXY_SYNC_PARTIAL == mode)
	{
		ptr += zbx_serialize_value(ptr, proxy->deleted_group_hosts.values_num);

		for (int i = 0; i < proxy->deleted_group_hosts.values_num; i++)
			ptr += zbx_serialize_value(ptr, proxy->deleted_group_hosts.values[i].hostproxyid);
	}

	pg_cache_unlock(pgs->cache);

	zbx_ipc_client_send(client, ZBX_IPC_PGM_PROXY_SYNC_DATA, data, data_len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get proxy group statistics                                        *
 *                                                                            *
 * Parameter: pgs     - [IN] proxy group service                              *
 *            message - [IN] IPC message with host relocation data            *
 *                                                                            *
 ******************************************************************************/
static void	pg_get_proxy_group_stats(zbx_pg_service_t *pgs, zbx_ipc_client_t *client, zbx_ipc_message_t *message)
{
	unsigned char	*ptr, *data;
	int		state, proxies_online_num = 0;
	zbx_uint32_t	data_len;
	zbx_pg_group_t	*pg, *proxy_group = NULL;
	const char	*name = (const char *)message->data;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	pg_cache_lock(pgs->cache);

	zbx_hashset_iter_t	iter;

	zbx_hashset_iter_reset(&pgs->cache->groups, &iter);
	while (NULL != (pg = (zbx_pg_group_t *)zbx_hashset_iter_next(&iter)))
	{
		if (0 == strcmp(pg->name, name))
		{
			proxy_group = pg;
			break;
		}
	}

	if (NULL != proxy_group)
	{
		data_len = (zbx_uint32_t)((size_t)proxy_group->proxies.values_num * sizeof(zbx_uint64_t) +
				3 * sizeof(int));
		ptr = data = (unsigned char *)zbx_malloc(NULL, data_len);

		for (int i = 0; i < proxy_group->proxies.values_num; i++)
		{
			if (ZBX_PG_PROXY_STATE_ONLINE == proxy_group->proxies.values[i]->state)
				proxies_online_num++;
		}

		ptr += zbx_serialize_value(ptr, proxy_group->state);
		ptr += zbx_serialize_value(ptr, proxies_online_num);
		ptr += zbx_serialize_value(ptr, proxy_group->proxies.values_num);

		for (int i = 0; i < proxy_group->proxies.values_num; i++)
			ptr += zbx_serialize_value(ptr, proxy_group->proxies.values[i]->proxyid);

		zbx_ipc_client_send(client, ZBX_IPC_PGM_STATS, data, data_len);

		zbx_free(data);
	}
	else
	{
		state = -1;
		zbx_ipc_client_send(client, ZBX_IPC_PGM_STATS, (const unsigned char *)&state, sizeof(state));
	}

	pg_cache_unlock(pgs->cache);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: gets all proxy group real-time data                               *
 *                                                                            *
 * Parameter: pgs     - [IN] proxy group service                              *
 *            client  - [IN] IPC client                                       *
 *                                                                            *
 * Comments: skips standalone proxies                                         *
 *                                                                            *
 ******************************************************************************/
static void	pg_get_all_pgroup_rtdata(zbx_pg_service_t *pgs, zbx_ipc_client_t *client)
{
#define PROXY_GROUP_LEN	(sizeof(zbx_uint64_t) + 3 * sizeof(int))
	int			num_pgroups = 0;
	zbx_uint32_t		data_len;
	unsigned char		*ptr, *data;
	zbx_hashset_iter_t	iter;
	zbx_pg_group_t		*pg;
#define HEADER_LEN	sizeof(num_pgroups)
	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	pg_cache_lock(pgs->cache);

	data_len = (zbx_uint32_t)(HEADER_LEN + (size_t)pgs->cache->groups.num_data * PROXY_GROUP_LEN);
	ptr = data = (unsigned char *)zbx_malloc(NULL, data_len);

	ptr += HEADER_LEN;

	zbx_hashset_iter_reset(&pgs->cache->groups, &iter);
	while (NULL != (pg = (zbx_pg_group_t *)zbx_hashset_iter_next(&iter)))
	{
		/* skip standalone proxies */
		if (ZBX_PG_GROUP_STATE_DISABLED == pg->state)
			continue;

		int	proxies_online_num = 0;

		for (int i = 0; i < pg->proxies.values_num; i++)
		{
			if (ZBX_PG_PROXY_STATE_ONLINE == pg->proxies.values[i]->state)
				proxies_online_num++;
		}

		ptr += zbx_serialize_value(ptr, pg->proxy_groupid);
		ptr += zbx_serialize_value(ptr, pg->state);
		ptr += zbx_serialize_value(ptr, proxies_online_num);
		ptr += zbx_serialize_value(ptr, pg->proxies.values_num);

		num_pgroups++;
	}

	(void)zbx_serialize_value(data, num_pgroups);

	data_len = (zbx_uint32_t)(HEADER_LEN + (size_t)num_pgroups * PROXY_GROUP_LEN);
	zbx_ipc_client_send(client, ZBX_IPC_PGM_ALL_PGROUP_RTDATA, data, data_len);

	zbx_free(data);

	pg_cache_unlock(pgs->cache);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
#undef HEADER_LEN
#undef PROXY_GROUP_LEN
}

/******************************************************************************
 *                                                                            *
 * Purpose: proxy group service thread entry                                  *
 *                                                                            *
 ******************************************************************************/
static void	*pg_service_entry(void *data)
{
	zbx_pg_service_t	*pgs = (zbx_pg_service_t *)data;
	zbx_timespec_t		timeout = {1, 0};
	zbx_ipc_client_t	*client;
	zbx_ipc_message_t	*message;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	while (ZBX_IS_RUNNING())
	{
		(void)zbx_ipc_service_recv(&pgs->service, &timeout, &client, &message);

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_PGM_HOST_PGROUP_UPDATE:
					pg_update_host_pgroup(pgs, message);
					break;
				case ZBX_IPC_PGM_GET_PROXY_SYNC_DATA:
					pg_get_proxy_sync_data(pgs, client, message);
					break;
				case ZBX_IPC_PGM_GET_STATS:
					pg_get_proxy_group_stats(pgs, client, message);
					break;
				case ZBX_IPC_PGM_PROXY_RTDATA:
					pg_update_proxy_rtdata(pgs, message);
					break;
				case ZBX_IPC_PGM_GET_ALL_PGROUP_RTDATA:
					pg_get_all_pgroup_rtdata(pgs, client);
					break;
				case ZBX_IPC_PGM_STOP:
					goto out;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);
	}
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return NULL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: initialize proxy group service                                    *
 *                                                                            *
 ******************************************************************************/
int	pg_service_init(zbx_pg_service_t *pgs, zbx_pg_cache_t *cache, char **error)
{
	int	ret = FAIL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (FAIL == zbx_ipc_service_start(&pgs->service, ZBX_IPC_SERVICE_PGSERVICE, error))
		goto out;

	pgs->cache = cache;

	pthread_attr_t	attr;
	int		err;

	zbx_pthread_init_attr(&attr);
	if (0 != (err = pthread_create(&pgs->thread, &attr, pg_service_entry, (void *)pgs)))
	{
		*error = zbx_dsprintf(NULL, "cannot create thread: %s", zbx_strerror(err));
		goto out;
	}

	ret = SUCCEED;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: destroy proxy group service                                       *
 *                                                                            *
 ******************************************************************************/
void	pg_service_destroy(zbx_pg_service_t *pgs)
{
	zbx_ipc_socket_t	sock;
	char			*error = NULL;

	if (FAIL == zbx_ipc_socket_open(&sock, ZBX_IPC_SERVICE_PGSERVICE, ZBX_PG_SERVICE_TIMEOUT, &error))
	{
		zabbix_log(LOG_LEVEL_ERR, "cannot connect to to proxy group manager service: %s", error);
		zbx_free(error);
		return;
	}

	zbx_ipc_socket_write(&sock, ZBX_IPC_PGM_STOP, NULL, 0);
	zbx_ipc_socket_close(&sock);

	void	*retval;

	pthread_join(pgs->thread, &retval);
}