#include "async_manager.h"
#include "async_worker.h"
#include "async_queue.h"
#include "zbxstr.h"
#include "zbxalgo.h"
#include "zbxpoller.h"
ZBX_PTR_VECTOR_IMPL(interface_status, zbx_interface_status_t *)
ZBX_PTR_VECTOR_IMPL(poller_item, zbx_poller_item_t *)
struct zbx_async_manager
{
zbx_async_worker_t *workers;
int workers_num;
zbx_async_queue_t queue;
};
zbx_async_manager_t *zbx_async_manager_create(int workers_num, zbx_async_notify_cb_t finished_cb,
void *finished_data, zbx_thread_poller_args *poller_args_in, char **error)
{
int ret = FAIL, started_num = 0;
time_t time_start;
struct timespec poll_delay = {0, 1e8};
zbx_async_manager_t *manager;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, workers_num);
manager = (zbx_async_manager_t *)zbx_malloc(NULL, sizeof(zbx_async_manager_t));
memset(manager, 0, sizeof(zbx_async_manager_t));
if (SUCCEED != async_task_queue_init(&manager->queue, poller_args_in, error))
goto out;
manager->workers_num = workers_num;
manager->workers = (zbx_async_worker_t *)zbx_calloc(NULL, (size_t)workers_num, sizeof(zbx_async_worker_t));
for (int i = 0; i < workers_num; i++)
{
if (SUCCEED != async_worker_init(&manager->workers[i], &manager->queue,
poller_args_in->zbx_get_progname_cb_arg(), error))
{
goto out;
}
async_worker_set_finished_cb(&manager->workers[i], finished_cb, finished_data);
}
time_start = time(NULL);
#define PP_STARTUP_TIMEOUT 10
while (started_num != workers_num)
{
if (time_start + PP_STARTUP_TIMEOUT < time(NULL))
{
*error = zbx_strdup(NULL, "timeout occurred while waiting for workers to start");
goto out;
}
pthread_mutex_lock(&manager->queue.lock);
started_num = manager->queue.workers_num;
pthread_mutex_unlock(&manager->queue.lock);
nanosleep(&poll_delay, NULL);
}
#undef PP_STARTUP_TIMEOUT
ret = SUCCEED;
out:
if (FAIL == ret)