/*
** Zabbix
** Copyright (C) 2001-2022 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "common.h"

#include "dbcache.h"
#include "daemon.h"
#include "zbxself.h"
#include "log.h"
#include "zbxserver.h"
#include "sysinfo.h"
#include "zbxserialize.h"
#include "zbxipcservice.h"
#include "zbxlld.h"

#include "preprocessing.h"
#include "preproc_manager.h"
#include "zbxalgo.h"
#include "../../libs/zbxalgo/vectorimpl.h"
#include "preproc_history.h"

extern unsigned char	process_type, program_type;
extern int		server_num, process_num, CONFIG_PREPROCESSOR_FORKS;

#define ZBX_PREPROCESSING_MANAGER_DELAY	1

#define ZBX_PREPROC_PRIORITY_NONE	0
#define ZBX_PREPROC_PRIORITY_FIRST	1

typedef enum
{
	REQUEST_STATE_QUEUED		= 0,		/* requires preprocessing */
	REQUEST_STATE_PROCESSING	= 1,		/* is being preprocessed  */
	REQUEST_STATE_DONE		= 2,		/* value is set, waiting for flush */
	REQUEST_STATE_PENDING		= 3		/* value requires preprocessing, */
							/* but is waiting on other request to complete */
}
zbx_preprocessing_states_t;

typedef struct preprocessing_request zbx_preprocessing_request_t;

ZBX_PTR_VECTOR_DECL(preprocessing_request, zbx_preprocessing_request_t *)
ZBX_PTR_VECTOR_IMPL(preprocessing_request, zbx_preprocessing_request_t *)

/* preprocessing request */
struct preprocessing_request
{
	zbx_preprocessing_states_t		state;		/* request state */
	zbx_preprocessing_request_t		*pending;	/* the request waiting on this request to complete */
	zbx_preproc_item_value_t		value;		/* unpacked item value */
	zbx_preproc_op_t			*steps;		/* preprocessing steps */
	int					steps_num;	/* number of preprocessing steps */
	unsigned char				value_type;	/* value type from configuration */
								/* at the beginning of preprocessing queue */
	zbx_vector_preprocessing_request_t	flush_queue;	/* processed request waiting to be flushed */
};

/* preprocessing worker data */
typedef struct
{
	zbx_ipc_client_t	*client;	/* the connected preprocessing worker client */
	void			*task;		/* the current task data */
}
zbx_preprocessing_worker_t;

/* item link index */
typedef struct
{
	zbx_uint64_t		itemid;		/* item id */
	zbx_list_item_t		*queue_item;	/* queued item */
}
zbx_item_link_t;

/* direct request to be forwarded to worker, bypassing the preprocessing queue */
typedef struct
{
	zbx_ipc_client_t	*client;	/* the IPC client sending forward message to worker */
	zbx_ipc_message_t	message;
}
zbx_preprocessing_direct_request_t;

/* preprocessing manager data */
typedef struct
{
	zbx_preprocessing_worker_t	*workers;	/* preprocessing worker array */
	int				worker_count;	/* preprocessing worker count */
	zbx_list_t			queue;		/* queue of item values */
	zbx_hashset_t			item_config;	/* item configuration L2 cache */
	zbx_hashset_t			history_cache;	/* item value history cache */
	zbx_hashset_t			linked_items;	/* linked items placed in queue */
	int				cache_ts;	/* cache timestamp */
	zbx_uint64_t			processed_num;	/* processed value counter */
	zbx_uint64_t			queued_num;	/* queued value counter */
	zbx_uint64_t			preproc_num;	/* queued values with preprocessing steps */
	zbx_list_iterator_t		priority_tail;	/* iterator to the last queued priority item */

	zbx_list_t			direct_queue;	/* Queue of external requests that have to be */
							/* forwarded to workers for preprocessing.    */
}
zbx_preprocessing_manager_t;

static void	preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
		zbx_preproc_item_value_t *source_value, zbx_list_item_t *master);

/* cleanup functions */

static void	preproc_item_clear(zbx_preproc_item_t *item)
{
	int	i;

	zbx_free(item->dep_itemids);

	for (i = 0; i < item->preproc_ops_num; i++)
	{
		zbx_free(item->preproc_ops[i].params);
		zbx_free(item->preproc_ops[i].error_handler_params);
	}

	zbx_free(item->preproc_ops);
}

static void	request_free_steps(zbx_preprocessing_request_t *request)
{
	while (0 < request->steps_num)
	{
		request->steps_num--;
		zbx_free(request->steps[request->steps_num].params);
		zbx_free(request->steps[request->steps_num].error_handler_params);
	}

	zbx_free(request->steps);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_sync_configuration                                  *
 *                                                                            *
 * Purpose: synchronize preprocessing manager with configuration cache data   *
 *                                                                            *
 * Parameters: manager - [IN] the manager to be synchronized                  *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_sync_configuration(zbx_preprocessing_manager_t *manager)
{
	zbx_hashset_iter_t	iter;
	int			ts;
	zbx_preproc_history_t	*vault;
	zbx_preproc_item_t	*item;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	ts = manager->cache_ts;
	DCconfig_get_preprocessable_items(&manager->item_config, &manager->cache_ts);

	if (ts != manager->cache_ts)
	{
		/* drop items with removed preprocessing steps from preprocessing history cache */
		zbx_hashset_iter_reset(&manager->history_cache, &iter);
		while (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_iter_next(&iter)))
		{
			if (NULL != zbx_hashset_search(&manager->item_config, &vault->itemid))
				continue;

			zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_iter_remove(&iter);
		}

		/* reset preprocessing history for an item if its preprocessing step was modified */
		zbx_hashset_iter_reset(&manager->item_config, &iter);
		while (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_iter_next(&iter)))
		{
			if (ts >= item->update_time)
				continue;

			if (NULL == (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
					&item->itemid)))
			{
				continue;
			}

			zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_remove_direct(&manager->history_cache, vault);
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size: %d, history cache size: %d", __func__,
			manager->item_config.num_data, manager->history_cache.num_data);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_create_task                                         *
 *                                                                            *
 * Purpose: create preprocessing task for request                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             request - [IN] preprocessing request                           *
 *             task    - [OUT] preprocessing task data                        *
 *                                                                            *
 ******************************************************************************/
static zbx_uint32_t	preprocessor_create_task(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_request_t *request, unsigned char **task)
{
	zbx_variant_t		value;
	zbx_preproc_history_t	*vault;
	zbx_vector_ptr_t	*phistory;

	if (ISSET_LOG(request->value.result_ptr->result))
		zbx_variant_set_str(&value, request->value.result_ptr->result->log->value);
	else if (ISSET_UI64(request->value.result_ptr->result))
		zbx_variant_set_ui64(&value, request->value.result_ptr->result->ui64);
	else if (ISSET_DBL(request->value.result_ptr->result))
		zbx_variant_set_dbl(&value, request->value.result_ptr->result->dbl);
	else if (ISSET_STR(request->value.result_ptr->result))
		zbx_variant_set_str(&value, request->value.result_ptr->result->str);
	else if (ISSET_TEXT(request->value.result_ptr->result))
		zbx_variant_set_str(&value, request->value.result_ptr->result->text);
	else
		THIS_SHOULD_NEVER_HAPPEN;

	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
				&request->value.itemid)))
	{
		phistory = &vault->history;
	}
	else
		phistory = NULL;


	return zbx_preprocessor_pack_task(task, request->value.itemid, request->value_type, request->value.ts, &value,
			phistory, request->steps, request->steps_num);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_set_request_state_done                              *
 *                                                                            *
 * Purpose: set request state to done and handle linked items                 *
 *                                                                            *
 * Parameters: manager    - [IN] preprocessing manager                        *
 *             request    - [IN] preprocessing request                        *
 *             queue_item - [IN/OUT] in - queued item                         *
 *                                   out - new position in queue              *
 *                                                                            *
 ******************************************************************************/
static	void	preprocessor_set_request_state_done(zbx_preprocessing_manager_t *manager,
		zbx_preprocessing_request_t *request, zbx_list_item_t **queue_item)
{
	zbx_item_link_t			*index;
	zbx_list_iterator_t		iterator, next_iterator;
	zbx_preprocessing_request_t	*prev;

	request->state = REQUEST_STATE_DONE;

	/* value processed - the pending value can now be processed */
	if (NULL != request->pending)
		request->pending->state = REQUEST_STATE_QUEUED;

	if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &request->value.itemid)) &&
			*queue_item == index->queue_item)
	{
		zbx_hashset_remove_direct(&manager->linked_items, index);
	}

	if (NULL == manager->queue.head)
		return;

	zbx_list_iterator_init(&manager->queue, &iterator);
	if (iterator.next == *queue_item)
		return;

	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		if (iterator.next == *queue_item)
			break;
	}

	*queue_item = iterator.current;

	prev = (zbx_preprocessing_request_t *)iterator.current->data;
	zbx_vector_preprocessing_request_append(&prev->flush_queue, request);

	next_iterator = iterator;
	if (SUCCEED == zbx_list_iterator_next(&next_iterator))
	{
		if (SUCCEED == zbx_list_iterator_equal(&next_iterator, &manager->priority_tail))
			manager->priority_tail = iterator;
	}

	(void)zbx_list_iterator_remove_next(&iterator);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_get_next_task                                       *
 *                                                                            *
 * Purpose: gets next task to be sent to worker                               *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [OUT] the serialized task to be sent                 *
 *                                                                            *
 * Return value: pointer to the task object                                   *
 *                                                                            *
 ******************************************************************************/
static void	*preprocessor_get_next_task(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
{
	zbx_list_iterator_t			iterator;
	zbx_preprocessing_request_t		*request = NULL;
	void					*task = NULL;
	zbx_preprocessing_direct_request_t	*direct_request;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
	{
		*message = direct_request->message;
		zbx_ipc_message_init(&direct_request->message);
		task = direct_request;
		goto out;
	}

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&request);

		if (REQUEST_STATE_QUEUED != request->state)
			continue;

		if (ITEM_STATE_NOTSUPPORTED == request->value.state)
		{
			zbx_preproc_history_t	*vault;
			zbx_list_item_t		*node = iterator.current;

			if (NULL != (vault = (zbx_preproc_history_t *) zbx_hashset_search(&manager->history_cache,
					&request->value.itemid)))
			{
				zbx_vector_ptr_clear_ext(&vault->history,
						(zbx_clean_func_t) zbx_preproc_op_history_free);
				zbx_vector_ptr_destroy(&vault->history);
				zbx_hashset_remove_direct(&manager->history_cache, vault);
			}

			preprocessor_set_request_state_done(manager, request, &node);
			continue;
		}

		task = iterator.current;
		request->state = REQUEST_STATE_PROCESSING;
		message->code = ZBX_IPC_PREPROCESSOR_REQUEST;
		message->size = preprocessor_create_task(manager, request, &message->data);
		request_free_steps(request);
		break;
	}
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return task;
}

static void	preproc_item_result_free(zbx_preproc_item_value_t *value)
{
	if (0 == --(value->result_ptr->refcount))
	{
		if (NULL != value->result_ptr->result)
		{
			free_result(value->result_ptr->result);
			zbx_free(value->result_ptr->result);
		}
		zbx_free(value->result_ptr);
	}
	else
		value->result_ptr = NULL;
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_get_worker_by_client                                *
 *                                                                            *
 * Purpose: get worker data by IPC client                                     *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *                                                                            *
 * Return value: pointer to the worker data                                   *
 *                                                                            *
 ******************************************************************************/
static zbx_preprocessing_worker_t	*preprocessor_get_worker_by_client(zbx_preprocessing_manager_t *manager,
		zbx_ipc_client_t *client)
{
	int				i;
	zbx_preprocessing_worker_t	*worker = NULL;

	for (i = 0; i < manager->worker_count; i++)
	{
		if (client == manager->workers[i].client)
		{
			worker = &manager->workers[i];
			break;
		}
	}

	if (NULL == worker)
	{
		THIS_SHOULD_NEVER_HAPPEN;
		exit(EXIT_FAILURE);
	}

	return worker;
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_get_free_worker                                     *
 *                                                                            *
 * Purpose: get worker without active preprocessing task                      *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *                                                                            *
 * Return value: pointer to the worker data or NULL if none                   *
 *                                                                            *
 ******************************************************************************/
static zbx_preprocessing_worker_t	*preprocessor_get_free_worker(zbx_preprocessing_manager_t *manager)
{
	int	i;

	for (i = 0; i < manager->worker_count; i++)
	{
		if (NULL == manager->workers[i].task)
			return &manager->workers[i];
	}

	return NULL;
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_assign_tasks                                        *
 *                                                                            *
 * Purpose: assign available queued preprocessing tasks to free workers       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_assign_tasks(zbx_preprocessing_manager_t *manager)
{
	zbx_preprocessing_worker_t	*worker;
	void				*data;
	zbx_ipc_message_t		message;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	while (NULL != (worker = preprocessor_get_free_worker(manager)) &&
			NULL != (data = preprocessor_get_next_task(manager, &message)))
	{
		if (FAIL == zbx_ipc_client_send(worker->client, message.code, message.data, message.size))
		{
			zabbix_log(LOG_LEVEL_CRIT, "cannot send data to preprocessing worker");
			exit(EXIT_FAILURE);
		}

		worker->task = data;
		zbx_ipc_message_clean(&message);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preproc_item_value_clear                                         *
 *                                                                            *
 * Purpose: frees resources allocated by preprocessor item value              *
 *                                                                            *
 * Parameters: value - [IN] value to be freed                                 *
 *                                                                            *
 ******************************************************************************/
static void	preproc_item_value_clear(zbx_preproc_item_value_t *value)
{
	zbx_free(value->error);
	preproc_item_result_free(value);
	zbx_free(value->ts);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_free_request                                        *
 *                                                                            *
 * Purpose: free preprocessing request                                        *
 *                                                                            *
 * Parameters: request - [IN] request data to be freed                        *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_free_request(zbx_preprocessing_request_t *request)
{
	zbx_vector_preprocessing_request_clear_ext(&request->flush_queue, preprocessor_free_request);
	zbx_vector_preprocessing_request_destroy(&request->flush_queue);

	preproc_item_value_clear(&request->value);
	request_free_steps(request);
	zbx_free(request);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_free_direct_request                                 *
 *                                                                            *
 * Purpose: free preprocessing direct request                                 *
 *                                                                            *
 * Parameters: forward - [IN] forward data to be freed                        *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_free_direct_request(zbx_preprocessing_direct_request_t *direct_request)
{
	zbx_ipc_client_release(direct_request->client);
	zbx_ipc_message_clean(&direct_request->message);
	zbx_free(direct_request);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_flush_value                                         *
 *                                                                            *
 * Purpose: add new value to the local history cache or send to LLD manager   *
 *                                                                            *
 * Parameters: value - [IN] value to be added or sent                         *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_flush_value(const zbx_preproc_item_value_t *value)
{
	if (0 == (value->item_flags & ZBX_FLAG_DISCOVERY_RULE) || 0 == (program_type & ZBX_PROGRAM_TYPE_SERVER))
	{
		dc_add_history(value->itemid, value->item_value_type, value->item_flags, value->result_ptr->result,
				value->ts, value->state, value->error);
	}
	else
		zbx_lld_process_agent_result(value->itemid, value->hostid, value->result_ptr->result, value->ts,
				value->error);
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush preprocessing request and all requests waiting on it        *
 *                                                                            *
 ******************************************************************************/
static void	preprocessing_flush_request(zbx_preprocessing_manager_t *manager, zbx_preprocessing_request_t *request)
{
	int	i;

	preprocessor_flush_value(&request->value);

	manager->processed_num++;
	manager->queued_num--;

	for (i = 0; i < request->flush_queue.values_num; i++)
		preprocessing_flush_request(manager, request->flush_queue.values[i]);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessing_flush_queue                                        *
 *                                                                            *
 * Purpose: add all sequential processed values from beginning of the queue   *
 *          to the local history cache                                        *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *                                                                            *
 ******************************************************************************/
static void	preprocessing_flush_queue(zbx_preprocessing_manager_t *manager)
{
	zbx_preprocessing_request_t	*request;
	zbx_list_iterator_t		iterator;

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&request);

		if (REQUEST_STATE_DONE != request->state)
			break;

		preprocessing_flush_request(manager, request);
		preprocessor_free_request(request);

		if (SUCCEED == zbx_list_iterator_equal(&iterator, &manager->priority_tail))
			zbx_list_iterator_clear(&manager->priority_tail);

		zbx_list_pop(&manager->queue, NULL);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_link_items                                          *
 *                                                                            *
 * Purpose: create relation between item values within value queue            *
 *                                                                            *
 * Parameters: manager     - [IN] preprocessing manager                       *
 *             enqueued_at - [IN] position in value queue                     *
 *             item        - [IN] item configuration data                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_link_items(zbx_preprocessing_manager_t *manager, zbx_list_item_t *enqueued_at,
		zbx_preproc_item_t *item)
{
	int				i;
	zbx_preprocessing_request_t	*request, *dep_request;
	zbx_item_link_t			*index, index_local;
	zbx_preproc_op_t		*op;

	for (i = 0; i < item->preproc_ops_num; i++)
	{
		op = &item->preproc_ops[i];

		if (ZBX_PREPROC_DELTA_VALUE == op->type || ZBX_PREPROC_DELTA_SPEED == op->type)
			break;

		if (ZBX_PREPROC_THROTTLE_VALUE == op->type || ZBX_PREPROC_THROTTLE_TIMED_VALUE == op->type)
			break;
	}

	if (i != item->preproc_ops_num)
	{
		/* existing linked item*/
		if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &item->itemid)))
		{
			dep_request = (zbx_preprocessing_request_t *)(enqueued_at->data);
			request = (zbx_preprocessing_request_t *)(index->queue_item->data);

			if (REQUEST_STATE_DONE != request->state)
			{
				request->pending = dep_request;
				dep_request->state = REQUEST_STATE_PENDING;
			}

			index->queue_item = enqueued_at;
		}
		else
		{
			index_local.itemid = item->itemid;
			index_local.queue_item = enqueued_at;

			zbx_hashset_insert(&manager->linked_items, &index_local, sizeof(zbx_item_link_t));
		}
	}
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_copy_value                                          *
 *                                                                            *
 * Purpose: create a copy of existing item value                              *
 *                                                                            *
 * Parameters: target  - [OUT] created copy                                   *
 *             source  - [IN]  value to be copied                             *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_copy_value(zbx_preproc_item_value_t *target, zbx_preproc_item_value_t *source)
{
	memcpy(target, source, sizeof(zbx_preproc_item_value_t));

	if (NULL != source->error)
		target->error = zbx_strdup(NULL, source->error);

	if (NULL != source->ts)
	{
		target->ts = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
		memcpy(target->ts, source->ts, sizeof(zbx_timespec_t));
	}
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_enqueue                                             *
 *                                                                            *
 * Purpose: enqueue preprocessing request                                     *
 *                                                                            *
 * Parameters: manage   - [IN] preprocessing manager                          *
 *             value    - [IN] item value                                     *
 *             master   - [IN] request should be enqueued after this item     *
 *                             (NULL for the end of the queue)                *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_enqueue(zbx_preprocessing_manager_t *manager, zbx_preproc_item_value_t *value,
		zbx_list_item_t *master)
{
	zbx_preprocessing_request_t	*request;
	zbx_preproc_item_t		*item, item_local;
	zbx_list_item_t			*enqueued_at;
	int				i;
	zbx_preprocessing_states_t	state;
	unsigned char			priority = ZBX_PREPROC_PRIORITY_NONE;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, value->itemid);

	item_local.itemid = value->itemid;
	item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local);

	/* override priority based on item type */
	if (NULL != item && ITEM_TYPE_INTERNAL == item->type)
		priority = ZBX_PREPROC_PRIORITY_FIRST;

	if (NULL == item || 0 == item->preproc_ops_num || (ITEM_STATE_NOTSUPPORTED != value->state &&
			(NULL == value->result_ptr->result || 0 == ISSET_VALUE(value->result_ptr->result))))
	{
		state = REQUEST_STATE_DONE;

		if (NULL == manager->queue.head)
		{
			/* queue is empty and item is done, it can be flushed */
			preprocessor_flush_value(value);
			manager->processed_num++;
			preprocessor_enqueue_dependent(manager, value, NULL);
			preproc_item_value_clear(value);

			goto out;
		}
	}
	else
		state = REQUEST_STATE_QUEUED;

	request = (zbx_preprocessing_request_t *)zbx_malloc(NULL, sizeof(zbx_preprocessing_request_t));
	memset(request, 0, sizeof(zbx_preprocessing_request_t));
	zbx_vector_preprocessing_request_create(&request->flush_queue);
	memcpy(&request->value, value, sizeof(zbx_preproc_item_value_t));
	request->state = state;

	if (REQUEST_STATE_QUEUED == state && ITEM_STATE_NOTSUPPORTED != value->state)
	{
		request->value_type = item->value_type;
		request->steps = (zbx_preproc_op_t *)zbx_malloc(NULL, sizeof(zbx_preproc_op_t) * item->preproc_ops_num);
		request->steps_num = item->preproc_ops_num;

		for (i = 0; i < item->preproc_ops_num; i++)
		{
			request->steps[i].type = item->preproc_ops[i].type;
			request->steps[i].params = zbx_strdup(NULL, item->preproc_ops[i].params);
			request->steps[i].error_handler = item->preproc_ops[i].error_handler;
			request->steps[i].error_handler_params = zbx_strdup(NULL,
					item->preproc_ops[i].error_handler_params);
		}

		manager->preproc_num++;
	}

	/* priority items are enqueued at the beginning of the line */
	if (NULL == master && ZBX_PREPROC_PRIORITY_FIRST == priority)
	{
		if (SUCCEED == zbx_list_iterator_isset(&manager->priority_tail))
		{
			/* insert after the last internal item */
			zbx_list_insert_after(&manager->queue, manager->priority_tail.current, request, &enqueued_at);
			zbx_list_iterator_update(&manager->priority_tail);
		}
		else
		{
			/* no internal items in queue, insert at the beginning */
			zbx_list_prepend(&manager->queue, request, &enqueued_at);
			zbx_list_iterator_init(&manager->queue, &manager->priority_tail);
		}

		zbx_list_iterator_next(&manager->priority_tail);
	}
	else
	{
		zbx_list_insert_after(&manager->queue, master, request, &enqueued_at);
		zbx_list_iterator_update(&manager->priority_tail);

		/* move internal item tail position if we are inserting after last internal item */
		if (NULL != master && master == manager->priority_tail.current)
			zbx_list_iterator_next(&manager->priority_tail);
	}

	if (REQUEST_STATE_QUEUED == request->state)
		preprocessor_link_items(manager, enqueued_at, item);

	/* if no preprocessing is needed, dependent items are enqueued */
	if (REQUEST_STATE_DONE == request->state)
		preprocessor_enqueue_dependent(manager, value, enqueued_at);

	manager->queued_num++;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_enqueue_dependent                                   *
 *                                                                            *
 * Purpose: enqueue dependent items (if any)                                  *
 *                                                                            *
 * Parameters: manager      - [IN] preprocessing manager                      *
 *             source_value - [IN] master item value                          *
 *             master       - [IN] dependent item should be enqueued after    *
 *                                 this item                                  *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
		zbx_preproc_item_value_t *source_value, zbx_list_item_t *master)
{
	int				i;
	zbx_preproc_item_t		*item, item_local;
	zbx_preproc_item_value_t	value;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, source_value->itemid);

	if (NULL != source_value->result_ptr->result && ISSET_VALUE(source_value->result_ptr->result))
	{
		item_local.itemid = source_value->itemid;
		if (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local)) &&
				0 != item->dep_itemids_num)
		{
			/* result is shared between all dependent items, new result will be created after preprocessing */
			source_value->result_ptr->refcount += item->dep_itemids_num;

			for (i = item->dep_itemids_num - 1; i >= 0; i--)
			{
				preprocessor_copy_value(&value, source_value);
				value.itemid = item->dep_itemids[i].first;
				value.item_flags = item->dep_itemids[i].second;
				preprocessor_enqueue(manager, &value, master);
			}

			preprocessor_assign_tasks(manager);
			preprocessing_flush_queue(manager);
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_add_request                                         *
 *                                                                            *
 * Purpose: handle new preprocessing request                                  *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
{
	zbx_uint32_t			offset = 0;
	zbx_preproc_item_value_t	value;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	preprocessor_sync_configuration(manager);

	while (offset < message->size)
	{
		offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
		preprocessor_enqueue(manager, &value, NULL);
	}

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_add_test_request                                    *
 *                                                                            *
 * Purpose: handle new preprocessing test request                             *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             message - [IN] packed preprocessing request                    *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_test_request(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_direct_request_t	*direct_request;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_ipc_client_addref(client);
	direct_request = zbx_malloc(NULL, sizeof(zbx_preprocessing_direct_request_t));
	direct_request->client = client;
	zbx_ipc_message_copy(&direct_request->message, message);
	zbx_list_append(&manager->direct_queue, direct_request, NULL);

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: create_result_with_meta                                          *
 *                                                                            *
 * Purpose: create new result and copy meta information from previous result  *
 *                                                                            *
 * Parameters: result_old - [IN] result that can contain meta information     *
 *                                                                            *
 * Return value: pointer newly allocated result                               *
 *                                                                            *
 ******************************************************************************/
static AGENT_RESULT	*create_result_with_meta(const AGENT_RESULT *result_old)
{
	AGENT_RESULT	*result;

	result = zbx_malloc(NULL, sizeof(AGENT_RESULT));

	init_result(result);

	if (0 == ISSET_META(result_old))
		return result;

	result->type = AR_META;
	result->lastlogsize = result_old->lastlogsize;
	result->mtime = result_old->mtime;

	return result;
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_set_variant_result                                  *
 *                                                                            *
 * Purpose: get result data from variant and error message                    *
 *                                                                            *
 * Parameters: request - [IN/OUT] preprocessing request                       *
 *             value   - [IN] variant value                                   *
 *             error   - [IN] error message (if any)                          *
 *                                                                            *
 ******************************************************************************/
static int	preprocessor_set_variant_result(zbx_preprocessing_request_t *request,
		zbx_variant_t *value, char *error)
{
	int		type, ret = FAIL;
	zbx_log_t	*log;

	if (NULL != error)
	{
		/* on error item state is set to ITEM_STATE_NOTSUPPORTED */
		request->value.state = ITEM_STATE_NOTSUPPORTED;
		request->value.error = error;
		ret = FAIL;

		goto out;
	}

	if (ZBX_VARIANT_NONE == value->type)
	{
		AGENT_RESULT	*result;

		result = create_result_with_meta(request->value.result_ptr->result);

		preproc_item_result_free(&request->value);
		request->value.result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
		request->value.result_ptr->refcount = 1;
		request->value.result_ptr->result = result;

		ret = FAIL;

		goto out;
	}

	switch (request->value_type)
	{
		case ITEM_VALUE_TYPE_FLOAT:
			type = ZBX_VARIANT_DBL;
			break;
		case ITEM_VALUE_TYPE_UINT64:
			type = ZBX_VARIANT_UI64;
			break;
		default:
			/* ITEM_VALUE_TYPE_STR, ITEM_VALUE_TYPE_TEXT, ITEM_VALUE_TYPE_LOG */
			type = ZBX_VARIANT_STR;
	}

	if (FAIL != (ret = zbx_variant_convert(value, type)))
	{
		/* old result is shared between dependent and master items, it cannot be modified, create new result */
		AGENT_RESULT	*result;

		result = create_result_with_meta(request->value.result_ptr->result);

		switch (request->value_type)
		{
			case ITEM_VALUE_TYPE_FLOAT:
				SET_DBL_RESULT(result, value->data.dbl);
				break;
			case ITEM_VALUE_TYPE_STR:
				SET_STR_RESULT(result, value->data.str);
				break;
			case ITEM_VALUE_TYPE_LOG:
				log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t));

				if (ISSET_LOG(request->value.result_ptr->result))
				{
					*log = *request->value.result_ptr->result->log;
					if (NULL != log->source)
						log->source = zbx_strdup(NULL, log->source);
				}
				else
					memset(log, 0, sizeof(zbx_log_t));

				log->value = value->data.str;
				SET_LOG_RESULT(result, log);
				break;
			case ITEM_VALUE_TYPE_UINT64:
				SET_UI64_RESULT(result, value->data.ui64);
				break;
			case ITEM_VALUE_TYPE_TEXT:
				SET_TEXT_RESULT(result, value->data.str);
				break;
		}

		preproc_item_result_free(&request->value);
		request->value.result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
		request->value.result_ptr->refcount = 1;
		request->value.result_ptr->result = result;

		zbx_variant_set_none(value);
	}
	else
	{
		zbx_free(request->value.error);
		request->value.error = zbx_dsprintf(NULL, "Value \"%s\" of type \"%s\" is not suitable for"
			" value type \"%s\"", zbx_variant_value_desc(value), zbx_variant_type_desc(value),
			zbx_item_value_type_string((zbx_item_value_type_t)request->value_type));

		request->value.state = ITEM_STATE_NOTSUPPORTED;
		ret = FAIL;
	}

out:
	return ret;
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_add_result                                          *
 *                                                                            *
 * Purpose: handle preprocessing result                                       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_add_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t	*worker;
	zbx_preprocessing_request_t	*request;
	zbx_variant_t			value;
	char				*error;
	zbx_vector_ptr_t		history;
	zbx_preproc_history_t		*vault;
	zbx_list_item_t			*node;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	node = (zbx_list_item_t *)worker->task;
	request = (zbx_preprocessing_request_t *)node->data;

	zbx_vector_ptr_create(&history);
	zbx_preprocessor_unpack_result(&value, &history, &error, message->data);

	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
			&request->value.itemid)))
	{
		zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
	}

	if (0 != history.values_num)
	{
		if (NULL == vault)
		{
			zbx_preproc_history_t	history_local;

			history_local.itemid = request->value.itemid;
			vault = (zbx_preproc_history_t *)zbx_hashset_insert(&manager->history_cache, &history_local,
					sizeof(history_local));
			zbx_vector_ptr_create(&vault->history);
		}

		zbx_vector_ptr_append_array(&vault->history, history.values, history.values_num);
		zbx_vector_ptr_clear(&history);
	}
	else
	{
		if (NULL != vault)
		{
			zbx_vector_ptr_destroy(&vault->history);
			zbx_hashset_remove_direct(&manager->history_cache, vault);
		}
	}

	preprocessor_set_request_state_done(manager, request, &node);

	if (FAIL != preprocessor_set_variant_result(request, &value, error))
		preprocessor_enqueue_dependent(manager, &request->value, node);

	worker->task = NULL;
	zbx_variant_clear(&value);

	manager->preproc_num--;

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zbx_vector_ptr_destroy(&history);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_flush_test_result                                   *
 *                                                                            *
 * Purpose: handle preprocessing result                                       *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] packed preprocessing result                     *
 *                                                                            *
 * Comments: Preprocessing testing results are directly forwarded to source   *
 *           client as they are.                                              *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_flush_test_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t		*worker;
	zbx_preprocessing_direct_request_t	*direct_request;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	worker = preprocessor_get_worker_by_client(manager, client);
	direct_request = (zbx_preprocessing_direct_request_t *)worker->task;

	/* forward the response to the client */
	if (SUCCEED == zbx_ipc_client_connected(direct_request->client))
		zbx_ipc_client_send(direct_request->client, message->code, message->data, message->size);

	worker->task = NULL;
	preprocessor_free_direct_request(direct_request);

	preprocessor_assign_tasks(manager);
	preprocessing_flush_queue(manager);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static	void	preprocessor_get_items_totals(zbx_preprocessing_manager_t *manager, int *total, int *queued,
		int *processing, int *done, int *pending)
{
#define ZBX_MAX_REQUEST_STATE_PRINT_LIMIT	25

	zbx_preproc_item_stats_t	*item;
	zbx_list_iterator_t		iterator;
	zbx_preprocessing_request_t	*request;
	zbx_hashset_t			items;

	*total = 0;
	*queued = 0;
	*processing = 0;
	*done = 0;
	*pending = 0;

	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&request);

		if (NULL == (item = zbx_hashset_search(&items, &request->value.itemid)))
		{
			zbx_preproc_item_stats_t	item_local = {.itemid = request->value.itemid};

			item = zbx_hashset_insert(&items, &item_local, sizeof(item_local));
		}

		switch(request->state)
		{
			case REQUEST_STATE_QUEUED:
				if (*queued < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
				{
					zabbix_log(LOG_LEVEL_DEBUG, "oldest queued itemid: " ZBX_FS_UI64
							" values:%d pos:%d", item->itemid, item->values_num, *total);
				}
				(*queued)++;
				break;
			case REQUEST_STATE_PROCESSING:
				if (*processing < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
				{
					zabbix_log(LOG_LEVEL_DEBUG, "oldest processing itemid: " ZBX_FS_UI64
							" values:%d pos:%d", item->itemid, item->values_num, *total);
				}
				(*processing)++;
				break;
			case REQUEST_STATE_DONE:
				if (*done < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
				{
					zabbix_log(LOG_LEVEL_DEBUG, "oldest done itemid: " ZBX_FS_UI64
							" values:%d pos:%d", item->itemid, item->values_num, *total);
				}
				(*done)++;
				break;
			case REQUEST_STATE_PENDING:
				if (*pending < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
				{
					zabbix_log(LOG_LEVEL_DEBUG, "oldest pending itemid: " ZBX_FS_UI64
							" values:%d pos:%d", item->itemid, item->values_num, *total);
				}
				(*pending)++;
				break;
		}

		item->values_num++;
		(*total)++;
	}

	zbx_hashset_destroy(&items);
#undef ZBX_MAX_REQUEST_STATE_PRINT_LIMIT
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_get_diag_stats                                      *
 *                                                                            *
 * Purpose: return diagnostic statistics                                      *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_get_diag_stats(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client)
{
	unsigned char	*data;
	zbx_uint32_t	data_len;
	int		total, queued, processing, done, pending;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	preprocessor_get_items_totals(manager, &total, &queued, &processing, &done, &pending);

	data_len = zbx_preprocessor_pack_diag_stats(&data, total, queued, processing, done, pending);
	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preproc_sort_item_by_values_desc                                 *
 *                                                                            *
 * Purpose: compare item statistics by value                                  *
 *                                                                            *
 ******************************************************************************/
static int	preproc_sort_item_by_values_desc(const void *d1, const void *d2)
{
	zbx_preproc_item_stats_t	*i1 = *(zbx_preproc_item_stats_t **)d1;
	zbx_preproc_item_stats_t	*i2 = *(zbx_preproc_item_stats_t **)d2;

	return i2->values_num - i1->values_num;
}

static	void	preprocessor_get_items_view(zbx_preprocessing_manager_t *manager, zbx_hashset_t *items,
		zbx_vector_ptr_t *view)
{
	zbx_preproc_item_stats_t	*item;
	zbx_list_iterator_t		iterator;
	zbx_preprocessing_request_t	*request;

	zbx_list_iterator_init(&manager->queue, &iterator);
	while (SUCCEED == zbx_list_iterator_next(&iterator))
	{
		zbx_list_iterator_peek(&iterator, (void **)&request);

		if (NULL == (item = zbx_hashset_search(items, &request->value.itemid)))
		{
			zbx_preproc_item_stats_t	item_local = {.itemid = request->value.itemid};

			item = zbx_hashset_insert(items, &item_local, sizeof(item_local));
			zbx_vector_ptr_append(view, item);
		}
		/* There might be processed, but not yet flushed items at the start of queue with    */
		/* freed preprocessing steps and steps_num being zero. Because of that keep updating */
		/* items steps_num to have preprocessing steps of last queued item.                  */
		item->steps_num = request->steps_num;
		item->values_num++;
	}
}


/******************************************************************************
 *                                                                            *
 * Function: preprocessor_get_top_items                                       *
 *                                                                            *
 * Purpose: return diagnostic top view                                        *
 *                                                                            *
 * Parameters: manager - [IN] preprocessing manager                           *
 *             client  - [IN] IPC client                                      *
 *             message - [IN] the message with request                        *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_get_top_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	int			limit;
	unsigned char		*data;
	zbx_uint32_t		data_len;
	zbx_hashset_t		items;
	zbx_vector_ptr_t	view;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_preprocessor_unpack_top_request(&limit, message->data);

	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_ptr_create(&view);

	preprocessor_get_items_view(manager, &items, &view);

	zbx_vector_ptr_sort(&view, preproc_sort_item_by_values_desc);

	data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view.values,
			MIN(limit, view.values_num));
	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
	zbx_free(data);

	zbx_vector_ptr_destroy(&view);
	zbx_hashset_destroy(&items);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static void	preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	int			limit, i;
	unsigned char		*data;
	zbx_uint32_t		data_len;
	zbx_hashset_t		items;
	zbx_vector_ptr_t	view, view_preproc;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_preprocessor_unpack_top_request(&limit, message->data);

	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_ptr_create(&view);
	zbx_vector_ptr_create(&view_preproc);
	zbx_vector_ptr_reserve(&view_preproc, limit);

	preprocessor_get_items_view(manager, &items, &view);

	for (i = 0; i < view.values_num && 0 < limit; i++)
	{
		zbx_preproc_item_stats_t	*item;

		item = (zbx_preproc_item_stats_t *)view.values[i];

		/* only items with preprocessing can slow down queue */
		if (0 == item->steps_num)
			continue;

		zbx_vector_ptr_append(&view_preproc, item);
		limit--;
	}

	data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view_preproc.values,
			MIN(limit, view_preproc.values_num));
	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
	zbx_free(data);

	zbx_vector_ptr_destroy(&view_preproc);
	zbx_vector_ptr_destroy(&view);
	zbx_hashset_destroy(&items);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_init_manager                                        *
 *                                                                            *
 * Purpose: initializes preprocessing manager                                 *
 *                                                                            *
 * Parameters: manager - [IN] the manager to initialize                       *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_init_manager(zbx_preprocessing_manager_t *manager)
{
	zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers: %d", __func__, CONFIG_PREPROCESSOR_FORKS);

	memset(manager, 0, sizeof(zbx_preprocessing_manager_t));

	manager->workers = (zbx_preprocessing_worker_t *)zbx_calloc(NULL, CONFIG_PREPROCESSOR_FORKS,
			sizeof(zbx_preprocessing_worker_t));
	zbx_list_create(&manager->queue);
	zbx_list_create(&manager->direct_queue);
	zbx_hashset_create_ext(&manager->item_config, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
			(zbx_clean_func_t)preproc_item_clear,
			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
	zbx_hashset_create(&manager->linked_items, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_hashset_create(&manager->history_cache, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC,
			ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_register_worker                                     *
 *                                                                            *
 * Purpose: registers preprocessing worker                                    *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected preprocessing worker              *
 *             message - [IN] message received by preprocessing manager       *
 *                                                                            *
 ******************************************************************************/
static void preprocessor_register_worker(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
		zbx_ipc_message_t *message)
{
	zbx_preprocessing_worker_t	*worker = NULL;
	pid_t				ppid;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	memcpy(&ppid, message->data, sizeof(ppid));

	if (ppid != getppid())
	{
		zbx_ipc_client_close(client);
		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
	}
	else
	{
		if (CONFIG_PREPROCESSOR_FORKS == manager->worker_count)
		{
			THIS_SHOULD_NEVER_HAPPEN;
			exit(EXIT_FAILURE);
		}

		worker = (zbx_preprocessing_worker_t *)&manager->workers[manager->worker_count++];
		worker->client = client;

		preprocessor_assign_tasks(manager);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: preprocessor_destroy_manager                                     *
 *                                                                            *
 * Purpose: destroy preprocessing manager                                     *
 *                                                                            *
 * Parameters: manager - [IN] the manager to destroy                          *
 *                                                                            *
 ******************************************************************************/
static void	preprocessor_destroy_manager(zbx_preprocessing_manager_t *manager)
{

	zbx_preprocessing_request_t		*request;
	zbx_preprocessing_direct_request_t	*direct_request;

	zbx_free(manager->workers);

	/* this is the place where values are lost */
	while (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
		preprocessor_free_direct_request(direct_request);

	zbx_list_destroy(&manager->direct_queue);

	while (SUCCEED == zbx_list_pop(&manager->queue, (void **)&request))
		preprocessor_free_request(request);

	zbx_list_destroy(&manager->queue);

	zbx_hashset_destroy(&manager->item_config);
	zbx_hashset_destroy(&manager->linked_items);
	zbx_hashset_destroy(&manager->history_cache);
}

ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
{
	zbx_ipc_service_t		service;
	char				*error = NULL;
	zbx_ipc_client_t		*client;
	zbx_ipc_message_t		*message;
	zbx_preprocessing_manager_t	manager;
	int				ret;
	double				time_stat, time_idle = 0, time_now, time_flush, sec;

#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */

	process_type = ((zbx_thread_args_t *)args)->process_type;
	server_num = ((zbx_thread_args_t *)args)->server_num;
	process_num = ((zbx_thread_args_t *)args)->process_num;

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
			server_num, get_process_type_string(process_type), process_num);

	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);

	if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	preprocessor_init_manager(&manager);

	/* initialize statistics */
	time_stat = zbx_time();
	time_flush = time_stat;

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();

		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
					ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
					get_process_type_string(process_type), process_num,
					manager.queued_num, manager.processed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			manager.processed_num = 0;
		}

		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
		ret = zbx_ipc_service_recv(&service, ZBX_PREPROCESSING_MANAGER_DELAY, &client, &message);
		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
		sec = zbx_time();
		zbx_update_env(sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_PREPROCESSOR_WORKER:
					preprocessor_register_worker(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_REQUEST:
					preprocessor_add_request(&manager, message);
					break;
				case ZBX_IPC_PREPROCESSOR_RESULT:
					preprocessor_add_result(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_QUEUE:
					zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
							sizeof(zbx_uint64_t));
					break;
				case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
					preprocessor_add_test_request(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_TEST_RESULT:
					preprocessor_flush_test_result(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
					preprocessor_get_diag_stats(&manager, client);
					break;
				case ZBX_IPC_PREPROCESSOR_TOP_ITEMS:
					preprocessor_get_top_items(&manager, client, message);
					break;
				case ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEMS:
					preprocessor_get_oldest_preproc_items(&manager, client, message);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);

		if (0 == manager.preproc_num || 1 < time_now - time_flush)
		{
			dc_flush_history();
			time_flush = time_now;
		}
	}

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);

	zbx_ipc_service_close(&service);
	preprocessor_destroy_manager(&manager);
#undef STAT_INTERVAL
}