/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see .
**/
#include "active_checks.h"
#include "../agent_conf/agent_conf.h"
#include "../logfiles/logfiles.h"
#include "../metrics/metrics.h"
#include "zbxcfg.h"
#include "zbxlog.h"
#include "zbxsysinfo.h"
#include "zbxcommshigh.h"
#include "zbxthreads.h"
#include "zbxcrypto.h"
#include "zbxjson.h"
#include "zbxregexp.h"
#include "zbxstr.h"
#include "zbxnum.h"
#include "zbxtime.h"
#include "zbx_rtc_constants.h"
#include "zbx_item_constants.h"
#include "zbxalgo.h"
#include "zbxparam.h"
#include "zbxexpr.h"
#if defined(ZABBIX_SERVICE)
# include "zbxwinservice.h"
#elif !defined(_WINDOWS)
# include "zbxnix.h"
#endif
typedef struct
{
zbx_uint64_t itemid;
char *value;
unsigned char state;
zbx_uint64_t lastlogsize;
int timestamp;
char *source;
int severity;
zbx_timespec_t ts;
int logeventid;
int mtime;
unsigned char flags;
zbx_uint64_t id;
}
active_buffer_element_t;
ZBX_PTR_VECTOR_DECL(command_result_ptr, struct zbx_command_result *)
typedef struct zbx_command_result
{
zbx_uint64_t id;
char *value;
unsigned char state;
}
zbx_command_result_t;
ZBX_PTR_VECTOR_IMPL(command_result_ptr, zbx_command_result_t *)
typedef struct
{
active_buffer_element_t *data;
int count;
int pcount;
int lastsent;
int first_error;
}
active_buffer_t;
typedef struct _zbx_active_command_t zbx_active_command_t;
ZBX_PTR_VECTOR_DECL(active_command_ptr, zbx_active_command_t *)
struct _zbx_active_command_t
{
zbx_uint64_t command_id;
char *key;
int timeout;
};
ZBX_PTR_VECTOR_IMPL(active_command_ptr, zbx_active_command_t *)
static ZBX_THREAD_LOCAL active_buffer_t buffer;
static ZBX_THREAD_LOCAL zbx_vector_command_result_ptr_t command_results;
static ZBX_THREAD_LOCAL zbx_vector_active_metrics_ptr_t active_metrics;
static ZBX_THREAD_LOCAL zbx_vector_active_command_ptr_t active_commands;
static ZBX_THREAD_LOCAL zbx_hashset_t commands_hash;
static ZBX_THREAD_LOCAL zbx_vector_expression_t regexps;
static ZBX_THREAD_LOCAL char *session_token;
static ZBX_THREAD_LOCAL zbx_uint64_t last_valueid = 0;
static ZBX_THREAD_LOCAL zbx_vector_pre_persistent_t pre_persistent_vec; /* used for staging of data going */
/* into persistent files */
/* used for deleting inactive persistent files */
static ZBX_THREAD_LOCAL zbx_vector_persistent_inactive_t persistent_inactive_vec;
#define ZBX_HISTORY_UPLOAD_ENABLED 0
#define ZBX_HISTORY_UPLOAD_DISABLED (-1)
static ZBX_THREAD_LOCAL int history_upload = ZBX_HISTORY_UPLOAD_ENABLED;
typedef struct
{
zbx_uint64_t id;
time_t ttl;
}
zbx_cmd_hash_t;
#ifndef _WINDOWS
static volatile sig_atomic_t need_update_userparam;
#endif
static void send_back_unsupported_item(zbx_uint64_t itemid, const char *key, char *error, const char *config_hostname,
zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls, int config_timeout,
const char *config_source_ip, int config_buffer_send, int config_buffer_size);
static void init_active_metrics(int config_buffer_size)
{
size_t sz;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (NULL == buffer.data)
{
zabbix_log(LOG_LEVEL_DEBUG, "buffer: first allocation for %d elements", config_buffer_size);
sz = (size_t)config_buffer_size * sizeof(active_buffer_element_t);
buffer.data = (active_buffer_element_t *)zbx_malloc(buffer.data, sz);
memset(buffer.data, 0, sz);
buffer.count = 0;
buffer.pcount = 0;
buffer.lastsent = (int)time(NULL);
buffer.first_error = 0;
}
zbx_vector_command_result_ptr_create(&command_results);
zbx_vector_active_metrics_ptr_create(&active_metrics);
zbx_vector_active_command_ptr_create(&active_commands);
zbx_hashset_create(&commands_hash, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
zbx_vector_expression_create(®exps);
zbx_vector_pre_persistent_create(&pre_persistent_vec);
zbx_vector_persistent_inactive_create(&persistent_inactive_vec);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
static void free_active_metric(zbx_active_metric_t *metric)
{
zbx_free(metric->key);
zbx_free(metric->delay);
for (int i = 0; i < metric->logfiles_num; i++)
zbx_free(metric->logfiles[i].filename);
zbx_free(metric->logfiles);
#if !defined(_WINDOWS) && !defined(__MINGW32__)
zbx_free(metric->persistent_file_name);
#endif
zbx_free(metric);
}
static void free_active_command(zbx_active_command_t *command)
{
zbx_free(command->key);
zbx_free(command);
}
static void free_command_result(zbx_command_result_t *result)
{
zbx_free(result->value);
zbx_free(result);
}
static void clean_command_hash(void)
{
zbx_cmd_hash_t *cmd_hash;
zbx_hashset_iter_t iter;
zbx_hashset_iter_reset(&commands_hash, &iter);
while (NULL != (cmd_hash = (zbx_cmd_hash_t *)zbx_hashset_iter_next(&iter)))
{
int i;
zbx_command_result_t *result, result_loc;
if (cmd_hash->ttl > time(NULL))
continue;
result_loc.id = cmd_hash->id;
if (FAIL != (i = zbx_vector_command_result_ptr_search(&command_results, &result_loc,
ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC)))
{
result = (zbx_command_result_t *)command_results.values[i];
zbx_vector_command_result_ptr_remove_noorder(&command_results, i);
free_command_result(result);
}
zbx_hashset_iter_remove(&iter);
}
}
#ifdef _WINDOWS
static void free_active_metrics(void)
{
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_regexp_clean_expressions(®exps);
zbx_vector_expression_destroy(®exps);
zbx_vector_active_metrics_ptr_clear_ext(&active_metrics, (zbx_clean_func_t)free_active_metric);
zbx_vector_active_metrics_ptr_destroy(&active_metrics);
zbx_vector_command_result_ptr_clear_ext(&command_results, (zbx_clean_func_t)free_command_result);
zbx_vector_command_result_ptr_destroy(&command_results);
zbx_vector_active_command_ptr_clear_ext(&active_commands, (zbx_clean_func_t)free_command_result);
zbx_vector_active_command_ptr_destroy(&active_commands);
zbx_hashset_destroy(&commands_hash);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
#endif
static int get_min_nextcheck(void)
{
int min = -1;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
for (int i = 0; i < active_metrics.values_num; i++)
{
const zbx_active_metric_t *metric = (const zbx_active_metric_t *)active_metrics.values[i];
if (metric->nextcheck < min || -1 == min)
min = metric->nextcheck;
}
if (-1 == min)
min = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, min);
return min;
}
static void add_check(const char *key, zbx_uint64_t itemid, const char *delay, zbx_uint64_t lastlogsize, int mtime,
int timeout)
{
zbx_active_metric_t *metric;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() key:'%s' refresh:%s lastlogsize:" ZBX_FS_UI64 " mtime:%d timeout:%d",
__func__, key, delay, lastlogsize, mtime, timeout);
for (int i = 0; i < active_metrics.values_num; i++)
{
metric = active_metrics.values[i];
if (metric->itemid != itemid)
continue;
if (0 != strcmp(metric->key, key))
{
zbx_free(metric->key);
metric->key = zbx_strdup(NULL, key);
metric->lastlogsize = lastlogsize;
metric->mtime = mtime;
metric->big_rec = 0;
metric->use_ino = 0;
metric->error_count = 0;
for (int j = 0; j < metric->logfiles_num; j++)
zbx_free(metric->logfiles[j].filename);
zbx_free(metric->logfiles);
metric->logfiles_num = 0;
metric->start_time = 0.0;
metric->processed_bytes = 0;
#if !defined(_WINDOWS) && !defined(__MINGW32__)
if (NULL != metric->persistent_file_name)
{
char *error = NULL;
zabbix_log(LOG_LEVEL_DEBUG, "%s() removing persistent file '%s'",
__func__, metric->persistent_file_name);
zbx_remove_from_persistent_inactive_list(&persistent_inactive_vec, metric->itemid);
if (SUCCEED != zbx_remove_persistent_file(metric->persistent_file_name, &error))
{
/* log error and continue operation */
zabbix_log(LOG_LEVEL_WARNING, "cannot remove persistent file \"%s\": %s",
metric->persistent_file_name, error);
zbx_free(error);
}
zbx_free(metric->persistent_file_name);
}
#endif
}
#if !defined(_WINDOWS) && !defined(__MINGW32__)
else if (NULL != metric->persistent_file_name)
{
/* the metric is active, but it could have been placed on inactive list earlier */
zbx_remove_from_persistent_inactive_list(&persistent_inactive_vec, metric->itemid);
}
#endif
/* replace metric */
if (0 != strcmp(metric->delay, delay))
{
metric->nextcheck = 0;
metric->delay = zbx_strdup(metric->delay, delay);
}
metric->timeout = timeout;
goto out;
}
metric = (zbx_active_metric_t *)zbx_malloc(NULL, sizeof(zbx_active_metric_t));
/* add new metric */
metric->itemid = itemid;
metric->key = zbx_strdup(NULL, key);
metric->delay = zbx_strdup(NULL, delay);
metric->nextcheck = 0;
metric->state = ITEM_STATE_NORMAL;
metric->lastlogsize = lastlogsize;
metric->mtime = mtime;
metric->timeout = timeout;
/* existing log[], log.count[] and eventlog[] data can be skipped */
metric->skip_old_data = (0 != metric->lastlogsize ? 0 : 1);
metric->big_rec = 0;
metric->use_ino = 0;
metric->error_count = 0;
metric->logfiles_num = 0;
metric->logfiles = NULL;
metric->flags = ZBX_METRIC_FLAG_NEW;
if ('l' == metric->key[0] && 'o' == metric->key[1] && 'g' == metric->key[2])
{
if ('[' == metric->key[3]) /* log[ */
metric->flags |= ZBX_METRIC_FLAG_LOG_LOG;
else if (0 == strncmp(metric->key + 3, "rt[", 3)) /* logrt[ */
metric->flags |= ZBX_METRIC_FLAG_LOG_LOGRT;
else if (0 == strncmp(metric->key + 3, ".count[", 7)) /* log.count[ */
metric->flags |= ZBX_METRIC_FLAG_LOG_LOG | ZBX_METRIC_FLAG_LOG_COUNT;
else if (0 == strncmp(metric->key + 3, "rt.count[", 9)) /* logrt.count[ */
metric->flags |= ZBX_METRIC_FLAG_LOG_LOGRT | ZBX_METRIC_FLAG_LOG_COUNT;
}
else if (0 == strncmp(metric->key, "eventlog", 8))
{
if ('[' == metric->key[8])
metric->flags |= ZBX_METRIC_FLAG_LOG_EVENTLOG;
else if (0 == strncmp(metric->key + 8, ".count[", 7))
metric->flags |= ZBX_METRIC_FLAG_LOG_EVENTLOG | ZBX_METRIC_FLAG_LOG_COUNT;
}
metric->start_time = 0.0;
metric->processed_bytes = 0;
metric->persistent_file_name = NULL; /* initialized but not used on Microsoft Windows */
zbx_vector_active_metrics_ptr_append(&active_metrics, metric);
out:
if (0 == metric->nextcheck)
{
char *error = NULL;
int nextcheck = 0, scheduling = FAIL;
if (SUCCEED == zbx_get_agent_item_nextcheck(metric->itemid, metric->delay, (int)time(NULL),
&nextcheck, &scheduling, &error))
{
/* first poll of new items without scheduling checks must be done as soon as possible */
if (SUCCEED == scheduling)
metric->nextcheck = nextcheck;
}
else
{
/* item nextcheck is calculated when item is being polled - */
/* invalid interval error will be generated then */
zbx_free(error);
}
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
static void add_command(const char *key, zbx_uint64_t id, int timeout)
{
zbx_active_command_t *command;
zbx_cmd_hash_t cmd_hash_loc;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() key:'%s' id:" ZBX_FS_UI64 " timeout: %d", __func__, key, id, timeout);
if (NULL == zbx_hashset_search(&commands_hash, &id))
{
cmd_hash_loc.id = id;
cmd_hash_loc.ttl = time(NULL) + SEC_PER_HOUR;
zbx_hashset_insert(&commands_hash, &cmd_hash_loc, sizeof(cmd_hash_loc));
command = (zbx_active_command_t *)zbx_malloc(NULL, sizeof(zbx_active_command_t));
command->key = zbx_strdup(NULL, key);
command->command_id = id;
command->timeout = timeout;
zbx_vector_active_command_ptr_append(&active_commands, command);
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/********************************************************************************
* *
* Purpose: parses list of active checks received from server *
* *
* Parameters: *
* str - [IN] NULL terminated string received from server *
* host - [IN] address of host *
* port - [IN] port number on host *
* config_revision_local - [IN/OUT] revision of processed configuration *
* config_timeout - [IN] global timeout value for checks without *
* timeouts *
* *
* Comments: *
* String is represented as "ZBX_EOF" termination list, with '\n' *
* delimiter between elements. *
* Each element represented as: *
* ::: *
* *
********************************************************************************/
static int parse_list_of_checks(char *str, const char *host, unsigned short port,
zbx_uint32_t *config_revision_local, int config_timeout, const char *config_hostname,
zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls, const char *config_source_ip,
int config_buffer_send, int config_buffer_size)
{
const char *p;
size_t name_alloc = 0, delay_alloc = 0;
char *name = NULL, *delay = NULL, expression[MAX_STRING_LEN], tmp[MAX_STRING_LEN] = {0},
exp_delimiter, error[MAX_STRING_LEN];
zbx_uint64_t lastlogsize, itemid;
struct zbx_json_parse jp, jp_data, jp_row;
zbx_active_metric_t *metric;
zbx_vector_uint64_t received_itemids;
int mtime, expression_type, case_sensitive, timeout, i, ret = FAIL;
zbx_uint32_t config_revision;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_vector_uint64_create(&received_itemids);
if (SUCCEED != zbx_json_open(str, &jp))
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active checks: %s", zbx_json_strerror());
goto out;
}
if (SUCCEED != zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_RESPONSE, tmp, sizeof(tmp), NULL))
{
if (SUCCEED == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_ERROR, tmp, sizeof(tmp), NULL))
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active checks from [%s:%hu]: %s", host, port,
tmp);
}
else
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active checks: cannot find tag: %s",
ZBX_PROTO_TAG_RESPONSE);
}
goto out;
}
if (0 != strcmp(tmp, ZBX_PROTO_VALUE_SUCCESS))
{
if (SUCCEED == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_INFO, tmp, sizeof(tmp), NULL))
zabbix_log(LOG_LEVEL_ERR, "no active checks on server [%s:%hu]: %s", host, port, tmp);
else
zabbix_log(LOG_LEVEL_ERR, "no active checks on server");
ret = SUCCEED;
goto out;
}
if (FAIL == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_CONFIG_REVISION, tmp, sizeof(tmp), NULL))
{
config_revision = 0;
}
else if (FAIL == zbx_is_uint32(tmp, &config_revision))
{
zabbix_log(LOG_LEVEL_WARNING, "\"%s\" is not a valid revision", tmp);
goto out;
}
if (FAIL != zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_HISTORY_UPLOAD, tmp, sizeof(tmp), NULL) &&
0 == strcmp(tmp, ZBX_PROTO_VALUE_HISTORY_UPLOAD_DISABLED))
{
history_upload = ZBX_HISTORY_UPLOAD_DISABLED;
}
else
history_upload = ZBX_HISTORY_UPLOAD_ENABLED;
if (SUCCEED != zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_DATA, &jp_data))
{
if (0 != *config_revision_local)
{
ret = SUCCEED;
goto out;
}
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active checks: %s", zbx_json_strerror());
goto out;
}
if (*config_revision_local > config_revision)
{
zbx_hashset_iter_t iter;
zbx_hashset_iter_reset(&commands_hash, &iter);
while (NULL != zbx_hashset_iter_next(&iter))
zbx_hashset_iter_remove(&iter);
zbx_vector_active_command_ptr_clear_ext(&active_commands, free_active_command);
zbx_vector_command_result_ptr_clear_ext(&command_results, free_command_result);
}
*config_revision_local = config_revision;
p = NULL;
while (NULL != (p = zbx_json_next(&jp_data, p)))
{
/* {"data":[{"key":"system.cpu.num",...,...},{...},...]}
* ^------------------------------^
*/ if (SUCCEED != zbx_json_brackets_open(p, &jp_row))
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active checks: %s", zbx_json_strerror());
goto out;
}
if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_KEY, &name, &name_alloc, NULL) ||
'\0' == *name)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", ZBX_PROTO_TAG_KEY);
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_ITEMID, tmp, sizeof(tmp), NULL) ||
SUCCEED != zbx_is_uint64(tmp, &itemid))
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", ZBX_PROTO_TAG_ITEMID);
continue;
}
if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_DELAY, &delay, &delay_alloc, NULL) ||
'\0' == *delay)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", ZBX_PROTO_TAG_DELAY);
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LASTLOGSIZE, tmp, sizeof(tmp), NULL) ||
SUCCEED != zbx_is_uint64(tmp, &lastlogsize))
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", ZBX_PROTO_TAG_LASTLOGSIZE);
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_MTIME, tmp, sizeof(tmp), NULL) ||
'\0' == *tmp)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", ZBX_PROTO_TAG_MTIME);
mtime = 0;
}
else
mtime = atoi(tmp);
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_TIMEOUT, tmp, sizeof(tmp), NULL) ||
'\0' == *tmp)
{
timeout = config_timeout;
}
else if (FAIL == zbx_validate_item_timeout(tmp, &timeout, error, sizeof(error)))
{
send_back_unsupported_item(itemid, name, error, config_hostname, addrs, config_tls,
config_timeout, config_source_ip, config_buffer_send, config_buffer_size);
continue;
}
add_check(zbx_alias_get(name), itemid, delay, lastlogsize, mtime, timeout);
/* remember what was received */
zbx_vector_uint64_append(&received_itemids, itemid);
}
zbx_vector_uint64_sort(&received_itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
/* remove what wasn't received */
for (i = 0; i < active_metrics.values_num; i++)
{
int found = 0;
metric = active_metrics.values[i];
found = zbx_vector_uint64_bsearch(&received_itemids, metric->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
if (FAIL == found)
{
#if !defined(_WINDOWS) && !defined(__MINGW32__)
if (NULL != metric->persistent_file_name)
{
zbx_add_to_persistent_inactive_list(&persistent_inactive_vec, metric->itemid,
metric->persistent_file_name);
}
#endif
zbx_vector_active_metrics_ptr_remove_noorder(&active_metrics, i);
free_active_metric(metric);
i--; /* consider the same index on the next run */
}
}
zbx_regexp_clean_expressions(®exps);
if (SUCCEED == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_REGEXP, &jp_data))
{
p = NULL;
while (NULL != (p = zbx_json_next(&jp_data, p)))
{
/* {"regexp":[{"name":"regexp1",...,...},{...},...]}
* ^------------------------^
*/ if (SUCCEED != zbx_json_brackets_open(p, &jp_row))
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active checks: %s",
zbx_json_strerror());
goto out;
}
if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, "name", &name, &name_alloc, NULL))
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", "name");
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, "expression", expression, sizeof(expression),
NULL) || '\0' == *expression)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", "expression");
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, "expression_type", tmp, sizeof(tmp), NULL) ||
'\0' == *tmp)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", "expression_type");
continue;
}
expression_type = atoi(tmp);
if (SUCCEED != zbx_json_value_by_name(&jp_row, "exp_delimiter", tmp, sizeof(tmp), NULL))
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", "exp_delimiter");
continue;
}
exp_delimiter = tmp[0];
if (SUCCEED != zbx_json_value_by_name(&jp_row, "case_sensitive", tmp,
sizeof(tmp), NULL) || '\0' == *tmp)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", "case_sensitive");
continue;
}
case_sensitive = atoi(tmp);
zbx_add_regexp_ex(®exps, name, expression, expression_type, exp_delimiter, case_sensitive);
}
}
ret = SUCCEED;
out:
zbx_vector_uint64_destroy(&received_itemids);
zbx_free(delay);
zbx_free(name);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
static int parse_list_of_commands(char *str, int config_timeout)
{
const char *p;
char *cmd = NULL, tmp[MAX_STRING_LEN], error[MAX_STRING_LEN], *key = NULL;
int timeout, ret = FAIL;
zbx_uint64_t command_id;
struct zbx_json_parse jp, jp_data, jp_row;
size_t cmd_alloc = 0, key_alloc;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (SUCCEED != zbx_json_open(str, &jp))
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active commands: %s", zbx_json_strerror());
goto out;
}
if (SUCCEED == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_COMMANDS, &jp_data))
{
p = NULL;
while (NULL != (p = zbx_json_next(&jp_data, p)))
{
size_t offset = 0;
if (SUCCEED != zbx_json_brackets_open(p, &jp_row))
{
zabbix_log(LOG_LEVEL_ERR, "cannot parse list of active commands: %s",
zbx_json_strerror());
goto out;
}
if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_COMMAND, &cmd, &cmd_alloc,
NULL) || '\0' == *cmd)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"",
ZBX_PROTO_TAG_COMMAND);
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_WAIT, tmp, sizeof(tmp), NULL) ||
'\0' == *tmp)
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"",
ZBX_PROTO_TAG_WAIT);
continue;
}
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_TIMEOUT, tmp, sizeof(tmp), NULL) ||
'\0' == *tmp)
{
timeout = config_timeout;
}
else if (FAIL == zbx_validate_item_timeout(tmp, &timeout, error, sizeof(error)))
{
zabbix_log(LOG_LEVEL_ERR, "failed to validate timeout \"%d\", error: %s ", timeout,
error);
continue;
}
if (SUCCEED != zbx_quote_key_param(&cmd, 0))
{
zabbix_log(LOG_LEVEL_WARNING, "Invalid command \"%s\"", cmd);
continue;
}
if (0 == atoi(tmp))
{
zbx_snprintf_alloc(&key, &key_alloc, &offset, "system.run[%s,nowait]",
cmd);
}
else
zbx_snprintf_alloc(&key, &key_alloc, &offset, "system.run[%s,wait]",cmd);
if (SUCCEED != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_ID, tmp, sizeof(tmp), NULL) ||
SUCCEED != zbx_is_uint64(tmp, &command_id))
{
zabbix_log(LOG_LEVEL_WARNING, "cannot retrieve value of tag \"%s\"", ZBX_PROTO_TAG_ID);
continue;
}
add_command(key, command_id, timeout);
}
}
ret = SUCCEED;
out:
zbx_free(key);
zbx_free(cmd);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/************************************************************************************************
* *
* Purpose: processes configuration item and sets its value to respective parameter *
* *
* Parameters: *
* json - [OUT] pointer to JSON structure where to put resulting value *
* config - [IN] pointer to configuration parameter *
* length - [IN] length of configuration parameter *
* proto - [IN] configuration parameter prototype *
* config_host_metadata_item - [IN] *
* *
************************************************************************************************/
static void process_config_item(struct zbx_json *json, const char *config, size_t length, const char *proto,
const char *config_host_metadata_item)
{
char **value;
AGENT_RESULT result;
const char *config_name, *config_type;
if (config_host_metadata_item == config)
{
config_name = "HostMetadataItem";
config_type = "metadata";
}
else /* config_host_interface_item */
{
config_name = "HostInterfaceItem";
config_type = "interface";
}
zbx_init_agent_result(&result);
if (SUCCEED == zbx_execute_agent_check(config, ZBX_PROCESS_LOCAL_COMMAND | ZBX_PROCESS_WITH_ALIAS, &result,
ZBX_CHECK_TIMEOUT_UNDEFINED) && NULL != (value = ZBX_GET_STR_RESULT(&result)) && NULL != *value)
{
if (SUCCEED != zbx_is_utf8(*value))
{
zabbix_log(LOG_LEVEL_WARNING, "cannot get host %s using \"%s\" item specified by"
" \"%s\" configuration parameter: returned value is not"
" a UTF-8 string",config_type, config, config_name);
}
else
{
if (length < zbx_strlen_utf8(*value))
{
size_t bytes;
zabbix_log(LOG_LEVEL_WARNING, "the returned value of \"%s\" item specified by"
" \"%s\" configuration parameter is too long,"
" using first %d characters", config, config_name, (int)length);
bytes = zbx_strlen_utf8_nchars(*value, length);
(*value)[bytes] = '\0';
}
zbx_json_addstring(json, proto, *value, ZBX_JSON_TYPE_STRING);
}
}
else
zabbix_log(LOG_LEVEL_WARNING, "cannot get host %s using \"%s\" item specified by"
" \"%s\" configuration parameter", config_type, config, config_name);
zbx_free_agent_result(&result);
}
/******************************************************************************
* *
* Purpose: retrieves list of active checks from Zabbix server *
* *
* Return value: returns SUCCEED on successful parsing, *
* FAIL on other cases *
* *
******************************************************************************/
static int refresh_active_checks(zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls,
zbx_uint32_t *config_revision_local, int config_timeout, const char *config_source_ip,
const char *config_listen_ip, int config_listen_port, const char *config_hostname,
const char *config_host_metadata, const char *config_host_metadata_item,
const char *config_host_interface, const char *config_host_interface_item, int config_buffer_send,
int config_buffer_size)
{
static ZBX_THREAD_LOCAL int last_ret = SUCCEED;
int ret, level;
struct zbx_json json;
char *data = NULL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() host:'%s' port:%hu", __func__, ((zbx_addr_t *)addrs->values[0])->ip,
((zbx_addr_t *)addrs->values[0])->port);
zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
zbx_json_addstring(&json, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_GET_ACTIVE_CHECKS, ZBX_JSON_TYPE_STRING);
zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, config_hostname, ZBX_JSON_TYPE_STRING);
if (NULL != config_host_metadata)
{
zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST_METADATA, config_host_metadata, ZBX_JSON_TYPE_STRING);
}
else if (NULL != config_host_metadata_item)
{
#define HOST_METADATA_LEN 65535 /* UTF-8 characters, not bytes */
process_config_item(&json, config_host_metadata_item, HOST_METADATA_LEN, ZBX_PROTO_TAG_HOST_METADATA,
config_host_metadata_item);
#undef HOST_METADATA_LEN
}
if (NULL != config_host_interface)
{
zbx_json_addstring(&json, ZBX_PROTO_TAG_INTERFACE, config_host_interface, ZBX_JSON_TYPE_STRING);
}
else if (NULL != config_host_interface_item)
{
process_config_item(&json, config_host_interface_item, HOST_INTERFACE_LEN, ZBX_PROTO_TAG_INTERFACE,
config_host_metadata_item);
}
if (NULL != config_listen_ip)
{
char *p;
if (NULL != (p = strchr(config_listen_ip, ',')))
*p = '\0';
zbx_json_addstring(&json, ZBX_PROTO_TAG_IP, config_listen_ip, ZBX_JSON_TYPE_STRING);
if (NULL != p)
*p = ',';
}
if (ZBX_DEFAULT_AGENT_PORT != config_listen_port)
zbx_json_adduint64(&json, ZBX_PROTO_TAG_PORT, (zbx_uint64_t)config_listen_port);
zbx_json_adduint64(&json, ZBX_PROTO_TAG_CONFIG_REVISION, (zbx_uint64_t)*config_revision_local);
zbx_json_addstring(&json, ZBX_PROTO_TAG_SESSION, session_token, ZBX_JSON_TYPE_STRING);
zbx_json_addstring(&json, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING);
zbx_json_addint64(&json, ZBX_PROTO_TAG_VARIANT, ZBX_PROGRAM_VARIANT_AGENT);
level = SUCCEED != last_ret ? LOG_LEVEL_DEBUG : LOG_LEVEL_WARNING;
ret = zbx_comms_exchange_with_redirect(config_source_ip, addrs, config_timeout, config_timeout, 0, level,
config_tls, json.buffer, NULL, NULL, &data, NULL);
if (SUCCEED == ret && '\0' == *data)
{
zbx_free(data);
zabbix_log(LOG_LEVEL_WARNING, "Received empty response from active check configuration update");
ret = FAIL;
zbx_addrs_failover(addrs);
}
if (SUCCEED == ret)
{
int rc;
if (SUCCEED != last_ret)
{
zabbix_log(LOG_LEVEL_WARNING, "Active check configuration update from [%s:%hu]"
" is working again", ((zbx_addr_t *)addrs->values[0])->ip,
((zbx_addr_t *)addrs->values[0])->port);
}
rc = parse_list_of_checks(data, ((zbx_addr_t *)addrs->values[0])->ip,
((zbx_addr_t *)addrs->values[0])->port, config_revision_local, config_timeout,
config_hostname, addrs, config_tls, config_source_ip, config_buffer_send,
config_buffer_size);
rc |= parse_list_of_commands(data, config_timeout);
if (SUCCEED != rc)
zbx_addrs_failover(addrs);
zbx_free(data);
}
else
{
if (RECV_ERROR == ret)
{
/* server is unaware if configuration is actually delivered and saves session */
*config_revision_local = 0;
}
}
if (SUCCEED != ret && SUCCEED == last_ret)
zabbix_log(LOG_LEVEL_WARNING, "Active check configuration update started to fail");
last_ret = ret;
zbx_json_free(&json);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: checks whether JSON response is SUCCEED *
* *
* Parameters: response - [IN] JSON response from Zabbix trapper *
* *
* Return value: SUCCEED - processed successfully *
* FAIL - error occurred *
* *
* Comments: zabbix_sender has almost the same function! *
* *
******************************************************************************/
static int check_response(const char *response)
{
struct zbx_json_parse jp;
char value[MAX_STRING_LEN], info[MAX_STRING_LEN];
int ret;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() response:'%s'", __func__, response);
ret = zbx_json_open(response, &jp);
if (SUCCEED == ret)
ret = zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_RESPONSE, value, sizeof(value), NULL);
if (SUCCEED == ret && 0 != strcmp(value, ZBX_PROTO_VALUE_SUCCESS))
ret = FAIL;
if (SUCCEED == ret && SUCCEED == zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_INFO, info, sizeof(info), NULL))
zabbix_log(LOG_LEVEL_DEBUG, "info from server: '%s'", info);
if (SUCCEED == ret && FAIL != zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_HISTORY_UPLOAD, value, sizeof(value),
NULL) && 0 == strcmp(value, ZBX_PROTO_VALUE_HISTORY_UPLOAD_DISABLED))
{
history_upload = ZBX_HISTORY_UPLOAD_DISABLED;
}
else
history_upload = ZBX_HISTORY_UPLOAD_ENABLED;
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
static int format_metric_results(struct zbx_json *json, int now, int config_buffer_send, int config_buffer_size)
{
active_buffer_element_t *el;
int i, ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (ZBX_HISTORY_UPLOAD_ENABLED != history_upload)
{
zabbix_log(LOG_LEVEL_DEBUG, "cannot send buffer: server has paused history upload");
goto ret;
}
if (config_buffer_size / 2 > buffer.pcount && config_buffer_size > buffer.count &&
config_buffer_send > now - buffer.lastsent)
{
zabbix_log(LOG_LEVEL_DEBUG, "%s() now:%d lastsent:%d now-lastsent:%d BufferSend:%d; will not send now",
__func__, now, buffer.lastsent, now - buffer.lastsent, config_buffer_send);
goto ret;
}
if (0 == buffer.count)
goto ret;
zbx_json_addarray(json, ZBX_PROTO_TAG_DATA);
for (i = 0; i < buffer.count; i++)
{
el = &buffer.data[i];
zbx_json_addobject(json, NULL);
zbx_json_adduint64(json, ZBX_PROTO_TAG_ITEMID, el->itemid);
if (NULL != el->value)
zbx_json_addstring(json, ZBX_PROTO_TAG_VALUE, el->value, ZBX_JSON_TYPE_STRING);
if (ITEM_STATE_NOTSUPPORTED == el->state)
{
zbx_json_adduint64(json, ZBX_PROTO_TAG_STATE, ITEM_STATE_NOTSUPPORTED);
}
else
{
/* add item meta information only for items in normal state */
if (0 != (ZBX_METRIC_FLAG_LOG & el->flags))
zbx_json_adduint64(json, ZBX_PROTO_TAG_LASTLOGSIZE, el->lastlogsize);
if (0 != (ZBX_METRIC_FLAG_LOG_LOGRT & el->flags))
zbx_json_addint64(json, ZBX_PROTO_TAG_MTIME, el->mtime);
}
if (0 != el->timestamp)
zbx_json_addint64(json, ZBX_PROTO_TAG_LOGTIMESTAMP, el->timestamp);
if (NULL != el->source)
zbx_json_addstring(json, ZBX_PROTO_TAG_LOGSOURCE, el->source, ZBX_JSON_TYPE_STRING);
if (0 != el->severity)
zbx_json_addint64(json, ZBX_PROTO_TAG_LOGSEVERITY, el->severity);
if (0 != el->logeventid)
zbx_json_addint64(json, ZBX_PROTO_TAG_LOGEVENTID, el->logeventid);
zbx_json_adduint64(json, ZBX_PROTO_TAG_ID, el->id);
zbx_json_addint64(json, ZBX_PROTO_TAG_CLOCK, el->ts.sec);
zbx_json_addint64(json, ZBX_PROTO_TAG_NS, el->ts.ns);
zbx_json_close(json);
}
zbx_json_close(json);
ret = SUCCEED;
ret:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
static int format_command_results(struct zbx_json *json)
{
int i;
zbx_command_result_t *result;
if (0 == command_results.values_num)
return FAIL;
zbx_json_addarray(json, ZBX_PROTO_TAG_COMMANDS);
for (i = 0; i < command_results.values_num; i++)
{
result = (zbx_command_result_t *)command_results.values[i];
if (NULL == result->value)
continue;
zbx_json_addobject(json, NULL);
zbx_json_adduint64(json, ZBX_PROTO_TAG_ID, result->id);
if (ITEM_STATE_NOTSUPPORTED == result->state)
zbx_json_addstring(json, ZBX_PROTO_TAG_ERROR, result->value, ZBX_JSON_TYPE_STRING);
else
zbx_json_addstring(json, ZBX_PROTO_TAG_VALUE, result->value, ZBX_JSON_TYPE_STRING);
zbx_json_close(json);
}
zbx_json_close(json);
return SUCCEED;
}
static void clear_metric_results(zbx_vector_addr_ptr_t *addrs, zbx_vector_pre_persistent_t *prep_vec, int now,
int ret)
{
int i;
active_buffer_element_t *el;
if (SUCCEED == ret)
{
#if !defined(_WINDOWS) && !defined(__MINGW32__)
zbx_write_persistent_files(prep_vec);
zbx_clean_pre_persistent_elements(prep_vec);
#else
ZBX_UNUSED(prep_vec);
#endif
/* free buffer */
for (i = 0; i < buffer.count; i++)
{
el = &buffer.data[i];
zbx_free(el->value);
zbx_free(el->source);
}
buffer.count = 0;
buffer.pcount = 0;
buffer.lastsent = now;
if (0 != buffer.first_error)
{
zabbix_log(LOG_LEVEL_WARNING, "active check data upload to [%s:%hu] is working again",
((zbx_addr_t *)addrs->values[0])->ip, ((zbx_addr_t *)addrs->values[0])->port);
buffer.first_error = 0;
}
}
else
{
if (0 == buffer.first_error)
{
zabbix_log(LOG_LEVEL_WARNING, "Active check data upload started to fail");
buffer.first_error = now;
}
}
}
static char *connect_callback(void *data)
{
zbx_json_t *json = (zbx_json_t *)data;
zbx_timespec_t ts;
zbx_timespec(&ts);
zbx_json_addint64(json, ZBX_PROTO_TAG_CLOCK, ts.sec);
zbx_json_addint64(json, ZBX_PROTO_TAG_NS, ts.ns);
return json->buffer;
}
/******************************************************************************
* *
* Purpose: sends value stored in buffer to Zabbix server *
* *
* Parameters: *
* addrs - [IN] vector with pair of Zabbix server IP or *
* Hostname and port number *
* prep_vec - [IN/OUT] vector with data for writing into *
* persistent files *
* config_tls - [IN] *
* config_timeout - [IN] *
* config_source_ip - [IN] *
* config_buffer_send - [IN] *
* config_buffer_size - [IN] *
* *
* Return value: SUCCEED if: *
* - no need to send data now (buffer empty or has enough *
* free elements, or recently sent) *
* - data successfully sent to server (proxy) *
* FAIL - error when sending data *
* *
******************************************************************************/
static int send_buffer(zbx_vector_addr_ptr_t *addrs, zbx_vector_pre_persistent_t *prep_vec,
const zbx_config_tls_t *config_tls, int config_timeout, const char *config_source_ip,
const char *config_hostname, int config_buffer_send, int config_buffer_size)
{
int ret = SUCCEED, ret_metrics, ret_commands, now, level;
char *data = NULL;
struct zbx_json json;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() host:'%s' port:%d entries:%d/%d",
__func__, ((zbx_addr_t *)addrs->values[0])->ip, ((zbx_addr_t *)addrs->values[0])->port,
buffer.count, config_buffer_size);
now = (int)time(NULL);
zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
zbx_json_addstring(&json, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_AGENT_DATA, ZBX_JSON_TYPE_STRING);
zbx_json_addstring(&json, ZBX_PROTO_TAG_SESSION, session_token, ZBX_JSON_TYPE_STRING);
zbx_json_addstring(&json, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING);
zbx_json_addint64(&json, ZBX_PROTO_TAG_VARIANT, ZBX_PROGRAM_VARIANT_AGENT);
zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, config_hostname, ZBX_JSON_TYPE_STRING);
ret_metrics = format_metric_results(&json, now, config_buffer_send, config_buffer_size);
ret_commands = format_command_results(&json);
if (FAIL == ret_metrics && FAIL == ret_commands)
goto ret;
level = 0 == buffer.first_error ? LOG_LEVEL_WARNING : LOG_LEVEL_DEBUG;
ret = zbx_comms_exchange_with_redirect(config_source_ip, addrs, MIN(buffer.count * config_timeout, 60),
config_timeout, 0, level, config_tls, json.buffer, connect_callback, &json, &data, NULL);
if (SUCCEED == ret)
{
if (NULL == data || SUCCEED != check_response(data))
{
ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "NOT OK");
zbx_addrs_failover(addrs);
}
else
zabbix_log(LOG_LEVEL_DEBUG, "OK");
zbx_free(data);
}
if (SUCCEED == ret_metrics)
clear_metric_results(addrs, prep_vec, now, ret);
if (SUCCEED == ret && SUCCEED == ret_commands)
zbx_vector_command_result_ptr_clear_ext(&command_results, free_command_result);
ret:
zbx_json_free(&json);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************************
* *
* Purpose: buffers new value or sends whole buffer to server *
* *
* Parameters: *
* addrs - [IN] In C agent - vector with a pair of Zabbix server IP or *
* Hostname and port number. In Agent2 it is not used (NULL). *
* agent2_result - [IN] NULL in C agent. In Agent2 it is used for passing *
* address of buffer where to store matching log *
* records. It is here to have the same function *
* prototype as in Agent2. *
* itemid - [IN] item identifier
* host - [IN] name of host in Zabbix database *
* key - [IN] name of metric *
* value - [IN] key value or error message why item became NOTSUPPORTED *
* state - [IN] ITEM_STATE_NORMAL or ITEM_STATE_NOTSUPPORTED *
* lastlogsize - [IN] size of read logfile *
* mtime - [IN] time of last file modification *
* timestamp - [IN] timestamp of read value *
* source - [IN] name of logged data source *
* severity - [IN] severity of logged data sources *
* logeventid - [IN] application-specific identifier for the event; used *
* for monitoring of Windows event logs *
* flags - [IN] metric flags *
* config_tls - [IN] *
* config_timeout - [IN] *
* config_source_ip - [IN] *
* config_buffer_send - [IN] *
* config_buffer_size - [IN] *
* *
* Return value: returns SUCCEED on successful parsing, *
* FAIL on other cases *
* *
* Comments: ATTENTION! This function's address and pointers to arguments *
* are described in Zabbix defined type "zbx_process_value_func_t" *
* and used when calling process_log(), process_logrt() and *
* zbx_read2(). If you ever change this process_value() arguments *
* or return value do not forget to synchronize changes with the *
* defined type "zbx_process_value_func_t" and implementations of *
* process_log(), process_logrt(), zbx_read2() and their callers. *
* *
******************************************************************************************/
static int process_value(zbx_vector_addr_ptr_t *addrs, zbx_vector_ptr_t *agent2_result, zbx_uint64_t itemid,
const char *host, const char *key, const char *value, unsigned char state, zbx_uint64_t *lastlogsize,
const int *mtime, const unsigned long *timestamp, const char *source,
const unsigned short *severity, const unsigned long *logeventid, unsigned char flags,
const zbx_config_tls_t *config_tls, int config_timeout, const char *config_source_ip,
int config_buffer_send, int config_buffer_size)
{
active_buffer_element_t *el = NULL;
int i, ret = FAIL;
size_t sz;
ZBX_UNUSED(agent2_result);
if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_DEBUG))
{
if (NULL != lastlogsize)
{
zabbix_log(LOG_LEVEL_DEBUG, "In %s() key:'%s:%s' lastlogsize:" ZBX_FS_UI64 " value:'%s'",
__func__, host, key, *lastlogsize, ZBX_NULL2STR(value));
}
else
{
/* log a dummy lastlogsize to keep the same record format for simpler parsing */
zabbix_log(LOG_LEVEL_DEBUG, "In %s() key:'%s:%s' lastlogsize:null value:'%s'",
__func__, host, key, ZBX_NULL2STR(value));
}
}
/* do not send data from buffer if host/key are the same as previous unless buffer is full already */
if (0 < buffer.count)
{
el = &buffer.data[buffer.count - 1];
if ((0 != (flags & ZBX_METRIC_FLAG_PERSISTENT) && config_buffer_size / 2 <= buffer.pcount) ||
config_buffer_size <= buffer.count || el->itemid != itemid)
{
send_buffer(addrs, &pre_persistent_vec, config_tls, config_timeout, config_source_ip,
host, config_buffer_send, config_buffer_size);
}
}
if (0 != (ZBX_METRIC_FLAG_PERSISTENT & flags) && config_buffer_size / 2 <= buffer.pcount)
{
zabbix_log(LOG_LEVEL_WARNING, "buffer is full, cannot store persistent value");
goto out;
}
if (config_buffer_size > buffer.count)
{
zabbix_log(LOG_LEVEL_DEBUG, "buffer: new element %d", buffer.count);
el = &buffer.data[buffer.count];
buffer.count++;
}
else
{
if (0 == (ZBX_METRIC_FLAG_PERSISTENT & flags))
{
for (i = 0; i < buffer.count; i++)
{
el = &buffer.data[i];
if (el->itemid == itemid)
break;
}
}
if (0 != (ZBX_METRIC_FLAG_PERSISTENT & flags) || i == buffer.count)
{
for (i = 0; i < buffer.count; i++)
{
el = &buffer.data[i];
if (0 == (ZBX_METRIC_FLAG_PERSISTENT & el->flags))
break;
}
}
if (NULL != el)
{
zabbix_log(LOG_LEVEL_DEBUG, "remove element [%d] itemid:" ZBX_FS_UI64, i, el->itemid);
zbx_free(el->value);
zbx_free(el->source);
}
sz = (size_t)(config_buffer_size - i - 1) * sizeof(active_buffer_element_t);
memmove(&buffer.data[i], &buffer.data[i + 1], sz);
zabbix_log(LOG_LEVEL_DEBUG, "buffer full: new element %d", buffer.count - 1);
el = &buffer.data[config_buffer_size - 1];
}
memset(el, 0, sizeof(active_buffer_element_t));
el->itemid = itemid;
if (NULL != value)
el->value = zbx_strdup(NULL, value);
el->state = state;
if (NULL != source)
el->source = strdup(source);
if (NULL != severity)
el->severity = *severity;
if (NULL != lastlogsize)
el->lastlogsize = *lastlogsize;
if (NULL != mtime)
el->mtime = *mtime;
if (NULL != timestamp)
el->timestamp = *timestamp;
if (NULL != logeventid)
el->logeventid = (int)*logeventid;
zbx_timespec(&el->ts);
el->flags = flags;
el->id = ++last_valueid;
if (0 != (ZBX_METRIC_FLAG_PERSISTENT & flags))
buffer.pcount++;
/* If conditions are met then send buffer now. It is necessary for synchronization */
/* between sending data to server and writing of persistent files. */
if ((0 != (flags & ZBX_METRIC_FLAG_PERSISTENT) && config_buffer_size / 2 <= buffer.pcount) ||
config_buffer_size <= buffer.count)
{
send_buffer(addrs, &pre_persistent_vec, config_tls, config_timeout, config_source_ip, host,
config_buffer_send, config_buffer_size);
}
ret = SUCCEED;
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
static void process_remote_command_value(const char *value, zbx_uint64_t id, unsigned char state)
{
zbx_command_result_t *result;
result = (zbx_command_result_t *)zbx_malloc(NULL, sizeof(zbx_command_result_t));
if (NULL != value)
result->value = zbx_strdup(NULL, value);
else
result->value = NULL;
result->id = id;
result->state = state;
zbx_vector_command_result_ptr_append(&command_results, result);
}
static int need_meta_update(zbx_active_metric_t *metric, zbx_uint64_t lastlogsize_sent, int mtime_sent,
unsigned char old_state, zbx_uint64_t lastlogsize_last, int mtime_last)
{
int ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() key:%s", __func__, metric->key);
if (0 != (ZBX_METRIC_FLAG_LOG & metric->flags))
{
/* meta information update is needed if: */
/* - lastlogsize or mtime changed since we last sent within this check */
/* - nothing was sent during this check and state changed from notsupported to normal */
/* - nothing was sent during this check and it's a new metric */
if (lastlogsize_sent != metric->lastlogsize || mtime_sent != metric->mtime ||
(lastlogsize_last == lastlogsize_sent && mtime_last == mtime_sent &&
(old_state != metric->state ||
0 != (ZBX_METRIC_FLAG_NEW & metric->flags))))
{
/* needs meta information update */
ret = SUCCEED;
}
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
#if !defined(_WINDOWS) && !defined(__MINGW32__)
static int process_eventlog_check(zbx_vector_addr_ptr_t *addrs, zbx_vector_ptr_t *agent2_result,
zbx_vector_expression_t *regular_expressions, zbx_active_metric_t *metric,
zbx_process_value_func_t process_value_cb, zbx_uint64_t *lastlogsize_sent,
const zbx_config_tls_t *config_tls, int config_timeout, const char *config_source_ip,
const char *config_hostname, int config_buffer_send, int config_buffer_size,
int config_eventlog_max_lines_per_second, char **error)
{
ZBX_UNUSED(addrs);
ZBX_UNUSED(agent2_result);
ZBX_UNUSED(regular_expressions);
ZBX_UNUSED(metric);
ZBX_UNUSED(process_value_cb);
ZBX_UNUSED(lastlogsize_sent);
ZBX_UNUSED(config_tls);
ZBX_UNUSED(config_timeout);
ZBX_UNUSED(config_source_ip);
ZBX_UNUSED(config_hostname);
ZBX_UNUSED(config_buffer_send);
ZBX_UNUSED(config_buffer_size);
ZBX_UNUSED(config_eventlog_max_lines_per_second);
ZBX_UNUSED(error);
return FAIL;
}
#else
int process_eventlog_check(zbx_vector_addr_ptr_t *addrs, zbx_vector_ptr_t *agent2_result,
zbx_vector_expression_t *regexps, zbx_active_metric_t *metric, zbx_process_value_func_t process_value_cb,
zbx_uint64_t *lastlogsize_sent, const zbx_config_tls_t *config_tls, int config_timeout,
const char *config_source_ip, const char *config_hostname, int config_buffer_send,
int config_buffer_size, int config_eventlog_max_lines_per_second, char **error);
#endif
static int process_common_check(zbx_vector_addr_ptr_t *addrs, zbx_active_metric_t *metric,
const zbx_config_tls_t *config_tls, int config_timeout, const char *config_source_ip,
const char *config_hostname, int config_buffer_send, int config_buffer_size, char **error)
{
int ret;
AGENT_RESULT result;
char **pvalue;
zbx_init_agent_result(&result);
if (ZBX_CHECK_TIMEOUT_UNDEFINED == metric->timeout)
{
SET_MSG_RESULT(&result, zbx_strdup(NULL, "Unsupported timeout value."));
*error = zbx_strdup(*error, *ZBX_GET_MSG_RESULT(&result));
ret = NOTSUPPORTED;
}
else if (SUCCEED != (ret = zbx_execute_agent_check(metric->key, 0, &result, metric->timeout)))
{
if (NULL != (pvalue = ZBX_GET_MSG_RESULT(&result)))
*error = zbx_strdup(*error, *pvalue);
goto out;
}
if (NULL != (pvalue = ZBX_GET_TEXT_RESULT(&result)))
{
zabbix_log(LOG_LEVEL_DEBUG, "for key [%s] received value [%s]", metric->key, *pvalue);
process_value(addrs, NULL, metric->itemid, config_hostname, metric->key, *pvalue, ITEM_STATE_NORMAL,
NULL, NULL, NULL, NULL, NULL, NULL, metric->flags, config_tls, config_timeout,
config_source_ip, config_buffer_send, config_buffer_size);
}
out:
zbx_free_agent_result(&result);
return ret;
}
static void process_command(zbx_active_command_t *command)
{
AGENT_RESULT result;
char **pvalue, *empty = "", *error = ZBX_NOTSUPPORTED_MSG;
unsigned char state = ITEM_STATE_NORMAL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() command:%s", __func__, command->key);
zbx_init_agent_result(&result);
if (SUCCEED != zbx_execute_agent_check(command->key, 0, &result, command->timeout))
{
state = ITEM_STATE_NOTSUPPORTED;
if (NULL == (pvalue = ZBX_GET_MSG_RESULT(&result)))
pvalue = &error;
}
else if (NULL == (pvalue = ZBX_GET_TEXT_RESULT(&result)))
pvalue = ∅
process_remote_command_value(*pvalue, command->command_id, state);
zbx_free_agent_result(&result);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() state:%d result:%s", __func__, state, *pvalue);
}
#if !defined(_WINDOWS) && !defined(__MINGW32__)
/********************************************************************************
* *
* Purpose: initializes element of preparation vector with available data *
* *
* Parameters: *
* lastlogsize - [IN] lastlogize value to write into persistent data file *
* mtime - [IN] mtime value to write into persistent data file *
* prep_vec_elem - [IN/OUT] element of vector to initialize *
* *
* Comments: This is a minimal initialization for using before sending status *
* updates or meta-data. It initializes only 2 attributes to be *
* usable without any data about log files. *
* *
********************************************************************************/
static void zbx_minimal_init_prep_vec_data(zbx_uint64_t lastlogsize, int mtime, zbx_pre_persistent_t *prep_vec_elem)
{
if (NULL != prep_vec_elem->filename)
zbx_free(prep_vec_elem->filename); /* filename == NULL should be checked when preparing JSON */
/* for writing as most attributes are not initialized */
prep_vec_elem->processed_size = lastlogsize;
prep_vec_elem->mtime = mtime;
}
static void zbx_fill_prep_vec_element(zbx_vector_pre_persistent_t *prep_vec, zbx_uint64_t itemid,
const char *persistent_file_name, const struct st_logfile *logfile, const zbx_uint64_t lastlogsize,
const int mtime)
{
/* index in preparation vector */
int idx = zbx_find_or_create_prep_vec_element(prep_vec, itemid, persistent_file_name);
if (NULL != logfile)
{
zbx_init_prep_vec_data(logfile, prep_vec->values + idx);
zbx_update_prep_vec_data(logfile, logfile->processed_size, prep_vec->values + idx);
}
else
zbx_minimal_init_prep_vec_data(lastlogsize, mtime, prep_vec->values + idx);
}
#endif /* not WINDOWS, not __MINGW32__ */
static void process_active_commands(zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls,
int config_timeout, const char *config_source_ip, const char *config_hostname, int config_buffer_send,
int config_buffer_size)
{
int i;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() server:'%s' port:%hu", __func__, addrs->values[0]->ip,
addrs->values[0]->port);
for (i = 0; i < active_commands.values_num; i++)
process_command((zbx_active_command_t *)active_commands.values[i]);
zbx_vector_active_command_ptr_clear_ext(&active_commands, free_active_command);
send_buffer(addrs, &pre_persistent_vec, config_tls, config_timeout, config_source_ip, config_hostname,
config_buffer_send, config_buffer_size);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
static void process_active_checks(zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls,
int config_timeout, const char *config_source_ip, const char *config_hostname, int config_buffer_send,
int config_buffer_size, int config_eventlog_max_lines_per_second, int config_max_lines_per_second)
{
char *error = NULL;
int i, now;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() server:'%s' port:%hu", __func__, addrs->values[0]->ip,
addrs->values[0]->port);
now = (int)time(NULL);
for (i = 0; i < active_metrics.values_num; i++)
{
zbx_uint64_t lastlogsize_last, lastlogsize_sent;
int mtime_last, mtime_sent, ret, scheduling = FAIL;
zbx_active_metric_t *metric = active_metrics.values[i];
if (metric->nextcheck > now)
continue;
if (SUCCEED != zbx_get_agent_item_nextcheck(metric->itemid, metric->delay, now, &metric->nextcheck,
&scheduling, &error))
{
process_value(addrs, NULL, metric->itemid, config_hostname, metric->key, error,
ITEM_STATE_NOTSUPPORTED, &metric->lastlogsize, &metric->mtime, NULL, NULL, NULL,
NULL, metric->flags, config_tls, config_timeout, config_source_ip,
config_buffer_send, config_buffer_size);
metric->state = ITEM_STATE_NOTSUPPORTED;
metric->error_count = 0;
zbx_free(error);
continue;
}
/* for meta information update we need to know if something was sent at all during the check */
lastlogsize_last = metric->lastlogsize;
mtime_last = metric->mtime;
lastlogsize_sent = metric->lastlogsize;
mtime_sent = metric->mtime;
if (0 != ((ZBX_METRIC_FLAG_LOG_LOG | ZBX_METRIC_FLAG_LOG_LOGRT) & metric->flags))
{
ret = process_log_check(addrs, NULL, ®exps, metric, process_value, &lastlogsize_sent,
&mtime_sent, &error, &pre_persistent_vec, config_tls, config_timeout,
config_source_ip, config_hostname, config_buffer_send, config_buffer_size,
config_max_lines_per_second);
}
else if (0 != (ZBX_METRIC_FLAG_LOG_EVENTLOG & metric->flags))
{
ret = process_eventlog_check(addrs, NULL, ®exps, metric, process_value, &lastlogsize_sent,
config_tls, config_timeout, config_source_ip, config_hostname,
config_buffer_send, config_buffer_size, config_eventlog_max_lines_per_second,
&error);
}
else
ret = process_common_check(addrs, metric, config_tls, config_timeout, config_source_ip,
config_hostname, config_buffer_send, config_buffer_size, &error);
if (SUCCEED != ret)
{
const char *perror = (NULL != error ? error : ZBX_NOTSUPPORTED_MSG);
metric->state = ITEM_STATE_NOTSUPPORTED;
metric->error_count = 0;
metric->processed_bytes = 0;
zabbix_log(LOG_LEVEL_WARNING, "active check \"%s\" is not supported: %s", metric->key, perror);
#if !defined(_WINDOWS) && !defined(__MINGW32__)
/* only for log*[] items */
if (0 != ((ZBX_METRIC_FLAG_LOG_LOG | ZBX_METRIC_FLAG_LOG_LOGRT) & metric->flags) &&
NULL != metric->persistent_file_name)
{
const struct st_logfile *logfile = NULL;
if (0 < metric->logfiles_num)
{
logfile = find_last_processed_file_in_logfiles_list(metric->logfiles,
metric->logfiles_num);
}
zbx_fill_prep_vec_element(&pre_persistent_vec, metric->itemid,
metric->persistent_file_name, logfile, metric->lastlogsize,
metric->mtime);
}
#endif
process_value(addrs, NULL, metric->itemid, config_hostname, metric->key, perror,
ITEM_STATE_NOTSUPPORTED, &metric->lastlogsize, &metric->mtime, NULL, NULL, NULL,
NULL, metric->flags, config_tls, config_timeout, config_source_ip,
config_buffer_send, config_buffer_size);
zbx_free(error);
}
else
{
if (0 == metric->error_count)
{
unsigned char old_state = metric->state;
if (ITEM_STATE_NOTSUPPORTED == metric->state)
{
/* item became supported */
metric->state = ITEM_STATE_NORMAL;
}
if (SUCCEED == need_meta_update(metric, lastlogsize_sent, mtime_sent, old_state,
lastlogsize_last, mtime_last))
{
#if !defined(_WINDOWS) && !defined(__MINGW32__)
if (NULL != metric->persistent_file_name)
{
const struct st_logfile *logfile = NULL;
if (0 < metric->logfiles_num)
{
logfile = find_last_processed_file_in_logfiles_list(
metric->logfiles, metric->logfiles_num);
}
zbx_fill_prep_vec_element(&pre_persistent_vec, metric->itemid,
metric->persistent_file_name, logfile,
metric->lastlogsize, metric->mtime);
}
#endif
/* meta information update */
process_value(addrs, NULL, metric->itemid, config_hostname, metric->key, NULL,
metric->state, &metric->lastlogsize, &metric->mtime, NULL, NULL,
NULL, NULL, metric->flags, config_tls, config_timeout,
config_source_ip, config_buffer_send, config_buffer_size);
}
/* remove "new metric" flag */
metric->flags &= ~ZBX_METRIC_FLAG_NEW;
}
}
send_buffer(addrs, &pre_persistent_vec, config_tls, config_timeout, config_source_ip, config_hostname,
config_buffer_send, config_buffer_size);
if (metric->nextcheck <= (now = (int)time(NULL)))
{
/* reschedule metric if polling took it past is scheduled next poll */
if (SUCCEED != zbx_get_agent_item_nextcheck(metric->itemid, metric->delay, now,
&metric->nextcheck, &scheduling, &error))
{
/* while not likely that another nextcheck calculation with the same */
/* delay could result in an error - still it can be handled and reported */
process_value(addrs, NULL, metric->itemid, config_hostname, metric->key, error,
ITEM_STATE_NOTSUPPORTED, &metric->lastlogsize, &metric->mtime, NULL,
NULL, NULL, NULL, metric->flags, config_tls, config_timeout,
config_source_ip, config_buffer_send, config_buffer_size);
metric->state = ITEM_STATE_NOTSUPPORTED;
metric->error_count = 0;
zbx_free(error);
}
}
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/******************************************************************************
* *
* Purpose: updates schedules of active checks and buffer by specified time *
* delta *
* *
* Parameters: delta - [IN] time delta in seconds *
* *
* Comments: This function is used to update checking and sending schedules *
* if the system time was rolled back. *
* *
******************************************************************************/
static void update_schedule(int delta)
{
int i;
for (i = 0; i < active_metrics.values_num; i++)
{
zbx_active_metric_t *metric = active_metrics.values[i];
metric->nextcheck += delta;
}
buffer.lastsent += delta;
}
#ifndef _WINDOWS
static void zbx_active_checks_sigusr_handler(int flags)
{
if (ZBX_RTC_USER_PARAMETERS_RELOAD == ZBX_RTC_GET_MSG(flags))
need_update_userparam = 1;
}
#endif
static void send_heartbeat_msg(zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls, int config_timeout,
const char *config_source_ip, const char *config_hostname, int config_heartbeat_frequency)
{
static ZBX_THREAD_LOCAL int last_ret = SUCCEED;
int ret, level;
struct zbx_json json;
char *error = NULL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
zbx_json_addstring(&json, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_ACTIVE_CHECK_HEARTBEAT, ZBX_JSON_TYPE_STRING);
zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, config_hostname, ZBX_JSON_TYPE_STRING);
zbx_json_addint64(&json, ZBX_PROTO_TAG_HEARTBEAT_FREQ, config_heartbeat_frequency);
zbx_json_addstring(&json, ZBX_PROTO_TAG_VERSION, ZABBIX_VERSION, ZBX_JSON_TYPE_STRING);
zbx_json_addint64(&json, ZBX_PROTO_TAG_VARIANT, ZBX_PROGRAM_VARIANT_AGENT);
level = SUCCEED != last_ret ? LOG_LEVEL_DEBUG : LOG_LEVEL_WARNING;
ret = zbx_comms_exchange_with_redirect(config_source_ip, addrs, config_timeout, config_timeout, 0, level,
config_tls, json.buffer, NULL, NULL, NULL, &error);
if (SUCCEED == ret)
{
if (last_ret == FAIL)
{
zabbix_log(LOG_LEVEL_WARNING, "Successfully sent heartbeat message to [%s]:%d",
((zbx_addr_t *)addrs->values[0])->ip, ((zbx_addr_t *)addrs->values[0])->port);
}
}
else
{
zabbix_log(level, "Unable to send heartbeat message to [%s]:%d [%s]",
((zbx_addr_t *)addrs->values[0])->ip, ((zbx_addr_t *)addrs->values[0])->port, error);
zbx_free(error);
}
last_ret = ret;
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
ZBX_THREAD_ENTRY(active_checks_thread, args)
{
zbx_thread_activechk_args activechk_args, *activechks_args_in;
time_t nextcheck = 0, nextrefresh = 0, nextsend = 0, now, delta, lastcheck = 0,
heartbeat_nextcheck = 0, lash_cmd_hash_check = 0;
zbx_uint32_t config_revision_local = 0;
zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info;
unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type;
int server_num = ((zbx_thread_args_t *)args)->info.server_num,
process_num = ((zbx_thread_args_t *)args)->info.process_num;
activechks_args_in = (zbx_thread_activechk_args *)((((zbx_thread_args_t *)args))->args);
zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
server_num, get_process_type_string(process_type), process_num);
zbx_vector_addr_ptr_create(&activechk_args.addrs);
zbx_addr_copy(&activechk_args.addrs, &(activechks_args_in->addrs));
const char *config_hostname = zbx_strdup(NULL, activechks_args_in->config_hostname);
zbx_free(args);
session_token = zbx_create_token(0);
#if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
zbx_tls_init_child(activechks_args_in->zbx_config_tls, activechks_args_in->zbx_get_program_type_cb_arg, NULL);
#endif
init_active_metrics(activechks_args_in->config_buffer_size);
#ifndef _WINDOWS
zbx_set_sigusr_handler(zbx_active_checks_sigusr_handler);
#endif
if (0 != activechks_args_in->config_heartbeat_frequency)
heartbeat_nextcheck = time(NULL);
while (ZBX_IS_RUNNING())
{
#ifndef _WINDOWS
if (1 == need_update_userparam)
{
zbx_setproctitle("active checks #%d [reloading user parameters]", process_num);
reload_user_parameters(process_type, process_num, activechks_args_in->config_file);
need_update_userparam = 0;
}
#endif
zbx_update_env(get_process_type_string(process_type), zbx_time());
if ((now = time(NULL)) >= nextsend)
{
send_buffer(&activechk_args.addrs, &pre_persistent_vec, activechks_args_in->zbx_config_tls,
activechks_args_in->config_timeout, activechks_args_in->config_source_ip,
config_hostname, activechks_args_in->config_buffer_send,
activechks_args_in->config_buffer_size);
nextsend = time(NULL) + 1;
}
if (heartbeat_nextcheck != 0 && now >= heartbeat_nextcheck)
{
heartbeat_nextcheck = now + activechks_args_in->config_heartbeat_frequency;
send_heartbeat_msg(&activechk_args.addrs, activechks_args_in->zbx_config_tls,
activechks_args_in->config_timeout, activechks_args_in->config_source_ip,
config_hostname, activechks_args_in->config_heartbeat_frequency);
}
if (now >= nextrefresh)
{
zbx_setproctitle("active checks #%d [getting list of active checks]", process_num);
if (FAIL == refresh_active_checks(&activechk_args.addrs, activechks_args_in->zbx_config_tls,
&config_revision_local, activechks_args_in->config_timeout,
activechks_args_in->config_source_ip, activechks_args_in->config_listen_ip,
activechks_args_in->config_listen_port, config_hostname,
activechks_args_in->config_host_metadata,
activechks_args_in->config_host_metadata_item,
activechks_args_in->config_host_interface,
activechks_args_in->config_host_interface_item, activechks_args_in->config_buffer_send, activechks_args_in->config_buffer_size))
{
nextrefresh = time(NULL) + 60;
}
else
{
nextrefresh = time(NULL) + activechks_args_in->config_refresh_active_checks;
nextcheck = 0;
}
#if !defined(_WINDOWS) && !defined(__MINGW32__)
zbx_remove_inactive_persistent_files(&persistent_inactive_vec);
#endif
}
if (0 != active_commands.values_num)
{
process_active_commands(&activechk_args.addrs, activechks_args_in->zbx_config_tls,
activechks_args_in->config_timeout, activechks_args_in->config_source_ip,
config_hostname, activechks_args_in->config_buffer_send,
activechks_args_in->config_buffer_size);
}
if (now >= nextcheck && activechks_args_in->config_buffer_size / 2 > buffer.pcount)
{
zbx_setproctitle("active checks #%d [processing active checks]", process_num);
process_active_checks(&activechk_args.addrs, activechks_args_in->zbx_config_tls,
activechks_args_in->config_timeout, activechks_args_in->config_source_ip,
config_hostname, activechks_args_in->config_buffer_send,
activechks_args_in->config_buffer_size,
activechks_args_in->config_eventlog_max_lines_per_second,
activechks_args_in->config_max_lines_per_second);
if (activechks_args_in->config_buffer_size / 2 <= buffer.pcount)
{
/* failed to complete processing active checks */
continue;
}
nextcheck = get_min_nextcheck();
if (FAIL == nextcheck)
nextcheck = time(NULL) + 60;
}
else
{
if (0 > (delta = now - lastcheck))
{
zabbix_log(LOG_LEVEL_WARNING, "the system time has been pushed back,"
" adjusting active check schedule");
update_schedule((int)delta);
nextcheck += delta;
nextsend += delta;
nextrefresh += delta;
if (0 != heartbeat_nextcheck)
heartbeat_nextcheck += delta;
}
zbx_setproctitle("active checks #%d [idle 1 sec]", process_num);
zbx_sleep(1);
}
lastcheck = now;
if (now > (lash_cmd_hash_check + SEC_PER_HOUR))
{
clean_command_hash();
lash_cmd_hash_check = now;
}
}
zbx_free(session_token);
#ifdef _WINDOWS
zbx_vector_addr_ptr_clear_ext(&activechk_args.addrs, (zbx_clean_func_t)zbx_addr_free);
zbx_vector_addr_ptr_destroy(&activechk_args.addrs);
free_active_metrics();
ZBX_DO_EXIT();
zbx_thread_exit(EXIT_SUCCESS);
#else
zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
while (1)
zbx_sleep(SEC_PER_MIN);
#endif
}
static void send_back_unsupported_item(zbx_uint64_t itemid, const char *key, char *error, const char *config_hostname,
zbx_vector_addr_ptr_t *addrs, const zbx_config_tls_t *config_tls, int config_timeout,
const char *config_source_ip, int config_buffer_send, int config_buffer_size)
{
zabbix_log(LOG_LEVEL_WARNING, "active check \"%s\" is not supported: %s", key, error);
process_value(addrs, NULL, itemid, config_hostname, key, error, ITEM_STATE_NOTSUPPORTED,
NULL, NULL, NULL, NULL, NULL, NULL, 0, config_tls, config_timeout, config_source_ip,
config_buffer_send, config_buffer_size);
}