/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see .
**/
#include "zbxdiscoverer.h"
#include "zbxlog.h"
#include "zbxcacheconfig.h"
#include "zbxicmpping.h"
#include "zbxdiscovery.h"
#include "zbxexpression.h"
#include "zbxself.h"
#include "zbxrtc.h"
#include "zbxnix.h"
#include "zbxnum.h"
#include "zbxtime.h"
#include "zbxip.h"
#include "zbxsysinfo.h"
#include "zbx_rtc_constants.h"
#include "discoverer_queue.h"
#include "discoverer_job.h"
#include "discoverer_async.h"
#include "zbx_discoverer_constants.h"
#include "discoverer_taskprep.h"
#include "discoverer_int.h"
#include "zbxtimekeeper.h"
#include "zbxpoller.h"
#include "zbxalgo.h"
#include "zbxcomms.h"
#include "zbxdb.h"
#include "zbxdbhigh.h"
#include "zbxipcservice.h"
#include "zbxstr.h"
#include "zbxthreads.h"
#ifdef HAVE_LDAP
# include
#endif
static ZBX_THREAD_LOCAL int log_worker_id;
static zbx_get_progname_f zbx_get_progname_cb = NULL;
static zbx_get_program_type_f zbx_get_program_type_cb = NULL;
ZBX_PTR_VECTOR_IMPL(discoverer_services_ptr, zbx_discoverer_dservice_t*)
ZBX_PTR_VECTOR_IMPL(discoverer_results_ptr, zbx_discoverer_results_t*)
ZBX_PTR_VECTOR_IMPL(discoverer_jobs_ptr, zbx_discoverer_job_t*)
#define ZBX_DISCOVERER_STARTUP_TIMEOUT 30
static zbx_discoverer_manager_t dmanager;
ZBX_VECTOR_IMPL(portrange, zbx_range_t)
ZBX_PTR_VECTOR_IMPL(ds_dcheck_ptr, zbx_ds_dcheck_t *)
ZBX_PTR_VECTOR_IMPL(discoverer_drule_error, zbx_discoverer_drule_error_t)
/******************************************************************************
* *
* Purpose: clear job error *
* *
******************************************************************************/
void zbx_discoverer_drule_error_free(zbx_discoverer_drule_error_t value)
{
zbx_free(value.error);
}
static zbx_hash_t discoverer_check_count_hash(const void *data)
{
const zbx_discoverer_check_count_t *count = (const zbx_discoverer_check_count_t *)data;
zbx_hash_t hash;
hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&count->druleid);
hash = ZBX_DEFAULT_STRING_HASH_ALGO(count->ip, strlen(count->ip), hash);
return hash;
}
static int discoverer_check_count_compare(const void *d1, const void *d2)
{
const zbx_discoverer_check_count_t *count1 = (const zbx_discoverer_check_count_t *)d1;
const zbx_discoverer_check_count_t *count2 = (const zbx_discoverer_check_count_t *)d2;
ZBX_RETURN_IF_NOT_EQUAL(count1->druleid, count2->druleid);
return strcmp(count1->ip, count2->ip);
}
static zbx_hash_t discoverer_result_hash(const void *data)
{
const zbx_discoverer_results_t *result = (const zbx_discoverer_results_t *)data;
zbx_hash_t hash;
hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&result->druleid);
hash = ZBX_DEFAULT_STRING_HASH_ALGO(result->ip, strlen(result->ip), hash);
return hash;
}
static int discoverer_result_compare(const void *d1, const void *d2)
{
const zbx_discoverer_results_t *r1 = (const zbx_discoverer_results_t *)d1;
const zbx_discoverer_results_t *r2 = (const zbx_discoverer_results_t *)d2;
ZBX_RETURN_IF_NOT_EQUAL(r1->druleid, r2->druleid);
return strcmp(r1->ip, r2->ip);
}
void discoverer_ds_dcheck_free(zbx_ds_dcheck_t *ds_dcheck)
{
zbx_free(ds_dcheck->dcheck.key_);
zbx_free(ds_dcheck->dcheck.ports);
if (SVC_SNMPv1 == ds_dcheck->dcheck.type || SVC_SNMPv2c == ds_dcheck->dcheck.type ||
SVC_SNMPv3 == ds_dcheck->dcheck.type)
{
zbx_free(ds_dcheck->dcheck.snmp_community);
zbx_free(ds_dcheck->dcheck.snmpv3_securityname);
zbx_free(ds_dcheck->dcheck.snmpv3_authpassphrase);
zbx_free(ds_dcheck->dcheck.snmpv3_privpassphrase);
zbx_free(ds_dcheck->dcheck.snmpv3_contextname);
}
zbx_vector_portrange_destroy(&ds_dcheck->portranges);
zbx_free(ds_dcheck);
}
static int discoverer_check_count_decrease(zbx_hashset_t *check_counts, zbx_uint64_t druleid, const char *ip,
zbx_uint64_t count)
{
zbx_discoverer_check_count_t *check_count, cmp;
cmp.druleid = druleid;
zbx_strlcpy(cmp.ip, ip, sizeof(cmp.ip));
if (NULL == (check_count = zbx_hashset_search(check_counts, &cmp)) || 0 == check_count->count)
return FAIL;
check_count->count -= count;
return SUCCEED;
}
static int discoverer_drule_check(zbx_hashset_t *check_counts, zbx_uint64_t druleid, const char *ip)
{
return discoverer_check_count_decrease(check_counts, druleid, ip, 0);
}
static int dcheck_get_timeout(unsigned char type, int *timeout_sec, char *error_val, size_t error_len)
{
char *tmt;
int ret;
tmt = zbx_dc_get_global_item_type_timeout(type);
zbx_substitute_simple_macros(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, &tmt, ZBX_MACRO_TYPE_COMMON, NULL, 0);
ret = zbx_validate_item_timeout(tmt, timeout_sec, error_val, error_len);
zbx_free(tmt);
return ret;
}
/******************************************************************************
* *
* Purpose: checks if service is available *
* *
* Parameters: dcheck - [IN] service type *
* ip - [IN] *
* port - [IN] *
* value - [OUT] *
* value_alloc - [IN/OUT] *
* error - [OUT] *
* *
* Return value: SUCCEED - service is UP, FAIL - service not discovered *
* *
******************************************************************************/
static int discoverer_service(const zbx_dc_dcheck_t *dcheck, char *ip, int port, char **error)
{
int ret = SUCCEED;
const char *service = NULL;
AGENT_RESULT result;
zabbix_log(LOG_LEVEL_DEBUG, "[%d] In %s()", log_worker_id, __func__);
zbx_init_agent_result(&result);
switch (dcheck->type)
{
case SVC_LDAP:
#ifdef HAVE_LDAP
service = "ldap";
#else
ret = FAIL;
*error = zbx_strdup(*error, "Support for LDAP checks was not compiled in.");
#endif
break;
default:
ret = FAIL;
*error = zbx_dsprintf(*error, "Unsupported check type %u.", dcheck->type);
break;
}
if (SUCCEED == ret)
{
char key[MAX_STRING_LEN];
zbx_snprintf(key, sizeof(key), "net.tcp.service[%s,%s,%d]", service, ip, port);
if (SUCCEED != zbx_execute_agent_check(key, 0, &result, dcheck->timeout) ||
NULL == ZBX_GET_UI64_RESULT(&result) || 0 == result.ui64)
{
ret = FAIL;
}
}
zbx_free_agent_result(&result);
zabbix_log(LOG_LEVEL_DEBUG, "[%d] End of %s() ret:%s", log_worker_id, __func__, zbx_result_string(ret));
return ret;
}
static void service_free(zbx_discoverer_dservice_t *service)
{
zbx_free(service);
}
static void results_clear(zbx_discoverer_results_t *result)
{
zbx_free(result->ip);
zbx_free(result->dnsname);
zbx_vector_discoverer_services_ptr_clear_ext(&result->services, service_free);
zbx_vector_discoverer_services_ptr_destroy(&result->services);
}
void results_free(zbx_discoverer_results_t *result)
{
results_clear(result);
zbx_free(result);
}
void dcheck_port_ranges_get(const char *ports, zbx_vector_portrange_t *ranges)
{
char buf[MAX_STRING_LEN / 8 + 1];
const char *start;
zbx_strscpy(buf, ports);
for (start = buf; '\0' != *start;)
{
char *comma, *last_port;
zbx_range_t r;
if (NULL != (comma = strchr(start, ',')))
*comma = '\0';
if (NULL != (last_port = strchr(start, '-')))
{
*last_port = '\0';
r.from = atoi(start);
r.to = atoi(last_port + 1);
*last_port = '-';
}
else
r.from = r.to = atoi(start);
zbx_vector_portrange_append(ranges, r);
if (NULL != comma)
{
*comma = ',';
start = comma + 1;
}
else
break;
}
}
static int process_services(void *handle, zbx_uint64_t druleid, zbx_db_dhost *dhost, const char *ip,
const char *dns, time_t now, zbx_uint64_t unique_dcheckid,
const zbx_vector_discoverer_services_ptr_t *services, zbx_add_event_func_t add_event_cb,
zbx_discovery_update_service_func_t discovery_update_service_cb,
zbx_discovery_update_service_down_func_t discovery_update_service_down_cb,
zbx_discovery_find_host_func_t discovery_find_host_cb)
{
int host_status = -1;
zbx_vector_uint64_t dserviceids;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_vector_uint64_create(&dserviceids);
for (int i = 0; i < services->values_num; i++)
{
zbx_discoverer_dservice_t *service = (zbx_discoverer_dservice_t *)services->values[i];
if ((-1 == host_status || DOBJECT_STATUS_UP == service->status) && host_status != service->status)
host_status = service->status;
discovery_update_service_cb(handle, druleid, service->dcheckid, unique_dcheckid, dhost,
ip, dns, service->port, service->status, service->value, now, &dserviceids,
add_event_cb);
}
if (0 == services->values_num)
{
discovery_find_host_cb(druleid, ip, dhost);
host_status = DOBJECT_STATUS_DOWN;
}
if (0 != dhost->dhostid)
discovery_update_service_down_cb(dhost->dhostid, now, &dserviceids);
zbx_vector_uint64_destroy(&dserviceids);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
return host_status;
}
/******************************************************************************
* *
* Purpose: cleans dservices and dhosts not present in drule *
* *
******************************************************************************/
static void discoverer_clean_services(zbx_uint64_t druleid)
{
zbx_db_result_t result;
zbx_db_row_t row;
char *iprange = NULL;
zbx_vector_uint64_t keep_dhostids, del_dhostids, del_dserviceids;
zbx_uint64_t dhostid, dserviceid;
char *sql = NULL;
size_t sql_alloc = 0, sql_offset;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
result = zbx_db_select("select iprange from drules where druleid=" ZBX_FS_UI64, druleid);
if (NULL != (row = zbx_db_fetch(result)))
iprange = zbx_strdup(iprange, row[0]);
zbx_db_free_result(result);
if (NULL == iprange)
goto out;
zbx_vector_uint64_create(&keep_dhostids);
zbx_vector_uint64_create(&del_dhostids);
zbx_vector_uint64_create(&del_dserviceids);
result = zbx_db_select(
"select dh.dhostid,ds.dserviceid,ds.ip"
" from dhosts dh"
" left join dservices ds"
" on dh.dhostid=ds.dhostid"
" where dh.druleid=" ZBX_FS_UI64,
druleid);
while (NULL != (row = zbx_db_fetch(result)))
{
ZBX_STR2UINT64(dhostid, row[0]);
if (SUCCEED == zbx_db_is_null(row[1]))
{
zbx_vector_uint64_append(&del_dhostids, dhostid);
}
else if (SUCCEED != zbx_ip_in_list(iprange, row[2]))
{
ZBX_STR2UINT64(dserviceid, row[1]);
zbx_vector_uint64_append(&del_dhostids, dhostid);
zbx_vector_uint64_append(&del_dserviceids, dserviceid);
}
else
zbx_vector_uint64_append(&keep_dhostids, dhostid);
}
zbx_db_free_result(result);
zbx_free(iprange);
if (0 != del_dserviceids.values_num)
{
/* remove dservices */
zbx_vector_uint64_sort(&del_dserviceids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
sql_offset = 0;
zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "delete from dservices where");
zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "dserviceid",
del_dserviceids.values, del_dserviceids.values_num);
zbx_db_execute("%s", sql);
/* remove dhosts */
zbx_vector_uint64_sort(&keep_dhostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
zbx_vector_uint64_uniq(&keep_dhostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
zbx_vector_uint64_sort(&del_dhostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
zbx_vector_uint64_uniq(&del_dhostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
for (int i = 0; i < del_dhostids.values_num; i++)
{
dhostid = del_dhostids.values[i];
if (FAIL != zbx_vector_uint64_bsearch(&keep_dhostids, dhostid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
zbx_vector_uint64_remove_noorder(&del_dhostids, i--);
}
}
if (0 != del_dhostids.values_num)
{
zbx_vector_uint64_sort(&del_dhostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
sql_offset = 0;
zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "delete from dhosts where");
zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "dhostid",
del_dhostids.values, del_dhostids.values_num);
zbx_db_execute("%s", sql);
}
zbx_free(sql);
zbx_vector_uint64_destroy(&del_dserviceids);
zbx_vector_uint64_destroy(&del_dhostids);
zbx_vector_uint64_destroy(&keep_dhostids);
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
static void process_results_incompletecheckscount_remove(zbx_discoverer_manager_t *manager,
zbx_vector_uint64_t *del_druleids)
{
int i;
for (i = 0; i < del_druleids->values_num; i++)
{
zbx_hashset_iter_t iter;
zbx_discoverer_check_count_t *dcc;
zbx_hashset_iter_reset(&manager->incomplete_checks_count, &iter);
while (NULL != (dcc = (zbx_discoverer_check_count_t *)zbx_hashset_iter_next(&iter)))
{
if (dcc->druleid == del_druleids->values[i])
zbx_hashset_iter_remove(&iter);
}
}
}
static void process_results_incompleteresult_remove(zbx_discoverer_manager_t *manager,
zbx_vector_discoverer_drule_error_t *drule_errors)
{
int i;
for (i = 0; i < drule_errors->values_num; i++)
{
zbx_hashset_iter_t iter;
zbx_discoverer_results_t *dr;
zbx_discoverer_check_count_t *dcc;
zbx_hashset_iter_reset(&manager->results, &iter);
while (NULL != (dr = (zbx_discoverer_results_t *)zbx_hashset_iter_next(&iter)))
{
if (dr->druleid != drule_errors->values[i].druleid)
continue;
results_clear(dr);
zbx_hashset_iter_remove(&iter);
}
zbx_hashset_iter_reset(&manager->incomplete_checks_count, &iter);
while (NULL != (dcc = (zbx_discoverer_check_count_t *)zbx_hashset_iter_next(&iter)))
{
if (dcc->druleid == drule_errors->values[i].druleid)
zbx_hashset_iter_remove(&iter);
}
}
}
static int process_results(zbx_discoverer_manager_t *manager, zbx_vector_uint64_t *del_druleids,
zbx_hashset_t *incomplete_druleids, zbx_uint64_t *unsaved_checks,
zbx_vector_discoverer_drule_error_t *drule_errors, const zbx_events_funcs_t *events_cbs,
zbx_discovery_open_func_t discovery_open_cb, zbx_discovery_close_func_t discovery_close_cb,
zbx_discovery_update_host_func_t discovery_update_host_cb,
zbx_discovery_update_service_func_t discovery_update_service_cb,
zbx_discovery_update_service_down_func_t discovery_update_service_down_cb,
zbx_discovery_find_host_func_t discovery_find_host_cb)
{
#define DISCOVERER_BATCH_RESULTS_NUM 1000
zbx_uint64_t res_check_total = 0,res_check_count = 0;
zbx_vector_discoverer_results_ptr_t results;
zbx_discoverer_results_t *result, *result_tmp;
zbx_hashset_iter_t iter;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() del_druleids:%d", __func__, del_druleids->values_num);
zbx_vector_discoverer_results_ptr_create(&results);
zbx_hashset_clear(incomplete_druleids);
pthread_mutex_lock(&manager->results_lock);
/* protection against returning values from removed revision of druleid */
process_results_incompletecheckscount_remove(manager, del_druleids);
zbx_hashset_iter_reset(&manager->results, &iter);
while (NULL != (result = (zbx_discoverer_results_t *)zbx_hashset_iter_next(&iter)))
{
zbx_discoverer_check_count_t *check_count, cmp;
cmp.druleid = result->druleid;
zbx_strlcpy(cmp.ip, result->ip, sizeof(cmp.ip));
if (FAIL != zbx_vector_uint64_bsearch(del_druleids, cmp.druleid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
{
results_clear(result);
zbx_hashset_iter_remove(&iter);
continue;
}
res_check_total += (zbx_uint64_t)result->services.values_num;
if (DISCOVERER_BATCH_RESULTS_NUM <= res_check_count ||
(NULL != (check_count = zbx_hashset_search(&manager->incomplete_checks_count, &cmp)) &&
0 != check_count->count))
{
zbx_hashset_insert(incomplete_druleids, &cmp.druleid, sizeof(zbx_uint64_t));
continue;
}
res_check_count += (zbx_uint64_t)result->services.values_num;
if (NULL != check_count)
zbx_hashset_remove_direct(&manager->incomplete_checks_count, check_count);
result_tmp = (zbx_discoverer_results_t*)zbx_malloc(NULL, sizeof(zbx_discoverer_results_t));
memcpy(result_tmp, result, sizeof(zbx_discoverer_results_t));
zbx_vector_discoverer_results_ptr_append(&results, result_tmp);
zbx_hashset_iter_remove(&iter);
}
process_results_incompleteresult_remove(manager, drule_errors);
zabbix_log(LOG_LEVEL_DEBUG, "%s() results=%d checks:" ZBX_FS_UI64 "/" ZBX_FS_UI64 " del_druleids=%d"
" incomplete_druleids=%d", __func__, results.values_num, res_check_count, res_check_total,
del_druleids->values_num, incomplete_druleids->num_data);
pthread_mutex_unlock(&manager->results_lock);
if (0 != results.values_num)
{
void *handle = discovery_open_cb();
for (int i = 0; i < results.values_num; i++)
{
zbx_db_dhost dhost;
int host_status;
result = results.values[i];
if (NULL == result->dnsname)
{
zabbix_log(LOG_LEVEL_WARNING,
"Missing 'dnsname', result skipped (druleid=" ZBX_FS_UI64 ", ip: '%s')",
result->druleid, result->ip);
continue;
}
memset(&dhost, 0, sizeof(zbx_db_dhost));
zbx_db_begin();
host_status = process_services(handle, result->druleid, &dhost, result->ip, result->dnsname,
result->now, result->unique_dcheckid, &result->services,
events_cbs->add_event_cb, discovery_update_service_cb,
discovery_update_service_down_cb, discovery_find_host_cb);
discovery_update_host_cb(handle, result->druleid, &dhost, result->ip, result->dnsname,
host_status, result->now, events_cbs->add_event_cb);
if (NULL != events_cbs->process_events_cb)
events_cbs->process_events_cb(NULL, NULL, NULL);
if (NULL != events_cbs->clean_events_cb)
events_cbs->clean_events_cb();
zbx_db_commit();
}
discovery_close_cb(handle);
}
*unsaved_checks = res_check_total - res_check_count;
zbx_vector_discoverer_results_ptr_clear_ext(&results, results_free);
zbx_vector_discoverer_results_ptr_destroy(&results);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() ret:%d", __func__,
DISCOVERER_BATCH_RESULTS_NUM <= res_check_count ? 1 : 0);
return DISCOVERER_BATCH_RESULTS_NUM <= res_check_count ? 1 : 0;
#undef DISCOVERER_BATCH_RESULTS_NUM
}
static void process_job_finalize(zbx_vector_uint64_t *del_jobs, zbx_vector_discoverer_drule_error_t *drule_errors,
zbx_hashset_t *incomplete_druleids, zbx_discovery_open_func_t discovery_open_cb,
zbx_discovery_close_func_t discovery_close_cb,
zbx_discovery_update_drule_func_t discovery_udpate_drule_cb)
{
void *handle;
int i;
time_t now;
if (0 == del_jobs->values_num)
return;
/* multiple errors can duplicate druleid */
zbx_vector_uint64_sort(del_jobs, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
zbx_vector_uint64_uniq(del_jobs, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
now = time(NULL);
handle = discovery_open_cb();
for (i = del_jobs->values_num; i != 0; i--)
{
int j;
char *err = NULL;
zbx_discoverer_drule_error_t derror = {.druleid = del_jobs->values[i - 1]};
if (NULL != zbx_hashset_search(incomplete_druleids, &derror.druleid))
continue;
if (FAIL != (j = zbx_vector_discoverer_drule_error_search(drule_errors, derror,
ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
{
err = drule_errors->values[j].error;
zbx_vector_discoverer_drule_error_remove(drule_errors, j);
}
discovery_udpate_drule_cb(handle, derror.druleid, err, now);
zbx_free(err);
zbx_vector_uint64_remove(del_jobs, i - 1);
}
discovery_close_cb(handle);
}
static int drule_delay_get(const char *delay, char **delay_resolved, int *delay_int)
{
int ret;
*delay_resolved = zbx_strdup(*delay_resolved, delay);
zbx_substitute_simple_macros(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
delay_resolved, ZBX_MACRO_TYPE_COMMON, NULL, 0);
if (SUCCEED != (ret = zbx_is_time_suffix(*delay_resolved, delay_int, ZBX_LENGTH_UNLIMITED)))
*delay_int = ZBX_DEFAULT_INTERVAL;
return ret;
}
static int process_discovery(int *nextcheck, zbx_hashset_t *incomplete_druleids,
zbx_vector_discoverer_jobs_ptr_t *jobs, zbx_hashset_t *check_counts,
zbx_vector_discoverer_drule_error_t *drule_errors, zbx_vector_uint64_t *err_druleids)
{
int rule_count = 0, delay, i, tmt_simple = 0, tmt_agent = 0, tmt_snmp = 0;
char *delay_str = NULL;
zbx_dc_um_handle_t *um_handle;
time_t now, nextcheck_loc;
zbx_vector_dc_drule_ptr_t drules;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
now = time(NULL);
zbx_vector_dc_drule_ptr_create(&drules);
zbx_dc_drules_get(now, &drules, &nextcheck_loc);
*nextcheck = 0 == nextcheck_loc ? FAIL : (int)nextcheck_loc;
um_handle = zbx_dc_open_user_macros();
for (int k = 0; ZBX_IS_RUNNING() && k < drules.values_num; k++)
{
zbx_hashset_t tasks;
zbx_hashset_iter_t iter;
zbx_discoverer_task_t *task, *task_out;
zbx_discoverer_job_t *job, cmp;
zbx_dc_drule_t *drule = drules.values[k];
zbx_vector_ds_dcheck_ptr_t *ds_dchecks_common;
zbx_vector_iprange_t *ipranges;
char error[MAX_STRING_LEN];
now = time(NULL);
cmp.druleid = drule->druleid;
discoverer_queue_lock(&dmanager.queue);
i = zbx_vector_discoverer_jobs_ptr_bsearch(&dmanager.job_refs, &cmp,
ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
discoverer_queue_unlock(&dmanager.queue);
if (FAIL != i || NULL != zbx_hashset_search(incomplete_druleids, &drule->druleid))
{
(void)drule_delay_get(drule->delay_str, &delay_str, &delay);
goto next;
}
if (SUCCEED != drule_delay_get(drule->delay_str, &delay_str, &delay))
{
zbx_snprintf(error, sizeof(error), "Invalid update interval \"%s\".", delay_str);
discoverer_queue_append_error(drule_errors, drule->druleid, error);
zbx_vector_uint64_append(err_druleids, drule->druleid);
goto next;
}
for (i = 0; i < drule->dchecks.values_num; i++)
{
zbx_dc_dcheck_t *dcheck = (zbx_dc_dcheck_t*)drule->dchecks.values[i];
char err[MAX_STRING_LEN];
if (SVC_AGENT == dcheck->type)
{
if (0 == tmt_agent && FAIL == dcheck_get_timeout(ITEM_TYPE_ZABBIX, &tmt_agent,
err, sizeof(err)))
{
zbx_snprintf(error, sizeof(error), "Invalid global timeout for Zabbix Agent"
" checks: \"%s\"", err);
discoverer_queue_append_error(drule_errors, drule->druleid, error);
zbx_vector_uint64_append(err_druleids, drule->druleid);
goto next;
}
dcheck->timeout = tmt_agent;
}
else if (SVC_SNMPv1 == dcheck->type || SVC_SNMPv2c == dcheck->type ||
SVC_SNMPv3 == dcheck->type)
{
if (0 == tmt_snmp && FAIL == dcheck_get_timeout(ITEM_TYPE_SNMP, &tmt_snmp,
err, sizeof(err)))
{
zbx_snprintf(error, sizeof(error), "Invalid global timeout for SNMP checks"
": \"%s\"", err);
discoverer_queue_append_error(drule_errors, drule->druleid, error);
zbx_vector_uint64_append(err_druleids, drule->druleid);
goto next;
}
dcheck->timeout = tmt_snmp;
}
else
{
if (0 == tmt_simple && FAIL == dcheck_get_timeout(ITEM_TYPE_SIMPLE, &tmt_simple,
err, sizeof(err)))
{
zbx_snprintf(error, sizeof(error), "Invalid global timeout for simple checks"
": \"%s\"", err);
discoverer_queue_append_error(drule_errors, drule->druleid, error);
zbx_vector_uint64_append(err_druleids, drule->druleid);
goto next;
}
dcheck->timeout = tmt_simple;
}
if (0 != dcheck->uniq)
{
drule->unique_dcheckid = dcheck->dcheckid;
break;
}
}
zbx_hashset_create(&tasks, 1, discoverer_task_hash, discoverer_task_compare);
ds_dchecks_common = (zbx_vector_ds_dcheck_ptr_t *)zbx_malloc(NULL, sizeof(zbx_vector_dc_dcheck_ptr_t));
zbx_vector_ds_dcheck_ptr_create(ds_dchecks_common);
ipranges = (zbx_vector_iprange_t *)zbx_malloc(NULL, sizeof(zbx_vector_iprange_t));
zbx_vector_iprange_create(ipranges);
process_rule(drule, &tasks, check_counts, ds_dchecks_common, ipranges, drule_errors, err_druleids);
if (0 != tasks.num_data)
{
job = discoverer_job_create(drule, ds_dchecks_common, ipranges);
zbx_hashset_iter_reset(&tasks, &iter);
while (NULL != (task = (zbx_discoverer_task_t*)zbx_hashset_iter_next(&iter)))
{
task_out = (zbx_discoverer_task_t*)zbx_malloc(NULL, sizeof(zbx_discoverer_task_t));
memcpy(task_out, task, sizeof(zbx_discoverer_task_t));
(void)zbx_list_append(&job->tasks, task_out, NULL);
}
zbx_vector_discoverer_jobs_ptr_append(jobs, job);
}
else
{
zbx_vector_ds_dcheck_ptr_clear_ext(ds_dchecks_common, discoverer_ds_dcheck_free);
zbx_vector_ds_dcheck_ptr_destroy(ds_dchecks_common);
zbx_free(ds_dchecks_common);
zbx_vector_iprange_destroy(ipranges);
zbx_free(ipranges);
}
zbx_hashset_destroy(&tasks);
rule_count++;
next:
if (0 != (zbx_get_program_type_cb() & ZBX_PROGRAM_TYPE_SERVER))
discoverer_clean_services(drule->druleid);
zbx_dc_drule_queue(now, drule->druleid, delay);
}
zbx_dc_close_user_macros(um_handle);
zbx_free(delay_str);
zbx_vector_dc_drule_ptr_clear_ext(&drules, zbx_discovery_drule_free);
zbx_vector_dc_drule_ptr_destroy(&drules);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() rule_count:%d nextcheck:%d", __func__, rule_count, *nextcheck);
return rule_count; /* performance metric */
}
static void discoverer_job_remove(zbx_discoverer_job_t *job)
{
int i;
zbx_discoverer_job_t cmp = {.druleid = job->druleid};
if (FAIL != (i = zbx_vector_discoverer_jobs_ptr_bsearch(&dmanager.job_refs, &cmp,
ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC)))
{
zbx_vector_discoverer_jobs_ptr_remove(&dmanager.job_refs, i);
}
discoverer_job_free(job);
}
zbx_discoverer_dservice_t *result_dservice_create(const unsigned short port,
const zbx_uint64_t dcheckid)
{
zbx_discoverer_dservice_t *service;
service = (zbx_discoverer_dservice_t *)zbx_malloc(NULL, sizeof(zbx_discoverer_dservice_t));
service->dcheckid = dcheckid;
service->port = port;
*service->value = '\0';
return service;
}
zbx_discoverer_results_t *discoverer_result_create(zbx_uint64_t druleid, const zbx_uint64_t unique_dcheckid)
{
zbx_discoverer_results_t *result;
result = (zbx_discoverer_results_t *)zbx_malloc(NULL, sizeof(zbx_discoverer_results_t));
zbx_vector_discoverer_services_ptr_create(&result->services);
result->druleid = druleid;
result->unique_dcheckid = unique_dcheckid;
result->ip = result->dnsname = NULL;
result->now = time(NULL);
result->processed_checks_per_ip = 0;
return result;
}
static zbx_discoverer_results_t *discoverer_results_host_reg(zbx_hashset_t *hr_dst, zbx_uint64_t druleid,
zbx_uint64_t unique_dcheckid, char *ip)
{
zbx_discoverer_results_t *dst, src = {.druleid = druleid, .ip = ip};
if (NULL == (dst = zbx_hashset_search(hr_dst, &src)))
{
dst = zbx_hashset_insert(hr_dst, &src, sizeof(zbx_discoverer_results_t));
zbx_vector_discoverer_services_ptr_create(&dst->services);
dst->ip = zbx_strdup(NULL, ip);
dst->now = time(NULL);
dst->unique_dcheckid = unique_dcheckid;
dst->dnsname = zbx_strdup(NULL, "");
}
return dst;
}
ZBX_PTR_VECTOR_DECL(fping_host, zbx_fping_host_t)
ZBX_PTR_VECTOR_IMPL(fping_host, zbx_fping_host_t)
static int discoverer_icmp_result_merge(zbx_hashset_t *incomplete_checks_count, zbx_hashset_t *results,
const zbx_uint64_t druleid, const zbx_uint64_t dcheckid, const zbx_uint64_t unique_dcheckid,
const zbx_vector_fping_host_t *hosts)
{
int i;
zabbix_log(LOG_LEVEL_DEBUG, "[%d] In %s()", log_worker_id, __func__);
for (i = 0; i < hosts->values_num; i++)
{
zbx_discoverer_dservice_t *service;
zbx_discoverer_results_t *result;
zbx_fping_host_t *h = &hosts->values[i];
char *ip = h->addr;
if (FAIL == discoverer_check_count_decrease(incomplete_checks_count, druleid, ip, 1))
{
return FAIL; /* config revision id was changed */
}
/* we must register at least 1 empty result per ip */
result = discoverer_results_host_reg(results, druleid, unique_dcheckid, ip);
if (0 == h->rcv)
continue;
if (NULL == result->dnsname || ('\0' == *result->dnsname && '\0' != *h->dnsname))
{
result->dnsname = zbx_strdup(result->dnsname, h->dnsname);
}
service = result_dservice_create(0, dcheckid);
service->status = DOBJECT_STATUS_UP;
zbx_vector_discoverer_services_ptr_append(&result->services, service);
}
zabbix_log(LOG_LEVEL_DEBUG, "[%d] End of %s() results:%d", log_worker_id, __func__, hosts->values_num);
return SUCCEED;
}
static int discoverer_icmp(const zbx_uint64_t druleid, zbx_discoverer_task_t *task,
const int dcheck_idx, int concurrency_max, int *stop, zbx_discoverer_queue_t *queue, char **error)
{
char err[ZBX_ITEM_ERROR_LEN_MAX], ip[ZBX_INTERFACE_IP_LEN_MAX];
int i, ret = SUCCEED, abort = SUCCEED;
zbx_uint64_t count = 0;
zbx_vector_fping_host_t hosts;
const zbx_dc_dcheck_t *dcheck = &task->ds_dchecks.values[dcheck_idx]->dcheck;
zbx_fping_host_t host;
zabbix_log(LOG_LEVEL_DEBUG, "[%d] In %s() ranges:%d range id:%d dcheck_idx:%d task state count:%d",
log_worker_id, __func__, task->range.ipranges->values_num, task->range.id,
dcheck_idx, task->range.state.count);
zbx_vector_fping_host_create(&hosts);
if (0 == concurrency_max)
concurrency_max = queue->checks_per_worker_max;
for (i = 0; i < task->range.ipranges->values_num; i++)
count += zbx_iprange_volume(&task->range.ipranges->values[i]);
zbx_vector_fping_host_reserve(&hosts, (size_t)hosts.values_num + (size_t)count);
do
{
memset(&host, 0, sizeof(host));
TASK_IP2STR(task, ip);
task->range.state.count--;
host.addr = zbx_strdup(NULL, ip);
zbx_vector_fping_host_append(&hosts, host);
if (concurrency_max > hosts.values_num)
continue;
if (SUCCEED != (ret = zbx_ping(&hosts.values[0], hosts.values_num, 3, 0, 0, dcheck->timeout * 1000,
dcheck->allow_redirect, 1, err, sizeof(err))))
{
zabbix_log(LOG_LEVEL_DEBUG, "[%d] %s() %d icmp checks failed with err:%s",
log_worker_id, __func__, concurrency_max, err);
*error = zbx_strdup(*error, err);
break;
}
else
{
pthread_mutex_lock(&dmanager.results_lock);
abort = discoverer_icmp_result_merge(&dmanager.incomplete_checks_count, &dmanager.results,
druleid, dcheck->dcheckid, task->unique_dcheckid, &hosts);
pthread_mutex_unlock(&dmanager.results_lock);
}
for (i = 0; i < hosts.values_num; i++)
{
zbx_str_free(hosts.values[i].addr);
zbx_str_free(hosts.values[i].dnsname);
}
(void)discovery_pending_checks_count_decrease(queue, concurrency_max, 0,
(zbx_uint64_t)hosts.values_num);
zbx_vector_fping_host_clear(&hosts);
}
while (0 == *stop && SUCCEED == abort && 0 != task->range.state.count &&
SUCCEED == zbx_iprange_uniq_iter(task->range.ipranges->values, task->range.ipranges->values_num,
&task->range.state.index_ip, task->range.state.ipaddress));
if (0 == *stop && 0 != hosts.values_num && ret == SUCCEED)
{
if (SUCCEED != (ret = zbx_ping(&hosts.values[0], hosts.values_num, 3, 0, 0, dcheck->timeout * 1000,
dcheck->allow_redirect, 1, err, sizeof(err))))
{
zabbix_log(LOG_LEVEL_DEBUG, "[%d] %s() %d icmp checks failed with err:%s", log_worker_id,
__func__, concurrency_max, err);
*error = zbx_strdup(*error, err);
}
else
{
pthread_mutex_lock(&dmanager.results_lock);
(void)discoverer_icmp_result_merge(&dmanager.incomplete_checks_count, &dmanager.results,
druleid, dcheck->dcheckid, task->unique_dcheckid, &hosts);
pthread_mutex_unlock(&dmanager.results_lock);
}
}
for (i = 0; i < hosts.values_num; i++)
{
zbx_str_free(hosts.values[i].addr);
zbx_str_free(hosts.values[i].dnsname);
}
(void)discovery_pending_checks_count_decrease(queue, concurrency_max, 0, (zbx_uint64_t)hosts.values_num);
zbx_vector_fping_host_destroy(&hosts);
zabbix_log(LOG_LEVEL_DEBUG, "[%d] End of %s() task state count:%d", log_worker_id, __func__,
task->range.state.count);
return ret;
}
static void discoverer_results_move_value(zbx_discoverer_results_t *src, zbx_hashset_t *hr_dst)
{
zbx_discoverer_results_t *dst;
if (NULL == src->dnsname)
src->dnsname = zbx_strdup(NULL, "");
if (NULL == (dst = zbx_hashset_search(hr_dst, src)))
{
dst = zbx_hashset_insert(hr_dst, src, sizeof(zbx_discoverer_results_t));
zbx_vector_discoverer_services_ptr_create(&dst->services);
src->dnsname = NULL;
src->ip = NULL;
}
else if ('\0' == *dst->dnsname && '\0' != *src->dnsname)
{
zbx_free(dst->dnsname);
dst->dnsname = src->dnsname;
src->dnsname = NULL;
}
zbx_vector_discoverer_services_ptr_append_array(&dst->services, src->services.values,
src->services.values_num);
zbx_vector_discoverer_services_ptr_clear(&src->services);
results_free(src);
}
int discoverer_results_partrange_merge(zbx_hashset_t *hr_dst, zbx_vector_discoverer_results_ptr_t *vr_src,
zbx_discoverer_task_t *task, int force)
{
int i, ret = SUCCEED;
zbx_uint64_t druleid = task->ds_dchecks.values[0]->dcheck.druleid;
zabbix_log(LOG_LEVEL_DEBUG, "[%d] In %s() src:%d dst:%d", log_worker_id, __func__, vr_src->values_num,
hr_dst->num_data);
if (0 == force && 0 != vr_src->values_num) /* checking that config revision id was changed */
{
zbx_discoverer_results_t *src = vr_src->values[0];
ret = discoverer_drule_check(&dmanager.incomplete_checks_count, druleid, src->ip);
}
for (i = vr_src->values_num - 1; i >= 0 && SUCCEED == ret; i--)
{
zbx_discoverer_results_t *src = vr_src->values[i];
if (0 == force && src->processed_checks_per_ip != task->range.state.checks_per_ip)
continue;
if (FAIL == (ret = discoverer_check_count_decrease(&dmanager.incomplete_checks_count, druleid,
src->ip, src->processed_checks_per_ip)))
{
break; /* config revision id was changed */
}
discoverer_results_move_value(src, hr_dst);
zbx_vector_discoverer_results_ptr_remove(vr_src, i);
}
zabbix_log(LOG_LEVEL_DEBUG, "[%d] End of %s() src:%d dst:%d", log_worker_id, __func__, vr_src->values_num,
hr_dst->num_data);
return ret;
}
static int discoverer_net_check_icmp(zbx_uint64_t druleid, zbx_discoverer_task_t *task, int concurrency_max,
int *stop, zbx_discoverer_queue_t *queue, char **error)
{
int i, ret = SUCCEED;
for (i = task->range.state.index_dcheck; i < task->ds_dchecks.values_num && SUCCEED == ret &&
0 != task->range.state.count; i++)
{
ret = discoverer_icmp(druleid, task, i, concurrency_max, stop, queue, error);
task->range.state.index_ip = 0;
zbx_iprange_first(task->range.ipranges->values, task->range.state.ipaddress);
}
if (FAIL == ret)
(void)discovery_pending_checks_count_decrease(queue, concurrency_max, 0, task->range.state.count);
return ret;
}
static int discoverer_net_check_common(zbx_uint64_t druleid, zbx_discoverer_task_t *task, char **error)
{
int ret;
char dns[ZBX_INTERFACE_DNS_LEN_MAX];
char ip[ZBX_INTERFACE_IP_LEN_MAX];
zbx_dc_dcheck_t *dcheck = &task->ds_dchecks.values[task->range.state.index_dcheck]->dcheck;
zbx_discoverer_dservice_t *service = NULL;
zbx_discoverer_results_t *result = NULL;
zabbix_log(LOG_LEVEL_DEBUG, "[%d] In %s() dchecks:%d key[0]:%s", log_worker_id, __func__,
task->ds_dchecks.values_num, 0 != task->ds_dchecks.values_num ?
task->ds_dchecks.values[0]->dcheck.key_ : "empty");
TASK_IP2STR(task, ip);
if (SUCCEED == discoverer_service(dcheck, ip, (unsigned short)task->range.state.port, error))
{
service = result_dservice_create((unsigned short)task->range.state.port, dcheck->dcheckid);
service->status = DOBJECT_STATUS_UP;
zbx_gethost_by_ip(ip, dns, sizeof(dns));
}
else if (NULL != *error)
{
ret = FAIL;
goto err;
}
pthread_mutex_lock(&dmanager.results_lock);
if (SUCCEED == discoverer_check_count_decrease(&dmanager.incomplete_checks_count, druleid, ip, 1))
{
/* we must register at least 1 empty result per ip */
result = discoverer_results_host_reg(&dmanager.results, druleid, task->unique_dcheckid, ip);
if (NULL != service)
{
if (NULL == result->dnsname || ('\0' == *result->dnsname && '\0' != *dns))
{
result->dnsname = zbx_strdup(result->dnsname, dns);
}
zbx_vector_discoverer_services_ptr_append(&result->services, service);
}
}
else
service_free(service); /* drule revision has been changed or drule aborted */
pthread_mutex_unlock(&dmanager.results_lock);
ret = SUCCEED;
err:
zabbix_log(LOG_LEVEL_DEBUG, "[%d] End of %s() ip:%s dresult services:%d rdns:%s", log_worker_id, __func__,
ip, NULL != result ? result->services.values_num : -1, NULL != result ? result->dnsname : "");
return ret;
}
int dcheck_is_async(zbx_ds_dcheck_t *ds_dcheck)
{
switch(ds_dcheck->dcheck.type)
{
case SVC_AGENT:
case SVC_ICMPPING:
case SVC_SNMPv1:
case SVC_SNMPv2c:
case SVC_SNMPv3:
case SVC_TCP:
case SVC_SMTP:
case SVC_FTP:
case SVC_POP:
case SVC_NNTP:
case SVC_IMAP:
case SVC_HTTP:
case SVC_HTTPS:
case SVC_SSH:
case SVC_TELNET:
return SUCCEED;
default:
return FAIL;
}
}
static void *discoverer_worker_entry(void *net_check_worker)
{
int err;
sigset_t mask;
zbx_discoverer_worker_t *worker = (zbx_discoverer_worker_t*)net_check_worker;
zbx_discoverer_queue_t *queue = worker->queue;
zabbix_log(LOG_LEVEL_INFORMATION, "thread started [%s #%d]",
get_process_type_string(ZBX_PROCESS_TYPE_DISCOVERER), worker->worker_id);
log_worker_id = worker->worker_id;
sigemptyset(&mask);
sigaddset(&mask, SIGQUIT);
sigaddset(&mask, SIGALRM);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGUSR1);
sigaddset(&mask, SIGUSR2);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGINT);
if (0 > (err = pthread_sigmask(SIG_BLOCK, &mask, NULL)))
zabbix_log(LOG_LEVEL_WARNING, "cannot block the signals: %s", zbx_strerror(err));
zbx_init_icmpping_env(get_process_type_string(ZBX_PROCESS_TYPE_DISCOVERER), worker->worker_id);
worker->stop = 0;
discoverer_queue_lock(queue);
discoverer_queue_register_worker(queue);
while (0 == worker->stop)
{
char *error = NULL;
int ret;
zbx_discoverer_job_t *job;
if (NULL != (job = discoverer_queue_pop(queue)))
{
int concurrency_max;
unsigned char dcheck_type;
zbx_uint64_t druleid;
zbx_discoverer_task_t *task;
if (NULL == (task = discoverer_task_pop(job, queue->checks_per_worker_max)))
{
if (0 == job->workers_used)
{
zbx_vector_uint64_append(&queue->del_jobs, job->druleid);
discoverer_job_remove(job);
}
else
job->status = DISCOVERER_JOB_STATUS_REMOVING;
continue;
}
if (FAIL == dcheck_is_async(task->ds_dchecks.values[0]))
queue->pending_checks_count--;
job->workers_used++;
if (0 == job->concurrency_max || job->workers_used != job->concurrency_max ||
SUCCEED == dcheck_is_async(task->ds_dchecks.values[0]))
{
discoverer_queue_push(queue, job);
discoverer_queue_notify(queue);
}
else
job->status = DISCOVERER_JOB_STATUS_WAITING;
druleid = job->druleid;
concurrency_max = job->concurrency_max;
discoverer_queue_unlock(queue);
/* process checks */
zbx_timekeeper_update(worker->timekeeper, worker->worker_id - 1, ZBX_PROCESS_STATE_BUSY);
if (FAIL == dcheck_is_async(task->ds_dchecks.values[0]))
{
ret = discoverer_net_check_common(druleid, task, &error);
}
else if (SVC_ICMPPING == GET_DTYPE(task))
{
ret = discoverer_net_check_icmp(druleid, task, concurrency_max, &worker->stop, queue,
&error);
}
else
{
ret = discovery_net_check_range(druleid, task, concurrency_max, &worker->stop,
&dmanager, log_worker_id, &error);
}
if (FAIL == ret)
{
zabbix_log(LOG_LEVEL_DEBUG, "[%d] Discovery rule " ZBX_FS_UI64 " error:%s",
worker->worker_id, druleid, ZBX_NULL2STR(error));
}
dcheck_type = GET_DTYPE(task);
discoverer_task_free(task);
zbx_timekeeper_update(worker->timekeeper, worker->worker_id - 1, ZBX_PROCESS_STATE_IDLE);
/* proceed to the next job */
discoverer_queue_lock(queue);
job->workers_used--;
if (NULL != error)
{
error = zbx_dsprintf(error, "'%s' checks failed: \"%s\"",
zbx_dservice_type_string(dcheck_type), error);
discoverer_job_abort(job, &queue->pending_checks_count, &queue->errors, error);
zbx_free(error);
}
if (SVC_SNMPv3 == dcheck_type)
queue->snmpv3_allowed_workers++;
if (DISCOVERER_JOB_STATUS_WAITING == job->status)
{
job->status = DISCOVERER_JOB_STATUS_QUEUED;
discoverer_queue_push(queue, job);
}
else if (DISCOVERER_JOB_STATUS_REMOVING == job->status && 0 == job->workers_used)
{
zbx_vector_uint64_append(&queue->del_jobs, job->druleid);
discoverer_job_remove(job);
}
continue;
}
if (SUCCEED != discoverer_queue_wait(queue, &error))
{
zabbix_log(LOG_LEVEL_WARNING, "[%d] %s", worker->worker_id, error);
zbx_free(error);
worker->stop = 1;
}
}
discoverer_queue_deregister_worker(queue);
discoverer_queue_unlock(queue);
zabbix_log(LOG_LEVEL_INFORMATION, "thread stopped [%s #%d]",
get_process_type_string(ZBX_PROCESS_TYPE_DISCOVERER), worker->worker_id);
return (void*)0;
}
static int discoverer_worker_init(zbx_discoverer_worker_t *worker, zbx_discoverer_queue_t *queue,
zbx_timekeeper_t *timekeeper, void *func(void *), char **error)
{
int err;
worker->flags = DISCOVERER_WORKER_INIT_NONE;
worker->queue = queue;
worker->timekeeper = timekeeper;
worker->stop = 1;
if (0 != (err = pthread_create(&worker->thread, NULL, func, (void *)worker)))
{
*error = zbx_dsprintf(NULL, "cannot create thread: %s", zbx_strerror(err));
return FAIL;
}
worker->flags |= DISCOVERER_WORKER_INIT_THREAD;
return SUCCEED;
}
static void discoverer_worker_destroy(zbx_discoverer_worker_t *worker)
{
if (0 != (worker->flags & DISCOVERER_WORKER_INIT_THREAD))
{
void *dummy;
pthread_join(worker->thread, &dummy);
}
worker->flags = DISCOVERER_WORKER_INIT_NONE;
}
static void discoverer_worker_stop(zbx_discoverer_worker_t *worker)
{
if (0 != (worker->flags & DISCOVERER_WORKER_INIT_THREAD))
worker->stop = 1;
}
/******************************************************************************
* *
* Purpose: initializes libraries, called before creating worker threads *
* *
******************************************************************************/
static void discoverer_libs_init(void)
{
#ifdef HAVE_NETSNMP
zbx_init_library_mt_snmp(zbx_get_progname_cb());
#endif
#ifdef HAVE_LIBCURL
curl_global_init(CURL_GLOBAL_DEFAULT);
#endif
#ifdef HAVE_LDAP
ldap_get_option(NULL, 0, NULL);
#endif
}
/******************************************************************************
* *
* Purpose: releases libraries resources *
* *
******************************************************************************/
static void discoverer_libs_destroy(void)
{
#ifdef HAVE_NETSNMP
zbx_shutdown_library_mt_snmp(zbx_get_progname_cb());
#endif
#ifdef HAVE_LIBCURL
curl_global_cleanup();
#endif
}
static int discoverer_manager_init(zbx_discoverer_manager_t *manager, zbx_thread_discoverer_args *args_in,
const zbx_thread_info_t *info, char **error)
{
# define SNMPV3_WORKERS_MAX 1
int i, err, ret = FAIL, started_num = 0, checks_per_worker_max;
time_t time_start;
struct timespec poll_delay = {0, 1e8};
#ifdef HAVE_GETRLIMIT
struct rlimit rlim;
#endif
memset(manager, 0, sizeof(zbx_discoverer_manager_t));
manager->config_timeout = args_in->config_timeout;
manager->source_ip = args_in->config_source_ip;
manager->progname = args_in->zbx_get_progname_cb_arg();
manager->process_type = info->process_type;
#ifdef HAVE_GETRLIMIT
if (0 == getrlimit(RLIMIT_NOFILE, &rlim))
{
/* we will consume not more than 3/5 of all FD */
checks_per_worker_max = ((int)rlim.rlim_cur / 5 * 3) / args_in->workers_num;
if (0 == checks_per_worker_max)
{
*error = zbx_dsprintf(NULL, "cannot initialize maximum number of concurrent checks per worker,"
" user limit of file descriptors is insufficient");
return FAIL;
}
if (DISCOVERER_JOB_TASKS_INPROGRESS_MAX > checks_per_worker_max)
{
zabbix_log(LOG_LEVEL_WARNING, "for a discovery process with %d workers, the user limit of %d"
" file descriptors is insufficient. The maximum number of concurrent checks"
" per worker has been reduced to %d", args_in->workers_num,
(int)rlim.rlim_cur, checks_per_worker_max);
}
else if (DISCOVERER_JOB_TASKS_INPROGRESS_MAX < checks_per_worker_max)
{
checks_per_worker_max = DISCOVERER_JOB_TASKS_INPROGRESS_MAX;
}
}
else
#endif
checks_per_worker_max = DISCOVERER_JOB_TASKS_INPROGRESS_MAX;
if (0 != (err = pthread_mutex_init(&manager->results_lock, NULL)))
{
*error = zbx_dsprintf(NULL, "cannot initialize results mutex: %s", zbx_strerror(err));
return FAIL;
}
if (SUCCEED != discoverer_queue_init(&manager->queue, SNMPV3_WORKERS_MAX, checks_per_worker_max, error))
{
pthread_mutex_destroy(&manager->results_lock);
return FAIL;
}
discoverer_libs_init();
zbx_hashset_create(&manager->results, 1, discoverer_result_hash, discoverer_result_compare);
zbx_hashset_create(&manager->incomplete_checks_count, 1, discoverer_check_count_hash,
discoverer_check_count_compare);
zbx_vector_discoverer_jobs_ptr_create(&manager->job_refs);
manager->timekeeper = zbx_timekeeper_create(args_in->workers_num, NULL);
manager->workers_num = args_in->workers_num;
manager->workers = (zbx_discoverer_worker_t*)zbx_calloc(NULL, (size_t)args_in->workers_num,
sizeof(zbx_discoverer_worker_t));
for (i = 0; i < args_in->workers_num; i++)
{
manager->workers[i].worker_id = i + 1;
if (SUCCEED != discoverer_worker_init(&manager->workers[i], &manager->queue, manager->timekeeper,
discoverer_worker_entry, error))
{
goto out;
}
}
/* wait for threads to start */
time_start = time(NULL);
while (started_num != args_in->workers_num)
{
if (time_start + ZBX_DISCOVERER_STARTUP_TIMEOUT < time(NULL))
{
*error = zbx_strdup(NULL, "timeout occurred while waiting for workers to start");
goto out;
}
discoverer_queue_lock(&manager->queue);
started_num = manager->queue.workers_num;
discoverer_queue_unlock(&manager->queue);
nanosleep(&poll_delay, NULL);
}
ret = SUCCEED;
out:
if (FAIL == ret)
{
for (i = 0; i < manager->workers_num; i++)
discoverer_worker_stop(&manager->workers[i]);
discoverer_queue_destroy(&manager->queue);
zbx_hashset_destroy(&manager->results);
zbx_hashset_destroy(&manager->incomplete_checks_count);
zbx_vector_discoverer_jobs_ptr_destroy(&manager->job_refs);
zbx_timekeeper_free(manager->timekeeper);
discoverer_libs_destroy();
}
return ret;
# undef SNMPV3_WORKERS_MAX
}
static void discoverer_manager_free(zbx_discoverer_manager_t *manager)
{
zbx_hashset_iter_t iter;
zbx_discoverer_results_t *result;
discoverer_queue_lock(&manager->queue);
for (int i = 0; i < manager->workers_num; i++)
discoverer_worker_stop(&manager->workers[i]);
discoverer_queue_notify_all(&manager->queue);
discoverer_queue_unlock(&manager->queue);
for (int i = 0; i < manager->workers_num; i++)
discoverer_worker_destroy(&manager->workers[i]);
zbx_free(manager->workers);
discoverer_queue_destroy(&manager->queue);
zbx_timekeeper_free(manager->timekeeper);
zbx_hashset_destroy(&manager->incomplete_checks_count);
zbx_vector_discoverer_jobs_ptr_clear(&manager->job_refs);
zbx_vector_discoverer_jobs_ptr_destroy(&manager->job_refs);
zbx_hashset_iter_reset(&manager->results, &iter);
while (NULL != (result = (zbx_discoverer_results_t *)zbx_hashset_iter_next(&iter)))
results_clear(result);
zbx_hashset_destroy(&manager->results);
pthread_mutex_destroy(&manager->results_lock);
discoverer_libs_destroy();
}
/******************************************************************************
* *
* Purpose: responds to worker usage statistics request *
* *
* Parameters: manager - [IN] discovery manager *
* client - [IN] request source *
* *
******************************************************************************/
static void discoverer_reply_usage_stats(zbx_discoverer_manager_t *manager, zbx_ipc_client_t *client)
{
zbx_vector_dbl_t usage;
unsigned char *data;
zbx_uint32_t data_len;
zbx_vector_dbl_create(&usage);
(void)zbx_timekeeper_get_usage(manager->timekeeper, &usage);
data_len = zbx_discovery_pack_usage_stats(&data, &usage, manager->workers_num);
zbx_ipc_client_send(client, ZBX_IPC_DISCOVERER_USAGE_STATS_RESULT, data, data_len);
zbx_free(data);
zbx_vector_dbl_destroy(&usage);
}
/******************************************************************************
* *
* Purpose: periodically tries to find new hosts and services *
* *
******************************************************************************/
ZBX_THREAD_ENTRY(zbx_discoverer_thread, args)
{
zbx_thread_discoverer_args *discoverer_args_in = (zbx_thread_discoverer_args *)
(((zbx_thread_args_t *)args)->args);
double sec;
int nextcheck = 0;
zbx_ipc_service_t ipc_service;
zbx_ipc_client_t *client;
zbx_ipc_message_t *message;
zbx_timespec_t sleeptime = { .sec = DISCOVERER_DELAY, .ns = 0 };
const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info;
char *error = NULL;
zbx_vector_uint64_pair_t revisions;
zbx_vector_uint64_t del_druleids, del_jobs;
zbx_vector_discoverer_drule_error_t drule_errors;
zbx_hashset_t incomplete_druleids;
zbx_uint32_t rtc_msgs[] = {ZBX_RTC_SNMP_CACHE_RELOAD};
zbx_uint64_t rev_last = 0;
zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
info->server_num, get_process_type_string(info->process_type), info->process_num);
zbx_get_progname_cb = discoverer_args_in->zbx_get_progname_cb_arg;
zbx_get_program_type_cb = discoverer_args_in->zbx_get_program_type_cb_arg;
zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);
#if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
zbx_tls_init_child(discoverer_args_in->zbx_config_tls, discoverer_args_in->zbx_get_program_type_cb_arg,
zbx_dc_get_psk_by_identity);
#endif
zbx_get_progname_cb = discoverer_args_in->zbx_get_progname_cb_arg;
zbx_setproctitle("%s #%d [connecting to the database]", get_process_type_string(info->process_type),
info->process_num);
zbx_db_connect(ZBX_DB_CONNECT_NORMAL);
if (FAIL == zbx_ipc_service_start(&ipc_service, ZBX_IPC_SERVICE_DISCOVERER, &error))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot start discoverer service: %s", error);
zbx_free(error);
exit(EXIT_FAILURE);
}
if (FAIL == discoverer_manager_init(&dmanager, discoverer_args_in, info, &error))
{
zabbix_log(LOG_LEVEL_ERR, "Cannot initialize discovery manager: %s", error);
zbx_free(error);
zbx_ipc_service_close(&ipc_service);
exit(EXIT_FAILURE);
}
zbx_rtc_subscribe_service(ZBX_PROCESS_TYPE_DISCOVERYMANAGER, 0, rtc_msgs, ARRSIZE(rtc_msgs),
discoverer_args_in->config_timeout, ZBX_IPC_SERVICE_DISCOVERER);
zbx_vector_uint64_pair_create(&revisions);
zbx_vector_uint64_create(&del_druleids);
zbx_vector_uint64_create(&del_jobs);
zbx_hashset_create(&incomplete_druleids, 1, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
zbx_vector_discoverer_drule_error_create(&drule_errors);
zbx_setproctitle("%s #%d [started]", get_process_type_string(info->process_type), info->process_num);
while (ZBX_IS_RUNNING())
{
int processing_rules_num, more_results, is_drules_rev_updated;
zbx_uint64_t unsaved_checks;
sec = zbx_time();
zbx_update_env(get_process_type_string(info->process_type), sec);
/* update local drules revisions */
zbx_vector_uint64_clear(&del_druleids);
zbx_vector_uint64_pair_clear(&revisions);
is_drules_rev_updated = zbx_dc_drule_revisions_get(&rev_last, &revisions);
discoverer_queue_lock(&dmanager.queue);
if (SUCCEED == is_drules_rev_updated)
{
for (int i = 0; i < dmanager.job_refs.values_num; i++)
{
int k;
zbx_uint64_pair_t revision;
zbx_discoverer_job_t *job = dmanager.job_refs.values[i];
revision.first = job->druleid;
if (FAIL == (k = zbx_vector_uint64_pair_bsearch(&revisions, revision,
ZBX_DEFAULT_UINT64_COMPARE_FUNC)) ||
revisions.values[k].second != job->drule_revision)
{
zbx_vector_uint64_append(&del_druleids, job->druleid);
dmanager.queue.pending_checks_count -= discoverer_job_tasks_free(job);
zabbix_log(LOG_LEVEL_DEBUG, "%s() changed revision of druleid:" ZBX_FS_UI64,
__func__, job->druleid);
}
}
nextcheck = 0;
}
processing_rules_num = dmanager.job_refs.values_num;
zbx_vector_discoverer_drule_error_append_array(&drule_errors, dmanager.queue.errors.values,
dmanager.queue.errors.values_num);
zbx_vector_discoverer_drule_error_clear(&dmanager.queue.errors);
zbx_vector_uint64_append_array(&del_jobs, dmanager.queue.del_jobs.values,
dmanager.queue.del_jobs.values_num);
zbx_vector_uint64_clear(&dmanager.queue.del_jobs);
discoverer_queue_unlock(&dmanager.queue);
zbx_vector_uint64_sort(&del_druleids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
more_results = process_results(&dmanager, &del_druleids, &incomplete_druleids, &unsaved_checks,
&drule_errors, discoverer_args_in->events_cbs, discoverer_args_in->discovery_open_cb,
discoverer_args_in->discovery_close_cb, discoverer_args_in->discovery_update_host_cb,
discoverer_args_in->discovery_update_service_cb,
discoverer_args_in->discovery_update_service_down_cb,
discoverer_args_in->discovery_find_host_cb);
process_job_finalize(&del_jobs, &drule_errors, &incomplete_druleids,
discoverer_args_in->discovery_open_cb, discoverer_args_in->discovery_close_cb,
discoverer_args_in->discovery_update_drule_cb);
zbx_setproctitle("%s #%d [processing %d rules, " ZBX_FS_UI64 " unsaved checks]",
get_process_type_string(info->process_type), info->process_num, processing_rules_num,
unsaved_checks);
/* process discovery rules and create net check jobs */
sec = zbx_time();
if ((int)sec >= nextcheck)
{
int rule_count;
zbx_vector_discoverer_jobs_ptr_t jobs;
zbx_hashset_t check_counts;
zbx_vector_discoverer_jobs_ptr_create(&jobs);
zbx_hashset_create(&check_counts, 1, discoverer_check_count_hash,
discoverer_check_count_compare);
rule_count = process_discovery(&nextcheck, &incomplete_druleids, &jobs, &check_counts,
&drule_errors, &del_jobs);
if (0 < rule_count)
{
zbx_hashset_iter_t iter;
zbx_discoverer_check_count_t *count;
zbx_uint64_t queued = 0;
zbx_hashset_iter_reset(&check_counts, &iter);
pthread_mutex_lock(&dmanager.results_lock);
while (NULL != (count = (zbx_discoverer_check_count_t *)zbx_hashset_iter_next(&iter)))
{
queued += count->count;
zbx_hashset_insert(&dmanager.incomplete_checks_count, count,
sizeof(zbx_discoverer_check_count_t));
}
pthread_mutex_unlock(&dmanager.results_lock);
discoverer_queue_lock(&dmanager.queue);
dmanager.queue.pending_checks_count += queued;
for (int i = 0; i < jobs.values_num; i++)
{
zbx_discoverer_job_t *job = jobs.values[i];
discoverer_queue_push(&dmanager.queue, job);
zbx_vector_discoverer_jobs_ptr_append(&dmanager.job_refs, job);
}
zbx_vector_discoverer_jobs_ptr_sort(&dmanager.job_refs,
ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
discoverer_queue_notify_all(&dmanager.queue);
discoverer_queue_unlock(&dmanager.queue);
}
zbx_vector_discoverer_jobs_ptr_destroy(&jobs);
zbx_hashset_destroy(&check_counts);
}
/* update sleeptime */
sleeptime.sec = 0 != more_results ? 0 : zbx_calculate_sleeptime(nextcheck, DISCOVERER_DELAY);
zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);
(void)zbx_ipc_service_recv(&ipc_service, &sleeptime, &client, &message);
zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);
if (NULL != message)
{
zbx_uint64_t count;
switch (message->code)
{
case ZBX_IPC_DISCOVERER_QUEUE:
discoverer_queue_lock(&dmanager.queue);
count = dmanager.queue.pending_checks_count;
discoverer_queue_unlock(&dmanager.queue);
zbx_ipc_client_send(client, ZBX_IPC_DISCOVERER_QUEUE, (unsigned char *)&count,
sizeof(count));
break;
case ZBX_IPC_DISCOVERER_USAGE_STATS:
discoverer_reply_usage_stats(&dmanager, client);
break;
#ifdef HAVE_NETSNMP
case ZBX_RTC_SNMP_CACHE_RELOAD:
zbx_clear_cache_snmp(info->process_type, info->process_num);
break;
#endif
case ZBX_RTC_SHUTDOWN:
zabbix_log(LOG_LEVEL_DEBUG, "shutdown message received, terminating...");
goto out;
}
zbx_ipc_message_free(message);
}
if (NULL != client)
zbx_ipc_client_release(client);
zbx_timekeeper_collect(dmanager.timekeeper);
}
out:
zbx_setproctitle("%s #%d [terminating]", get_process_type_string(info->process_type), info->process_num);
zbx_vector_uint64_pair_destroy(&revisions);
zbx_vector_uint64_destroy(&del_druleids);
zbx_vector_uint64_destroy(&del_jobs);
zbx_vector_discoverer_drule_error_clear_ext(&drule_errors, zbx_discoverer_drule_error_free);
zbx_vector_discoverer_drule_error_destroy(&drule_errors);
zbx_hashset_destroy(&incomplete_druleids);
discoverer_manager_free(&dmanager);
zbx_ipc_service_close(&ipc_service);
exit(EXIT_SUCCESS);
}