/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "async_manager.h" #include "async_worker.h" #include "async_queue.h" #include "zbxstr.h" #include "zbxalgo.h" #include "zbxpoller.h" ZBX_PTR_VECTOR_IMPL(interface_status, zbx_interface_status_t *) ZBX_PTR_VECTOR_IMPL(poller_item, zbx_poller_item_t *) struct zbx_async_manager { zbx_async_worker_t *workers; int workers_num; zbx_async_queue_t queue; }; zbx_async_manager_t *zbx_async_manager_create(int workers_num, zbx_async_notify_cb_t finished_cb, void *finished_data, zbx_thread_poller_args *poller_args_in, char **error) { int ret = FAIL, started_num = 0; time_t time_start; struct timespec poll_delay = {0, 1e8}; zbx_async_manager_t *manager; zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, workers_num); manager = (zbx_async_manager_t *)zbx_malloc(NULL, sizeof(zbx_async_manager_t)); memset(manager, 0, sizeof(zbx_async_manager_t)); if (SUCCEED != async_task_queue_init(&manager->queue, poller_args_in, error)) goto out; manager->workers_num = workers_num; manager->workers = (zbx_async_worker_t *)zbx_calloc(NULL, (size_t)workers_num, sizeof(zbx_async_worker_t)); for (int i = 0; i < workers_num; i++) { if (SUCCEED != async_worker_init(&manager->workers[i], &manager->queue, poller_args_in->progname, error)) { goto out; } async_worker_set_finished_cb(&manager->workers[i], finished_cb, finished_data); } /* wait for threads to start */ time_start = time(NULL); #define PP_STARTUP_TIMEOUT 10 while (started_num != workers_num) { if (time_start + PP_STARTUP_TIMEOUT < time(NULL)) { *error = zbx_strdup(NULL, "timeout occurred while waiting for workers to start"); goto out; } pthread_mutex_lock(&manager->queue.lock); started_num = manager->queue.workers_num; pthread_mutex_unlock(&manager->queue.lock); nanosleep(&poll_delay, NULL); } #undef PP_STARTUP_TIMEOUT ret = SUCCEED; out: if (FAIL == ret) { for (int i = 0; i < manager->workers_num; i++) async_worker_stop(&manager->workers[i]); async_task_queue_destroy(&manager->queue); zbx_free(manager); manager = NULL; } zabbix_log(LOG_LEVEL_DEBUG, "End of %s() ret:%s error:%s", __func__, zbx_result_string(ret), ZBX_NULL2EMPTY_STR(*error)); return manager; } void zbx_async_manager_free(zbx_async_manager_t *manager) { async_task_queue_lock(&manager->queue); for (int i = 0; i < manager->workers_num; i++) async_worker_stop(&manager->workers[i]); async_task_queue_notify(&manager->queue); async_task_queue_unlock(&manager->queue); for (int i = 0; i < manager->workers_num; i++) async_worker_destroy(&manager->workers[i]); zbx_free(manager->workers); async_task_queue_destroy(&manager->queue); zbx_free(manager); } void zbx_async_manager_queue_get(zbx_async_manager_t *manager, zbx_vector_poller_item_t *poller_items) { async_task_queue_lock(&manager->queue); if (0 != manager->queue.poller_items.values_num) { zbx_vector_poller_item_append_array(poller_items, manager->queue.poller_items.values, manager->queue.poller_items.values_num); zbx_vector_poller_item_clear(&manager->queue.poller_items); } async_task_queue_unlock(&manager->queue); } void zbx_async_manager_requeue(zbx_async_manager_t *manager, zbx_uint64_t itemid, int errcode, int lastclock) { async_task_queue_lock(&manager->queue); zbx_vector_uint64_append(&manager->queue.itemids, itemid); zbx_vector_int32_append(&manager->queue.errcodes, errcode); zbx_vector_int32_append(&manager->queue.lastclocks, lastclock); async_task_queue_unlock(&manager->queue); } void zbx_async_manager_requeue_flush(zbx_async_manager_t *manager) { async_task_queue_lock(&manager->queue); if (0 != manager->queue.itemids.values_num) async_task_queue_notify(&manager->queue); async_task_queue_unlock(&manager->queue); } void zbx_async_manager_queue_sync(zbx_async_manager_t *manager) { async_task_queue_lock(&manager->queue); manager->queue.check_queue = 1; async_task_queue_notify(&manager->queue); async_task_queue_unlock(&manager->queue); } void zbx_async_manager_interfaces_flush(zbx_async_manager_t *manager, zbx_hashset_t *interfaces) { zbx_hashset_iter_t iter; zbx_interface_status_t *interface_status; async_task_queue_lock(&manager->queue); zbx_hashset_iter_reset(interfaces, &iter); while (NULL != (interface_status = (zbx_interface_status_t *)zbx_hashset_iter_next(&iter))) { zbx_interface_status_t *interface_status_ptr = zbx_malloc(NULL, sizeof(zbx_interface_status_t)); *interface_status_ptr = *interface_status; zbx_vector_interface_status_append(&manager->queue.interfaces, interface_status_ptr); interface_status->key_orig = NULL; interface_status->error = NULL; } async_task_queue_unlock(&manager->queue); zbx_hashset_clear(interfaces); } void zbx_interface_status_clean(zbx_interface_status_t *interface_status) { zbx_free(interface_status->key_orig); zbx_free(interface_status->error); } void zbx_interface_status_free(zbx_interface_status_t *interface_status) { zbx_interface_status_clean(interface_status); zbx_free(interface_status); } void zbx_poller_item_free(zbx_poller_item_t *poller_item) { zbx_clean_items(poller_item->items, poller_item->num, poller_item->results); zbx_dc_config_clean_items(poller_item->items, NULL, (size_t)poller_item->num); zbx_free(poller_item->results); zbx_free(poller_item->errcodes); zbx_free(poller_item->items); zbx_free(poller_item); }