/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "pp_worker.h" #include "pp_task.h" #include "pp_queue.h" #include "pp_execute.h" #include "zbxcommon.h" #include "zbxself.h" #include "zbxpreproc.h" #include "zbxalgo.h" #include "zbxregexp.h" #include "zbxthreads.h" #define PP_WORKER_INIT_NONE 0x00 #define PP_WORKER_INIT_THREAD 0x01 /****************************************************************************** * * * Purpose: process preprocessing testing task * * * ******************************************************************************/ static void pp_task_process_test(zbx_pp_context_t *ctx, zbx_pp_task_t *task, const char *config_source_ip) { zbx_pp_task_test_t *d = (zbx_pp_task_test_t *)PP_TASK_DATA(task); pp_execute(ctx, d->preproc, NULL, NULL, &d->value, d->ts, config_source_ip, &d->result, &d->results, &d->results_num); } /****************************************************************************** * * * Purpose: process value preprocessing task * * * ******************************************************************************/ static void pp_task_process_value(zbx_pp_context_t *ctx, zbx_pp_task_t *task, const char *config_source_ip) { zbx_pp_task_value_t *d = (zbx_pp_task_value_t *)PP_TASK_DATA(task); pp_execute(ctx, d->preproc, d->cache, d->um_handle, &d->value, d->ts, config_source_ip, &d->result, NULL, NULL); } /****************************************************************************** * * * Purpose: process dependent preprocessing task * * * ******************************************************************************/ static void pp_task_process_dependent(zbx_pp_context_t *ctx, zbx_pp_task_t *task, const char *config_source_ip) { zbx_pp_task_dependent_t *d = (zbx_pp_task_dependent_t *)PP_TASK_DATA(task); zbx_pp_task_value_t *d_first = (zbx_pp_task_value_t *)PP_TASK_DATA(d->primary); pp_execute(ctx, d_first->preproc, d->cache, d_first->um_handle, &d_first->value, d_first->ts, config_source_ip, &d_first->result, NULL, NULL); } /****************************************************************************** * * * Purpose: process first task in sequence task * * * ******************************************************************************/ static void pp_task_process_sequence(zbx_pp_context_t *ctx, zbx_pp_task_t *task_seq, const char *config_source_ip) { zbx_pp_task_sequence_t *d_seq = (zbx_pp_task_sequence_t *)PP_TASK_DATA(task_seq); zbx_pp_task_t *task; if (SUCCEED == zbx_list_peek(&d_seq->tasks, (void **)&task)) { switch (task->type) { case ZBX_PP_TASK_VALUE: case ZBX_PP_TASK_VALUE_SEQ: pp_task_process_value(ctx, task, config_source_ip); break; case ZBX_PP_TASK_DEPENDENT: pp_task_process_dependent(ctx, task, config_source_ip); break; default: THIS_SHOULD_NEVER_HAPPEN; break; } } } /****************************************************************************** * * * Purpose: preprocessing worker thread entry * * * ******************************************************************************/ static void *pp_worker_entry(void *args) { zbx_pp_worker_t *worker = (zbx_pp_worker_t *)args; zbx_pp_queue_t *queue = worker->queue; zbx_pp_task_t *in; char *error = NULL, component[MAX_ID_LEN + 1]; sigset_t mask; int err; zbx_snprintf(component, sizeof(component), "%d", worker->id); zbx_set_log_component(component, &worker->logger); zabbix_log(LOG_LEVEL_INFORMATION, "thread started [%s #%d]", get_process_type_string(ZBX_PROCESS_TYPE_PREPROCESSOR), worker->id); zbx_init_regexp_env(); sigemptyset(&mask); sigaddset(&mask, SIGTERM); sigaddset(&mask, SIGUSR1); sigaddset(&mask, SIGUSR2); sigaddset(&mask, SIGHUP); sigaddset(&mask, SIGQUIT); sigaddset(&mask, SIGINT); if (0 != (err = pthread_sigmask(SIG_BLOCK, &mask, NULL))) zabbix_log(LOG_LEVEL_WARNING, "cannot block signals: %s", zbx_strerror(err)); worker->stop = 0; pp_context_init(&worker->execute_ctx); pp_task_queue_lock(queue); pp_task_queue_register_worker(queue); while (0 == worker->stop) { if (NULL != (in = pp_task_queue_pop_new(queue))) { pp_task_queue_unlock(queue); zbx_timekeeper_update(worker->timekeeper, worker->id - 1, ZBX_PROCESS_STATE_BUSY); zabbix_log(LOG_LEVEL_TRACE, "%s() process task type:%u itemid:" ZBX_FS_UI64, __func__, in->type, in->itemid); switch (in->type) { case ZBX_PP_TASK_TEST: pp_task_process_test(&worker->execute_ctx, in, worker->config_source_ip); break; case ZBX_PP_TASK_VALUE: case ZBX_PP_TASK_VALUE_SEQ: pp_task_process_value(&worker->execute_ctx, in, worker->config_source_ip); break; case ZBX_PP_TASK_DEPENDENT: pp_task_process_dependent(&worker->execute_ctx, in, worker->config_source_ip); break; case ZBX_PP_TASK_SEQUENCE: pp_task_process_sequence(&worker->execute_ctx, in, worker->config_source_ip); break; } zbx_timekeeper_update(worker->timekeeper, worker->id - 1, ZBX_PROCESS_STATE_IDLE); pp_task_queue_lock(queue); pp_task_queue_push_finished(queue, in); if (NULL != worker->finished_cb) worker->finished_cb(worker->finished_data); continue; } if (SUCCEED != pp_task_queue_wait(queue, &error)) { zabbix_log(LOG_LEVEL_WARNING, "[%d] %s", worker->id, error); zbx_free(error); worker->stop = 1; } if (1 < queue->pending_num) pp_task_queue_notify(queue); } pp_task_queue_deregister_worker(queue); pp_task_queue_unlock(queue); zabbix_log(LOG_LEVEL_INFORMATION, "thread stopped [%s #%d]", get_process_type_string(ZBX_PROCESS_TYPE_PREPROCESSOR), worker->id); return NULL; } /****************************************************************************** * * * Purpose: initialize and start preprocessing worker * * * * Parameters: worker - [IN] preprocessing worker * * id - [IN] worker id (index) * * queue - [IN] task queue * * timekeeper - [IN] timekeeper object for busy/idle worker * * state reporting * * config_source_ip - [IN] * * error - [OUT] * * * * Return value: SUCCEED - the worker was initialized and started * * FAIL - otherwise * * * ******************************************************************************/ int pp_worker_init(zbx_pp_worker_t *worker, int id, zbx_pp_queue_t *queue, zbx_timekeeper_t *timekeeper, const char *config_source_ip, char **error) { int err, ret = FAIL; pthread_attr_t attr; worker->id = id; worker->queue = queue; worker->timekeeper = timekeeper; worker->config_source_ip = config_source_ip; zbx_pthread_init_attr(&attr); if (0 != (err = pthread_create(&worker->thread, &attr, pp_worker_entry, (void *)worker))) { *error = zbx_dsprintf(NULL, "cannot create thread: %s", zbx_strerror(err)); goto out; } worker->init_flags |= PP_WORKER_INIT_THREAD; ret = SUCCEED; out: if (FAIL == ret) pp_worker_stop(worker); return err; } /****************************************************************************** * * * Purpose: stop the worker thread * * * ******************************************************************************/ void pp_worker_stop(zbx_pp_worker_t *worker) { if (0 != (worker->init_flags & PP_WORKER_INIT_THREAD)) worker->stop = 1; } /****************************************************************************** * * * Purpose: destroy the worker * * * ******************************************************************************/ void pp_worker_destroy(zbx_pp_worker_t *worker) { if (0 != (worker->init_flags & PP_WORKER_INIT_THREAD)) { void *retval; pthread_join(worker->thread, &retval); } pp_context_destroy(&worker->execute_ctx); worker->init_flags = PP_WORKER_INIT_NONE; } /****************************************************************************** * * * Purpose: set callback to call after task is processed * * * * Parameters: worker - [IN] the preprocessing worker * * finished_cb - [IN] a callback to call after finishing * * task * * finished_data - [IN] the callback data * * * ******************************************************************************/ void pp_worker_set_finished_cb(zbx_pp_worker_t *worker, zbx_pp_notify_cb_t finished_cb, void *finished_data) { worker->finished_cb = finished_cb; worker->finished_data = finished_data; }