/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see .
**/
#include "pp_manager.h"
#include "pp_worker.h"
#include "pp_queue.h"
#include "pp_task.h"
#include "zbxpreproc.h"
#include "zbxalgo.h"
#include "zbxtimekeeper.h"
#include "zbxself.h"
#include "zbxstr.h"
#include "zbxcachehistory.h"
#include "zbxprof.h"
#include "pp_protocol.h"
#include "zbx_item_constants.h"
#include "zbxnix.h"
#include "zbxvariant.h"
#include "zbxlog.h"
#include "pp_cache.h"
#include "zbxcacheconfig.h"
#include "zbxipcservice.h"
#include "zbxthreads.h"
#include "zbxtime.h"
#include "zbxrtc.h"
#include "zbxpreprocbase.h"
#include "zbx_rtc_constants.h"
#ifdef HAVE_LIBXML2
# include
# include
#endif
#ifdef HAVE_NETSNMP
# include "preproc_snmp.h"
#endif
static zbx_preproc_prepare_value_func_t preproc_prepare_value_func_cb = NULL;
static zbx_preproc_flush_value_func_t preproc_flush_value_func_cb = NULL;
static zbx_get_progname_f get_progname_func_cb = NULL;
/******************************************************************************
* *
* Purpose: initialize xml library, called before creating worker threads *
* *
******************************************************************************/
static void pp_xml_init(void)
{
#ifdef HAVE_LIBXML2
xmlInitParser();
#endif
}
/******************************************************************************
* *
* Purpose: release xml library resources *
* *
******************************************************************************/
static void pp_xml_destroy(void)
{
#ifdef HAVE_LIBXML2
xmlCleanupParser();
#endif
}
/******************************************************************************
* *
* Purpose: initialize curl library, called before creating worker threads *
* *
******************************************************************************/
static void pp_curl_init(void)
{
#ifdef HAVE_LIBCURL
curl_global_init(CURL_GLOBAL_DEFAULT);
#endif
}
/******************************************************************************
* *
* Purpose: release curl library resources *
* *
******************************************************************************/
static void pp_curl_destroy(void)
{
#ifdef HAVE_LIBCURL
curl_global_cleanup();
#endif
}
void zbx_init_library_preproc(zbx_preproc_prepare_value_func_t preproc_prepare_value_cb,
zbx_preproc_flush_value_func_t preproc_flush_value_cb, zbx_get_progname_f get_progname_cb)
{
preproc_prepare_value_func_cb = preproc_prepare_value_cb;
preproc_flush_value_func_cb = preproc_flush_value_cb;
get_progname_func_cb = get_progname_cb;
}
zbx_get_progname_f preproc_get_progname_cb(void)
{
return get_progname_func_cb;
}
/******************************************************************************
* *
* Purpose: create preprocessing manager *
* *
* Parameters: workers_num - [IN] number of workers to create *
* pp_finished_task_cb - [IN] callback to call after finishing *
* task (optional) *
* finished_data - [IN] callback data (optional) *
* config_source_ip - [IN] *
* config_timeout - [IN] *
* error - [OUT] *
* *
* Return value: The created manager or NULL on error. *
* *
******************************************************************************/
static zbx_pp_manager_t *zbx_pp_manager_create(int workers_num, zbx_pp_finished_task_cb_t pp_finished_task_cb,
void *finished_data, const char *config_source_ip, int config_timeout, char **error)
{
int i, ret = FAIL, started_num = 0;
time_t time_start;
struct timespec poll_delay = {0, 1e8};
zbx_pp_manager_t *manager;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, workers_num);
pp_xml_init();
pp_curl_init();
#ifdef HAVE_NETSNMP
preproc_init_snmp();
#endif
manager = (zbx_pp_manager_t *)zbx_malloc(NULL, sizeof(zbx_pp_manager_t));
memset(manager, 0, sizeof(zbx_pp_manager_t));
if (SUCCEED != pp_task_queue_init(&manager->queue, error))
goto out;
manager->timekeeper = zbx_timekeeper_create(workers_num, NULL);
manager->workers_num = workers_num;
manager->workers = (zbx_pp_worker_t *)zbx_calloc(NULL, (size_t)workers_num, sizeof(zbx_pp_worker_t));
for (i = 0; i < workers_num; i++)
{
if (SUCCEED != pp_worker_init(&manager->workers[i], i + 1, &manager->queue, manager->timekeeper,
config_source_ip, error))
{
goto out;
}
pp_worker_set_finished_task_cb(&manager->workers[i], pp_finished_task_cb, finished_data);
}
zbx_hashset_create_ext(&manager->items, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
(zbx_clean_func_t)zbx_pp_item_clear, ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC,
ZBX_DEFAULT_MEM_FREE_FUNC);
if (FAIL == zbx_ipc_async_socket_open(&manager->rtc, ZBX_IPC_SERVICE_RTC, config_timeout, error))
goto out;
/* wait for threads to start */
time_start = time(NULL);
#define PP_STARTUP_TIMEOUT 10
while (started_num != workers_num)
{
if (time_start + PP_STARTUP_TIMEOUT < time(NULL))
{
*error = zbx_strdup(NULL, "timeout occurred while waiting for workers to start");
goto out;
}
pthread_mutex_lock(&manager->queue.lock);
started_num = manager->queue.workers_num;
pthread_mutex_unlock(&manager->queue.lock);
nanosleep(&poll_delay, NULL);
}
#undef PP_STARTUP_TIMEOUT
ret = SUCCEED;
out:
if (FAIL == ret)
{
for (i = 0; i < manager->workers_num; i++)
pp_worker_stop(&manager->workers[i]);
pp_task_queue_destroy(&manager->queue);
zbx_free(manager);
manager = NULL;
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() ret:%s error:%s", __func__, zbx_result_string(ret),
ZBX_NULL2EMPTY_STR(*error));
return manager;
}
/******************************************************************************
* *
* Purpose: destroy preprocessing manager *
* *
******************************************************************************/
static void zbx_pp_manager_free(zbx_pp_manager_t *manager)
{
int i;
pp_task_queue_lock(&manager->queue);
for (i = 0; i < manager->workers_num; i++)
pp_worker_stop(&manager->workers[i]);
pp_task_queue_notify_all(&manager->queue);
pp_task_queue_unlock(&manager->queue);
for (i = 0; i < manager->workers_num; i++)
pp_worker_destroy(&manager->workers[i]);
zbx_free(manager->workers);
pp_task_queue_destroy(&manager->queue);
zbx_hashset_destroy(&manager->items);
zbx_timekeeper_free(manager->timekeeper);
zbx_dc_um_shared_handle_release(manager->um_handle);
#ifdef HAVE_NETSNMP
preproc_shutdown_snmp();
#endif
pp_curl_destroy();
pp_xml_destroy();
zbx_ipc_async_socket_close(&manager->rtc);
zbx_free(manager);
}
/******************************************************************************
* *
* Purpose: queue value for preprocessing test *
* *
* Parameters: manager - [IN] *
* preproc - [IN] item preprocessing data *
* value - [IN] value to preprocess, its contents will be *
* directly copied over and cleared by the task *
* ts - [IN] value timestamp *
* client - [IN] request source *
* *
******************************************************************************/
static void zbx_pp_manager_queue_test(zbx_pp_manager_t *manager, zbx_pp_item_preproc_t *preproc, zbx_variant_t *value,
zbx_timespec_t ts, zbx_ipc_client_t *client)
{
zbx_pp_task_t *task = pp_task_test_create(preproc, value, ts, client);
pp_task_queue_lock(&manager->queue);
pp_task_queue_push_test(&manager->queue, task);
pp_task_queue_notify(&manager->queue);
pp_task_queue_unlock(&manager->queue);
}
/******************************************************************************
* *
* Purpose: queue value/value_sec preprocessing tasks *
* *
******************************************************************************/
static void zbx_pp_manager_queue_value_preproc(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks)
{
zbx_prof_start(__func__, ZBX_PROF_MUTEX);
pp_task_queue_lock(&manager->queue);
zbx_prof_end_wait();
for (int i = 0; i < tasks->values_num; i++)
pp_task_queue_push(&manager->queue, tasks->values[i]);
pp_task_queue_notify(&manager->queue);
pp_task_queue_unlock(&manager->queue);
zbx_prof_end();
}
/******************************************************************************
* *
* Purpose: create preprocessing task from request *
* *
* Parameters: manager - [IN] *
* itemid - [IN] item identifier *
* value - [IN] value to preprocess, its contents will be *
* directly copied over and cleared by the task *
* ts - [IN] value timestamp *
* value_opt - [IN] optional value data (optional) *
* *
* Return value: The created task or NULL if the data can be flushed directly.*
* *
******************************************************************************/
static zbx_pp_task_t *zbx_pp_manager_create_task(zbx_pp_manager_t *manager, zbx_uint64_t itemid,
zbx_variant_t *value, zbx_timespec_t ts, const zbx_pp_value_opt_t *value_opt)
{
zbx_pp_item_t *item;
if (ZBX_VARIANT_NONE == value->type)
return NULL;
if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &itemid)))
return NULL;
if (0 == item->preproc->dep_itemids_num && 0 == item->preproc->steps_num)
return NULL;
if (ZBX_PP_PROCESS_PARALLEL == item->preproc->mode)
{
return pp_task_value_create(item->itemid, item->preproc, manager->um_handle, value, ts, value_opt,
NULL);
}
else
{
return pp_task_value_seq_create(item->itemid, item->preproc, manager->um_handle, value, ts, value_opt,
NULL);
}
}
/******************************************************************************
* *
* Purpose: get first dependent item with preprocessing that can be cached *
* *
* Parameters: manager - [IN] *
* itemids - [IN] dependent itemids *
* itemids_num - [IN] number of dependent itemids *
* *
* Return value: The first dependent item with cacheable preprocessing data *
* or NULL. *
* *
******************************************************************************/
static zbx_pp_item_t *pp_manager_get_cacheable_dependent_item(zbx_pp_manager_t *manager, zbx_uint64_t *itemids,
int itemids_num)
{
zbx_pp_item_t *item;
for (int i = 0; i < itemids_num; i++)
{
if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &itemids[i])))
continue;
if (SUCCEED == pp_cache_is_supported(item->preproc))
return item;
}
return NULL;
}
/******************************************************************************
* *
* Purpose: create and queue tasks for dependent items *
* *
* Parameters: manager - [IN] manager *
* preproc - [IN] master item preprocessing data *
* um_handle - [IN] shared user macro cache handle *
* exclude_itemid - [IN] dependent itemid to exclude, can be 0 *
* value - [IN] value *
* ts - [IN] value timestamp *
* cache - [IN] preprocessing cache *
* (optional, can be NULL) *
* *
* Comments: This function called within task queue lock. *
* *
******************************************************************************/
static void pp_manager_queue_dependents(zbx_pp_manager_t *manager, zbx_pp_item_preproc_t *preproc,
zbx_dc_um_shared_handle_t *um_handle, zbx_uint64_t exclude_itemid, const zbx_variant_t *value,
zbx_timespec_t ts, zbx_pp_cache_t *cache)
{
int queued_num = 0;
if (0 == preproc->dep_itemids_num)
return;
cache = pp_cache_copy(cache);
if (NULL == cache)
cache = pp_cache_create(preproc, value);
for (int i = 0; i < preproc->dep_itemids_num; i++)
{
zbx_pp_item_t *item;
zbx_pp_task_t *new_task;
/* skip already preprocessed dependent item */
if (preproc->dep_itemids[i] == exclude_itemid)
continue;
/* skip disabled/removed items */
if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &preproc->dep_itemids[i])))
continue;
if (ZBX_PP_PROCESS_PARALLEL == item->preproc->mode)
{
new_task = pp_task_value_create(item->itemid, item->preproc, um_handle, NULL, ts, NULL,
cache);
}
else
{
new_task = pp_task_value_seq_create(item->itemid, item->preproc, um_handle, NULL, ts,
NULL, cache);
}
pp_task_queue_push_immediate(&manager->queue, new_task);
queued_num++;
}
if (0 < queued_num)
pp_task_queue_notify(&manager->queue);
pp_cache_release(cache);
}
/******************************************************************************
* *
* Purpose: queue new tasks in response to finished value task *
* *
* Parameters: manager - [IN] manager *
* task - [IN] finished value task *
* *
* Comments: This function called within task queue lock. *
* *
******************************************************************************/
static void pp_manager_queue_value_task_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
zbx_pp_task_value_t *d = (zbx_pp_task_value_t *)PP_TASK_DATA(task);
zbx_pp_item_t *item;
if (ZBX_VARIANT_NONE == d->result.type)
return;
if (NULL != (item = pp_manager_get_cacheable_dependent_item(manager, d->preproc->dep_itemids,
d->preproc->dep_itemids_num)))
{
zbx_pp_task_t *dep_task;
zbx_variant_t value;
dep_task = pp_task_dependent_create(task->itemid, d->preproc);
zbx_pp_task_dependent_t *d_dep = (zbx_pp_task_dependent_t *)PP_TASK_DATA(dep_task);
d_dep->cache = pp_cache_create(item->preproc, &d->result);
zbx_variant_set_none(&value);
d_dep->primary = pp_task_value_create(item->itemid, item->preproc, d->um_handle, &value, d->ts,
NULL, d_dep->cache);
pp_task_queue_push_immediate(&manager->queue, dep_task);
pp_task_queue_notify(&manager->queue);
}
else
pp_manager_queue_dependents(manager, d->preproc, d->um_handle, 0, &d->result, d->ts, NULL);
}
/******************************************************************************
* *
* Purpose: queue new tasks in response to finished dependent task *
* *
* Parameters: manager - [IN] manager *
* task - [IN] finished dependent task *
* *
* Comments: This function called within task queue lock. *
* *
******************************************************************************/
static zbx_pp_task_t *pp_manager_queue_dependent_task_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
zbx_pp_task_dependent_t *d = (zbx_pp_task_dependent_t *)PP_TASK_DATA(task);
zbx_pp_task_t *task_value = d->primary;
zbx_pp_task_value_t *dp = (zbx_pp_task_value_t *)PP_TASK_DATA(task_value);
pp_manager_queue_value_task_result(manager, d->primary);
pp_manager_queue_dependents(manager, d->preproc, dp->um_handle, task_value->itemid, &dp->result, dp->ts, d->cache);
d->primary = NULL;
pp_task_free(task);
return task_value;
}
/******************************************************************************
* *
* Purpose: requeue sequence task *
* *
* Parameters: manager - [IN] manager *
* task_seq - [IN] finished sequence task *
* *
* Comments: This function called within task queue lock. *
* *
******************************************************************************/
static zbx_pp_task_t *pp_manager_requeue_next_sequence_task(zbx_pp_manager_t *manager, zbx_pp_task_t *task_seq)
{
zbx_pp_task_sequence_t *d_seq = (zbx_pp_task_sequence_t *)PP_TASK_DATA(task_seq);
zbx_pp_task_t *task = NULL, *tmp_task;
if (SUCCEED == zbx_list_pop(&d_seq->tasks, (void **)&task))
{
switch (task->type)
{
case ZBX_PP_TASK_VALUE:
case ZBX_PP_TASK_VALUE_SEQ:
pp_manager_queue_value_task_result(manager, task);
break;
case ZBX_PP_TASK_DEPENDENT:
task = pp_manager_queue_dependent_task_result(manager, task);
break;
default:
THIS_SHOULD_NEVER_HAPPEN;
break;
}
}
if (SUCCEED == zbx_list_peek(&d_seq->tasks, (void **)&tmp_task))
{
pp_task_queue_push_immediate(&manager->queue, task_seq);
pp_task_queue_notify(&manager->queue);
}
else
{
pp_task_queue_remove_sequence(&manager->queue, task_seq->itemid);
pp_task_free(task_seq);
}
return task;
}
/******************************************************************************
* *
* Purpose: process finished tasks *
* *
* Parameters: manager - [IN] manager *
* tasks - [OUT] finished tasks *
* pending_num - [OUT] remaining pending tasks *
* processing_num - [OUT] processed tasks *
* finished_num - [OUT] finished tasks *
* *
******************************************************************************/
static void zbx_pp_manager_process_finished(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks,
zbx_uint64_t *pending_num, zbx_uint64_t *processing_num, zbx_uint64_t *finished_num)
{
#define PP_FINISHED_TASK_BATCH_SIZE 100
zbx_pp_task_t *task;
static time_t timekeeper_clock = 0;
time_t now;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_vector_pp_task_ptr_reserve(tasks, PP_FINISHED_TASK_BATCH_SIZE);
zbx_prof_start(__func__, ZBX_PROF_MUTEX);
pp_task_queue_lock(&manager->queue);
zbx_prof_end_wait();
while (PP_FINISHED_TASK_BATCH_SIZE > tasks->values_num)
{
if (NULL != (task = pp_task_queue_pop_finished(&manager->queue)))
{
switch (task->type)
{
case ZBX_PP_TASK_VALUE:
pp_manager_queue_value_task_result(manager, task);
break;
case ZBX_PP_TASK_DEPENDENT:
task = pp_manager_queue_dependent_task_result(manager, task);
break;
case ZBX_PP_TASK_SEQUENCE:
task = pp_manager_requeue_next_sequence_task(manager, task);
break;
default:
break;
}
}
if (NULL == task)
break;
zbx_vector_pp_task_ptr_append(tasks, task);
}
*pending_num = manager->queue.pending_num;
*finished_num = manager->queue.finished_num;
*processing_num = manager->queue.processing_num;
pp_task_queue_unlock(&manager->queue);
zbx_prof_end();
now = time(NULL);
if (now != timekeeper_clock)
{
zbx_timekeeper_collect(manager->timekeeper);
timekeeper_clock = now;
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() values_num:%d", __func__, tasks->values_num);
#undef PP_FINISHED_TASK_BATCH_SIZE
}
/******************************************************************************
* *
* Purpose: dump cached item information into log *
* *
* Parameters: manager - [IN] manager *
* *
******************************************************************************/
static void zbx_pp_manager_dump_items(zbx_pp_manager_t *manager)
{
zbx_hashset_iter_t iter;
zbx_pp_item_t *item;
zbx_hashset_iter_reset(&manager->items, &iter);
while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter)))
{
zabbix_log(LOG_LEVEL_TRACE, "itemid:" ZBX_FS_UI64 " hostid:" ZBX_FS_UI64 " revision:" ZBX_FS_UI64
" type:%u value_type:%u mode:%u flags:%u",
item->itemid, item->preproc->hostid, item->revision, item->preproc->type,
item->preproc->value_type, item->preproc->mode, item->preproc->flags);
zabbix_log(LOG_LEVEL_TRACE, " preprocessing steps:");
for (int i = 0; i < item->preproc->steps_num; i++)
{
zabbix_log(LOG_LEVEL_TRACE, " type:%d params:'%s' err_handler:%d err_params:'%s'",
item->preproc->steps[i].type,
ZBX_NULL2EMPTY_STR(item->preproc->steps[i].params),
item->preproc->steps[i].error_handler,
ZBX_NULL2EMPTY_STR(item->preproc->steps[i].error_handler_params));
}
zabbix_log(LOG_LEVEL_TRACE, " dependent items:");
for (int i = 0; i < item->preproc->dep_itemids_num; i++)
zabbix_log(LOG_LEVEL_TRACE, " " ZBX_FS_UI64, item->preproc->dep_itemids[i]);
}
}
/******************************************************************************
* *
* Purpose: get item configuration data for reading and updates *
* *
******************************************************************************/
zbx_hashset_t *zbx_pp_manager_items(zbx_pp_manager_t *manager)
{
return &manager->items;
}
/******************************************************************************
* *
* Purpose: get diagnostic statistics *
* *
******************************************************************************/
static void zbx_pp_manager_get_diag_stats(zbx_pp_manager_t *manager, zbx_uint64_t *preproc_num,
zbx_uint64_t *pending_num, zbx_uint64_t *finished_num, zbx_uint64_t *sequences_num)
{
*preproc_num = (zbx_uint64_t)manager->items.num_data;
*pending_num = manager->queue.pending_num;
*finished_num = manager->queue.finished_num;
*sequences_num = (zbx_uint64_t)manager->queue.sequences.num_data;
}
/******************************************************************************
* *
* Purpose: get task sequence statistics *
* *
******************************************************************************/
static void zbx_pp_manager_get_sequence_stats(zbx_pp_manager_t *manager, zbx_vector_pp_top_stats_ptr_t *stats)
{
pp_task_queue_get_sequence_stats(&manager->queue, stats);
}
/******************************************************************************
* *
* Purpose: get worker usage statistics *
* *
******************************************************************************/
static void zbx_pp_manager_get_worker_usage(zbx_pp_manager_t *manager, zbx_vector_dbl_t *worker_usage)
{
(void)zbx_timekeeper_get_usage(manager->timekeeper, worker_usage);
}
/******************************************************************************
* *
* Purpose: synchronize preprocessing manager with configuration cache data *
* *
* Parameters: manager - [IN] manager to be synchronized *
* *
******************************************************************************/
static void preprocessor_sync_configuration(zbx_pp_manager_t *manager)
{
zbx_uint64_t old_revision, revision;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
old_revision = revision = manager->revision;
zbx_dc_config_get_preprocessable_items(&manager->items, &manager->um_handle, &revision);
manager->revision = revision;
if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && revision != old_revision)
zbx_pp_manager_dump_items(manager);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size:%d revision:" ZBX_FS_UI64 "->" ZBX_FS_UI64, __func__,
manager->items.num_data, old_revision, revision);
}
/******************************************************************************
* *
* Purpose: frees resources allocated by preprocessor item value *
* *
* Parameters: value - [IN] value to be freed *
* *
******************************************************************************/
static void preproc_item_value_clear(zbx_preproc_item_value_t *value)
{
zbx_free(value->error);
zbx_free(value->ts);
if (NULL != value->result)
{
zbx_free_agent_result(value->result);
zbx_free(value->result);
}
}
/******************************************************************************
* *
* Purpose: extract value, timestamp and optional data from preprocessing *
* item value. *
* *
* Parameters: value - [IN] preprocessing item value *
* var - [OUT] extracted value (including error message) *
* ts - [OUT] extracted timestamp *
* opt - [OUT] extracted optional data *
* *
******************************************************************************/
static void preproc_item_value_extract_data(zbx_preproc_item_value_t *value, zbx_variant_t *var, zbx_timespec_t *ts,
zbx_pp_value_opt_t *opt)
{
opt->flags = ZBX_PP_VALUE_OPT_NONE;
if (NULL != value->ts)
{
*ts = *value->ts;
}
else
{
ts->sec = 0;
ts->ns = 0;
}
if (ITEM_STATE_NOTSUPPORTED == value->state)
{
if (NULL != value->error)
{
zbx_variant_set_error(var, value->error);
value->error = NULL;
}
else if (NULL != value->result && ZBX_ISSET_MSG(value->result))
zbx_variant_set_error(var, zbx_strdup(NULL, value->result->msg));
else
zbx_variant_set_error(var, zbx_strdup(NULL, "Unknown error."));
return;
}
if (NULL == value->result)
{
zbx_variant_set_none(var);
return;
}
if (ZBX_ISSET_LOG(value->result))
{
zbx_variant_set_str(var, value->result->log->value);
value->result->log->value = NULL;
opt->source = value->result->log->source;
value->result->log->source = NULL;
opt->logeventid = value->result->log->logeventid;
opt->severity = value->result->log->severity;
opt->timestamp = value->result->log->timestamp;
opt->flags |= ZBX_PP_VALUE_OPT_LOG;
}
else if (ZBX_ISSET_UI64(value->result))
{
zbx_variant_set_ui64(var, value->result->ui64);
}
else if (ZBX_ISSET_DBL(value->result))
{
zbx_variant_set_dbl(var, value->result->dbl);
}
else if (ZBX_ISSET_STR(value->result))
{
zbx_variant_set_str(var, value->result->str);
value->result->str = NULL;
}
else if (ZBX_ISSET_TEXT(value->result))
{
zbx_variant_set_str(var, value->result->text);
value->result->text = NULL;
}
else if (ZBX_ISSET_BIN(value->result))
{
THIS_SHOULD_NEVER_HAPPEN;
exit(EXIT_FAILURE);
}
else
zbx_variant_set_none(var);
if (ZBX_ISSET_META(value->result))
{
opt->lastlogsize = value->result->lastlogsize;
opt->mtime = value->result->mtime;
opt->flags |= ZBX_PP_VALUE_OPT_META;
}
}
/******************************************************************************
* *
* Purpose: handle new preprocessing request *
* *
* Parameters: manager - [IN] preprocessing manager *
* message - [IN] packed preprocessing request *
* direct_num - [OUT] number of directly flushed values *
* *
* Return value: The number of requests queued for preprocessing *
* *
******************************************************************************/
static zbx_uint64_t preprocessor_add_request(zbx_pp_manager_t *manager, zbx_ipc_message_t *message,
zbx_uint64_t *direct_num)
{
zbx_uint32_t offset = 0;
zbx_preproc_item_value_t value;
zbx_uint64_t queued_num = 0;
zbx_vector_pp_task_ptr_t tasks;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_vector_pp_task_ptr_create(&tasks);
zbx_vector_pp_task_ptr_reserve(&tasks, ZBX_PREPROCESSING_BATCH_SIZE);
preprocessor_sync_configuration(manager);
while (offset < message->size)
{
zbx_variant_t var;
zbx_pp_value_opt_t var_opt;
zbx_timespec_t ts;
zbx_pp_task_t *task;
offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
preproc_item_value_extract_data(&value, &var, &ts, &var_opt);
if (NULL == (task = zbx_pp_manager_create_task(manager, value.itemid, &var, ts, &var_opt)))
{
(*direct_num)++;
/* allow empty values */
preproc_flush_value_func_cb(manager, value.itemid, value.item_value_type, value.item_flags,
&var, ts, &var_opt);
zbx_variant_clear(&var);
zbx_pp_value_opt_clear(&var_opt);
}
else
zbx_vector_pp_task_ptr_append(&tasks, task);
preproc_item_value_clear(&value);
}
if (0 != tasks.values_num)
zbx_pp_manager_queue_value_preproc(manager, &tasks);
queued_num = tasks.values_num;
zbx_vector_pp_task_ptr_destroy(&tasks);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
return queued_num;
}
/******************************************************************************
* *
* Purpose: handle new preprocessing test request *
* *
* Parameters: manager - [IN] preprocessing manager *
* client - [IN] request source *
* message - [IN] packed preprocessing request *
* *
******************************************************************************/
static void preprocessor_add_test_request(zbx_pp_manager_t *manager, zbx_ipc_client_t *client,
zbx_ipc_message_t *message)
{
zbx_pp_item_preproc_t *preproc;
zbx_variant_t value;
zbx_timespec_t ts;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
preproc = zbx_pp_item_preproc_create(0, 0, 0, 0);
zbx_preprocessor_unpack_test_request(preproc, &value, &ts, message->data);
zbx_pp_manager_queue_test(manager, preproc, &value, ts, client);
zbx_pp_item_preproc_release(preproc);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
static void preprocessor_reply_queue_size(zbx_pp_manager_t *manager, zbx_ipc_client_t *client)
{
zbx_uint64_t pending_num = manager->queue.pending_num;
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_QUEUE, (unsigned char *)&pending_num, sizeof(pending_num));
}
/******************************************************************************
* *
* Purpose: flush processed value task *
* *
* Parameters: manager - [IN] preprocessing manager *
* tasks - [IN] processed tasks *
* *
******************************************************************************/
static void prpeprocessor_flush_value_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task)
{
zbx_variant_t *value;
unsigned char value_type, flags;
zbx_timespec_t ts;
zbx_pp_value_opt_t *value_opt;
zbx_pp_value_task_get_data(task, &value_type, &flags, &value, &ts, &value_opt);
if (SUCCEED == preproc_prepare_value_func_cb(value, value_opt))
preproc_flush_value_func_cb(manager, task->itemid, value_type, flags, value, ts, value_opt);
}
/******************************************************************************
* *
* Purpose: send back result of processed test task *
* *
* Parameters: tasks - [IN] processed tasks *
* *
******************************************************************************/
static void preprocessor_reply_test_result(zbx_pp_task_t *task)
{
unsigned char *data;
zbx_uint32_t len;
zbx_ipc_client_t *client;
zbx_variant_t *result;
zbx_pp_result_t *results;
int results_num;
zbx_pp_history_t *history;
zbx_pp_test_task_get_data(task, &client, &result, &results, &results_num, &history);
len = zbx_preprocessor_pack_test_result(&data, results, results_num, history);
zbx_pp_test_task_history_release(task, &history);
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TEST_RESULT, data, len);
zbx_free(data);
}
/******************************************************************************
* *
* Purpose: flush processed tasks *
* *
* Parameters: manager - [IN] preprocessing manager *
* tasks - [IN] processed tasks *
* *
******************************************************************************/
static void preprocessor_flush_tasks(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks)
{
for (int i = 0; i < tasks->values_num; i++)
{
switch (tasks->values[i]->type)
{
case ZBX_PP_TASK_VALUE:
case ZBX_PP_TASK_VALUE_SEQ: /* value and value_seq task contents are identical */
prpeprocessor_flush_value_result(manager, tasks->values[i]);
break;
case ZBX_PP_TASK_TEST:
preprocessor_reply_test_result(tasks->values[i]);
break;
default:
/* the internal tasks (dependent/sequence) shouldn't get here */
THIS_SHOULD_NEVER_HAPPEN;
}
}
}
/******************************************************************************
* *
* Purpose: respond to diagnostic information request *
* *
* Parameters: manager - [IN] preprocessing manager *
* client - [IN] request source *
* *
******************************************************************************/
static void preprocessor_reply_diag_info(zbx_pp_manager_t *manager, zbx_ipc_client_t *client)
{
zbx_uint64_t preproc_num, pending_num, finished_num, sequences_num;
unsigned char *data;
zbx_uint32_t data_len;
zbx_pp_manager_get_diag_stats(manager, &preproc_num, &pending_num, &finished_num, &sequences_num);
data_len = zbx_preprocessor_pack_diag_stats(&data, preproc_num, pending_num, finished_num, sequences_num);
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
zbx_free(data);
}
static int preprocessor_compare_top_stats(const void *d1, const void *d2)
{
const zbx_pp_top_stats_t *s1 = *(const zbx_pp_top_stats_t * const *)d1;
const zbx_pp_top_stats_t *s2 = *(const zbx_pp_top_stats_t * const *)d2;
return s2->tasks_num - s1->tasks_num;
}
static void zbx_pp_manager_items_preproc_peak(zbx_pp_manager_t *manager, zbx_vector_pp_top_stats_ptr_t *stats)
{
zbx_hashset_iter_t iter;
zbx_pp_item_t *item;
zbx_hashset_iter_reset(&manager->items, &iter);
while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter)))
{
zbx_pp_top_stats_t *stat;
if (NULL == item->preproc || 1 >= item->preproc->refcount_peak - 1)
continue;
stat = (zbx_pp_top_stats_t *)zbx_malloc(NULL, sizeof(zbx_pp_top_stats_t));
stat->tasks_num = item->preproc->refcount_peak - 1;
stat->itemid = item->itemid;
zbx_vector_pp_top_stats_ptr_append(stats, stat);
}
}
static void zbx_pp_manager_items_preproc_peak_reset(zbx_pp_manager_t *manager)
{
zbx_hashset_iter_t iter;
zbx_pp_item_t *item;
zbx_hashset_iter_reset(&manager->items, &iter);
while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter)))
{
if (NULL == item->preproc)
continue;
item->preproc->refcount_peak = 1;
}
}
/******************************************************************************
* *
* Purpose: respond to top sequences request *
* *
* Parameters: manager - [IN] preprocessing manager *
* client - [IN] request source *
* message - [IN] request message *
* *
******************************************************************************/
static void preprocessor_reply_top_stats(zbx_pp_manager_t *manager, zbx_ipc_client_t *client,
zbx_ipc_message_t *message, zbx_uint32_t code)
{
int limit;
zbx_vector_pp_top_stats_ptr_t stats;
unsigned char *data;
zbx_uint32_t data_len;
zbx_vector_pp_top_stats_ptr_create(&stats);
zbx_preprocessor_unpack_top_request(&limit, message->data);
if (ZBX_IPC_PREPROCESSOR_TOP_SEQUENCES == code)
zbx_pp_manager_get_sequence_stats(manager, &stats);
else
zbx_pp_manager_items_preproc_peak(manager, &stats);
if (limit > stats.values_num)
limit = stats.values_num;
zbx_vector_pp_top_stats_ptr_sort(&stats, preprocessor_compare_top_stats);
data_len = zbx_preprocessor_pack_top_stats_result(&data, &stats, limit);
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_STATS_RESULT, data, data_len);
zbx_free(data);
zbx_vector_pp_top_stats_ptr_clear_ext(&stats, (zbx_pp_top_stats_ptr_free_func_t)zbx_ptr_free);
zbx_vector_pp_top_stats_ptr_destroy(&stats);
}
/******************************************************************************
* *
* Purpose: respond to worker usage statistics request *
* *
* Parameters: manager - [IN] preprocessing manager *
* workers_num - [IN] number of preprocessing workers *
* client - [IN] request source *
* *
******************************************************************************/
static void preprocessor_reply_usage_stats(zbx_pp_manager_t *manager, int workers_num, zbx_ipc_client_t *client)
{
zbx_vector_dbl_t usage;
unsigned char *data;
zbx_uint32_t data_len;
zbx_vector_dbl_create(&usage);
zbx_pp_manager_get_worker_usage(manager, &usage);
data_len = zbx_preprocessor_pack_usage_stats(&data, &usage, workers_num);
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
zbx_free(data);
zbx_vector_dbl_destroy(&usage);
}
static void pp_finished_task_cb(void *data)
{
zbx_ipc_service_alert((zbx_ipc_service_t *)data);
}
/******************************************************************************
* *
* Purpose: change worker log level *
* *
******************************************************************************/
static void pp_manager_change_worker_loglevel(zbx_pp_manager_t *manager, int worker_num, int direction)
{
if (0 > worker_num || manager->workers_num < worker_num)
{
zabbix_log(LOG_LEVEL_INFORMATION, "Cannot change log level for preprocessing worker #%d:"
" no such instance", worker_num);
return;
}
for (int i = 0; i < manager->workers_num; i++)
{
if (0 != worker_num && worker_num != i + 1)
continue;
zbx_change_component_log_level(&manager->workers[i].logger, direction);
}
}
/******************************************************************************
* *
* Purpose: change log level for the specified worker(s) *
* *
* Parameters: manager - [IN] preprocessing manager *
* direction - [IN] 1) increase, -1) decrease *
* data - [IN] rtc data in json format *
* *
******************************************************************************/
static void preprocessor_change_loglevel(zbx_pp_manager_t *manager, int direction, const char *data)
{
char *error = NULL;
pid_t pid;
int proc_type, proc_num;
if (SUCCEED != zbx_rtc_get_command_target(data, &pid, &proc_type, &proc_num, NULL, &error))
{
zabbix_log(LOG_LEVEL_WARNING, "Cannot change log level: %s", error);
zbx_free(error);
return;
}
if (0 != pid)
{
zabbix_log(LOG_LEVEL_WARNING, "Cannot change log level for preprocessing worker by pid");
return;
}
pp_manager_change_worker_loglevel(manager, proc_num, direction);
}
ZBX_THREAD_ENTRY(zbx_pp_manager_thread, args)
{
#define PP_MANAGER_DELAY_SEC 0
#define PP_MANAGER_DELAY_NS 5e8
zbx_ipc_service_t service;
char *error = NULL;
zbx_ipc_client_t *client;
zbx_ipc_message_t *message;
double time_stat, time_idle = 0, time_flush, time_vps_update, time_trim;
zbx_timespec_t timeout = {PP_MANAGER_DELAY_SEC, PP_MANAGER_DELAY_NS};
const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info;
int server_num = ((zbx_thread_args_t *)args)->info.server_num,
process_num = ((zbx_thread_args_t *)args)->info.process_num;
unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type;
zbx_thread_pp_manager_args *pp_args = ((zbx_thread_args_t *)args)->args;
zbx_pp_manager_t *manager;
zbx_vector_pp_task_ptr_t tasks;
zbx_uint32_t rtc_msgs[] = {ZBX_RTC_LOG_LEVEL_INCREASE, ZBX_RTC_LOG_LEVEL_DECREASE};
zbx_uint64_t pending_num, finished_num, processed_num = 0, queued_num = 0,
processing_num = 0;
const zbx_thread_pp_manager_args *pp_manager_args_in = (const zbx_thread_pp_manager_args *)
(((zbx_thread_args_t *)args)->args);
#define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */
/* once in STAT_INTERVAL seconds */
zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
server_num, get_process_type_string(process_type), process_num);
zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);
if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
zbx_free(error);
exit(EXIT_FAILURE);
}
if (NULL == (manager = zbx_pp_manager_create(pp_args->workers_num, pp_finished_task_cb,
(void *)&service, pp_manager_args_in->config_source_ip, pp_manager_args_in->config_timeout,
&error)))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot initialize preprocessing manager: %s", error);
zbx_free(error);
exit(EXIT_FAILURE);
}
/* subscribe for worker log level rtc messages */
zbx_rtc_subscribe_service(ZBX_PROCESS_TYPE_PREPROCESSOR, 0, rtc_msgs, ARRSIZE(rtc_msgs),
pp_args->config_timeout, ZBX_IPC_SERVICE_PREPROCESSING);
zbx_vector_pp_task_ptr_create(&tasks);
/* initialize statistics */
time_stat = zbx_time();
time_flush = time_stat;
time_vps_update = time_stat;
time_trim = time_stat;
zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
while (ZBX_IS_RUNNING())
{
double time_now = zbx_time();
zbx_uint64_t direct_num = 0;
if (STAT_INTERVAL < time_now - time_stat)
{
zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
get_process_type_string(process_type), process_num,
queued_num, processed_num, time_idle, time_now - time_stat);
time_stat = time_now;
time_idle = 0;
processed_num = 0;
queued_num = 0;
}
zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);
int ret = zbx_ipc_service_recv(&service, &timeout, &client, &message);
zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);
double sec = zbx_time();
zbx_update_env(get_process_type_string(process_type), sec);
if (ZBX_IPC_RECV_IMMEDIATE != ret)
time_idle += sec - time_now;
if (NULL != message)
{
switch (message->code)
{
case ZBX_IPC_PREPROCESSOR_REQUEST:
queued_num += preprocessor_add_request(manager, message, &direct_num);
break;
case ZBX_IPC_PREPROCESSOR_QUEUE:
preprocessor_reply_queue_size(manager, client);
break;
case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
preprocessor_add_test_request(manager, client, message);
break;
case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
preprocessor_reply_diag_info(manager, client);
break;
case ZBX_IPC_PREPROCESSOR_TOP_SEQUENCES:
case ZBX_IPC_PREPROCESSOR_TOP_PEAK:
preprocessor_reply_top_stats(manager, client, message, message->code);
break;
case ZBX_IPC_PREPROCESSOR_USAGE_STATS:
preprocessor_reply_usage_stats(manager, pp_args->workers_num, client);
break;
case ZBX_RTC_LOG_LEVEL_INCREASE:
preprocessor_change_loglevel(manager, 1, (const char *)message->data);
break;
case ZBX_RTC_LOG_LEVEL_DECREASE:
preprocessor_change_loglevel(manager, -1, (const char *)message->data);
break;
case ZBX_RTC_SHUTDOWN:
zabbix_log(LOG_LEVEL_DEBUG, "shutdown message received, terminating...");
goto out;
}
zbx_ipc_message_free(message);
}
if (NULL != client)
zbx_ipc_client_release(client);
zbx_pp_manager_process_finished(manager, &tasks, &pending_num, &processing_num, &finished_num);
if (0 < tasks.values_num)
{
processed_num += (unsigned int)tasks.values_num;
preprocessor_flush_tasks(manager, &tasks);
zbx_pp_tasks_clear(&tasks);
}
if (0 != finished_num || 0 != direct_num)
{
timeout.sec = 0;
timeout.ns = 0;
}
else
{
timeout.sec = PP_MANAGER_DELAY_SEC;
timeout.ns = PP_MANAGER_DELAY_NS;
}
if (0 == pending_num + processing_num + finished_num + direct_num || 1 < sec - time_flush)
{
if (0 != zbx_dc_flush_history())
{
zbx_rtc_notify_generic(&manager->rtc, ZBX_PROCESS_TYPE_HISTSYNCER, 1,
ZBX_RTC_HISTORY_SYNC_NOTIFY, NULL, 0);
}
time_flush = sec;
}
/* trigger vps monitor update at least once per second */
if (1 <= sec - time_vps_update)
{
zbx_vps_monitor_add_collected(0);
time_vps_update = sec;
}
/* release memory in case of peak periods */
if (SEC_PER_DAY <= sec - time_trim)
{
#ifdef HAVE_MALLOC_TRIM
malloc_trim(128 * ZBX_MEBIBYTE);
#endif
zbx_pp_manager_items_preproc_peak_reset(manager);
time_trim = sec;
}
}
out:
zbx_setproctitle("%s #%d [terminating]", get_process_type_string(process_type), process_num);
zbx_vector_pp_task_ptr_destroy(&tasks);
zbx_pp_manager_free(manager);
zbx_ipc_service_close(&service);
exit(EXIT_SUCCESS);
#undef STAT_INTERVAL
#undef PP_MANAGER_DELAY_SEC
#undef PP_MANAGER_DELAY_NS
}