/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "pp_manager.h"
#include "pp_worker.h"
#include "pp_queue.h"
#include "pp_task.h"
#include "preproc_snmp.h"
#include "zbxpreproc.h"
#include "zbxcommon.h"
#include "zbxalgo.h"
#include "zbxtimekeeper.h"
#include "zbxself.h"
#include "zbxstr.h"
#include "zbxcachehistory.h"
#include "zbxprof.h"
#include "pp_protocol.h"
#include "zbx_item_constants.h"
#include "zbxnix.h"
#include "zbxvariant.h"
#include "zbxlog.h"
#include "pp_cache.h"
#include "zbxcacheconfig.h"
#include "zbxipcservice.h"
#include "zbxthreads.h"
#include "zbxtime.h"
#include "zbxrtc.h"
#include "zbx_rtc_constants.h"

#ifdef HAVE_LIBXML2
#	include <libxml/xpath.h>
#	include <libxml/parser.h>
#endif

static zbx_prepare_value_func_t	prepare_value_func_cb = NULL;
static zbx_flush_value_func_t	flush_value_func_cb = NULL;
static zbx_get_progname_f	get_progname_func_cb = NULL;

/******************************************************************************
 *                                                                            *
 * Purpose: initialize xml library, called before creating worker threads     *
 *                                                                            *
 ******************************************************************************/
static void	pp_xml_init(void)
{
#ifdef HAVE_LIBXML2
	xmlInitParser();
#endif
}

/******************************************************************************
 *                                                                            *
 * Purpose: release xml library resources                                     *
 *                                                                            *
 ******************************************************************************/
static void	pp_xml_destroy(void)
{
#ifdef HAVE_LIBXML2
	xmlCleanupParser();
#endif
}

/******************************************************************************
 *                                                                            *
 * Purpose: initialize curl library, called before creating worker threads    *
 *                                                                            *
 ******************************************************************************/
static void	pp_curl_init(void)
{
#ifdef HAVE_LIBCURL
	curl_global_init(CURL_GLOBAL_DEFAULT);
#endif
}

/******************************************************************************
 *                                                                            *
 * Purpose: release curl library resources                                    *
 *                                                                            *
 ******************************************************************************/
static void	pp_curl_destroy(void)
{
#ifdef HAVE_LIBCURL
	curl_global_cleanup();
#endif
}

void	zbx_init_library_preproc(zbx_prepare_value_func_t prepare_value_cb, zbx_flush_value_func_t flush_value_cb,
		zbx_get_progname_f get_progname_cb)
{
	prepare_value_func_cb = prepare_value_cb;
	flush_value_func_cb = flush_value_cb;
	get_progname_func_cb = get_progname_cb;
}

zbx_get_progname_f	preproc_get_progname_cb(void)
{
	return get_progname_func_cb;
}

/******************************************************************************
 *                                                                            *
 * Purpose: create preprocessing manager                                      *
 *                                                                            *
 * Parameters: workers_num      - [IN] number of workers to create            *
 *             finished_cb      - [IN] callback to call after finishing       *
 *                                     task (optional)                        *
 *             finished_data    - [IN] callback data (optional)               *
 *             config_source_ip - [IN]                                        *
 *             config_timeout   - [IN]                                        *
 *             error            - [OUT]                                       *
 *                                                                            *
 * Return value: The created manager or NULL on error.                        *
 *                                                                            *
 ******************************************************************************/
static zbx_pp_manager_t	*zbx_pp_manager_create(int workers_num, zbx_pp_notify_cb_t finished_cb,
		void *finished_data, const char *config_source_ip, int config_timeout, char **error)
{
	int			i, ret = FAIL, started_num = 0;
	time_t			time_start;
	struct timespec		poll_delay = {0, 1e8};
	zbx_pp_manager_t	*manager;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, workers_num);

	pp_xml_init();
	pp_curl_init();
#ifdef HAVE_NETSNMP
	preproc_init_snmp();
#endif
	manager = (zbx_pp_manager_t *)zbx_malloc(NULL, sizeof(zbx_pp_manager_t));
	memset(manager, 0, sizeof(zbx_pp_manager_t));

	if (SUCCEED != pp_task_queue_init(&manager->queue, error))
		goto out;

	manager->timekeeper = zbx_timekeeper_create(workers_num, NULL);

	manager->workers_num = workers_num;
	manager->workers = (zbx_pp_worker_t *)zbx_calloc(NULL, (size_t)workers_num, sizeof(zbx_pp_worker_t));

	for (i = 0; i < workers_num; i++)
	{
		if (SUCCEED != pp_worker_init(&manager->workers[i], i + 1, &manager->queue, manager->timekeeper,
				config_source_ip, error))
		{
			goto out;
		}

		pp_worker_set_finished_cb(&manager->workers[i], finished_cb, finished_data);
	}

	zbx_hashset_create_ext(&manager->items, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
			(zbx_clean_func_t)zbx_pp_item_clear, ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC,
			ZBX_DEFAULT_MEM_FREE_FUNC);

	if (FAIL == zbx_ipc_async_socket_open(&manager->rtc, ZBX_IPC_SERVICE_RTC, config_timeout, error))
		goto out;

	/* wait for threads to start */
	time_start = time(NULL);

#define PP_STARTUP_TIMEOUT	10

	while (started_num != workers_num)
	{
		if (time_start + PP_STARTUP_TIMEOUT < time(NULL))
		{
			*error = zbx_strdup(NULL, "timeout occurred while waiting for workers to start");
			goto out;
		}

		pthread_mutex_lock(&manager->queue.lock);
		started_num = manager->queue.workers_num;
		pthread_mutex_unlock(&manager->queue.lock);

		nanosleep(&poll_delay, NULL);
	}

#undef PP_STARTUP_TIMEOUT

	ret = SUCCEED;
out:
	if (FAIL == ret)
	{
		for (i = 0; i < manager->workers_num; i++)
			pp_worker_stop(&manager->workers[i]);

		pp_task_queue_destroy(&manager->queue);
		zbx_free(manager);

		manager = NULL;
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() ret:%s error:%s", __func__, zbx_result_string(ret),
			ZBX_NULL2EMPTY_STR(*error));

	return manager;
}

/******************************************************************************
 *                                                                            *
 * Purpose: destroy preprocessing manager                                     *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_free(zbx_pp_manager_t *manager)
{
	int	i;

	pp_task_queue_lock(&manager->queue);
	for (i = 0; i < manager->workers_num; i++)
		pp_worker_stop(&manager->workers[i]);

	pp_task_queue_notify_all(&manager->queue);
	pp_task_queue_unlock(&manager->queue);

	for (i = 0; i < manager->workers_num; i++)
		pp_worker_destroy(&manager->workers[i]);

	zbx_free(manager->workers);

	pp_task_queue_destroy(&manager->queue);
	zbx_hashset_destroy(&manager->items);

	zbx_timekeeper_free(manager->timekeeper);

	zbx_dc_um_shared_handle_release(manager->um_handle);

#ifdef HAVE_NETSNMP
	preproc_shutdown_snmp();
#endif
	pp_curl_destroy();
	pp_xml_destroy();

	zbx_ipc_async_socket_close(&manager->rtc);

	zbx_free(manager);
}

/******************************************************************************
 *                                                                            *
 * Purpose: queue value for preprocessing test                                *
 *                                                                            *
 * Parameters: manager - [IN]                                                 *
 *             preproc - [IN] item preprocessing data                         *
 *             value   - [IN] value to preprocess, its contents will be       *
 *                            directly copied over and cleared by the task    *
 *             ts      - [IN] value timestamp                                 *
 *             client  - [IN] request source                                  *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_queue_test(zbx_pp_manager_t *manager, zbx_pp_item_preproc_t *preproc, zbx_variant_t *value,
		zbx_timespec_t ts, zbx_ipc_client_t *client)
{
	zbx_pp_task_t	*task = pp_task_test_create(preproc, value, ts, client);

	pp_task_queue_lock(&manager->queue);
	pp_task_queue_push_test(&manager->queue, task);
	pp_task_queue_notify(&manager->queue);
	pp_task_queue_unlock(&manager->queue);
}

/******************************************************************************
 *                                                                            *
 * Purpose: queue value/value_sec preprocessing tasks                         *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_queue_value_preproc(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks)
{
	zbx_prof_start(__func__, ZBX_PROF_MUTEX);
	pp_task_queue_lock(&manager->queue);
	zbx_prof_end_wait();

	for (int i = 0; i < tasks->values_num; i++)
		pp_task_queue_push(&manager->queue, tasks->values[i]);

	pp_task_queue_notify(&manager->queue);

	pp_task_queue_unlock(&manager->queue);
	zbx_prof_end();
}

/******************************************************************************
 *                                                                            *
 * Purpose: create preprocessing task from request                            *
 *                                                                            *
 * Parameters: manager   - [IN]                                               *
 *             itemid    - [IN] item identifier                               *
 *             value     - [IN] value to preprocess, its contents will be     *
 *                              directly copied over and cleared by the task  *
 *             ts        - [IN] value timestamp                               *
 *             value_opt - [IN] optional value data (optional)                *
 *                                                                            *
 * Return value: The created task or NULL if the data can be flushed directly.*
 *                                                                            *
 ******************************************************************************/
static zbx_pp_task_t	*zbx_pp_manager_create_task(zbx_pp_manager_t *manager, zbx_uint64_t itemid,
		zbx_variant_t *value, zbx_timespec_t ts, const zbx_pp_value_opt_t *value_opt)
{
	zbx_pp_item_t	*item;

	if (ZBX_VARIANT_NONE == value->type)
		return NULL;

	if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &itemid)))
		return NULL;

	if (0 == item->preproc->dep_itemids_num && 0 == item->preproc->steps_num)
		return NULL;

	if (ZBX_PP_PROCESS_PARALLEL == item->preproc->mode)
	{
		return pp_task_value_create(item->itemid, item->preproc, manager->um_handle, value, ts, value_opt,
				NULL);
	}
	else
	{
		return pp_task_value_seq_create(item->itemid, item->preproc, manager->um_handle, value, ts, value_opt,
				NULL);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: get first dependent item with preprocessing that can be cached    *
 *                                                                            *
 * Parameters: manager     - [IN]                                             *
 *             itemids     - [IN] dependent itemids                           *
 *             itemids_num - [IN] number of dependent itemids                 *
 *                                                                            *
 * Return value: The first dependent item with cacheable preprocessing data   *
 *               or NULL.                                                     *
 *                                                                            *
 ******************************************************************************/
static zbx_pp_item_t	*pp_manager_get_cacheable_dependent_item(zbx_pp_manager_t *manager, zbx_uint64_t *itemids,
		int itemids_num)
{
	zbx_pp_item_t	*item;

	for (int i = 0; i < itemids_num; i++)
	{
		if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &itemids[i])))
			continue;

		if (SUCCEED == pp_cache_is_supported(item->preproc))
			return item;
	}

	return NULL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: create and queue tasks for dependent items                        *
 *                                                                            *
 * Parameters: manager        - [IN] manager                                  *
 *             preproc        - [IN] master item preprocessing data           *
 *             um_handle      - [IN] shared user macro cache handle           *
 *             exclude_itemid - [IN] dependent itemid to exclude, can be 0    *
 *             value          - [IN] value                                    *
 *             ts             - [IN] value timestamp                          *
 *             cache          - [IN] preprocessing cache                      *
 *                                   (optional, can be NULL)                  *
 *                                                                            *
 * Comments: This function called within task queue lock.                     *
 *                                                                            *
 ******************************************************************************/
static void	pp_manager_queue_dependents(zbx_pp_manager_t *manager, zbx_pp_item_preproc_t *preproc,
		zbx_dc_um_shared_handle_t *um_handle, zbx_uint64_t exclude_itemid, const zbx_variant_t *value,
		zbx_timespec_t ts, zbx_pp_cache_t *cache)
{
	int	queued_num = 0;

	if (0 == preproc->dep_itemids_num)
		return;

	cache = pp_cache_copy(cache);

	if (NULL == cache)
		cache = pp_cache_create(preproc, value);

	for (int i = 0; i < preproc->dep_itemids_num; i++)
	{
		zbx_pp_item_t	*item;
		zbx_pp_task_t	*new_task;

		/* skip already preprocessed dependent item */
		if (preproc->dep_itemids[i] == exclude_itemid)
			continue;

		/* skip disabled/removed items */
		if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &preproc->dep_itemids[i])))
			continue;

		if (ZBX_PP_PROCESS_PARALLEL == item->preproc->mode)
		{
			new_task = pp_task_value_create(item->itemid, item->preproc, um_handle, NULL, ts, NULL,
					cache);
		}
		else
		{
			new_task = pp_task_value_seq_create(item->itemid, item->preproc, um_handle, NULL, ts,
					NULL, cache);
		}

		pp_task_queue_push_immediate(&manager->queue, new_task);
		queued_num++;
	}

	if (0 < queued_num)
		pp_task_queue_notify(&manager->queue);

	pp_cache_release(cache);
}


/******************************************************************************
 *                                                                            *
 * Purpose: queue new tasks in response to finished value task                *
 *                                                                            *
 * Parameters: manager - [IN] manager                                         *
 *             task    - [IN] finished value task                             *
 *                                                                            *
 * Comments: This function called within task queue lock.                     *
 *                                                                            *
 ******************************************************************************/
static void	pp_manager_queue_value_task_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
	zbx_pp_task_value_t	*d = (zbx_pp_task_value_t *)PP_TASK_DATA(task);
	zbx_pp_item_t		*item;

	if (ZBX_VARIANT_NONE == d->result.type)
		return;

	if (NULL != (item = pp_manager_get_cacheable_dependent_item(manager, d->preproc->dep_itemids,
			d->preproc->dep_itemids_num)))
	{
		zbx_pp_task_t	*dep_task;
		zbx_variant_t	value;

		dep_task = pp_task_dependent_create(task->itemid, d->preproc);
		zbx_pp_task_dependent_t	*d_dep = (zbx_pp_task_dependent_t *)PP_TASK_DATA(dep_task);

		d_dep->cache = pp_cache_create(item->preproc, &d->result);
		zbx_variant_set_none(&value);

		d_dep->primary = pp_task_value_create(item->itemid, item->preproc, d->um_handle, &value, d->ts,
				NULL, d_dep->cache);

		pp_task_queue_push_immediate(&manager->queue, dep_task);
		pp_task_queue_notify(&manager->queue);
	}
	else
		pp_manager_queue_dependents(manager, d->preproc, d->um_handle, 0, &d->result, d->ts, NULL);
}

/******************************************************************************
 *                                                                            *
 * Purpose: queue new tasks in response to finished dependent task            *
 *                                                                            *
 * Parameters: manager - [IN] manager                                         *
 *             task    - [IN] finished dependent task                         *
 *                                                                            *
 * Comments: This function called within task queue lock.                     *
 *                                                                            *
 ******************************************************************************/
static zbx_pp_task_t	*pp_manager_queue_dependent_task_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
	zbx_pp_task_dependent_t	*d = (zbx_pp_task_dependent_t *)PP_TASK_DATA(task);
	zbx_pp_task_t		*task_value = d->primary;
	zbx_pp_task_value_t	*dp = (zbx_pp_task_value_t *)PP_TASK_DATA(task_value);

	pp_manager_queue_value_task_result(manager, d->primary);
	pp_manager_queue_dependents(manager, d->preproc, dp->um_handle, task_value->itemid, &dp->result, dp->ts, d->cache);

	d->primary = NULL;
	pp_task_free(task);

	return task_value;
}

/******************************************************************************
 *                                                                            *
 * Purpose: requeue sequence task                                             *
 *                                                                            *
 * Parameters: manager  - [IN] manager                                        *
 *             task_seq - [IN] finished sequence task                         *
 *                                                                            *
 * Comments: This function called within task queue lock.                     *
 *                                                                            *
 ******************************************************************************/
static zbx_pp_task_t	*pp_manager_requeue_next_sequence_task(zbx_pp_manager_t *manager, zbx_pp_task_t *task_seq)
{
	zbx_pp_task_sequence_t	*d_seq = (zbx_pp_task_sequence_t *)PP_TASK_DATA(task_seq);
	zbx_pp_task_t		*task = NULL, *tmp_task;

	if (SUCCEED == zbx_list_pop(&d_seq->tasks, (void **)&task))
	{
		switch (task->type)
		{
			case ZBX_PP_TASK_VALUE:
			case ZBX_PP_TASK_VALUE_SEQ:
				pp_manager_queue_value_task_result(manager, task);
				break;
			case ZBX_PP_TASK_DEPENDENT:
				task = pp_manager_queue_dependent_task_result(manager, task);
				break;
			default:
				THIS_SHOULD_NEVER_HAPPEN;
				break;
		}
	}

	if (SUCCEED == zbx_list_peek(&d_seq->tasks, (void **)&tmp_task))
	{
		pp_task_queue_push_immediate(&manager->queue, task_seq);
		pp_task_queue_notify(&manager->queue);
	}
	else
	{
		pp_task_queue_remove_sequence(&manager->queue, task_seq->itemid);
		pp_task_free(task_seq);
	}

	return task;
}

/******************************************************************************
 *                                                                            *
 * Purpose: process finished tasks                                            *
 *                                                                            *
 * Parameters: manager        - [IN] manager                                  *
 *             tasks          - [OUT] finished tasks                          *
 *             pending_num    - [OUT] remaining pending tasks                 *
 *             processing_num - [OUT] processed tasks                         *
 *             finished_num   - [OUT] finished tasks                          *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_process_finished(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks,
		zbx_uint64_t *pending_num, zbx_uint64_t *processing_num, zbx_uint64_t *finished_num)
{
#define PP_FINISHED_TASK_BATCH_SIZE	100

	zbx_pp_task_t	*task;
	static time_t	timekeeper_clock = 0;
	time_t		now;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_pp_task_ptr_reserve(tasks, PP_FINISHED_TASK_BATCH_SIZE);
	zbx_prof_start(__func__, ZBX_PROF_MUTEX);
	pp_task_queue_lock(&manager->queue);
	zbx_prof_end_wait();
	while (PP_FINISHED_TASK_BATCH_SIZE > tasks->values_num)
	{
		if (NULL != (task = pp_task_queue_pop_finished(&manager->queue)))
		{
			switch (task->type)
			{
				case ZBX_PP_TASK_VALUE:
					pp_manager_queue_value_task_result(manager, task);
					break;
				case ZBX_PP_TASK_DEPENDENT:
					task = pp_manager_queue_dependent_task_result(manager, task);
					break;
				case ZBX_PP_TASK_SEQUENCE:
					task = pp_manager_requeue_next_sequence_task(manager, task);
					break;
				default:
					break;
			}
		}

		if (NULL == task)
			break;

		zbx_vector_pp_task_ptr_append(tasks, task);
	}

	*pending_num = manager->queue.pending_num;
	*finished_num = manager->queue.finished_num;
	*processing_num = manager->queue.processing_num;

	pp_task_queue_unlock(&manager->queue);
	zbx_prof_end();
	now = time(NULL);
	if (now != timekeeper_clock)
	{
		zbx_timekeeper_collect(manager->timekeeper);
		timekeeper_clock = now;
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() values_num:%d", __func__, tasks->values_num);

#undef PP_FINISHED_TASK_BATCH_SIZE
}

/******************************************************************************
 *                                                                            *
 * Purpose: dump cached item information into log                             *
 *                                                                            *
 * Parameters: manager - [IN] manager                                         *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_dump_items(zbx_pp_manager_t *manager)
{
	zbx_hashset_iter_t	iter;
	zbx_pp_item_t		*item;

	zbx_hashset_iter_reset(&manager->items, &iter);

	while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter)))
	{
		zabbix_log(LOG_LEVEL_TRACE, "itemid:" ZBX_FS_UI64 " hostid:" ZBX_FS_UI64 " revision:" ZBX_FS_UI64
				" type:%u value_type:%u mode:%u flags:%u",
				item->itemid, item->preproc->hostid, item->revision, item->preproc->type,
				item->preproc->value_type, item->preproc->mode, item->preproc->flags);

		zabbix_log(LOG_LEVEL_TRACE, "  preprocessing steps:");
		for (int i = 0; i < item->preproc->steps_num; i++)
		{
			zabbix_log(LOG_LEVEL_TRACE, "    type:%d params:'%s' err_handler:%d err_params:'%s'",
					item->preproc->steps[i].type,
					ZBX_NULL2EMPTY_STR(item->preproc->steps[i].params),
					item->preproc->steps[i].error_handler,
					ZBX_NULL2EMPTY_STR(item->preproc->steps[i].error_handler_params));
		}

		zabbix_log(LOG_LEVEL_TRACE, "  dependent items:");
		for (int i = 0; i < item->preproc->dep_itemids_num; i++)
			zabbix_log(LOG_LEVEL_TRACE, "    " ZBX_FS_UI64, item->preproc->dep_itemids[i]);
	}

}

/******************************************************************************
 *                                                                            *
 * Purpose: get item configuration data for reading and updates               *
 *                                                                            *
 ******************************************************************************/
zbx_hashset_t  *zbx_pp_manager_items(zbx_pp_manager_t *manager)
{
	return &manager->items;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get diagnostic statistics                                         *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_get_diag_stats(zbx_pp_manager_t *manager, zbx_uint64_t *preproc_num,
		zbx_uint64_t *pending_num, zbx_uint64_t *finished_num, zbx_uint64_t *sequences_num)
{
	*preproc_num = (zbx_uint64_t)manager->items.num_data;
	*pending_num = manager->queue.pending_num;
	*finished_num = manager->queue.finished_num;
	*sequences_num = (zbx_uint64_t)manager->queue.sequences.num_data;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get task sequence statistics                                      *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_get_sequence_stats(zbx_pp_manager_t *manager, zbx_vector_pp_top_stats_ptr_t *stats)
{
	pp_task_queue_get_sequence_stats(&manager->queue, stats);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get worker usage statistics                                       *
 *                                                                            *
 ******************************************************************************/
static void	zbx_pp_manager_get_worker_usage(zbx_pp_manager_t *manager, zbx_vector_dbl_t *worker_usage)
{
	(void)zbx_timekeeper_get_usage(manager->timekeeper, worker_usage);
}

/******************************************************************************
 *                                                                            *
 * Purpose: synchronize preprocessing manager with configuration cache data   *
 *                                                                            *
 * Parameters: manager - [IN] manager to be synchronized                      *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_sync_configuration(zbx_pp_manager_t *manager)
{
	zbx_uint64_t	old_revision, revision;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	old_revision = revision = manager->revision;
	zbx_dc_config_get_preprocessable_items(&manager->items, &manager->um_handle, &revision);
	manager->revision = revision;

	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && revision != old_revision)
		zbx_pp_manager_dump_items(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size:%d revision:" ZBX_FS_UI64 "->" ZBX_FS_UI64, __func__,
			manager->items.num_data, old_revision, revision);
}

/******************************************************************************
 *                                                                            *
 * Purpose: frees resources allocated by preprocessor item value              *
 *                                                                            *
 * Parameters: value - [IN] value to be freed                                 *
 *                                                                            *
 ******************************************************************************/
static void	preproc_item_value_clear(zbx_preproc_item_value_t *value)
{
	zbx_free(value->error);
	zbx_free(value->ts);

	if (NULL != value->result)
	{
		zbx_free_agent_result(value->result);
		zbx_free(value->result);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: extract value, timestamp and optional data from preprocessing     *
 *          item value.                                                       *
 *                                                                            *
 * Parameters: value - [IN] preprocessing item value                          *
 *             var   - [OUT] extracted value (including error message)        *
 *             ts    - [OUT] extracted timestamp                              *
 *             opt   - [OUT] extracted optional data                          *
 *                                                                            *
 ******************************************************************************/
static void	preproc_item_value_extract_data(zbx_preproc_item_value_t *value, zbx_variant_t *var, zbx_timespec_t *ts,
		zbx_pp_value_opt_t *opt)
{
	opt->flags = ZBX_PP_VALUE_OPT_NONE;

	if (NULL != value->ts)
	{
		*ts = *value->ts;
	}
	else
	{
		ts->sec = 0;
		ts->ns = 0;
	}

	if (ITEM_STATE_NOTSUPPORTED == value->state)
	{
		if (NULL != value->error)
		{
			zbx_variant_set_error(var, value->error);
			value->error = NULL;
		}
		else if (NULL != value->result && ZBX_ISSET_MSG(value->result))
			zbx_variant_set_error(var, zbx_strdup(NULL, value->result->msg));
		else
			zbx_variant_set_error(var, zbx_strdup(NULL, "Unknown error."));

		return;
	}

	if (NULL == value->result)
	{
		zbx_variant_set_none(var);
		return;
	}

	if (ZBX_ISSET_LOG(value->result))
	{
		zbx_variant_set_str(var, value->result->log->value);
		value->result->log->value = NULL;

		opt->source = value->result->log->source;
		value->result->log->source = NULL;

		opt->logeventid = value->result->log->logeventid;
		opt->severity = value->result->log->severity;
		opt->timestamp = value->result->log->timestamp;

		opt->flags |= ZBX_PP_VALUE_OPT_LOG;
	}
	else if (ZBX_ISSET_UI64(value->result))
	{
		zbx_variant_set_ui64(var, value->result->ui64);
	}
	else if (ZBX_ISSET_DBL(value->result))
	{
		zbx_variant_set_dbl(var, value->result->dbl);
	}
	else if (ZBX_ISSET_STR(value->result))
	{
		zbx_variant_set_str(var, value->result->str);
		value->result->str = NULL;
	}
	else if (ZBX_ISSET_TEXT(value->result))
	{
		zbx_variant_set_str(var, value->result->text);
		value->result->text = NULL;
	}
	else if (ZBX_ISSET_BIN(value->result))
	{
		THIS_SHOULD_NEVER_HAPPEN;
		exit(EXIT_FAILURE);
	}
	else
		zbx_variant_set_none(var);

	if (ZBX_ISSET_META(value->result))
	{
		opt->lastlogsize = value->result->lastlogsize;
		opt->mtime = value->result->mtime;

		opt->flags |= ZBX_PP_VALUE_OPT_META;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush preprocessed value                                          *
 *                                                                            *
 * Parameters: manager    - [IN] preprocessing manager                        *
 *             itemid     - [IN] item identifier                              *
 *             value_type - [IN] item value type                              *
 *             flags      - [IN] item flags                                   *
 *             value      - [IN] preprocessed item value                      *
 *             ts         - [IN] value timestamp                              *
 *             value_opt  - [IN] optional value data                          *
 *                                                                            *
 ******************************************************************************/
static void	preprocessing_flush_value(zbx_pp_manager_t *manager, zbx_uint64_t itemid, unsigned char value_type,
		unsigned char flags, zbx_variant_t *value, zbx_timespec_t ts, zbx_pp_value_opt_t *value_opt)
{
	flush_value_func_cb(manager, itemid, value_type, flags, value, ts, value_opt);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle new preprocessing request                                  *
 *                                                                            *
 * Parameters: manager    - [IN] preprocessing manager                        *
 *             message    - [IN] packed preprocessing request                 *
 *             direct_num - [OUT] number of directly flushed values           *
 *                                                                            *
 *  Return value: The number of requests queued for preprocessing             *
 *                                                                            *
 ******************************************************************************/
static zbx_uint64_t	preprocessor_add_request(zbx_pp_manager_t *manager, zbx_ipc_message_t *message,
		zbx_uint64_t *direct_num)
{
	zbx_uint32_t			offset = 0;
	zbx_preproc_item_value_t	value;
	zbx_uint64_t			queued_num = 0;
	zbx_vector_pp_task_ptr_t	tasks;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_pp_task_ptr_create(&tasks);
	zbx_vector_pp_task_ptr_reserve(&tasks, ZBX_PREPROCESSING_BATCH_SIZE);

	preprocessor_sync_configuration(manager);

	while (offset < message->size)
	{
		zbx_variant_t		var;
		zbx_pp_value_opt_t	var_opt;
		zbx_timespec_t		ts;
		zbx_pp_task_t		*task;

		offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
		preproc_item_value_extract_data(&value, &var, &ts, &var_opt);

		if (NULL == (task = zbx_pp_manager_create_task(manager, value.itemid, &var, ts, &var_opt)))
		{
			(*direct_num)++;
			/* allow empty values */
			preprocessing_flush_value(manager, value.itemid, value.item_value_type, value.item_flags,
					&var, ts, &var_opt);

			zbx_variant_clear(&var);
			zbx_pp_value_opt_clear(&var_opt);
		}
		else
			zbx_vector_pp_task_ptr_append(&tasks, task);

		preproc_item_value_clear(&value);
	}

	if (0 != tasks.values_num)
		zbx_pp_manager_queue_value_preproc(manager, &tasks);

	queued_num = tasks.values_num;
	zbx_vector_pp_task_ptr_destroy(&tasks);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return queued_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle new preprocessing test request                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] request source                                  *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_test_request(zbx_pp_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_pp_item_preproc_t	*preproc;
	zbx_variant_t		value;
	zbx_timespec_t		ts;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	preproc = zbx_pp_item_preproc_create(0, 0, 0, 0);
	zbx_preprocessor_unpack_test_request(preproc, &value, &ts, message->data);
	zbx_pp_manager_queue_test(manager, preproc, &value, ts, client);
	zbx_pp_item_preproc_release(preproc);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static void	preprocessor_reply_queue_size(zbx_pp_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_uint64_t	pending_num = manager->queue.pending_num;

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_QUEUE, (unsigned char *)&pending_num, sizeof(pending_num));
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush processed value task                                        *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             tasks   - [IN] processed tasks                                 *
 *                                                                            *
 ******************************************************************************/
static void	prpeprocessor_flush_value_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
	zbx_variant_t		*value;
	unsigned char		value_type, flags;
	zbx_timespec_t		ts;
	zbx_pp_value_opt_t	*value_opt;

	zbx_pp_value_task_get_data(task, &value_type, &flags, &value, &ts, &value_opt);

	if (SUCCEED == prepare_value_func_cb(value, value_opt))
		preprocessing_flush_value(manager, task->itemid, value_type, flags, value, ts, value_opt);
}

/******************************************************************************
 *                                                                            *
 * Purpose: send back result of processed test task                           *
 *                                                                            *
 * Parameters: tasks - [IN] processed tasks                                   *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_test_result(zbx_pp_task_t *task)
{
	unsigned char		*data;
	zbx_uint32_t		len;
	zbx_ipc_client_t	*client;
	zbx_variant_t		*result;
	zbx_pp_result_t		*results;
	int			results_num;
	zbx_pp_history_t	*history;

	zbx_pp_test_task_get_data(task, &client, &result, &results, &results_num, &history);

	len = zbx_preprocessor_pack_test_result(&data, results, results_num, history);

	zbx_pp_test_task_history_release(task, &history);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TEST_RESULT, data, len);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush processed tasks                                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             tasks   - [IN] processed tasks                                 *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_flush_tasks(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks)
{
	for (int i = 0; i < tasks->values_num; i++)
	{
		switch (tasks->values[i]->type)
		{
			case ZBX_PP_TASK_VALUE:
			case ZBX_PP_TASK_VALUE_SEQ:	/* value and value_seq task contents are identical */
				prpeprocessor_flush_value_result(manager, tasks->values[i]);
				break;
			case ZBX_PP_TASK_TEST:
				preprocessor_reply_test_result(tasks->values[i]);
				break;
			default:
				/* the internal tasks (dependent/sequence) shouldn't get here */
				THIS_SHOULD_NEVER_HAPPEN;
		}
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: respond to diagnostic information request                         *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] request source                                  *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_diag_info(zbx_pp_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_uint64_t	preproc_num, pending_num, finished_num, sequences_num;
	unsigned char	*data;
	zbx_uint32_t	data_len;

	zbx_pp_manager_get_diag_stats(manager, &preproc_num, &pending_num, &finished_num, &sequences_num);
	data_len = zbx_preprocessor_pack_diag_stats(&data, preproc_num, pending_num, finished_num, sequences_num);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);

	zbx_free(data);
}

static int	preprocessor_compare_top_stats(const void *d1, const void *d2)
{
	const zbx_pp_top_stats_t *s1 = *(const zbx_pp_top_stats_t * const *)d1;
	const zbx_pp_top_stats_t *s2 = *(const zbx_pp_top_stats_t * const *)d2;

	return s2->tasks_num - s1->tasks_num;
}

static void	zbx_pp_manager_items_preproc_peak(zbx_pp_manager_t *manager, zbx_vector_pp_top_stats_ptr_t *stats)
{
	zbx_hashset_iter_t	iter;
	zbx_pp_item_t		*item;

	zbx_hashset_iter_reset(&manager->items, &iter);

	while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter)))
	{
		zbx_pp_top_stats_t	*stat;

		if (NULL ==  item->preproc || 1 >= item->preproc->refcount_peak - 1)
			continue;

		stat = (zbx_pp_top_stats_t *)zbx_malloc(NULL, sizeof(zbx_pp_top_stats_t));
		stat->tasks_num =  item->preproc->refcount_peak - 1;
		stat->itemid = item->itemid;
		zbx_vector_pp_top_stats_ptr_append(stats, stat);
	}
}

static void	zbx_pp_manager_items_preproc_peak_reset(zbx_pp_manager_t *manager)
{
	zbx_hashset_iter_t	iter;
	zbx_pp_item_t		*item;

	zbx_hashset_iter_reset(&manager->items, &iter);

	while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter)))
	{
		if (NULL ==  item->preproc)
			continue;

		item->preproc->refcount_peak = 1;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: respond to top sequences request                                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] request source                                  *
 *             message - [IN] request message                                 *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_top_stats(zbx_pp_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message, zbx_uint32_t code)
{
	int				limit;
	zbx_vector_pp_top_stats_ptr_t	stats;
	unsigned char			*data;
	zbx_uint32_t			data_len;

	zbx_vector_pp_top_stats_ptr_create(&stats);

	zbx_preprocessor_unpack_top_request(&limit, message->data);

	if (ZBX_IPC_PREPROCESSOR_TOP_SEQUENCES == code)
		zbx_pp_manager_get_sequence_stats(manager, &stats);
	else
		zbx_pp_manager_items_preproc_peak(manager, &stats);

	if (limit > stats.values_num)
		limit = stats.values_num;

	zbx_vector_pp_top_stats_ptr_sort(&stats, preprocessor_compare_top_stats);

	data_len = zbx_preprocessor_pack_top_stats_result(&data, &stats, limit);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_STATS_RESULT, data, data_len);

	zbx_free(data);
	zbx_vector_pp_top_stats_ptr_clear_ext(&stats, (zbx_pp_top_stats_ptr_free_func_t)zbx_ptr_free);
	zbx_vector_pp_top_stats_ptr_destroy(&stats);
}

/******************************************************************************
 *                                                                            *
 * Purpose: respond to worker usage statistics request                        *
 *                                                                            *
 * Parameters: manager     - [IN] preprocessing manager                       *
 *             workers_num - [IN] number of preprocessing workers             *
 *             client      - [IN] request source                              *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_reply_usage_stats(zbx_pp_manager_t *manager, int workers_num, zbx_ipc_client_t *client)
{
	zbx_vector_dbl_t	usage;
	unsigned char		*data;
	zbx_uint32_t		data_len;

	zbx_vector_dbl_create(&usage);
	zbx_pp_manager_get_worker_usage(manager, &usage);

	data_len = zbx_preprocessor_pack_usage_stats(&data, &usage, workers_num);

	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);

	zbx_free(data);
	zbx_vector_dbl_destroy(&usage);
}

static void	preprocessor_finished_task_cb(void *data)
{
	zbx_ipc_service_alert((zbx_ipc_service_t *)data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: change worker log level                                           *
 *                                                                            *
 ******************************************************************************/
static void	pp_manager_change_worker_loglevel(zbx_pp_manager_t *manager, int worker_num, int direction)
{
	if (0 > worker_num || manager->workers_num < worker_num)
	{
		zabbix_log(LOG_LEVEL_INFORMATION, "Cannot change log level for preprocessing worker #%d:"
				" no such instance", worker_num);
		return;
	}

	for (int i = 0; i < manager->workers_num; i++)
	{
		if (0 != worker_num && worker_num != i + 1)
			continue;

		zbx_change_component_log_level(&manager->workers[i].logger, direction);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: change log level for the specified worker(s)                      *
 *                                                                            *
 * Parameters: manager   - [IN] preprocessing manager                         *
 *             direction - [IN] 1) increase, -1) decrease                     *
 *             data      - [IN] rtc data in json format                       *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_change_loglevel(zbx_pp_manager_t *manager, int direction, const char *data)
{
	char	*error = NULL;
	pid_t	pid;
	int	proc_type, proc_num;

	if (SUCCEED != zbx_rtc_get_command_target(data, &pid, &proc_type, &proc_num, NULL, &error))
	{
		zabbix_log(LOG_LEVEL_WARNING, "Cannot change log level: %s", error);
		zbx_free(error);
		return;
	}

	if (0 != pid)
	{
		zabbix_log(LOG_LEVEL_WARNING, "Cannot change log level for preprocessing worker by pid");
		return;
	}

	pp_manager_change_worker_loglevel(manager, proc_num, direction);
}

ZBX_THREAD_ENTRY(zbx_pp_manager_thread, args)
{
#define PP_MANAGER_DELAY_SEC	0
#define PP_MANAGER_DELAY_NS	5e8

	zbx_ipc_service_t			service;
	char					*error = NULL;
	zbx_ipc_client_t			*client;
	zbx_ipc_message_t			*message;
	double					time_stat, time_idle = 0, time_flush, time_vps_update, time_trim;
	zbx_timespec_t				timeout = {PP_MANAGER_DELAY_SEC, PP_MANAGER_DELAY_NS};
	const zbx_thread_info_t			*info = &((zbx_thread_args_t *)args)->info;
	int					server_num = ((zbx_thread_args_t *)args)->info.server_num,
						process_num = ((zbx_thread_args_t *)args)->info.process_num;
	unsigned char				process_type = ((zbx_thread_args_t *)args)->info.process_type;
	zbx_thread_pp_manager_args		*pp_args = ((zbx_thread_args_t *)args)->args;
	zbx_pp_manager_t			*manager;
	zbx_vector_pp_task_ptr_t		tasks;
	zbx_uint32_t				rtc_msgs[] = {ZBX_RTC_LOG_LEVEL_INCREASE, ZBX_RTC_LOG_LEVEL_DECREASE};
	zbx_uint64_t				pending_num, finished_num, processed_num = 0, queued_num = 0,
						processing_num = 0;

	const zbx_thread_pp_manager_args	*pp_manager_args_in = (const zbx_thread_pp_manager_args *)
						(((zbx_thread_args_t *)args)->args);

#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
			server_num, get_process_type_string(process_type), process_num);

	zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

	if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	if (NULL == (manager = zbx_pp_manager_create(pp_args->workers_num, preprocessor_finished_task_cb,
			(void *)&service, pp_manager_args_in->config_source_ip, pp_manager_args_in->config_timeout,
			&error)))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot initialize preprocessing manager: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	/* subscribe for worker log level rtc messages */
	zbx_rtc_subscribe_service(ZBX_PROCESS_TYPE_PREPROCESSOR, 0, rtc_msgs, ARRSIZE(rtc_msgs),
			pp_args->config_timeout, ZBX_IPC_SERVICE_PREPROCESSING);

	zbx_vector_pp_task_ptr_create(&tasks);

	/* initialize statistics */
	time_stat = zbx_time();
	time_flush = time_stat;
	time_vps_update = time_stat;
	time_trim = time_stat;

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	while (ZBX_IS_RUNNING())
	{
		double		time_now = zbx_time();
		zbx_uint64_t	direct_num = 0;

		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
					ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
					get_process_type_string(process_type), process_num,
					queued_num, processed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			processed_num = 0;
			queued_num = 0;
		}

		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);

		int	ret = zbx_ipc_service_recv(&service, &timeout, &client, &message);

		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

		double	sec = zbx_time();

		zbx_update_env(get_process_type_string(process_type), sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_PREPROCESSOR_REQUEST:
					queued_num += preprocessor_add_request(manager, message, &direct_num);
					break;
				case ZBX_IPC_PREPROCESSOR_QUEUE:
					preprocessor_reply_queue_size(manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
					preprocessor_add_test_request(manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
					preprocessor_reply_diag_info(manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_TOP_SEQUENCES:
				case ZBX_IPC_PREPROCESSOR_TOP_PEAK:
					preprocessor_reply_top_stats(manager, client, message, message->code);
					break;
				case ZBX_IPC_PREPROCESSOR_USAGE_STATS:
					preprocessor_reply_usage_stats(manager, pp_args->workers_num, client);
					break;
				case ZBX_RTC_LOG_LEVEL_INCREASE:
					preprocessor_change_loglevel(manager, 1, (const char *)message->data);
					break;
				case ZBX_RTC_LOG_LEVEL_DECREASE:
					preprocessor_change_loglevel(manager, -1, (const char *)message->data);
					break;
				case ZBX_RTC_SHUTDOWN:
					zabbix_log(LOG_LEVEL_DEBUG, "shutdown message received, terminating...");
					goto out;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);

		zbx_pp_manager_process_finished(manager, &tasks, &pending_num, &processing_num, &finished_num);

		if (0 < tasks.values_num)
		{
			processed_num += (unsigned int)tasks.values_num;
			preprocessor_flush_tasks(manager, &tasks);
			zbx_pp_tasks_clear(&tasks);
		}

		if (0 != finished_num || 0 != direct_num)
		{
			timeout.sec = 0;
			timeout.ns = 0;
		}
		else
		{
			timeout.sec = PP_MANAGER_DELAY_SEC;
			timeout.ns = PP_MANAGER_DELAY_NS;
		}

		if (0 == pending_num + processing_num + finished_num + direct_num || 1 < sec - time_flush)
		{
			if (0 != zbx_dc_flush_history())
			{
				zbx_rtc_notify_generic(&manager->rtc, ZBX_PROCESS_TYPE_HISTSYNCER, 1,
						ZBX_RTC_HISTORY_SYNC_NOTIFY, NULL, 0);
			}

			time_flush = sec;
		}

		/* trigger vps monitor update at least once per second */
		if (1 <= sec - time_vps_update)
		{
			zbx_vps_monitor_add_collected(0);
			time_vps_update = sec;
		}

		/* release memory in case of peak periods */
		if (SEC_PER_DAY <= sec - time_trim)
		{
#ifdef	HAVE_MALLOC_TRIM
			malloc_trim(128 * ZBX_MEBIBYTE);
#endif
			zbx_pp_manager_items_preproc_peak_reset(manager);
			time_trim = sec;
		}
	}
out:
	zbx_setproctitle("%s #%d [terminating]", get_process_type_string(process_type), process_num);

	zbx_vector_pp_task_ptr_destroy(&tasks);
	zbx_pp_manager_free(manager);

	zbx_ipc_service_close(&service);

	exit(EXIT_SUCCESS);
#undef STAT_INTERVAL
#undef PP_MANAGER_DELAY_SEC
#undef PP_MANAGER_DELAY_NS
}