/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "async_queue.h"

#include "async_manager.h"

#include "zbxalgo.h"
#include "zbxcomms.h"

#define ASYNC_TASK_QUEUE_INIT_NONE	0x00
#define ASYNC_TASK_QUEUE_INIT_LOCK	0x01
#define ASYNC_TASK_QUEUE_INIT_EVENT	0x02

void	async_task_queue_destroy(zbx_async_queue_t *queue)
{
	if (0 != (queue->init_flags & ASYNC_TASK_QUEUE_INIT_LOCK))
		pthread_mutex_destroy(&queue->lock);

	if (0 != (queue->init_flags & ASYNC_TASK_QUEUE_INIT_EVENT))
		pthread_cond_destroy(&queue->event);

	zbx_vector_uint64_destroy(&queue->itemids);
	zbx_vector_int32_destroy(&queue->errcodes);
	zbx_vector_int32_destroy(&queue->lastclocks);

	zbx_vector_poller_item_clear_ext(&queue->poller_items, zbx_poller_item_free);
	zbx_vector_poller_item_destroy(&queue->poller_items);
	zbx_vector_interface_status_clear_ext(&queue->interfaces, zbx_interface_status_free);
	zbx_vector_interface_status_destroy(&queue->interfaces);

	queue->init_flags = ASYNC_TASK_QUEUE_INIT_NONE;
}

int	async_task_queue_init(zbx_async_queue_t *queue, zbx_thread_poller_args *poller_args_in, char **error)
{
	int	err, ret = FAIL;

	queue->workers_num = 0;
	queue->processing_num = 0;
	queue->processing_limit = poller_args_in->config_max_concurrent_checks_per_poller;
	queue->poller_type = poller_args_in->poller_type;
	queue->config_timeout = poller_args_in->config_comms->config_timeout;
	queue->config_unavailable_delay = poller_args_in->config_unavailable_delay;
	queue->config_unreachable_delay = poller_args_in->config_unreachable_delay;
	queue->config_unreachable_period = poller_args_in-> config_unreachable_period;

	zbx_vector_uint64_create(&queue->itemids);
	zbx_vector_int32_create(&queue->errcodes);
	zbx_vector_int32_create(&queue->lastclocks);
	zbx_vector_poller_item_create(&queue->poller_items);
	zbx_vector_interface_status_create(&queue->interfaces);

	if (0 != (err = pthread_mutex_init(&queue->lock, NULL)))
	{
		*error = zbx_dsprintf(NULL, "cannot initialize task queue mutex: %s", zbx_strerror(err));
		goto out;
	}
	queue->init_flags |= ASYNC_TASK_QUEUE_INIT_LOCK;

	if (0 != (err = pthread_cond_init(&queue->event, NULL)))
	{
		*error = zbx_dsprintf(NULL, "cannot initialize task queue conditional variable: %s", zbx_strerror(err));
		goto out;
	}
	queue->init_flags |= ASYNC_TASK_QUEUE_INIT_EVENT;

	ret = SUCCEED;
out:
	if (FAIL == ret)
		async_task_queue_destroy(queue);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: locks task queue                                                  *
 *                                                                            *
 ******************************************************************************/
void	async_task_queue_lock(zbx_async_queue_t *queue)
{
	pthread_mutex_lock(&queue->lock);
}

/******************************************************************************
 *                                                                            *
 * Purpose: unlocks task queue                                                *
 *                                                                            *
 ******************************************************************************/
void	async_task_queue_unlock(zbx_async_queue_t *queue)
{
	pthread_mutex_unlock(&queue->lock);
}

/******************************************************************************
 *                                                                            *
 * Purpose: registers new worker                                              *
 *                                                                            *
 ******************************************************************************/
void	async_task_queue_register_worker(zbx_async_queue_t *queue)
{
	queue->workers_num++;
}

/******************************************************************************
 *                                                                            *
 * Purpose: deregisters worker                                                *
 *                                                                            *
 ******************************************************************************/
void	async_task_queue_deregister_worker(zbx_async_queue_t *queue)
{
	queue->workers_num--;
}

int	async_task_queue_wait(zbx_async_queue_t *queue, char **error)
{
	int	err;

	if (0 != (err = pthread_cond_wait(&queue->event, &queue->lock)))
	{
		*error = zbx_dsprintf(NULL, "cannot wait for conditional variable: %s", zbx_strerror(err));
		return FAIL;
	}

	return SUCCEED;
}

void	async_task_queue_notify(zbx_async_queue_t *queue)
{
	int	err;

	if (0 != (err = pthread_cond_signal(&queue->event)))
		zabbix_log(LOG_LEVEL_WARNING, "cannot signal conditional variable: %s", zbx_strerror(err));
}