/*
** Zabbix
** Copyright (C) 2001-2022 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "preproc_manager.h"

#include "daemon.h"
#include "zbxself.h"
#include "log.h"
#include "zbxlld.h"
#include "preprocessing.h"
#include "preproc_history.h"
#include "../../libs/zbxalgo/vectorimpl.h"
#include "preproc_manager.h"

extern ZBX_THREAD_LOCAL unsigned char	process_type;
extern unsigned char			program_type;
extern ZBX_THREAD_LOCAL int		server_num, process_num;
extern int				CONFIG_PREPROCESSOR_FORKS;

#define ZBX_PREPROCESSING_MANAGER_DELAY	1

#define ZBX_PREPROC_PRIORITY_NONE	0
#define ZBX_PREPROC_PRIORITY_FIRST	1

typedef enum
{
	REQUEST_STATE_QUEUED		= 0,		/* requires preprocessing */
	REQUEST_STATE_PROCESSING	= 1,		/* is being preprocessed  */
	REQUEST_STATE_DONE		= 2,		/* value is set, waiting for flush */
	REQUEST_STATE_PENDING		= 3,		/* value requires preprocessing, */
							/* but is waiting on other request to complete */
}
zbx_preprocessing_states_t;

typedef enum
{
	ZBX_PREPROC_ITEM,	/* item preprocessing request */
	ZBX_PREPROC_DEPS	/* dependent item preprocessing request */
}
zbx_preprocessing_kind_t;

typedef struct zbx_preprocessing_request_base zbx_preprocessing_request_base_t;

ZBX_PTR_VECTOR_DECL(preprocessing_request_base, zbx_preprocessing_request_base_t *)
ZBX_PTR_VECTOR_IMPL(preprocessing_request_base, zbx_preprocessing_request_base_t *)

struct zbx_preprocessing_request_base
{
	zbx_preprocessing_kind_t		kind;
	zbx_preprocessing_states_t		state;
	zbx_preprocessing_request_base_t	*pending;	/* the request waiting on this request to complete */
	zbx_vector_preprocessing_request_base_t	flush_queue;	/* processed request waiting to be flushed */
};

/* preprocessing request */
typedef struct preprocessing_request
{
	zbx_preprocessing_request_base_t	base;		/* common data for various requests - must be first */
								/* field preprocessing request structure            */
	zbx_preproc_item_value_t		value;		/* unpacked item value */
	zbx_preproc_op_t			*steps;		/* preprocessing steps */
	int					steps_num;	/* number of preprocessing steps */
	unsigned char				value_type;	/* value type from configuration */
								/* at the beginning of preprocessing queue */
}
zbx_preprocessing_request_t;

/* bulk dependent item preprocessing request*/
typedef struct
{
	zbx_preprocessing_request_base_t	base;		/* common data for various requests - must be first */
								/* field preprocessing request structure            */
	zbx_uint64_t				hostid;
	zbx_uint64_t				master_itemid;
	unsigned char				value_type;	/* value type for items without preproc config */
								/* inherited from master item                  */
	zbx_variant_t				value;
	zbx_timespec_t				ts;

	zbx_vector_ipcmsg_t			messages;	/* IPC messages with dependent item preproc data */

	zbx_preproc_dep_result_t		*results;
	int					results_alloc;
	int					results_offset;
}
zbx_preprocessing_dep_request_t;

/* preprocessing worker data */
typedef struct
{
	zbx_ipc_client_t	*client;	/* the connected preprocessing worker client */
	void			*task;		/* the current task data */
}
zbx_preprocessing_worker_t;

/* item link index */
typedef struct
{
	zbx_uint64_t			itemid;		/* item id */
	zbx_preprocessing_kind_t	kind;
	zbx_list_item_t			*queue_item;	/* queued item */
}
zbx_item_link_t;

/* direct request to be forwarded to worker, bypassing the preprocessing queue */
typedef struct
{
	zbx_ipc_client_t	*client;	/* the IPC client sending forward message to worker */
	zbx_ipc_message_t	message;
}
zbx_preprocessing_direct_request_t;

/* preprocessing manager data */
typedef struct
{
	zbx_preprocessing_worker_t	*workers;	/* preprocessing worker array */
	int				worker_count;	/* preprocessing worker count */
	zbx_list_t			queue;		/* queue of item values */
	zbx_hashset_t			item_config;	/* item configuration L2 cache */
	zbx_hashset_t			history_cache;	/* item value history cache */
	zbx_hashset_t			linked_items;	/* linked items placed in queue */
	int				cache_ts;	/* cache timestamp */
	zbx_uint64_t			processed_num;	/* processed value counter */
	zbx_uint64_t			queued_num;	/* queued value counter */
	zbx_uint64_t			preproc_num;	/* queued values with preprocessing steps */
	zbx_list_iterator_t		priority_tail;	/* iterator to the last queued priority item */

	zbx_list_t			direct_queue;	/* Queue of external requests that have to be */
							/* forwarded to workers for preprocessing.    */
}
zbx_preprocessing_manager_t;

static void	preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager, zbx_uint64_t hostid,
		zbx_uint64_t itemid, AGENT_RESULT *ar, unsigned char value_type, const zbx_timespec_t *ts);

static void	preprocessor_update_history(zbx_preprocessing_manager_t *manager, zbx_uint64_t itemid,
		zbx_vector_ptr_t *history);

/* cleanup functions */

static void	preproc_item_clear(zbx_preproc_item_t *item)
{
	int	i;

	zbx_free(item->dep_itemids);

	for (i = 0; i < item->preproc_ops_num; i++)
	{
		zbx_free(item->preproc_ops[i].params);
		zbx_free(item->preproc_ops[i].error_handler_params);
	}

	zbx_free(item->preproc_ops);
}

static void	request_free_steps(zbx_preprocessing_request_t *request)
{
	while (0 < request->steps_num)
	{
		request->steps_num--;
		zbx_free(request->steps[request->steps_num].params);
		zbx_free(request->steps[request->steps_num].error_handler_params);
	}

	zbx_free(request->steps);
}

/******************************************************************************
 *                                                                            *
 * Purpose: synchronize preprocessing manager with configuration cache data   *
 *                                                                            *
 * Parameters: manager - [IN] the manager to be synchronized                  *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_sync_configuration(zbx_preprocessing_manager_t *manager)
{
	zbx_hashset_iter_t	iter;
	int			ts;
	zbx_preproc_history_t	*vault;
	zbx_preproc_item_t	*item;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	ts = manager->cache_ts;
	DCconfig_get_preprocessable_items(&manager->item_config, &manager->cache_ts);

	if (ts != manager->cache_ts)
	{
		/* drop items with removed preprocessing steps from preprocessing history cache */
		zbx_hashset_iter_reset(&manager->history_cache, &iter);
		while (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_iter_next(&iter)))
		{
			if (NULL != zbx_hashset_search(&manager->item_config, &vault->itemid))
				continue;

			zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_iter_remove(&iter);
		}

		/* reset preprocessing history for an item if its preprocessing step was modified */
		zbx_hashset_iter_reset(&manager->item_config, &iter);
		while (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_iter_next(&iter)))
		{
			if (ts >= item->update_time)
				continue;

			if (NULL == (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
					&item->itemid)))
			{
				continue;
			}

			zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_remove_direct(&manager->history_cache, vault);
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size: %d, history cache size: %d", __func__,
			manager->item_config.num_data, manager->history_cache.num_data);
}

static void	preprocessing_ar_to_variant(AGENT_RESULT *ar, zbx_variant_t *value)
{
	if (ISSET_LOG(ar))
		zbx_variant_set_str(value,ar->log->value);
	else if (ISSET_UI64(ar))
		zbx_variant_set_ui64(value, ar->ui64);
	else if (ISSET_DBL(ar))
		zbx_variant_set_dbl(value, ar->dbl);
	else if (ISSET_STR(ar))
		zbx_variant_set_str(value, ar->str);
	else if (ISSET_TEXT(ar))
		zbx_variant_set_str(value, ar->text);
	else
		THIS_SHOULD_NEVER_HAPPEN;
}

/******************************************************************************
 *                                                                            *
 * Purpose: create preprocessing task for request                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             request - [IN] preprocessing request                           *
 *             task    - [OUT] preprocessing task data                        *
 *                                                                            *
 ******************************************************************************/
static zbx_uint32_t	preprocessor_create_task(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_request_t *request, unsigned char **task)
{
	zbx_variant_t		value;
	zbx_preproc_history_t	*vault;
	zbx_vector_ptr_t	*phistory;

	if (ITEM_STATE_NOTSUPPORTED == request->value.state)
		zbx_variant_set_str(&value, "");
	else
		preprocessing_ar_to_variant(request->value.result, &value);

	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
				&request->value.itemid)))
	{
		phistory = &vault->history;
	}
	else
		phistory = NULL;

	return zbx_preprocessor_pack_task(task, request->value.itemid, request->value_type, request->value.ts, &value,
			phistory, request->steps, request->steps_num);
}

/******************************************************************************
 *                                                                            *
 * Purpose: set request state to done and handle linked items                 *
 *                                                                            *
 * Parameters: manager    - [IN] preprocessing manager                        *
 *             request    - [IN] preprocessing request                        *
 *             queue_item - [IN] queued item                                  *
 *                                                                            *
 ******************************************************************************/
static	void	preprocessor_set_request_state_done(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_request_base_t *base, const zbx_list_item_t *queue_item)
{
	zbx_item_link_t				*index, index_local;
	zbx_list_iterator_t			iterator, next_iterator;
	zbx_preprocessing_request_t		*request;
	zbx_preprocessing_dep_request_t		*dep_request;
	zbx_preprocessing_request_base_t	*prev;

	base->state = REQUEST_STATE_DONE;

	/* value processed - the pending value can now be processed */
	if (NULL != base->pending)
		base->pending->state = REQUEST_STATE_QUEUED;

	switch (base->kind)
	{
		case ZBX_PREPROC_ITEM:
			request = (zbx_preprocessing_request_t *)base;
			index_local.itemid = request->value.itemid;
			break;
		case ZBX_PREPROC_DEPS:
			dep_request = (zbx_preprocessing_dep_request_t *)base;
			index_local.itemid = dep_request->master_itemid;
			break;
	}

	index_local.kind = base->kind;

	if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &index_local)) &&
			queue_item == index->queue_item)
	{
		zbx_hashset_remove_direct(&manager->linked_items, index);
	}

	if (NULL == manager->queue.head)
		return;

	zbx_list_iterator_init(&manager->queue, &iterator);
	if (iterator.next == queue_item)
		return;

	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		if (iterator.next == queue_item)
			break;
	}

	prev = (zbx_preprocessing_request_base_t *)iterator.current->data;
	zbx_vector_preprocessing_request_base_append(&prev->flush_queue, base);

	next_iterator = iterator;
	if (SUCCEED == zbx_list_iterator_next(&next_iterator))
	{
		if (SUCCEED == zbx_list_iterator_equal(&next_iterator, &manager->priority_tail))
			manager->priority_tail = iterator;
	}

	(void)zbx_list_iterator_remove_next(&iterator);
}

/******************************************************************************
 *                                                                            *
 * Purpose: create message(s) for dependent item bulk preprocessing           *
 *                                                                            *
 * Parameters: manager  - [IN] preprocessing manager                          *
 *             request  - [IN] preprocessing request                          *
 *                                                                            *
 * Return value: the number of created messages                               *
 *                                                                            *
 ******************************************************************************/
static int	preprocessor_create_dep_message(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_dep_request_t *request)
{
	int			i;
	zbx_preproc_dep_t	*deps;
	zbx_preproc_item_t	*master_item;

	if (NULL == (master_item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config,
			&request->master_itemid)) || 0 == master_item->dep_itemids_num)
	{
		return 0;
	}

	deps = (zbx_preproc_dep_t *)zbx_malloc(NULL, (size_t)master_item->dep_itemids_num * sizeof(zbx_preproc_dep_t));

	for (i = 0; i < master_item->dep_itemids_num; i++)
	{
		zbx_preproc_history_t	*vault;
		zbx_preproc_item_t	*item;

		deps[i].itemid = master_item->dep_itemids[i].first;
		deps[i].flags = (unsigned char)master_item->dep_itemids[i].second;

		if (NULL == (item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config,
				&master_item->dep_itemids[i].first)))
		{
			deps[i].value_type = request->value_type;
			deps[i].steps = NULL;
			deps[i].steps_num = 0;

			/* items without preprocessing do not have history */
			zbx_vector_ptr_create(&deps[i].history);
			continue;
		}

		deps[i].value_type = item->value_type;
		deps[i].steps = item->preproc_ops;
		deps[i].steps_num = item->preproc_ops_num;

		if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
				&item->itemid)))
		{
			deps[i].history = vault->history;
		}
		else
			zbx_vector_ptr_create(&deps[i].history);
	}

	zbx_preprocessor_pack_dep_request(&request->value, &request->ts, deps, master_item->dep_itemids_num,
			&request->messages);

	zbx_free(deps);

	return request->messages.values_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: returns next dependent item preprocessing message                 *
 *                                                                            *
 * Parameters: request - [IN] the dependent item preprocessing request        *
 *             message - [OUT] the next message to be sent                    *
 *                                                                            *
 * Return value: SUCCEED - the next message was returned                      *
 *               FAIL    - no more messages to send                           *
 *                                                                            *
 ******************************************************************************/
static int	preprocessor_dep_request_next_message(zbx_preprocessing_dep_request_t *request,
		zbx_ipc_message_t *message)
{
	if (0 == request->messages.values_num)
		return FAIL;

	*message = *request->messages.values[0];
	zbx_free(request->messages.values[0]);
	zbx_vector_ipcmsg_remove(&request->messages, 0);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: gets next task to be sent to worker                               *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [OUT] the serialized task to be sent                 *
 *                                                                            *
 * Return value: pointer to the task object                                   *
 *                                                                            *
 ******************************************************************************/
static void	*preprocessor_get_next_task(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
{
	zbx_list_iterator_t			iterator;
	zbx_preprocessing_request_base_t	*base;
	zbx_preprocessing_request_t		*request = NULL;
	void					*task = NULL;
	zbx_preprocessing_direct_request_t	*direct_request;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
	{
		*message = direct_request->message;
		zbx_ipc_message_init(&direct_request->message);
		task = direct_request;
		goto out;
	}

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		int process_notsupported = 0;

		zbx_list_iterator_peek(&iterator, (void **)&base);

		if (REQUEST_STATE_QUEUED != base->state)
			continue;

		switch (base->kind)
		{
			case ZBX_PREPROC_DEPS:
				if (0 == preprocessor_create_dep_message(manager,
						(zbx_preprocessing_dep_request_t *)base))
				{
					base->state = REQUEST_STATE_DONE;
					continue;
				}
				(void)preprocessor_dep_request_next_message((zbx_preprocessing_dep_request_t *)base,
						message);
				break;
			case ZBX_PREPROC_ITEM:
				request = (zbx_preprocessing_request_t *)base;
				if (NULL != request->steps &&
						ZBX_PREPROC_VALIDATE_NOT_SUPPORTED == request->steps[0].type)
				{
					process_notsupported = 1;
				}

				if (ITEM_STATE_NOTSUPPORTED == request->value.state && 0 == process_notsupported)
				{
					zbx_preproc_history_t	*vault;

					if (NULL != (vault = (zbx_preproc_history_t *) zbx_hashset_search(
							&manager->history_cache, &request->value.itemid)))
					{
						zbx_vector_ptr_clear_ext(&vault->history,
								(zbx_clean_func_t) zbx_preproc_op_history_free);
						zbx_vector_ptr_destroy(&vault->history);
						zbx_hashset_remove_direct(&manager->history_cache, vault);
					}

					preprocessor_set_request_state_done(manager, base, iterator.current);
					continue;
				}

				message->code = ZBX_IPC_PREPROCESSOR_REQUEST;
				message->size = preprocessor_create_task(manager, request, &message->data);
				request_free_steps(request);
				break;
		}

		task = iterator.current;
		base->state = REQUEST_STATE_PROCESSING;
		break;
	}
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return task;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get worker data by IPC client                                     *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *                                                                            *
 * Return value: pointer to the worker data                                   *
 *                                                                            *
 ******************************************************************************/
static zbx_preprocessing_worker_t	*preprocessor_get_worker_by_client(zbx_preprocessing_manager_t *manager,
		zbx_ipc_client_t *client)
{
	int				i;
	zbx_preprocessing_worker_t	*worker = NULL;

	for (i = 0; i < manager->worker_count; i++)
	{
		if (client == manager->workers[i].client)
		{
			worker = &manager->workers[i];
			break;
		}
	}

	if (NULL == worker)
	{
		THIS_SHOULD_NEVER_HAPPEN;
		exit(EXIT_FAILURE);
	}

	return worker;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get worker without active preprocessing task                      *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *                                                                            *
 * Return value: pointer to the worker data or NULL if none                   *
 *                                                                            *
 ******************************************************************************/
static zbx_preprocessing_worker_t	*preprocessor_get_free_worker(zbx_preprocessing_manager_t *manager)
{
	int	i;

	for (i = 0; i < manager->worker_count; i++)
	{
		if (NULL == manager->workers[i].task)
			return &manager->workers[i];
	}

	return NULL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: assign available queued preprocessing tasks to free workers       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_assign_tasks(zbx_preprocessing_manager_t *manager)
{
	zbx_preprocessing_worker_t	*worker;
	void				*data;
	zbx_ipc_message_t		message;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	while (NULL != (worker = preprocessor_get_free_worker(manager)) &&
			NULL != (data = preprocessor_get_next_task(manager, &message)))
	{
		if (FAIL == zbx_ipc_client_send(worker->client, message.code, message.data, message.size))
		{
			zabbix_log(LOG_LEVEL_CRIT, "cannot send data to preprocessing worker");
			exit(EXIT_FAILURE);
		}

		worker->task = data;
		zbx_ipc_message_clean(&message);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: frees resources allocated by preprocessor item value              *
 *                                                                            *
 * Parameters: value - [IN] value to be freed                                 *
 *                                                                            *
 ******************************************************************************/
static void	preproc_item_value_clear(zbx_preproc_item_value_t *value)
{
	zbx_free(value->error);
	zbx_free(value->ts);

	if (NULL != value->result)
	{
		free_result(value->result);
		zbx_free(value->result);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: free preprocessing request                                        *
 *                                                                            *
 * Parameters: base - [IN] request data to be freed                           *
 *                                                                            *
 * Comments: This handles freeing normal and dependent item requests          *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_free_request(zbx_preprocessing_request_base_t *base)
{
	zbx_preprocessing_request_t	*request;
	zbx_preprocessing_dep_request_t	*dep_request;

	zbx_vector_preprocessing_request_base_clear_ext(&base->flush_queue, preprocessor_free_request);
	zbx_vector_preprocessing_request_base_destroy(&base->flush_queue);

	switch (base->kind)
	{
		case ZBX_PREPROC_ITEM:
			request = (zbx_preprocessing_request_t *)base;
			preproc_item_value_clear(&request->value);
			request_free_steps(request);
			break;
		case ZBX_PREPROC_DEPS:
			dep_request = (zbx_preprocessing_dep_request_t *)base;
			zbx_preprocessor_free_dep_results(dep_request->results, dep_request->results_offset);
			zbx_variant_clear(&dep_request->value);
			zbx_vector_ipcmsg_clear_ext(&dep_request->messages, zbx_ipc_message_free);
			zbx_vector_ipcmsg_destroy(&dep_request->messages);
			break;
	}

	zbx_free(base);
}

/******************************************************************************
 *                                                                            *
 * Purpose: free preprocessing direct request                                 *
 *                                                                            *
 * Parameters: direct_request - [IN] forward data to be freed                 *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_free_direct_request(zbx_preprocessing_direct_request_t *direct_request)
{
	zbx_ipc_client_release(direct_request->client);
	zbx_ipc_message_clean(&direct_request->message);
	zbx_free(direct_request);
}

/******************************************************************************
 *                                                                            *
 * Purpose: add new value to the local history cache or send to LLD manager   *
 *                                                                            *
 * Parameters: value - [IN] value to be added or sent                         *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_flush_value(const zbx_preproc_item_value_t *value)
{
	if (0 == (value->item_flags & ZBX_FLAG_DISCOVERY_RULE) || 0 == (program_type & ZBX_PROGRAM_TYPE_SERVER))
	{
		dc_add_history(value->itemid, value->item_value_type, value->item_flags, value->result,
				value->ts, value->state, value->error);
	}
	else
	{
		zbx_lld_process_agent_result(value->itemid, value->hostid, value->result, value->ts,
				value->error);
	}
}

static void	preprocessor_flush_dep_results(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_dep_request_t *request)
{
	int	i;

	for (i = 0; i < request->results_alloc; i++)
	{
		unsigned char	state;

		state = (NULL == request->results[i].error ? ITEM_STATE_NORMAL : ITEM_STATE_NOTSUPPORTED);

		if (0 == (request->results[i].flags & ZBX_FLAG_DISCOVERY_RULE) ||
				0 == (program_type & ZBX_PROGRAM_TYPE_SERVER))
		{
			dc_add_history(request->results[i].itemid, request->results[i].value_type,
					request->results[i].flags, &request->results[i].value, &request->ts, state,
					request->results[i].error);
		}
		else
		{
			zbx_lld_process_agent_result(request->results[i].itemid, request->hostid,
					&request->results[i].value, &request->ts, request->results[i].error);
		}
	}

	manager->processed_num += (zbx_uint64_t)request->results_alloc;
	manager->preproc_num--;
}

/******************************************************************************
 *                                                                            *
 * Purpose: recursively flush processed request and the other processed       *
 *          requests that were waiting on this request to be finished         *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             base    - [IN] the preprocessing request                       *
 *                                                                            *
 ******************************************************************************/
static void	preprocessing_flush_request(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_request_base_t *base)
{
	zbx_preprocessing_request_t	*request;
	zbx_preprocessing_dep_request_t	*dep_request;
	int				i;

	switch (base->kind)
	{
		case ZBX_PREPROC_ITEM:
			request = (zbx_preprocessing_request_t *)base;
			preprocessor_flush_value(&request->value);
			manager->processed_num++;
			manager->queued_num--;
			break;
		case ZBX_PREPROC_DEPS:
			dep_request = (zbx_preprocessing_dep_request_t *)base;
			preprocessor_flush_dep_results(manager, dep_request);
			break;
	}

	for (i = 0; i < base->flush_queue.values_num; i++)
		preprocessing_flush_request(manager, base->flush_queue.values[i]);
}

/******************************************************************************
 *                                                                            *
 * Purpose: add all sequential processed values from beginning of the queue   *
 *          to the local history cache                                        *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *                                                                            *
 ******************************************************************************/
static void	preprocessing_flush_queue(zbx_preprocessing_manager_t *manager)
{
	zbx_preprocessing_request_base_t	*base;
	zbx_list_iterator_t			iterator;

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&base);

		if (REQUEST_STATE_DONE != base->state)
			break;

		preprocessing_flush_request(manager, base);

		if (SUCCEED == zbx_list_iterator_equal(&iterator, &manager->priority_tail))
			zbx_list_iterator_clear(&manager->priority_tail);

		zbx_list_pop(&manager->queue, NULL);
		preprocessor_free_request(base);
	}
}

static void	preproc_link_nodes(zbx_preprocessing_manager_t *manager, zbx_uint64_t itemid,
		zbx_preprocessing_kind_t kind, zbx_list_item_t *enqueued_at)
{
	zbx_item_link_t				*index, index_local;
	zbx_preprocessing_request_base_t	*request, *linked_request;

	index_local.itemid = itemid;
	index_local.kind = kind;

	/* existing linked item*/
	if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &index_local)))
	{
		linked_request = (zbx_preprocessing_request_base_t *)(enqueued_at->data);
		request = (zbx_preprocessing_request_base_t *)(index->queue_item->data);

		if (REQUEST_STATE_DONE != request->state)
		{
			request->pending = linked_request;
			linked_request->state = REQUEST_STATE_PENDING;
		}

		index->queue_item = enqueued_at;
	}
	else
	{
		index_local.queue_item = enqueued_at;
		zbx_hashset_insert(&manager->linked_items, &index_local, sizeof(zbx_item_link_t));
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: create relation between item values within value queue            *
 *                                                                            *
 * Parameters: manager     - [IN] preprocessing manager                       *
 *             enqueued_at - [IN] position in value queue                     *
 *             item        - [IN] item configuration data                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_link_items(zbx_preprocessing_manager_t *manager, zbx_list_item_t *enqueued_at,
		zbx_preproc_item_t *item)
{
	int			i;
	zbx_preproc_op_t	*op;

	/* allow out of order processing for items without dependent items */
	/* or preprocessing steps requiring serial processing              */
	if (0 == item->dep_itemids_num)
	{
		for (i = 0; i < item->preproc_ops_num; i++)
		{
			op = &item->preproc_ops[i];

			if (ZBX_PREPROC_DELTA_VALUE == op->type || ZBX_PREPROC_DELTA_SPEED == op->type)
				break;

			if (ZBX_PREPROC_THROTTLE_VALUE == op->type || ZBX_PREPROC_THROTTLE_TIMED_VALUE == op->type)
				break;
		}

		if (i == item->preproc_ops_num)
			return;
	}

	preproc_link_nodes(manager, item->itemid, ZBX_PREPROC_ITEM, enqueued_at);
}

/******************************************************************************
 *                                                                            *
 * Purpose: enqueue dependent items (if any) by preproc value                 *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             value   - [IN] the item preproc value                          *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_enqueue_dependent_value(zbx_preprocessing_manager_t *manager,
		zbx_preproc_item_value_t *value)
{
	if (NULL == value->result)
		return;

	preprocessor_enqueue_dependent(manager, value->hostid, value->itemid, value->result,
			value->item_value_type, value->ts);
}

/******************************************************************************
 *                                                                            *
 * Purpose: enqueue preprocessing request                                     *
 *                                                                            *
 * Parameters: manage   - [IN] preprocessing manager                          *
 *             value    - [IN] item value                                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_enqueue(zbx_preprocessing_manager_t *manager, zbx_preproc_item_value_t *value)
{
	zbx_preprocessing_request_t	*request;
	zbx_preproc_item_t		*item, item_local;
	zbx_list_item_t			*enqueued_at;
	int				i;
	zbx_preprocessing_states_t	state;
	unsigned char			priority = ZBX_PREPROC_PRIORITY_NONE;
	int				notsupp_shift;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, value->itemid);

	item_local.itemid = value->itemid;
	item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local);

	/* override priority based on item type */
	if (NULL != item && ITEM_TYPE_INTERNAL == item->type)
		priority = ZBX_PREPROC_PRIORITY_FIRST;

	if (NULL == item || 0 == item->preproc_ops_num || (ITEM_STATE_NOTSUPPORTED != value->state &&
			(NULL == value->result || 0 == ISSET_VALUE(value->result))))
	{
		state = REQUEST_STATE_DONE;

		if (NULL == manager->queue.head)
		{
			/* queue is empty and item is done, it can be flushed */
			preprocessor_flush_value(value);
			manager->processed_num++;
			preprocessor_enqueue_dependent_value(manager, value);
			preproc_item_value_clear(value);

			goto out;
		}
	}
	else
		state = REQUEST_STATE_QUEUED;

	request = (zbx_preprocessing_request_t *)zbx_malloc(NULL, sizeof(zbx_preprocessing_request_t));
	memset(request, 0, sizeof(zbx_preprocessing_request_t));
	request->base.kind = ZBX_PREPROC_ITEM;
	zbx_vector_preprocessing_request_base_create(&request->base.flush_queue);
	memcpy(&request->value, value, sizeof(zbx_preproc_item_value_t));
	request->base.state = state;

	if (REQUEST_STATE_QUEUED == state)
	{
		notsupp_shift = ITEM_STATE_NOTSUPPORTED == value->state ? -1 : 0;

		for (i = 0; i < item->preproc_ops_num; i++)
		{
			if (ZBX_PREPROC_VALIDATE_NOT_SUPPORTED == item->preproc_ops[i].type)
			{
				notsupp_shift = ITEM_STATE_NOTSUPPORTED != value->state;
				if (0 != i)
					THIS_SHOULD_NEVER_HAPPEN;
				break;
			}
		}
	}

	if (REQUEST_STATE_QUEUED == state && 0 <= notsupp_shift)
	{
		request->value_type = item->value_type;
		if (0 < item->preproc_ops_num - notsupp_shift)
		{
			request->steps = (zbx_preproc_op_t *)zbx_malloc(NULL, sizeof(zbx_preproc_op_t) *
					(size_t)(item->preproc_ops_num - notsupp_shift));
			request->steps_num = item->preproc_ops_num - notsupp_shift;

			for (i = 0; i < item->preproc_ops_num - notsupp_shift; i++)
			{
				request->steps[i].type = item->preproc_ops[i + notsupp_shift].type;
				request->steps[i].params = zbx_strdup(NULL, item->preproc_ops[i + notsupp_shift].params);
				request->steps[i].error_handler = item->preproc_ops[i + notsupp_shift].error_handler;
				request->steps[i].error_handler_params = zbx_strdup(NULL,
						item->preproc_ops[i + notsupp_shift].error_handler_params);
			}
		}
		else
			request->base.state = REQUEST_STATE_DONE;

		manager->preproc_num++;
	}

	/* priority items are enqueued at the beginning of the line */
	if (ZBX_PREPROC_PRIORITY_FIRST == priority)
	{
		if (SUCCEED == zbx_list_iterator_isset(&manager->priority_tail))
		{
			/* insert after the last internal item */
			zbx_list_insert_after(&manager->queue, manager->priority_tail.current, request, &enqueued_at);
			zbx_list_iterator_update(&manager->priority_tail);
		}
		else
		{
			/* no internal items in queue, insert at the beginning */
			zbx_list_prepend(&manager->queue, request, &enqueued_at);
			zbx_list_iterator_init(&manager->queue, &manager->priority_tail);
		}

		zbx_list_iterator_next(&manager->priority_tail);
	}
	else
	{
		zbx_list_insert_after(&manager->queue, NULL, request, &enqueued_at);
		zbx_list_iterator_update(&manager->priority_tail);
	}

	if (REQUEST_STATE_QUEUED == request->base.state)
		preprocessor_link_items(manager, enqueued_at, item);

	/* if no preprocessing is needed, dependent items are enqueued */
	if (REQUEST_STATE_DONE == request->base.state)
		preprocessor_enqueue_dependent_value(manager, value);

	manager->queued_num++;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: enqueue dependent items (if any)                                  *
 *                                                                            *
 * Parameters: manager      - [IN] preprocessing manager                      *
 *             source_value - [IN] master item value                          *
 *             master       - [IN] dependent item should be enqueued after    *
 *                                 this item                                  *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager, zbx_uint64_t hostid,
		zbx_uint64_t itemid, AGENT_RESULT *ar, unsigned char value_type, const zbx_timespec_t *ts)
{
	zbx_preproc_item_t	*item, item_local;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, itemid);

	if (ISSET_VALUE(ar))
	{
		item_local.itemid = itemid;
		if (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local)) &&
				0 != item->dep_itemids_num)
		{
			zbx_preprocessing_dep_request_t	*dep_request;
			zbx_variant_t			value;
			zbx_list_item_t			*enqueued_at;

			dep_request = zbx_malloc(NULL, sizeof(zbx_preprocessing_dep_request_t));
			dep_request->base.kind = ZBX_PREPROC_DEPS;
			dep_request->base.state = REQUEST_STATE_QUEUED;
			dep_request->base.pending = NULL;
			zbx_vector_preprocessing_request_base_create(&dep_request->base.flush_queue);
			dep_request->hostid = hostid;

			dep_request->ts = NULL != ts ? *ts : (zbx_timespec_t){0, 0};

			/* the data is copied without allocation - the variant value must not be cleared afterwards */
			preprocessing_ar_to_variant(ar, &value);
			zbx_variant_copy(&dep_request->value, &value);

			dep_request->value_type = value_type;
			dep_request->master_itemid = itemid;

			zbx_vector_ipcmsg_create(&dep_request->messages);

			dep_request->results = NULL;
			dep_request->results_alloc = 0;
			dep_request->results_offset = 0;

			zbx_list_append(&manager->queue, dep_request, &enqueued_at);

			preproc_link_nodes(manager, itemid, ZBX_PREPROC_DEPS, enqueued_at);

			preprocessor_assign_tasks(manager);
			preprocessing_flush_queue(manager);

			manager->preproc_num++;
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle new preprocessing request                                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
{
	zbx_uint32_t			offset = 0;
	zbx_preproc_item_value_t	value;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	preprocessor_sync_configuration(manager);

	while (offset < message->size)
	{
		offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
		preprocessor_enqueue(manager, &value);
	}

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle new preprocessing test request                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_test_request(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_direct_request_t	*direct_request;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_ipc_client_addref(client);
	direct_request = zbx_malloc(NULL, sizeof(zbx_preprocessing_direct_request_t));
	direct_request->client = client;
	zbx_ipc_message_copy(&direct_request->message, message);
	zbx_list_append(&manager->direct_queue, direct_request, NULL);

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get result data from variant and error message                    *
 *                                                                            *
 * Parameters: request - [IN/OUT] preprocessing request                       *
 *             value   - [IN] variant value                                   *
 *             error   - [IN] error message (if any)                          *
 *                                                                            *
 ******************************************************************************/
static int	preprocessor_set_variant_result(zbx_preprocessing_request_t *request,
		zbx_variant_t *value, char *error)
{
	int		type, ret = FAIL;
	zbx_log_t	*log;

	if (NULL != error)
	{
		/* on error item state is set to ITEM_STATE_NOTSUPPORTED */
		request->value.state = ITEM_STATE_NOTSUPPORTED;
		zbx_free(request->value.error);
		request->value.error = error;
		ret = FAIL;

		goto out;
	}

	if (ZBX_VARIANT_NONE == value->type)
	{
		if (NULL != request->value.result)
			free_result(request->value.result);

		zbx_free(request->value.error);

		request->value.state = ITEM_STATE_NORMAL;
		ret = FAIL;

		goto out;
	}

	switch (request->value_type)
	{
		case ITEM_VALUE_TYPE_FLOAT:
			type = ZBX_VARIANT_DBL;
			break;
		case ITEM_VALUE_TYPE_UINT64:
			type = ZBX_VARIANT_UI64;
			break;
		default:
			/* ITEM_VALUE_TYPE_STR, ITEM_VALUE_TYPE_TEXT, ITEM_VALUE_TYPE_LOG */
			type = ZBX_VARIANT_STR;
	}

	if (FAIL != (ret = zbx_variant_convert(value, type)))
	{
		/* old result is shared between dependent and master items, it cannot be modified, create new result */

		if (NULL == request->value.result)
		{
			request->value.result = (AGENT_RESULT *)zbx_malloc(NULL, sizeof(AGENT_RESULT));
			init_result(request->value.result);
		}
		else
		{
			/* preserve eventlog related information */
			if (ITEM_VALUE_TYPE_LOG != request->value_type)
				free_result(request->value.result);
		}

		if (ITEM_STATE_NOTSUPPORTED == request->value.state)
			request->value.state = ITEM_STATE_NORMAL;

		switch (request->value_type)
		{
			case ITEM_VALUE_TYPE_FLOAT:
				SET_DBL_RESULT(request->value.result, value->data.dbl);
				break;
			case ITEM_VALUE_TYPE_STR:
				SET_STR_RESULT(request->value.result, value->data.str);
				break;
			case ITEM_VALUE_TYPE_LOG:
				if (ISSET_LOG(request->value.result))
				{
					log = GET_LOG_RESULT(request->value.result);
					zbx_free(log->value);
				}
				else
				{
					log = zbx_malloc(NULL, sizeof(zbx_log_t));
					memset(log, 0, sizeof(zbx_log_t));
					SET_LOG_RESULT(request->value.result, log);
				}
				log->value = value->data.str;
				break;
			case ITEM_VALUE_TYPE_UINT64:
				SET_UI64_RESULT(request->value.result, value->data.ui64);
				break;
			case ITEM_VALUE_TYPE_TEXT:
				SET_TEXT_RESULT(request->value.result, value->data.str);
				break;
		}

		zbx_variant_set_none(value);
	}
	else
	{
		zbx_free(request->value.error);
		request->value.error = zbx_dsprintf(NULL, "Value \"%s\" of type \"%s\" is not suitable for"
			" value type \"%s\"", zbx_variant_value_desc(value), zbx_variant_type_desc(value),
			zbx_item_value_type_string((zbx_item_value_type_t)request->value_type));

		request->value.state = ITEM_STATE_NOTSUPPORTED;
		ret = FAIL;
	}

out:
	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle preprocessing result                                       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t	*worker;
	zbx_preprocessing_request_t	*request;
	zbx_variant_t			value;
	char				*error;
	zbx_vector_ptr_t		history;
	zbx_preproc_history_t		*vault;
	zbx_list_item_t			*node;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	node = (zbx_list_item_t *)worker->task;
	request = (zbx_preprocessing_request_t *)node->data;

	zbx_vector_ptr_create(&history);
	zbx_preprocessor_unpack_result(&value, &history, &error, message->data);

	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
			&request->value.itemid)))
	{
		zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
	}

	if (0 != history.values_num)
	{
		if (NULL == vault)
		{
			zbx_preproc_history_t	history_local;

			history_local.itemid = request->value.itemid;
			vault = (zbx_preproc_history_t *)zbx_hashset_insert(&manager->history_cache, &history_local,
					sizeof(history_local));
			zbx_vector_ptr_create(&vault->history);
		}

		zbx_vector_ptr_append_array(&vault->history, history.values, history.values_num);
		zbx_vector_ptr_clear(&history);
	}
	else
	{
		if (NULL != vault)
		{
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_remove_direct(&manager->history_cache, vault);
		}
	}

	preprocessor_set_request_state_done(manager, (zbx_preprocessing_request_base_t *)request, worker->task);

	if (FAIL != preprocessor_set_variant_result(request, &value, error))
		preprocessor_enqueue_dependent_value(manager, &request->value);

	worker->task = NULL;
	zbx_variant_clear(&value);

	manager->preproc_num--;

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zbx_vector_ptr_destroy(&history);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle preprocessing result                                       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             itemid  - [IN] the item identifier                             *
 *             history - [IN] the new preprocessing history                   *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_update_history(zbx_preprocessing_manager_t *manager, zbx_uint64_t itemid,
		zbx_vector_ptr_t *history)
{
	zbx_preproc_history_t	*vault;

	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
			&itemid)))
	{
		zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
	}

	if (0 != history->values_num)
	{
		if (NULL == vault)
		{
			zbx_preproc_history_t	history_local;

			history_local.itemid = itemid;
			vault = (zbx_preproc_history_t *)zbx_hashset_insert(&manager->history_cache, &history_local,
					sizeof(history_local));
			zbx_vector_ptr_create(&vault->history);
		}

		zbx_vector_ptr_append_array(&vault->history, history->values, history->values_num);
		zbx_vector_ptr_clear(history);
	}
	else
	{
		if (NULL != vault)
		{
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_remove_direct(&manager->history_cache, vault);
		}
	}
}

static void	preprocessor_finalize_dep_results(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_dep_request_t *request, zbx_preprocessing_worker_t *worker)
{
	int	i;

	if (request->results_alloc != request->results_offset)
		return;

	for (i = 0; i < request->results_alloc; i++)
	{
		if (NULL == request->results[i].error)
		{
			preprocessor_update_history(manager, request->results[i].itemid, &request->results[i].history);

			preprocessor_enqueue_dependent(manager, request->hostid, request->results[i].itemid,
					&request->results[i].value, request->results[i].value_type, &request->ts);
		}

	}

	preprocessor_set_request_state_done(manager, (zbx_preprocessing_request_base_t *)request,
			(zbx_list_item_t *)worker->task);

	worker->task = NULL;

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle dependent item batch preprocessing result                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_process_dep_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t	*worker;
	zbx_preprocessing_dep_request_t	*request;
	zbx_list_item_t			*node;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	node = (zbx_list_item_t *)worker->task;
	request = (zbx_preprocessing_dep_request_t *)node->data;

	zbx_preprocessor_unpack_dep_result(&request->results_alloc, &request->results_offset, &request->results,
			message->data);

	preprocessor_finalize_dep_results(manager, request, worker);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle next dependent item batch preprocessing result             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_process_dep_result_cont(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t	*worker;
	zbx_preprocessing_dep_request_t	*request;
	zbx_list_item_t			*node;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	node = (zbx_list_item_t *)worker->task;
	request = (zbx_preprocessing_dep_request_t *)node->data;

	zbx_preprocessor_unpack_dep_result_cont(&request->results_offset, request->results + request->results_offset,
			message->data);

	preprocessor_finalize_dep_results(manager, request, worker);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle dependent item batch preprocessing result                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_next_dep_request(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client)
{
	zbx_preprocessing_worker_t	*worker;
	zbx_preprocessing_dep_request_t	*request;
	zbx_ipc_message_t		message;
	zbx_list_item_t			*node;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	node = (zbx_list_item_t *)worker->task;
	request = (zbx_preprocessing_dep_request_t *)node->data;

	if (SUCCEED == preprocessor_dep_request_next_message(request, &message))
	{
		zbx_ipc_client_send(client, message.code, message.data, message.size);
		zbx_ipc_message_clean(&message);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle preprocessing result                                       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 * Comments: Preprocessing testing results are directly forwarded to source   *
 *           client as they are.                                              *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_flush_test_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t		*worker;
	zbx_preprocessing_direct_request_t	*direct_request;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	direct_request = (zbx_preprocessing_direct_request_t *)worker->task;

	/* forward the response to the client */
	if (SUCCEED == zbx_ipc_client_connected(direct_request->client))
		zbx_ipc_client_send(direct_request->client, message->code, message->data, message->size);

	worker->task = NULL;
	preprocessor_free_direct_request(direct_request);

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static void	preprocessor_add_item_stats(zbx_uint64_t itemid, zbx_preprocessing_states_t state, zbx_hashset_t *items,
		int *total, int *queued, int *processing, int *done, int *pending)
{
#define ZBX_MAX_REQUEST_STATE_PRINT_LIMIT	25

	zbx_preproc_item_stats_t	*item;

	if (NULL == (item = zbx_hashset_search(items, &itemid)))
	{
		zbx_preproc_item_stats_t	item_local = {.itemid = itemid};

		item = zbx_hashset_insert(items, &item_local, sizeof(item_local));
	}

	switch(state)
	{
		case REQUEST_STATE_QUEUED:
			if (*queued < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
			{
				zabbix_log(LOG_LEVEL_DEBUG, "oldest queued itemid: " ZBX_FS_UI64
						" values:%d pos:%d", item->itemid, item->values_num, *total);
			}
			(*queued)++;
			break;
		case REQUEST_STATE_PROCESSING:
			if (*processing < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
			{
				zabbix_log(LOG_LEVEL_DEBUG, "oldest processing itemid: " ZBX_FS_UI64
						" values:%d pos:%d", item->itemid, item->values_num, *total);
			}
			(*processing)++;
			break;
		case REQUEST_STATE_DONE:
			if (*done < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
			{
				zabbix_log(LOG_LEVEL_DEBUG, "oldest done itemid: " ZBX_FS_UI64
						" values:%d pos:%d", item->itemid, item->values_num, *total);
			}
			(*done)++;
			break;
		case REQUEST_STATE_PENDING:
			if (*pending < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
			{
				zabbix_log(LOG_LEVEL_DEBUG, "oldest pending itemid: " ZBX_FS_UI64
						" values:%d pos:%d", item->itemid, item->values_num, *total);
			}
			(*pending)++;
			break;
	}

	item->values_num++;
	(*total)++;

#undef ZBX_MAX_REQUEST_STATE_PRINT_LIMIT
}

static	void	preprocessor_get_items_totals(zbx_preprocessing_manager_t *manager, int *total, int *queued,
		int *processing, int *done, int *pending)
{

	zbx_list_iterator_t			iterator;
	zbx_hashset_t				items;
	zbx_preprocessing_request_base_t	*base;
	zbx_preprocessing_request_t		*request;
	zbx_preprocessing_dep_request_t		*dep_request;
	zbx_preproc_item_t			*master_item;

	*total = 0;
	*queued = 0;
	*processing = 0;
	*done = 0;
	*pending = 0;

	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&base);

		switch (base->kind)
		{
			case ZBX_PREPROC_ITEM:
				request = (zbx_preprocessing_request_t *)base;
				preprocessor_add_item_stats(request->value.itemid, base->state, &items, total, queued,
						processing, done, pending);
				break;
			case ZBX_PREPROC_DEPS:
				dep_request = (zbx_preprocessing_dep_request_t *)base;
				if (NULL != (master_item = (zbx_preproc_item_t *)zbx_hashset_search(
						&manager->item_config, &dep_request->master_itemid)))
				{
					int	i;

					for (i = 0; i < master_item->dep_itemids_num; i++)
					{
						preprocessor_add_item_stats(master_item->dep_itemids[i].first,
								base->state, &items, total, queued, processing, done,
								pending);

					}
				}
				break;
		}
	}

	zbx_hashset_destroy(&items);
#undef ZBX_MAX_REQUEST_STATE_PRINT_LIMIT
}

/******************************************************************************
 *                                                                            *
 * Purpose: return diagnostic statistics                                      *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_get_diag_stats(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client)
{
	unsigned char	*data;
	zbx_uint32_t	data_len;
	int		total, queued, processing, done, pending;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	preprocessor_get_items_totals(manager, &total, &queued, &processing, &done, &pending);

	data_len = zbx_preprocessor_pack_diag_stats(&data, total, queued, processing, done, pending);
	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: compare item statistics by value                                  *
 *                                                                            *
 ******************************************************************************/
static int	preproc_sort_item_by_values_desc(const void *d1, const void *d2)
{
	const zbx_preproc_item_stats_t	*i1 = *(const zbx_preproc_item_stats_t * const *)d1;
	const zbx_preproc_item_stats_t	*i2 = *(const zbx_preproc_item_stats_t * const *)d2;

	return i2->values_num - i1->values_num;
}

static void	preprocessor_add_item_view(zbx_preprocessing_manager_t *manager, zbx_uint64_t itemid,
		zbx_hashset_t *items, zbx_vector_ptr_t *view)
{
	zbx_preproc_item_stats_t	*item;

	if (NULL == (item = zbx_hashset_search(items, &itemid)))
	{
		zbx_preproc_item_stats_t	item_local = {.itemid = itemid};
		zbx_preproc_item_t		*child;

		item = zbx_hashset_insert(items, &item_local, sizeof(item_local));

		if (NULL != (child = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &itemid)))
			item->steps_num = child->preproc_ops_num;

		zbx_vector_ptr_append(view, item);
	}
	item->values_num++;
}

static	void	preprocessor_get_items_view(zbx_preprocessing_manager_t *manager, zbx_hashset_t *items,
		zbx_vector_ptr_t *view)
{
	zbx_list_iterator_t			iterator;
	zbx_preprocessing_request_base_t	*base;
	zbx_preprocessing_request_t		*request;
	zbx_preprocessing_dep_request_t		*dep_request;
	zbx_preproc_item_t			*master_item;

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&base);

		switch (base->kind)
		{
			case ZBX_PREPROC_ITEM:
				request = (zbx_preprocessing_request_t *)base;
				preprocessor_add_item_view(manager, request->value.itemid, items, view);
				break;
			case ZBX_PREPROC_DEPS:
				dep_request = (zbx_preprocessing_dep_request_t *)base;
				if (NULL != (master_item = (zbx_preproc_item_t *)zbx_hashset_search(
						&manager->item_config, &dep_request->master_itemid)))
				{
					int	i;

					for (i = 0; i < master_item->dep_itemids_num; i++)
					{
						preprocessor_add_item_view(manager, master_item->dep_itemids[i].first,
								items, view);
					}
				}
				break;
		}
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: return diagnostic top view                                        *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] the message with request                        *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_get_top_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	int			limit;
	unsigned char		*data;
	zbx_uint32_t		data_len;
	zbx_hashset_t		items;
	zbx_vector_ptr_t	view;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_preprocessor_unpack_top_request(&limit, message->data);

	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_ptr_create(&view);

	preprocessor_get_items_view(manager, &items, &view);

	zbx_vector_ptr_sort(&view, preproc_sort_item_by_values_desc);

	data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view.values,
			MIN(limit, view.values_num));
	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
	zbx_free(data);

	zbx_vector_ptr_destroy(&view);
	zbx_hashset_destroy(&items);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static void	preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	int			limit, i;
	unsigned char		*data;
	zbx_uint32_t		data_len;
	zbx_hashset_t		items;
	zbx_vector_ptr_t	view, view_preproc;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_preprocessor_unpack_top_request(&limit, message->data);

	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_ptr_create(&view);
	zbx_vector_ptr_create(&view_preproc);
	zbx_vector_ptr_reserve(&view_preproc, (size_t)limit);

	preprocessor_get_items_view(manager, &items, &view);

	for (i = 0; i < view.values_num && 0 < limit; i++)
	{
		zbx_preproc_item_stats_t	*item;

		item = (zbx_preproc_item_stats_t *)view.values[i];

		/* only items with preprocessing can slow down queue */
		if (0 == item->steps_num)
			continue;

		zbx_vector_ptr_append(&view_preproc, item);
		limit--;
	}

	data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view_preproc.values,
			MIN(limit, view_preproc.values_num));
	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
	zbx_free(data);

	zbx_vector_ptr_destroy(&view_preproc);
	zbx_vector_ptr_destroy(&view);
	zbx_hashset_destroy(&items);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static zbx_hash_t	preproc_item_link_hash(const void *d)
{
	const zbx_item_link_t	*link = (const zbx_item_link_t *)d;
	zbx_hash_t		hash;
	unsigned char		kind = link->kind;

	hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&link->itemid);
	return ZBX_DEFAULT_STRING_HASH_ALGO(&kind, 1, hash);
}

static int	preproc_item_link_compare(const void *d1, const void *d2)
{
	const zbx_item_link_t	*l1 = (const zbx_item_link_t *)d1;
	const zbx_item_link_t	*l2 = (const zbx_item_link_t *)d2;

	ZBX_RETURN_IF_NOT_EQUAL(l1->itemid, l2->itemid);

	return (int)l1->kind - (int)l2->kind;
}

/******************************************************************************
 *                                                                            *
 * Purpose: initializes preprocessing manager                                 *
 *                                                                            *
 * Parameters: manager - [IN] the manager to initialize                       *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_init_manager(zbx_preprocessing_manager_t *manager)
{
	zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers: %d", __func__, CONFIG_PREPROCESSOR_FORKS);

	memset(manager, 0, sizeof(zbx_preprocessing_manager_t));

	manager->workers = (zbx_preprocessing_worker_t *)zbx_calloc(NULL, (size_t)CONFIG_PREPROCESSOR_FORKS,
			sizeof(zbx_preprocessing_worker_t));
	zbx_list_create(&manager->queue);
	zbx_list_create(&manager->direct_queue);
	zbx_hashset_create_ext(&manager->item_config, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
			(zbx_clean_func_t)preproc_item_clear,
			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
	zbx_hashset_create(&manager->linked_items, 0, preproc_item_link_hash, preproc_item_link_compare);
	zbx_hashset_create(&manager->history_cache, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC,
			ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: registers preprocessing worker                                    *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected preprocessing worker              *
 *             message - [IN] message received by preprocessing manager       *
 *                                                                            *
 ******************************************************************************/
static void preprocessor_register_worker(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t	*worker = NULL;
	pid_t				ppid;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	memcpy(&ppid, message->data, sizeof(ppid));

	if (ppid != getppid())
	{
		zbx_ipc_client_close(client);
		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
	}
	else
	{
		if (CONFIG_PREPROCESSOR_FORKS == manager->worker_count)
		{
			THIS_SHOULD_NEVER_HAPPEN;
			exit(EXIT_FAILURE);
		}

		worker = (zbx_preprocessing_worker_t *)&manager->workers[manager->worker_count++];
		worker->client = client;

		preprocessor_assign_tasks(manager);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: destroy preprocessing manager                                     *
 *                                                                            *
 * Parameters: manager - [IN] the manager to destroy                          *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_destroy_manager(zbx_preprocessing_manager_t *manager)
{
	zbx_preprocessing_direct_request_t	*direct_request;
	zbx_preprocessing_request_base_t	*base;

	zbx_free(manager->workers);

	/* this is the place where values are lost */
	while (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
		preprocessor_free_direct_request(direct_request);

	zbx_list_destroy(&manager->direct_queue);

	while (SUCCEED == zbx_list_pop(&manager->queue, (void **)&base))
		preprocessor_free_request(base);

	zbx_list_destroy(&manager->queue);

	zbx_hashset_destroy(&manager->item_config);
	zbx_hashset_destroy(&manager->linked_items);
	zbx_hashset_destroy(&manager->history_cache);
}

ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
{
	zbx_ipc_service_t		service;
	char				*error = NULL;
	zbx_ipc_client_t		*client;
	zbx_ipc_message_t		*message;
	zbx_preprocessing_manager_t	manager;
	int				ret;
	double				time_stat, time_idle = 0, time_now, time_flush, sec;
	zbx_timespec_t			timeout = {ZBX_PREPROCESSING_MANAGER_DELAY, 0};

#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */

	process_type = ((zbx_thread_args_t *)args)->process_type;
	server_num = ((zbx_thread_args_t *)args)->server_num;
	process_num = ((zbx_thread_args_t *)args)->process_num;

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
			server_num, get_process_type_string(process_type), process_num);

	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);

	if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	preprocessor_init_manager(&manager);

	/* initialize statistics */
	time_stat = zbx_time();
	time_flush = time_stat;

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();

		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
					ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
					get_process_type_string(process_type), process_num,
					manager.queued_num, manager.processed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			manager.processed_num = 0;
		}

		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
		ret = zbx_ipc_service_recv(&service, &timeout, &client, &message);
		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
		sec = zbx_time();
		zbx_update_env(sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_PREPROCESSOR_WORKER:
					preprocessor_register_worker(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_REQUEST:
					preprocessor_add_request(&manager, message);
					break;
				case ZBX_IPC_PREPROCESSOR_RESULT:
					preprocessor_add_result(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_DEP_NEXT:
					preprocessor_next_dep_request(&manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_DEP_RESULT:
					preprocessor_process_dep_result(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_DEP_RESULT_CONT:
					preprocessor_process_dep_result_cont(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_QUEUE:
					zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
							sizeof(zbx_uint64_t));
					break;
				case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
					preprocessor_add_test_request(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_TEST_RESULT:
					preprocessor_flush_test_result(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
					preprocessor_get_diag_stats(&manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_TOP_ITEMS:
					preprocessor_get_top_items(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEMS:
					preprocessor_get_oldest_preproc_items(&manager, client, message);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);

		if (0 == manager.preproc_num || 1 < time_now - time_flush)
		{
			dc_flush_history();
			time_flush = time_now;
		}
	}

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);

	zbx_ipc_service_close(&service);
	preprocessor_destroy_manager(&manager);
#undef STAT_INTERVAL
}