/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ #include "pp_manager.h" #include "pp_worker.h" #include "pp_queue.h" #include "pp_item.h" #include "pp_task.h" #include "preproc_snmp.h" #include "zbxpreproc.h" #include "zbxcommon.h" #include "zbxalgo.h" #include "zbxtimekeeper.h" #include "zbxself.h" #include "zbxstr.h" #include "zbxcachehistory.h" #include "zbxprof.h" #ifdef HAVE_LIBXML2 # include <libxml/xpath.h> #endif #define PP_STARTUP_TIMEOUT 10 /****************************************************************************** * * * Purpose: initialize xml library, called before creating worker threads * * * ******************************************************************************/ static void pp_xml_init(void) { #ifdef HAVE_LIBXML2 xmlInitParser(); #endif } /****************************************************************************** * * * Purpose: release xml library resources * * * ******************************************************************************/ static void pp_xml_destroy(void) { #ifdef HAVE_LIBXML2 xmlCleanupParser(); #endif } /****************************************************************************** * * * Purpose: initialize curl library, called before creating worker threads * * * ******************************************************************************/ static void pp_curl_init(void) { #ifdef HAVE_LIBCURL curl_global_init(CURL_GLOBAL_DEFAULT); #endif } /****************************************************************************** * * * Purpose: release curl library resources * * * ******************************************************************************/ static void pp_curl_destroy(void) { #ifdef HAVE_LIBCURL curl_global_cleanup(); #endif } /****************************************************************************** * * * Purpose: create preprocessing manager * * * * Parameters: program_type - [IN] the component type (server/proxy) * * workers_num - [IN] the number of workers to create * * finished_cb - [IN] a callback to call after finishing * * task (optional) * * finished_data - [IN] the callback data (optional) * * error - [OUT] the error message * * * * Return value: The created manager or NULL on error. * * * ******************************************************************************/ zbx_pp_manager_t *zbx_pp_manager_create(int workers_num, zbx_pp_notify_cb_t finished_cb, void *finished_data, char **error) { int i, ret = FAIL, started_num = 0; time_t time_start; struct timespec poll_delay = {0, 1e8}; zbx_pp_manager_t *manager; zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, workers_num); pp_xml_init(); pp_curl_init(); #ifdef HAVE_NETSNMP preproc_init_snmp(); #endif manager = (zbx_pp_manager_t *)zbx_malloc(NULL, sizeof(zbx_pp_manager_t)); memset(manager, 0, sizeof(zbx_pp_manager_t)); if (SUCCEED != pp_task_queue_init(&manager->queue, error)) goto out; manager->timekeeper = zbx_timekeeper_create(workers_num, NULL); manager->workers_num = workers_num; manager->workers = (zbx_pp_worker_t *)zbx_calloc(NULL, (size_t)workers_num, sizeof(zbx_pp_worker_t)); for (i = 0; i < workers_num; i++) { if (SUCCEED != pp_worker_init(&manager->workers[i], i + 1, &manager->queue, manager->timekeeper, error)) goto out; pp_worker_set_finished_cb(&manager->workers[i], finished_cb, finished_data); } zbx_hashset_create_ext(&manager->items, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)pp_item_clear, ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC); /* wait for threads to start */ time_start = time(NULL); while (started_num != workers_num) { if (time_start + PP_STARTUP_TIMEOUT < time(NULL)) { *error = zbx_strdup(NULL, "timeout occurred while waiting for workers to start"); goto out; } pthread_mutex_lock(&manager->queue.lock); started_num = manager->queue.workers_num; pthread_mutex_unlock(&manager->queue.lock); nanosleep(&poll_delay, NULL); } ret = SUCCEED; out: if (FAIL == ret) { for (i = 0; i < manager->workers_num; i++) pp_worker_stop(&manager->workers[i]); pp_task_queue_destroy(&manager->queue); zbx_free(manager); manager = NULL; } zabbix_log(LOG_LEVEL_DEBUG, "End of %s() ret:%s error:%s", __func__, zbx_result_string(ret), ZBX_NULL2EMPTY_STR(*error)); return manager; } /****************************************************************************** * * * Purpose: destroy preprocessing manager * * * ******************************************************************************/ void zbx_pp_manager_free(zbx_pp_manager_t *manager) { int i; pp_task_queue_lock(&manager->queue); for (i = 0; i < manager->workers_num; i++) pp_worker_stop(&manager->workers[i]); pp_task_queue_notify_all(&manager->queue); pp_task_queue_unlock(&manager->queue); for (i = 0; i < manager->workers_num; i++) pp_worker_destroy(&manager->workers[i]); zbx_free(manager->workers); pp_task_queue_destroy(&manager->queue); zbx_hashset_destroy(&manager->items); zbx_timekeeper_free(manager->timekeeper); #ifdef HAVE_NETSNMP preproc_shutdown_snmp(); #endif pp_curl_destroy(); pp_xml_destroy(); zbx_free(manager); } /****************************************************************************** * * * Purpose: queue value for preprocessing test * * * * Parameters: manager - [IN] the manager * * preproc - [IN] the item preprocessing data * * value - [IN] the value to preprocess, its contents will be * * directly copied over and cleared by the task * * ts - [IN] the value timestamp * * client - [IN] the request source * * * ******************************************************************************/ void zbx_pp_manager_queue_test(zbx_pp_manager_t *manager, zbx_pp_item_preproc_t *preproc, zbx_variant_t *value, zbx_timespec_t ts, zbx_ipc_client_t *client) { zbx_pp_task_t *task; task = pp_task_test_create(preproc, value, ts, client); pp_task_queue_lock(&manager->queue); pp_task_queue_push_test(&manager->queue, task); pp_task_queue_notify(&manager->queue); pp_task_queue_unlock(&manager->queue); } /****************************************************************************** * * * Purpose: queue value/value_sec preprocessing tasks * * * ******************************************************************************/ void zbx_pp_manager_queue_value_preproc(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks) { zbx_prof_start(__func__, ZBX_PROF_MUTEX); pp_task_queue_lock(&manager->queue); zbx_prof_end_wait(); for (int i = 0; i < tasks->values_num; i++) pp_task_queue_push(&manager->queue, tasks->values[i]); pp_task_queue_notify(&manager->queue); pp_task_queue_unlock(&manager->queue); zbx_prof_end(); } /****************************************************************************** * * * Purpose: create preprocessing task from request * * * * Parameters: manager - [IN] the manager * * itemid - [IN] the item identifier * * value - [IN] the value to preprocess, its contents will be * * directly copied over and cleared by the task * * ts - [IN] the value timestamp * * value_opt - [IN] the optional value data (optional) * * * * Return value: The created task or NULL if the data can be flushed directly.* * * ******************************************************************************/ zbx_pp_task_t *zbx_pp_manager_create_task(zbx_pp_manager_t *manager, zbx_uint64_t itemid, zbx_variant_t *value, zbx_timespec_t ts, const zbx_pp_value_opt_t *value_opt) { zbx_pp_item_t *item; if (ZBX_VARIANT_NONE == value->type) return NULL; if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &itemid))) return NULL; if (0 == item->preproc->dep_itemids_num && 0 == item->preproc->steps_num) return NULL; if (ZBX_PP_PROCESS_PARALLEL == item->preproc->mode) return pp_task_value_create(item->itemid, item->preproc, value, ts, value_opt, NULL); else return pp_task_value_seq_create(item->itemid, item->preproc, value, ts, value_opt, NULL); } /****************************************************************************** * * * Purpose: get first dependent item with preprocessing that can be cached * * * * Parameters: manager - [IN] the manager * * itemids - [IN] the dependent itemids * * itemids_num - [IN] the number of dependent itemids * * * * Return value: The first dependent item with cacheable preprocessing data * * or NULL. * * * ******************************************************************************/ static zbx_pp_item_t *pp_manager_get_cacheable_dependent_item(zbx_pp_manager_t *manager, zbx_uint64_t *itemids, int itemids_num) { zbx_pp_item_t *item; for (int i = 0; i < itemids_num; i++) { if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &itemids[i]))) continue; if (SUCCEED == pp_cache_is_supported(item->preproc)) return item; } return NULL; } /****************************************************************************** * * * Purpose: create and queue tasks for dependent items * * * * Parameters: manager - [IN] the manager * * preproc - [IN] the master item preprocessing data * * exclude_itemid - [IN] the dependent itemid to exclude, can be 0* * ts - [IN] the value timestamp * * cache - [IN] the preprocessing cache * * (optional, can be NULL) * * * * Comments: This function called within task queue lock. * * * ******************************************************************************/ static void pp_manager_queue_dependents(zbx_pp_manager_t *manager, zbx_pp_item_preproc_t *preproc, zbx_uint64_t exclude_itemid, const zbx_variant_t *value, zbx_timespec_t ts, zbx_pp_cache_t *cache) { int queued_num = 0; if (0 == preproc->dep_itemids_num) return; cache = pp_cache_copy(cache); if (NULL == cache) cache = pp_cache_create(preproc, value); for (int i = 0; i < preproc->dep_itemids_num; i++) { zbx_pp_item_t *item; zbx_pp_task_t *new_task; /* skip already preprocessed dependent item */ if (preproc->dep_itemids[i] == exclude_itemid) continue; /* skip disabled/removed items */ if (NULL == (item = (zbx_pp_item_t *)zbx_hashset_search(&manager->items, &preproc->dep_itemids[i]))) continue; if (ZBX_PP_PROCESS_PARALLEL == item->preproc->mode) new_task = pp_task_value_create(item->itemid, item->preproc, NULL, ts, NULL, cache); else new_task = pp_task_value_seq_create(item->itemid, item->preproc, NULL, ts, NULL, cache); pp_task_queue_push_immediate(&manager->queue, new_task); queued_num++; } if (0 < queued_num) pp_task_queue_notify(&manager->queue); pp_cache_release(cache); } /****************************************************************************** * * * Purpose: queue new tasks in response to finished value task * * * * Parameters: manager - [IN] the manager * * task - [IN] the finished value task * * * * Comments: This function called within task queue lock. * * * ******************************************************************************/ static void pp_manager_queue_value_task_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task) { zbx_pp_task_value_t *d = (zbx_pp_task_value_t *)PP_TASK_DATA(task); zbx_pp_item_t *item; if (ZBX_VARIANT_NONE == d->result.type) return; if (NULL != (item = pp_manager_get_cacheable_dependent_item(manager, d->preproc->dep_itemids, d->preproc->dep_itemids_num))) { zbx_pp_task_t *dep_task; zbx_variant_t value; dep_task = pp_task_dependent_create(task->itemid, d->preproc); zbx_pp_task_dependent_t *d_dep = (zbx_pp_task_dependent_t *)PP_TASK_DATA(dep_task); d_dep->cache = pp_cache_create(item->preproc, &d->result); zbx_variant_set_none(&value); d_dep->primary = pp_task_value_create(item->itemid, item->preproc, &value, d->ts, NULL, d_dep->cache); pp_task_queue_push_immediate(&manager->queue, dep_task); pp_task_queue_notify(&manager->queue); } else pp_manager_queue_dependents(manager, d->preproc, 0, &d->result, d->ts, NULL); } /****************************************************************************** * * * Purpose: queue new tasks in response to finished dependent task * * * * Parameters: manager - [IN] the manager * * task - [IN] the finished dependent task * * * * Comments: This function called within task queue lock. * * * ******************************************************************************/ static zbx_pp_task_t *pp_manager_queue_dependent_task_result(zbx_pp_manager_t *manager, zbx_pp_task_t *task) { zbx_pp_task_dependent_t *d = (zbx_pp_task_dependent_t *)PP_TASK_DATA(task); zbx_pp_task_t *task_value = d->primary; zbx_pp_task_value_t *dp = (zbx_pp_task_value_t *)PP_TASK_DATA(task_value); pp_manager_queue_value_task_result(manager, d->primary); pp_manager_queue_dependents(manager, d->preproc, task_value->itemid, &dp->result, dp->ts, d->cache); d->primary = NULL; pp_task_free(task); return task_value; } /****************************************************************************** * * * Purpose: requeue sequence task * * * * Parameters: manager - [IN] the manager * * task - [IN] the finished sequence task * * * * Comments: This function called within task queue lock. * * * ******************************************************************************/ static zbx_pp_task_t *pp_manager_requeue_next_sequence_task(zbx_pp_manager_t *manager, zbx_pp_task_t *task_seq) { zbx_pp_task_sequence_t *d_seq = (zbx_pp_task_sequence_t *)PP_TASK_DATA(task_seq); zbx_pp_task_t *task = NULL, *tmp_task; if (SUCCEED == zbx_list_pop(&d_seq->tasks, (void **)&task)) { switch (task->type) { case ZBX_PP_TASK_VALUE: case ZBX_PP_TASK_VALUE_SEQ: pp_manager_queue_value_task_result(manager, task); break; case ZBX_PP_TASK_DEPENDENT: task = pp_manager_queue_dependent_task_result(manager, task); break; default: THIS_SHOULD_NEVER_HAPPEN; break; } } if (SUCCEED == zbx_list_peek(&d_seq->tasks, (void **)&tmp_task)) { pp_task_queue_push_immediate(&manager->queue, task_seq); pp_task_queue_notify(&manager->queue); } else { pp_task_queue_remove_sequence(&manager->queue, task_seq->itemid); pp_task_free(task_seq); } return task; } #define PP_FINISHED_TASK_BATCH_SIZE 100 /****************************************************************************** * * * Purpose: process finished tasks * * * * Parameters: manager - [IN] the manager * * * ******************************************************************************/ void zbx_pp_manager_process_finished(zbx_pp_manager_t *manager, zbx_vector_pp_task_ptr_t *tasks, zbx_uint64_t *pending_num, zbx_uint64_t *processing_num, zbx_uint64_t *finished_num) { zbx_pp_task_t *task; static time_t timekeeper_clock = 0; time_t now; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_vector_pp_task_ptr_reserve(tasks, PP_FINISHED_TASK_BATCH_SIZE); zbx_prof_start(__func__, ZBX_PROF_MUTEX); pp_task_queue_lock(&manager->queue); zbx_prof_end_wait(); while (PP_FINISHED_TASK_BATCH_SIZE > tasks->values_num) { if (NULL != (task = pp_task_queue_pop_finished(&manager->queue))) { switch (task->type) { case ZBX_PP_TASK_VALUE: pp_manager_queue_value_task_result(manager, task); break; case ZBX_PP_TASK_DEPENDENT: task = pp_manager_queue_dependent_task_result(manager, task); break; case ZBX_PP_TASK_SEQUENCE: task = pp_manager_requeue_next_sequence_task(manager, task); break; default: break; } } if (NULL == task) break; zbx_vector_pp_task_ptr_append(tasks, task); } *pending_num = manager->queue.pending_num; *finished_num = manager->queue.finished_num; *processing_num = manager->queue.processing_num; pp_task_queue_unlock(&manager->queue); zbx_prof_end(); now = time(NULL); if (now != timekeeper_clock) { zbx_timekeeper_collect(manager->timekeeper); timekeeper_clock = now; } zabbix_log(LOG_LEVEL_DEBUG, "End of %s() values_num:%d", __func__, tasks->values_num); } /****************************************************************************** * * * Purpose: dump cached item information into log * * * * Parameters: manager - [IN] the manager * * * ******************************************************************************/ void zbx_pp_manager_dump_items(zbx_pp_manager_t *manager) { zbx_hashset_iter_t iter; zbx_pp_item_t *item; zbx_hashset_iter_reset(&manager->items, &iter); while (NULL != (item = (zbx_pp_item_t *)zbx_hashset_iter_next(&iter))) { zabbix_log(LOG_LEVEL_TRACE, "itemid:" ZBX_FS_UI64 " hostid:" ZBX_FS_UI64 " revision:" ZBX_FS_UI64 " type:%u value_type:%u mode:%u flags:%u", item->itemid, item->hostid, item->revision, item->preproc->type, item->preproc->value_type, item->preproc->mode, item->preproc->flags); zabbix_log(LOG_LEVEL_TRACE, " preprocessing steps:"); for (int i = 0; i < item->preproc->steps_num; i++) { zabbix_log(LOG_LEVEL_TRACE, " type:%d params:'%s' err_handler:%d err_params:'%s'", item->preproc->steps[i].type, ZBX_NULL2EMPTY_STR(item->preproc->steps[i].params), item->preproc->steps[i].error_handler, ZBX_NULL2EMPTY_STR(item->preproc->steps[i].error_handler_params)); } zabbix_log(LOG_LEVEL_TRACE, " dependent items:"); for (int i = 0; i < item->preproc->dep_itemids_num; i++) zabbix_log(LOG_LEVEL_TRACE, " " ZBX_FS_UI64, item->preproc->dep_itemids[i]); } } /****************************************************************************** * * * Purpose: get manager configuration revision * * * ******************************************************************************/ zbx_uint64_t zbx_pp_manager_get_revision(const zbx_pp_manager_t *manager) { return manager->revision; } /****************************************************************************** * * * Purpose: set manager configuration revision * * * ******************************************************************************/ void zbx_pp_manager_set_revision(zbx_pp_manager_t *manager, zbx_uint64_t revision) { manager->revision = revision; } /****************************************************************************** * * * Purpose: get item configuration data for reading and updates * * * ******************************************************************************/ zbx_hashset_t *zbx_pp_manager_items(zbx_pp_manager_t *manager) { return &manager->items; } /****************************************************************************** * * * Purpose: get number of pending preprocessing tasks * * * ******************************************************************************/ zbx_uint64_t zbx_pp_manager_get_pending_num(zbx_pp_manager_t *manager) { return manager->queue.pending_num; } /****************************************************************************** * * * Purpose: get diagnostic statistics * * * ******************************************************************************/ void zbx_pp_manager_get_diag_stats(zbx_pp_manager_t *manager, zbx_uint64_t *preproc_num, zbx_uint64_t *pending_num, zbx_uint64_t *finished_num, zbx_uint64_t *sequences_num) { *preproc_num = (zbx_uint64_t)manager->items.num_data; *pending_num = manager->queue.pending_num; *finished_num = manager->queue.finished_num; *sequences_num = (zbx_uint64_t)manager->queue.sequences.num_data; } /****************************************************************************** * * * Purpose: get task sequence statistics * * * ******************************************************************************/ void zbx_pp_manager_get_sequence_stats(zbx_pp_manager_t *manager, zbx_vector_pp_sequence_stats_ptr_t *sequences) { pp_task_queue_get_sequence_stats(&manager->queue, sequences); } /****************************************************************************** * * * Purpose: get worker usage statistics * * * ******************************************************************************/ void zbx_pp_manager_get_worker_usage(zbx_pp_manager_t *manager, zbx_vector_dbl_t *worker_usage) { (void)zbx_timekeeper_get_usage(manager->timekeeper, worker_usage); } /****************************************************************************** * * * Purpose: change worker log level * * * ******************************************************************************/ void zbx_pp_manager_change_worker_loglevel(zbx_pp_manager_t *manager, int worker_num, int direction) { if (0 > worker_num || manager->workers_num < worker_num) { zabbix_log(LOG_LEVEL_INFORMATION, "Cannot change log level for preprocessing worker #%d:" " no such instance", worker_num); return; } for (int i = 0; i < manager->workers_num; i++) { if (0 != worker_num && worker_num != i + 1) continue; zbx_change_component_log_level(&manager->workers[i].logger, direction); } }