/*
** Zabbix
** Copyright (C) 2001-2023 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "preproc_manager.h"

#include "zbxpreproc.h"
#include "preprocessing.h"
#include "zbxnix.h"
#include "zbxself.h"
#include "log.h"
#include "zbxlld.h"
#include "zbxtime.h"
#include "zbxsysinfo.h"
#include "zbx_item_constants.h"
#include "zbxcachehistory.h"
#include "zbx_rtc_constants.h"
#include "zbxrtc.h"

extern unsigned char	program_type;

#define PP_MANAGER_DELAY_SEC 	1
#define PP_MANAGER_DELAY_NS	0

/******************************************************************************
 *                                                                            *
 * Purpose: synchronize preprocessing manager with configuration cache data   *
 *                                                                            *
 * Parameters: manager - [IN] the manager to be synchronized                  *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_sync_configuration(zbx_pp_manager_t *manager)
{
	zbx_uint64_t	old_revision, revision;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	old_revision = revision = zbx_pp_manager_get_revision(manager);
	DCconfig_get_preprocessable_items(zbx_pp_manager_items(manager), &revision);
	zbx_pp_manager_set_revision(manager, revision);

	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && revision != old_revision)
		zbx_pp_manager_dump_items(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size:%d revision:" ZBX_FS_UI64 "->" ZBX_FS_UI64, __func__,
			zbx_pp_manager_items(manager)->num_data, old_revision, revision);
}

/******************************************************************************
 *                                                                            *
 * Purpose: frees resources allocated by preprocessor item value              *
 *                                                                            *
 * Parameters: value - [IN] value to be freed                                 *
 *                                                                            *
 ******************************************************************************/
static void	preproc_item_value_clear(zbx_preproc_item_value_t *value)
{
	zbx_free(value->error);
	zbx_free(value->ts);

	if (NULL != value->result)
	{
		zbx_free_agent_result(value->result);
		zbx_free(value->result);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: extract value, timestamp and optional data from preprocessing     *
 *          item value.                                                       *
 *                                                                            *
 * Parameters: value - [IN] preprocessing item value                          *
 *             var   - [OUT] the extracted value (including error message)    *
 *             ts    - [OUT] the extracted timestamp                          *
 *             opt   - [OUT] the extracted optional data                      *
 *                                                                            *
 ******************************************************************************/
static void	preproc_item_value_extract_data(zbx_preproc_item_value_t *value, zbx_variant_t *var, zbx_timespec_t *ts,
		zbx_pp_value_opt_t *opt)
{
	opt->flags = ZBX_PP_VALUE_OPT_NONE;

	if (NULL != value->ts)
	{
		*ts = *value->ts;
	}
	else
	{
		ts->sec = 0;
		ts->ns = 0;
	}

	if (ITEM_STATE_NOTSUPPORTED == value->state)
	{
		if (NULL != value->error)
		{
			zbx_variant_set_error(var, value->error);
			value->error = NULL;
		}
		else if (NULL != value->result && ZBX_ISSET_MSG(value->result))
			zbx_variant_set_error(var, zbx_strdup(NULL, value->result->msg));
		else
			zbx_variant_set_error(var, zbx_strdup(NULL, "Unknown error."));

		return;
	}

	if (NULL == value->result)
	{
		zbx_variant_set_none(var);
		return;
	}

	if (ZBX_ISSET_LOG(value->result))
	{
		zbx_variant_set_str(var, value->result->log->value);
		value->result->log->value = NULL;

		opt->source = value->result->log->source;
		value->result->log->source = NULL;

		opt->logeventid = value->result->log->logeventid;
		opt->severity = value->result->log->severity;
		opt->timestamp = value->result->log->timestamp;

		opt->flags |= ZBX_PP_VALUE_OPT_LOG;
	}
	else if (ZBX_ISSET_UI64(value->result))
	{
		zbx_variant_set_ui64(var, value->result->ui64);
	}
	else if (ZBX_ISSET_DBL(value->result))
	{
		zbx_variant_set_dbl(var, value->result->dbl);
	}
	else if (ZBX_ISSET_STR(value->result))
	{
		zbx_variant_set_str(var, value->result->str);
		value->result->str = NULL;
	}
	else if (ZBX_ISSET_TEXT(value->result))
	{
		zbx_variant_set_str(var, value->result->text);
		value->result->text = NULL;
	}
	else
		zbx_variant_set_none(var);

	if (ZBX_ISSET_META(value->result))
	{
		opt->lastlogsize = value->result->lastlogsize;
		opt->mtime = value->result->mtime;

		opt->flags |= ZBX_PP_VALUE_OPT_META;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush preprocessed value                                          *
 *                                                                            *
 * Parameters: manager    - [IN] the preprocessing manager                    *
 *             itemid     - [IN] the item identifier                          *
 *             value_type - [IN] the item value type                          *
 *             flags      - [IN] the item flags                               *
 *             value      - [IN] preprocessed item value                      *
 *             ts         - [IN] the value timestamp                          *
 *             value_opt  - [IN] the optional value data                      *
 *                                                                            *
 ******************************************************************************/
static void	preprocessing_flush_value(zbx_pp_manager_t *manager, zbx_uint64_t itemid, unsigned char value_type,
		unsigned char flags, zbx_variant_t *value, zbx_timespec_t ts, zbx_pp_value_opt_t *value_opt)
{
	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (0 == (flags & ZBX_FLAG_DISCOVERY_RULE) || 0 == (program_type & ZBX_PROGRAM_TYPE_SERVER))
	{
		dc_add_history_variant(itemid, value_type, flags, value, ts, value_opt);
	}
	else
	{
		zbx_pp_item_t	*item;

		if (NULL != (item = (zbx_pp_item_t *)zbx_hashset_search(zbx_pp_manager_items(manager), &itemid)))
		{
			const char	*value_lld = NULL, *error_lld = NULL;
			unsigned char	meta = 0;
			zbx_uint64_t	lastlogsize = 0;
			int		mtime = 0;

			if (ZBX_VARIANT_ERR == value->type)
			{
				error_lld = value->data.err;
			}
			else
			{
				if (SUCCEED == zbx_variant_convert(value, ZBX_VARIANT_STR))
					value_lld = value->data.str;
			}

			if (0 != (value_opt->flags & ZBX_PP_VALUE_OPT_META))
			{
				meta = 1;
				lastlogsize = value_opt->lastlogsize;
				mtime = value_opt->mtime;
			}

			if (NULL != value_lld || NULL != error_lld || 0 != meta)
			{
				zbx_lld_process_value(itemid, item->hostid, value_lld, &ts, meta, lastlogsize, mtime,
						error_lld);
			}
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle new preprocessing request                                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 *  Return value: The number of requests queued for preprocessing             *
 *                                                                            *
 ******************************************************************************/
static zbx_uint64_t	preprocessor_add_request(zbx_pp_manager_t *manager, zbx_ipc_message_t *message)
{
	zbx_uint32_t			offset = 0;
	zbx_preproc_item_value_t	value;
	zbx_uint64_t			queued_num = 0;
	zbx_vector_pp_task_ptr_t	tasks;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_pp_task_ptr_create(&tasks);
	zbx_vector_pp_task_ptr_reserve(&tasks, ZBX_PREPROCESSING_BATCH_SIZE);

	preprocessor_sync_configuration(manager);

	while (offset < message->size)
	{
		zbx_variant_t		var;
		zbx_pp_value_opt_t	var_opt;
		zbx_timespec_t		ts;
		zbx_pp_task_t		*task;

		offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
		preproc_item_value_extract_data(&value, &var, &ts, &var_opt);

		if (NULL == (task = zbx_pp_manager_create_task(manager, value.itemid, &var, ts, &var_opt)))
		{
			preprocessing_flush_value(manager, value.itemid, value.item_value_type, value.item_flags,
					&var, ts, &var_opt);

			zbx_variant_clear(&var);
			zbx_pp_value_opt_clear(&var_opt);
		}
		else
			zbx_vector_pp_task_ptr_append(&tasks, task);

		preproc_item_value_clear(&value);
	}

	if (0 != tasks.values_num)
		zbx_pp_manager_queue_value_preproc(manager, &tasks);

	queued_num = tasks.values_num;
	zbx_vector_pp_task_ptr_destroy(&tasks);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return queued_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle new preprocessing test request                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_test_request(zbx_pp_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_pp_item_preproc_t	*preproc;
	zbx_variant_t		value;
	zbx_timespec_t		ts;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	preproc = zbx_pp_item_preproc_create(0, 0, 0);
	zbx_preprocessor_unpack_test_request(preproc, &value, &ts, message->data);
	zbx_pp_manager_queue_test(manager, preproc, &value, ts, client);
	zbx_pp_item_preproc_release(preproc);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static void	preprocessor_reply_queue_size(zbx_pp_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_uint64_t	pending_num = zbx_pp_manager_get_pending_num(manager);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_QUEUE, (unsigned char *)&pending_num, sizeof(pending_num));
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush processed value task                                        *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             tasks   - [IN] the processed tasks                             *
 *                                                                            *
 ******************************************************************************/
static void	prpeprocessor_flush_value_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
	zbx_variant_t		*value;
	unsigned char		value_type, flags;
	zbx_timespec_t		ts;
	zbx_pp_value_opt_t	*value_opt;

	zbx_pp_value_task_get_data(task, &value_type, &flags, &value, &ts, &value_opt);
	preprocessing_flush_value(manager, task->itemid, value_type, flags, value, ts, value_opt);
}

/******************************************************************************
 *                                                                            *
 * Purpose: send back result of processed test task                           *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             tasks   - [IN] the processed tasks                             *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_test_result(zbx_pp_task_t *task)
{
	unsigned char		*data;
	zbx_uint32_t		len;
	zbx_ipc_client_t	*client;
	zbx_variant_t		*result;
	zbx_pp_result_t		*results;
	int			results_num;
	zbx_pp_history_t	*history;

	zbx_pp_test_task_get_data(task, &client, &result, &results, &results_num, &history);

	len = zbx_preprocessor_pack_test_result(&data, results, results_num, history);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TEST_RESULT, data, len);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush processed tasks                                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             tasks   - [IN] the processed tasks                             *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_flush_tasks(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks)
{
	for (int i = 0; i < tasks->values_num; i++)
	{
		switch (tasks->values[i]->type)
		{
			case ZBX_PP_TASK_VALUE:
			case ZBX_PP_TASK_VALUE_SEQ:	/* value and value_seq task contents are identical */
				prpeprocessor_flush_value_result(manager, tasks->values[i]);
				break;
			case ZBX_PP_TASK_TEST:
				preprocessor_reply_test_result(tasks->values[i]);
				break;
			default:
				/* the internal tasks (dependent/sequence) shouldn't get here */
				THIS_SHOULD_NEVER_HAPPEN;
		}
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: respond to diagnostic information request                         *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] the request source                              *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_diag_info(zbx_pp_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_uint64_t	preproc_num, pending_num, finished_num, sequences_num;
	unsigned char	*data;
	zbx_uint32_t	data_len;

	zbx_pp_manager_get_diag_stats(manager, &preproc_num, &pending_num, &finished_num, &sequences_num);
	data_len = zbx_preprocessor_pack_diag_stats(&data, preproc_num, pending_num, finished_num, sequences_num);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);

	zbx_free(data);
}

static int	preprocessor_compare_sequence_stats(const void *d1, const void *d2)
{
	const zbx_pp_sequence_stats_t *s1 = *(const zbx_pp_sequence_stats_t * const *)d1;
	const zbx_pp_sequence_stats_t *s2 = *(const zbx_pp_sequence_stats_t * const *)d2;

	return s2->tasks_num - s1->tasks_num;
}


/******************************************************************************
 *                                                                            *
 * Purpose: respond to top sequences request                                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] the request source                              *
 *             message - [IN] the request message                             *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_top_sequences(zbx_pp_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	int					limit;
	zbx_vector_pp_sequence_stats_ptr_t	sequences;
	unsigned char				*data;
	zbx_uint32_t				data_len;

	zbx_vector_pp_sequence_stats_ptr_create(&sequences);

	zbx_preprocessor_unpack_top_request(&limit, message->data);

	zbx_pp_manager_get_sequence_stats(manager, &sequences);

	if (limit > sequences.values_num)
		limit = sequences.values_num;

	zbx_vector_pp_sequence_stats_ptr_sort(&sequences, preprocessor_compare_sequence_stats);

	data_len = zbx_preprocessor_pack_top_sequences_result(&data, &sequences, limit);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_SEQUENCES_RESULT, data, data_len);

	zbx_free(data);
	zbx_vector_pp_sequence_stats_ptr_clear_ext(&sequences, (zbx_pp_sequence_stats_ptr_free_func_t)zbx_ptr_free);
	zbx_vector_pp_sequence_stats_ptr_destroy(&sequences);
}

/******************************************************************************
 *                                                                            *
 * Purpose: respond to worker usage statistics request                        *
 *                                                                            *
 * Parameters: manager     - [IN] preprocessing manager                       *
 *             workers_num - [IN] number of preprocessing workers             *
 *             client      - [IN] the request source                          *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_usage_stats(zbx_pp_manager_t *manager, int workers_num, zbx_ipc_client_t *client)
{
	zbx_vector_dbl_t	usage;
	unsigned char		*data;
	zbx_uint32_t		data_len;

	zbx_vector_dbl_create(&usage);
	zbx_pp_manager_get_worker_usage(manager, &usage);

	data_len = zbx_preprocessor_pack_usage_stats(&data, &usage,  workers_num);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);

	zbx_free(data);
	zbx_vector_dbl_destroy(&usage);
}

static void	preprocessor_finished_task_cb(void *data)
{
	zbx_ipc_service_alert((zbx_ipc_service_t *)data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: change log level for the specified worker(s)                      *
 *                                                                            *
 * Parameters: manager   - [IN] preprocessing manager                         *
 *             direction - [IN] 1) increase, -1) decrease                     *
 *             data      - [IN] rtc data in json format                       *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_change_loglevel(zbx_pp_manager_t *manager, int direction, const char *data)
{
	char	*error = NULL;
	pid_t	pid;
	int	proc_type, proc_num;

	if (SUCCEED != zbx_rtc_get_command_target(data, &pid, &proc_type, &proc_num, NULL, &error))
	{
		zabbix_log(LOG_LEVEL_WARNING, "Cannot change log level: %s", error);
		zbx_free(error);
		return;
	}

	if (0 != pid)
	{
		zabbix_log(LOG_LEVEL_WARNING, "Cannot change log level for preprocessing worker by pid");
		return;
	}

	zbx_pp_manager_change_worker_loglevel(manager, proc_num, direction);
}

ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
{
	zbx_ipc_service_t		service;
	char				*error = NULL;
	zbx_ipc_client_t		*client;
	zbx_ipc_message_t		*message;
	int				ret;
	double				time_stat, time_idle = 0, time_now, time_flush, sec;
	zbx_timespec_t			timeout = {PP_MANAGER_DELAY_SEC, PP_MANAGER_DELAY_NS};
	const zbx_thread_info_t		*info = &((zbx_thread_args_t *)args)->info;
	int				server_num = ((zbx_thread_args_t *)args)->info.server_num;
	int				process_num = ((zbx_thread_args_t *)args)->info.process_num;
	unsigned char			process_type = ((zbx_thread_args_t *)args)->info.process_type;
	const zbx_thread_preprocessing_manager_args	*pp_args = ((zbx_thread_args_t *)args)->args;
	zbx_pp_manager_t		*manager;
	zbx_vector_pp_task_ptr_t	tasks;
	zbx_uint32_t			rtc_msgs[] = {ZBX_RTC_LOG_LEVEL_INCREASE, ZBX_RTC_LOG_LEVEL_DECREASE};
	zbx_uint64_t			pending_num, finished_num, processed_num = 0, queued_num = 0,
					processing_num = 0;

#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
			server_num, get_process_type_string(process_type), process_num);

	zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

	if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	if (NULL == (manager = zbx_pp_manager_create(pp_args->workers_num, preprocessor_finished_task_cb,
			(void *)&service, &error)))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot initialize preprocessing manager: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	zbx_rtc_subscribe_service(ZBX_PROCESS_TYPE_PREPROCESSOR, 0, rtc_msgs, ARRSIZE(rtc_msgs),
			pp_args->config_timeout, ZBX_IPC_SERVICE_PREPROCESSING);

	zbx_vector_pp_task_ptr_create(&tasks);

	/* initialize statistics */
	time_stat = zbx_time();
	time_flush = time_stat;

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();

		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
					ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
					get_process_type_string(process_type), process_num,
					queued_num, processed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			processed_num = 0;
			queued_num = 0;
		}

		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);
		ret = zbx_ipc_service_recv(&service, &timeout, &client, &message);
		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);
		sec = zbx_time();
		zbx_update_env(get_process_type_string(process_type), sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_PREPROCESSOR_REQUEST:
					queued_num += preprocessor_add_request(manager, message);
					break;
				case ZBX_IPC_PREPROCESSOR_QUEUE:
					preprocessor_reply_queue_size(manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
					preprocessor_add_test_request(manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
					preprocessor_reply_diag_info(manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_TOP_SEQUENCES:
					preprocessor_reply_top_sequences(manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_USAGE_STATS:
					preprocessor_reply_usage_stats(manager, pp_args->workers_num, client);
					break;
				case ZBX_RTC_LOG_LEVEL_INCREASE:
					preprocessor_change_loglevel(manager, 1, (const char *)message->data);
					break;
				case ZBX_RTC_LOG_LEVEL_DECREASE:
					preprocessor_change_loglevel(manager, -1, (const char *)message->data);
					break;
				case ZBX_RTC_SHUTDOWN:
					zabbix_log(LOG_LEVEL_DEBUG, "shutdown message received, terminating...");
					goto out;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);

		zbx_pp_manager_process_finished(manager, &tasks, &pending_num, &processing_num, &finished_num);

		if (0 < tasks.values_num)
		{
			processed_num += (unsigned int)tasks.values_num;
			preprocessor_flush_tasks(manager, &tasks);
			zbx_pp_tasks_clear(&tasks);
		}

		if (0 != finished_num)
		{
			timeout.sec = 0;
			timeout.ns = 0;
		}
		else
		{
			timeout.sec = PP_MANAGER_DELAY_SEC;
			timeout.ns = PP_MANAGER_DELAY_NS;
		}

		/* flush local history cache when there is nothing more to process or one second after last flush */
		if (0 == pending_num + processing_num + finished_num || 1 < sec - time_flush)
		{
			dc_flush_history();
			time_flush = sec;
		}
	}
out:
	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	zbx_vector_pp_task_ptr_destroy(&tasks);
	zbx_pp_manager_free(manager);

	zbx_ipc_service_close(&service);

	exit(EXIT_SUCCESS);
#undef STAT_INTERVAL
}