/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "pp_queue.h" #include "pp_task.h" #include "zbxcommon.h" #include "zbxalgo.h" #define PP_TASK_QUEUE_INIT_NONE 0x00 #define PP_TASK_QUEUE_INIT_LOCK 0x01 #define PP_TASK_QUEUE_INIT_EVENT 0x02 ZBX_PTR_VECTOR_IMPL(pp_top_stats_ptr, zbx_pp_top_stats_t *) /* task sequence registry by itemid */ typedef struct { zbx_uint64_t itemid; zbx_pp_task_t *task; } zbx_pp_item_task_sequence_t; /****************************************************************************** * * * Purpose: initialize task queue * * * * Parameters: queue - [IN] task queue * * error - [OUT] * * * * Return value: SUCCEED - the task queue was initialized successfully * * FAIL - otherwise * * * ******************************************************************************/ int pp_task_queue_init(zbx_pp_queue_t *queue, char **error) { int err, ret = FAIL; queue->workers_num = 0; queue->pending_num = 0; queue->finished_num = 0; queue->processing_num = 0; zbx_list_create(&queue->pending); zbx_list_create(&queue->immediate); zbx_list_create(&queue->finished); zbx_hashset_create(&queue->sequences, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); if (0 != (err = pthread_mutex_init(&queue->lock, NULL))) { *error = zbx_dsprintf(NULL, "cannot initialize task queue mutex: %s", zbx_strerror(err)); goto out; } queue->init_flags |= PP_TASK_QUEUE_INIT_LOCK; if (0 != (err = pthread_cond_init(&queue->event, NULL))) { *error = zbx_dsprintf(NULL, "cannot initialize task queue conditional variable: %s", zbx_strerror(err)); goto out; } queue->init_flags |= PP_TASK_QUEUE_INIT_EVENT; ret = SUCCEED; out: if (FAIL == ret) pp_task_queue_destroy(queue); return ret; } /****************************************************************************** * * * Purpose: clear task list * * * ******************************************************************************/ static void pp_task_queue_clear_tasks(zbx_list_t *tasks) { zbx_pp_task_t *task = NULL; while (SUCCEED == zbx_list_pop(tasks, (void **)&task)) pp_task_free(task); } /****************************************************************************** * * * Purpose: destroy task queue * * * ******************************************************************************/ void pp_task_queue_destroy(zbx_pp_queue_t *queue) { if (0 != (queue->init_flags & PP_TASK_QUEUE_INIT_LOCK)) pthread_mutex_destroy(&queue->lock); if (0 != (queue->init_flags & PP_TASK_QUEUE_INIT_EVENT)) pthread_cond_destroy(&queue->event); pp_task_queue_clear_tasks(&queue->pending); zbx_list_destroy(&queue->pending); pp_task_queue_clear_tasks(&queue->immediate); zbx_list_destroy(&queue->immediate); pp_task_queue_clear_tasks(&queue->finished); zbx_list_destroy(&queue->finished); zbx_hashset_destroy(&queue->sequences); queue->init_flags = PP_TASK_QUEUE_INIT_NONE; } /****************************************************************************** * * * Purpose: lock task queue * * * ******************************************************************************/ void pp_task_queue_lock(zbx_pp_queue_t *queue) { pthread_mutex_lock(&queue->lock); } /****************************************************************************** * * * Purpose: unlock task queue * * * ******************************************************************************/ void pp_task_queue_unlock(zbx_pp_queue_t *queue) { pthread_mutex_unlock(&queue->lock); } /****************************************************************************** * * * Purpose: register a new worker * * * ******************************************************************************/ void pp_task_queue_register_worker(zbx_pp_queue_t *queue) { queue->workers_num++; } /****************************************************************************** * * * Purpose: deregister a worker * * * ******************************************************************************/ void pp_task_queue_deregister_worker(zbx_pp_queue_t *queue) { queue->workers_num--; } /****************************************************************************** * * * Purpose: add task to an existing sequence or create/append to a new one * * * * Parameters: queue - [IN] task queue * * task - [IN] task to add * * * * Return value: The created sequence task or NULL if task was added to an * * existing sequence. * * * ******************************************************************************/ static zbx_pp_task_t *pp_task_queue_add_sequence(zbx_pp_queue_t *queue, zbx_pp_task_t *task) { zbx_pp_item_task_sequence_t *sequence; zbx_pp_task_t *new_task; if (NULL == (sequence = (zbx_pp_item_task_sequence_t *)zbx_hashset_search(&queue->sequences, &task->itemid))) { zbx_pp_item_task_sequence_t sequence_local = {.itemid = task->itemid}; sequence = (zbx_pp_item_task_sequence_t *)zbx_hashset_insert(&queue->sequences, &sequence_local, sizeof(sequence_local)); sequence->task = pp_task_sequence_create(task->itemid); new_task = sequence->task; } else new_task = NULL; zbx_pp_task_sequence_t *d_seq = (zbx_pp_task_sequence_t *)PP_TASK_DATA(sequence->task); (void)zbx_list_append(&d_seq->tasks, task, NULL); return new_task; } /****************************************************************************** * * * Purpose: queue task to be processed before normal tasks * * * * Parameters: queue - [IN] task queue * * task - [IN] task to push * * * ******************************************************************************/ void pp_task_queue_push_immediate(zbx_pp_queue_t *queue, zbx_pp_task_t *task) { switch (task->type) { case ZBX_PP_TASK_VALUE_SEQ: case ZBX_PP_TASK_DEPENDENT: queue->pending_num++; if (NULL == (task = pp_task_queue_add_sequence(queue, task))) return; break; case ZBX_PP_TASK_SEQUENCE: /* sequence task is just a container for other tasks - it does not affect statistics, */ /* so there is no need to increment queue->pending_num */ break; default: queue->pending_num++; break; } (void)zbx_list_append(&queue->immediate, task, NULL); } /****************************************************************************** * * * Purpose: remove task sequence * * * * Parameters: queue - [IN] task queue * * itemid - [IN] task sequence itemid * * * ******************************************************************************/ void pp_task_queue_remove_sequence(zbx_pp_queue_t *queue, zbx_uint64_t itemid) { zbx_hashset_remove(&queue->sequences, &itemid); } /****************************************************************************** * * * Purpose: queue test task to be processed * * * * Parameters: queue - [IN] task queue * * task - [IN] task * * * ******************************************************************************/ void pp_task_queue_push_test(zbx_pp_queue_t *queue, zbx_pp_task_t *task) { queue->pending_num++; (void)zbx_list_append(&queue->immediate, task, NULL); } /****************************************************************************** * * * Purpose: queue normal task to be processed * * * * Parameters: queue - [IN] task queue * * task - [IN] task * * * * Comments: This function is used to push tasks created by new preprocessing * * or testing requests. * * * ******************************************************************************/ void pp_task_queue_push(zbx_pp_queue_t *queue, zbx_pp_task_t *task) { zbx_pp_task_value_t *d = (zbx_pp_task_value_t *)PP_TASK_DATA(task); queue->pending_num++; if (ITEM_TYPE_INTERNAL != d->preproc->type) { (void)zbx_list_append(&queue->pending, task, NULL); return; } if (ZBX_PP_TASK_VALUE == task->type) { (void)zbx_list_append(&queue->immediate, task, NULL); return; } zbx_pp_task_t *seq_task; if (NULL != (seq_task = pp_task_queue_add_sequence(queue, task))) (void)zbx_list_append(&queue->immediate, seq_task, NULL); } /****************************************************************************** * * * Purpose: pop task from task queue * * * * Parameters: queue - [IN] task queue * * * * Return value: The popped task or NULL if there are no tasks to be * * processed. * * * * Comments: This function is used by workers to pop tasks for processing. * * Sequence tasks will be moved to existing tasks sequences or * * returned if there are no registered sequences for this item. * * * ******************************************************************************/ zbx_pp_task_t *pp_task_queue_pop_new(zbx_pp_queue_t *queue) { zbx_pp_task_t *task = NULL; if (SUCCEED == zbx_list_pop(&queue->immediate, (void **)&task)) { /* while sequence tasks do not affect statistics, the first task in sequence */ /* does, so the statistics can be updated for all tasks */ queue->pending_num--; queue->processing_num++; return (zbx_pp_task_t *)task; } while (SUCCEED == zbx_list_pop(&queue->pending, (void **)&task)) { if (ZBX_PP_TASK_VALUE_SEQ == task->type) { /* task is being moved from pending to immediate queue */ /* while still pending, so statistics are not affected */ task = pp_task_queue_add_sequence(queue, task); } if (NULL != task) { queue->pending_num--; queue->processing_num++; return task; } } return NULL; } /****************************************************************************** * * * Purpose: push finished task into queue * * * * Parameters: queue - [IN] task queue * * task - [IN] task * * * ******************************************************************************/ void pp_task_queue_push_finished(zbx_pp_queue_t *queue, zbx_pp_task_t *task) { queue->finished_num++; queue->processing_num--; (void)zbx_list_append(&queue->finished, task, NULL); } /****************************************************************************** * * * Purpose: pop finished task from queue * * * * Parameters: queue - [IN] task queue * * * * Return value: The popped task or NULL if there are no finished tasks. * * * ******************************************************************************/ zbx_pp_task_t *pp_task_queue_pop_finished(zbx_pp_queue_t *queue) { zbx_pp_task_t *task; if (SUCCEED == zbx_list_pop(&queue->finished, (void **)&task)) { queue->finished_num--; return task; } return NULL; } /****************************************************************************** * * * Purpose: wait for queue notifications * * * * Parameters: queue - [IN] task queue * * error - [IN] * * * * Return value: SUCCEED - the wait succeeded * * FAIL - an error has occurred * * * * Comments: This function is used by workers to wait for new tasks. * * * ******************************************************************************/ int pp_task_queue_wait(zbx_pp_queue_t *queue, char **error) { int err; if (0 != (err = pthread_cond_wait(&queue->event, &queue->lock))) { *error = zbx_dsprintf(NULL, "cannot wait for conditional variable: %s", zbx_strerror(err)); return FAIL; } return SUCCEED; } /****************************************************************************** * * * Purpose: notify one worker * * * * Parameters: queue - [IN] task queue * * * * Comments: This function is used by manager to notify a worker when a new * * task has been queued. * * * ******************************************************************************/ void pp_task_queue_notify(zbx_pp_queue_t *queue) { int err; if (0 != (err = pthread_cond_signal(&queue->event))) { zabbix_log(LOG_LEVEL_WARNING, "cannot signal conditional variable: %s", zbx_strerror(err)); } } /****************************************************************************** * * * Purpose: notify all workers * * * * Parameters: queue - [IN] task queue * * * * Comments: This function is used by manager to notify workers when either * * multiple tasks have been pushed or when stopping workers. * * * ******************************************************************************/ void pp_task_queue_notify_all(zbx_pp_queue_t *queue) { int err; if (0 != (err = pthread_cond_broadcast(&queue->event))) { zabbix_log(LOG_LEVEL_WARNING, "cannot broadcast conditional variable: %s", zbx_strerror(err)); } } /****************************************************************************** * * * Purpose: get registered task sequence statistics sorted by number of tasks * * in descending order * * * * Parameters: queue - [IN] task queue * * stats - [IN] sequence statistics * * * ******************************************************************************/ void pp_task_queue_get_sequence_stats(zbx_pp_queue_t *queue, zbx_vector_pp_top_stats_ptr_t *stats) { zbx_hashset_iter_t iter; zbx_pp_item_task_sequence_t *sequence; zbx_pp_top_stats_t *stat; zbx_list_iterator_t li; pp_task_queue_lock(queue); zbx_hashset_iter_reset(&queue->sequences, &iter); while (NULL != (sequence = (zbx_pp_item_task_sequence_t *)zbx_hashset_iter_next(&iter))) { stat = (zbx_pp_top_stats_t *)zbx_malloc(NULL, sizeof(zbx_pp_top_stats_t)); stat->tasks_num = 0; stat->itemid = sequence->itemid; zbx_pp_task_sequence_t *d_seq = (zbx_pp_task_sequence_t *)PP_TASK_DATA(sequence->task); if (NULL != d_seq->tasks.head) { zbx_list_iterator_init(&d_seq->tasks, &li); do { stat->tasks_num++; } while (SUCCEED == zbx_list_iterator_next(&li)); } zbx_vector_pp_top_stats_ptr_append(stats, stat); } pp_task_queue_unlock(queue); }