/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ #include "zbxalerter.h" #include "alerter_defs.h" #include "alerter_protocol.h" #include "zbxlog.h" #include "zbxalgo.h" #include "zbxdb.h" #include "zbxdbhigh.h" #include "zbxembed.h" #include "zbxipcservice.h" #include "zbxmedia.h" #include "zbxnix.h" #include "zbxself.h" #include "zbxserialize.h" #include "zbxstr.h" #include "zbxthreads.h" #include "zbxtime.h" #include "zbxtypes.h" #include "zbxxml.h" #include "zbxjson.h" #define ZBX_AM_LOCATION_NOWHERE 0 #define ZBX_AM_LOCATION_QUEUE 1 #define ZBX_UPDATE_STR(dst, src) \ do \ { \ if (NULL == src) \ zbx_free(dst); \ else if (NULL == dst || 0 != strcmp(dst, src)) \ dst = zbx_strdup(dst, src); \ } \ while(0) #define ALERT_SOURCE_EXTERNAL 0xffff #define ZBX_ALERTPOOL_SOURCE(id) (id >> 48) #define ZBX_ALERTPOOL_OBJECT(id) ((id >> 32) & 0xffff) #define ZBX_AM_MEDIATYPE_FLAG_NONE 0x00 #define ZBX_AM_MEDIATYPE_FLAG_REMOVE 0x01 #define ZBX_MEDIA_MESSAGE_FORMAT_DEFAULT 255 /* * The alert queue is implemented as a nested queue. * * At the bottom layer is media type queue, enforcing the media type maxsessions setting. * At the next layer is alert pool queue. Alert pool is an artificial construct to group * alerts generated by the same source (event source, object and objectid) so that they * are executed sequentially. * At the top layer is alert queue. * * mediatypes * alertpools * alerts * * Media type queue is sorted by the timestamp of the minimum item of its alertpool queue. * Alert pool queue is sorted by the timestamp of the minimum item of its alerts queue. * Alerts queue is sorted by the alert scheduled send timestamp. * * When taking the next alert to send the following actions are done: * 1) take the next media type object from media type queue * 2) take the next alert pool object from media type alertpool queue * 3) take the next alert from alert pool alerts queue * 4) if media type maxsessions limit has not reached, put the media type object back in queue * * When processing alert response the following actions are done: * 1) find alerts media type and alert pool objects * 2) cache alert status update to be flushed into database later * 3) if alert failed and can be retried put it back into its alert pool queue, * otherwise free the alert object * 4) release alert pool object, put it back into media type alertpools queue if the alert pool * was not removed * 5) release media type object, put it back into media types queue if the media type object * was not removed */ typedef char * zbx_shared_str_t; /* alert data */ typedef struct { zbx_uint64_t alertid; zbx_uint64_t mediatypeid; zbx_uint64_t alertpoolid; zbx_uint64_t eventid; /* problem event id for recovery events */ zbx_uint64_t p_eventid; int nextsend; /* alert data */ char *sendto; char *subject; zbx_shared_str_t message; char *params; unsigned char message_format; int status; int retries; char *expression; char *recovery_expression; zbx_uint64_t objectid; int object; int source; } zbx_am_alert_t; /* Alert pool data. */ /* Alerts are assigned to pools based on event source, object and objectid. */ /* While alert pools can be processed in parallel, alerts inside alert pool */ /* are processed sequentially. */ typedef struct { zbx_uint64_t id; zbx_uint64_t mediatypeid; /* alert queue */ zbx_binary_heap_t queue; int location; /* number of currently processing alerts */ int alerts_num; /* number of alert objects for this alert pool */ int refcount; } zbx_am_alertpool_t; /* report data */ typedef struct { char *subject; char *message; char *content; char *content_name; char *message_format; zbx_uint32_t content_size; } zbx_am_dispatch_t; /* alerter data */ typedef struct { /* connected alerter client */ zbx_ipc_client_t *client; zbx_am_alert_t *alert; } zbx_am_alerter_t; ZBX_PTR_VECTOR_DECL(am_alerter_ptr, zbx_am_alerter_t *) ZBX_PTR_VECTOR_IMPL(am_alerter_ptr, zbx_am_alerter_t *) /* alert manager data */ typedef struct { /* number of queued alerts */ zbx_uint64_t alerts_num; /* alerter vector, created during manager initialization */ zbx_vector_am_alerter_ptr_t alerters; zbx_queue_ptr_t free_alerters; /* alerters indexed by IPC service clients */ zbx_hashset_t alerters_client; /* next alerter index to be assigned to new IPC service clients */ int next_alerter_index; zbx_hashset_t mediatypes; zbx_hashset_t alertpools; /* alert status update cache */ zbx_hashset_t results; /* watchdog alert recipients */ zbx_hashset_t watchdog; /* mediatype queue */ zbx_binary_heap_t queue; int dbstatus; zbx_es_t es; zbx_ipc_service_t ipc; zbx_ipc_client_t *syncer_client; } zbx_am_t; /* alerters client index hashset support */ static zbx_hash_t alerter_hash_func(const void *d) { const zbx_am_alerter_t *alerter = *(const zbx_am_alerter_t * const *)d; zbx_hash_t hash = ZBX_DEFAULT_PTR_HASH_FUNC(&alerter->client); return hash; } static int alerter_compare_func(const void *d1, const void *d2) { const zbx_am_alerter_t *p1 = *(const zbx_am_alerter_t * const *)d1; const zbx_am_alerter_t *p2 = *(const zbx_am_alerter_t * const *)d2; ZBX_RETURN_IF_NOT_EQUAL(p1->client, p2->client); return 0; } /* alert pool hashset support */ static zbx_hash_t am_alertpool_hash_func(const void *data) { const zbx_am_alertpool_t *pool = (const zbx_am_alertpool_t *)data; zbx_hash_t hash; hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&pool->id); hash = ZBX_DEFAULT_UINT64_HASH_ALGO(&pool->mediatypeid, sizeof(pool->mediatypeid), hash); return hash; } static int am_alertpool_compare_func(const void *d1, const void *d2) { const zbx_am_alertpool_t *pool1 = (const zbx_am_alertpool_t *)d1; const zbx_am_alertpool_t *pool2 = (const zbx_am_alertpool_t *)d2; ZBX_RETURN_IF_NOT_EQUAL(pool1->id, pool2->id); ZBX_RETURN_IF_NOT_EQUAL(pool1->mediatypeid, pool2->mediatypeid); return 0; } /* queue support */ static int am_alert_compare(const zbx_am_alert_t *alert1, const zbx_am_alert_t *alert2) { ZBX_RETURN_IF_NOT_EQUAL(alert1->nextsend, alert2->nextsend); ZBX_RETURN_IF_NOT_EQUAL(alert1->alertid, alert2->alertid); return 0; } static int am_alert_queue_compare(const void *d1, const void *d2) { const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1; const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2; return am_alert_compare((const zbx_am_alert_t *)e1->data, (const zbx_am_alert_t *)e2->data); } static int am_alertpool_compare(const zbx_am_alertpool_t *pool1, const zbx_am_alertpool_t *pool2) { const zbx_binary_heap_elem_t *e1 = zbx_binary_heap_find_min(&pool1->queue); const zbx_binary_heap_elem_t *e2 = zbx_binary_heap_find_min(&pool2->queue); return am_alert_compare((const zbx_am_alert_t *)e1->data, (const zbx_am_alert_t *)e2->data); } static int am_alertpool_queue_compare(const void *d1, const void *d2) { const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1; const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2; return am_alertpool_compare((const zbx_am_alertpool_t *)e1->data, (const zbx_am_alertpool_t *)e2->data); } static int am_mediatype_compare(const zbx_am_mediatype_t *media1, const zbx_am_mediatype_t *media2) { const zbx_binary_heap_elem_t *e1 = zbx_binary_heap_find_min(&media1->queue); const zbx_binary_heap_elem_t *e2 = zbx_binary_heap_find_min(&media2->queue); return am_alertpool_compare((const zbx_am_alertpool_t *)e1->data, (const zbx_am_alertpool_t *)e2->data); } static int am_mediatype_queue_compare(const void *d1, const void *d2) { const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1; const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2; return am_mediatype_compare((const zbx_am_mediatype_t *)e1->data, (const zbx_am_mediatype_t *)e2->data); } /****************************************************************************** * * * reference counted strings * * * ******************************************************************************/ static zbx_shared_str_t shared_str_new(const char *src) { size_t len; char *ptr; if (NULL == src) return NULL; len = strlen(src); ptr = zbx_malloc(NULL, len + sizeof(zbx_uint32_t) + 1); *((zbx_uint32_t *)ptr) = 0; memcpy(ptr + sizeof(zbx_uint32_t), src, len + 1); return ptr + 4; } static zbx_shared_str_t shared_str_addref(zbx_shared_str_t str) { if (NULL != str) { zbx_uint32_t *refcount = (zbx_uint32_t *)(str - sizeof(zbx_uint32_t)); (*refcount)++; } return str; } static void shared_str_release(zbx_shared_str_t str) { if (NULL != str) { zbx_uint32_t *refcount = (zbx_uint32_t *)(str - sizeof(zbx_uint32_t)); if (0 == --(*refcount)) zbx_free(refcount); } } static void am_dispatch_free(zbx_am_dispatch_t *dispatch) { zbx_free(dispatch->subject); zbx_free(dispatch->message); zbx_free(dispatch->content); zbx_free(dispatch->content_name); zbx_free(dispatch->message_format); zbx_free(dispatch); } /****************************************************************************** * * * Purpose: gets media type object * * * * Parameters: manager - [IN] * * mediatypeid - [IN] * * * * Return value: media type object or NULL if not found * * * ******************************************************************************/ static zbx_am_mediatype_t *am_get_mediatype(zbx_am_t *manager, zbx_uint64_t mediatypeid) { return (zbx_am_mediatype_t *)zbx_hashset_search(&manager->mediatypes, &mediatypeid); } /****************************************************************************** * * * Purpose: updates additional webhook media type fields * * * ******************************************************************************/ static void zbx_am_update_webhook(zbx_am_t *manager, zbx_am_mediatype_t *mediatype, const char *script, const char *timeout, const char *config_source_ip) { if (FAIL == zbx_is_time_suffix(timeout, &mediatype->timeout, ZBX_LENGTH_UNLIMITED)) { mediatype->error = zbx_strdup(mediatype->error, "Invalid timeout value in media type configuration."); return; } if (NULL == mediatype->script || 0 != strcmp(mediatype->script, script)) { zbx_free(mediatype->script_bin); zbx_free(mediatype->script); if (SUCCEED != zbx_es_is_env_initialized(&manager->es)) { if (SUCCEED != zbx_es_init_env(&manager->es, config_source_ip, &mediatype->error)) return; } if (SUCCEED != zbx_es_compile(&manager->es, script, &mediatype->script_bin, &mediatype->script_bin_sz, &mediatype->error)) { return; } mediatype->script = zbx_strdup(mediatype->script, script); } } /****************************************************************************** * * * Purpose: updates media type object, creating one if necessary * * * * Parameters: manager - [IN] * * ... - [IN] media type properties * * config_source_ip - [IN] * * * ******************************************************************************/ static void am_update_mediatype(zbx_am_t *manager, zbx_uint64_t mediatypeid, unsigned char type, const char *smtp_server, const char *smtp_helo, const char *smtp_email, const char *exec_path, const char *gsm_modem, const char *username, const char *passwd, unsigned short smtp_port, unsigned char smtp_security, unsigned char smtp_verify_peer, unsigned char smtp_verify_host, unsigned char smtp_authentication, int maxsessions, int maxattempts, const char *attempt_interval, unsigned char message_format, const char *script, const char *timeout, unsigned char flags, const char *config_source_ip) { zbx_am_mediatype_t *mediatype; if (NULL == (mediatype = am_get_mediatype(manager, mediatypeid))) { zbx_am_mediatype_t mediatype_local = { .mediatypeid = mediatypeid, .location = ZBX_AM_LOCATION_NOWHERE, .flags = flags }; mediatype = (zbx_am_mediatype_t *)zbx_hashset_insert(&manager->mediatypes, &mediatype_local, sizeof(mediatype_local)); zbx_binary_heap_create(&mediatype->queue, am_alertpool_queue_compare, ZBX_BINARY_HEAP_OPTION_DIRECT); } else { /* reset remove flag if normal media type is being added */ if (ZBX_AM_MEDIATYPE_FLAG_NONE == flags) mediatype->flags = ZBX_AM_MEDIATYPE_FLAG_NONE; } mediatype->type = type; zbx_free(mediatype->error); ZBX_UPDATE_STR(mediatype->smtp_server, smtp_server); ZBX_UPDATE_STR(mediatype->smtp_helo, smtp_helo); ZBX_UPDATE_STR(mediatype->smtp_email, smtp_email); ZBX_UPDATE_STR(mediatype->exec_path, exec_path); ZBX_UPDATE_STR(mediatype->gsm_modem, gsm_modem); ZBX_UPDATE_STR(mediatype->username, username); ZBX_UPDATE_STR(mediatype->passwd, passwd); mediatype->smtp_port = smtp_port; mediatype->smtp_security = smtp_security; mediatype->smtp_verify_peer = smtp_verify_peer; mediatype->smtp_verify_host = smtp_verify_host; mediatype->smtp_authentication = smtp_authentication; mediatype->maxsessions = maxsessions; mediatype->maxattempts = maxattempts; mediatype->message_format = message_format; if (FAIL == zbx_is_time_suffix(attempt_interval, &mediatype->attempt_interval, ZBX_LENGTH_UNLIMITED)) { mediatype->error = zbx_strdup(mediatype->error, "Invalid media type attempt interval."); return; } if (MEDIA_TYPE_WEBHOOK == mediatype->type) zbx_am_update_webhook(manager, mediatype, script, timeout, config_source_ip); } /****************************************************************************** * * * Purpose: pushes media type into manager media type queue * * * * Parameters: manager - [IN] * * mediatype - [IN] * * * * Comments: The media type is inserted into queue only if it was not already * * queued and if the number of media type alerts being processed * * not reached the limit. * * If media type is already queued only its location in the queue * * is updated. * * * ******************************************************************************/ static void am_push_mediatype(zbx_am_t *manager, zbx_am_mediatype_t *mediatype) { zbx_binary_heap_elem_t elem = {mediatype->mediatypeid, mediatype}; if (SUCCEED == zbx_binary_heap_empty(&mediatype->queue)) return; if (ZBX_AM_LOCATION_NOWHERE == mediatype->location) { if (0 == mediatype->maxsessions || mediatype->alerts_num < mediatype->maxsessions) { zbx_binary_heap_insert(&manager->queue, &elem); mediatype->location = ZBX_AM_LOCATION_QUEUE; } } else zbx_binary_heap_update_direct(&manager->queue, &elem); } /****************************************************************************** * * * Purpose: gets the next media type from queue * * * * Parameters: manager - [IN] * * * * Return value: media type object. * * * ******************************************************************************/ static zbx_am_mediatype_t *am_pop_mediatype(zbx_am_t *manager) { zbx_binary_heap_elem_t *elem; zbx_am_mediatype_t *mediatype; if (FAIL != zbx_binary_heap_empty(&manager->queue)) return NULL; elem = zbx_binary_heap_find_min(&manager->queue); mediatype = (zbx_am_mediatype_t *)elem->data; mediatype->location = ZBX_AM_LOCATION_NOWHERE; zbx_binary_heap_remove_min(&manager->queue); return mediatype; } static void am_remove_mediatype(zbx_am_t *manager, zbx_am_mediatype_t *mediatype) { zabbix_log(LOG_LEVEL_DEBUG, "%s() mediatypeid:" ZBX_FS_UI64, __func__, mediatype->mediatypeid); zbx_free(mediatype->smtp_server); zbx_free(mediatype->smtp_helo); zbx_free(mediatype->smtp_email); zbx_free(mediatype->exec_path); zbx_free(mediatype->gsm_modem); zbx_free(mediatype->username); zbx_free(mediatype->passwd); zbx_free(mediatype->script); zbx_free(mediatype->script_bin); zbx_free(mediatype->error); zbx_binary_heap_destroy(&mediatype->queue); zbx_hashset_remove_direct(&manager->mediatypes, mediatype); } static int am_release_mediatype(zbx_am_t *manager, zbx_am_mediatype_t *mediatype) { if (0 != --mediatype->refcount) return FAIL; if (0 != (mediatype->flags & ZBX_AM_MEDIATYPE_FLAG_REMOVE)) am_remove_mediatype(manager, mediatype); return SUCCEED; } /****************************************************************************** * * * Purpose: calculates alert pool id from event source, object and objectid * * * * Parameters: source - [IN] event source * * object - [IN] event object type * * objectid - [IN] event objectid * * * * Return value: alert pool id. * * * ******************************************************************************/ static zbx_uint64_t am_calc_alertpoolid(int source, int object, zbx_uint64_t objectid) { zbx_uint64_t alertpoolid; if (source < 0 || source > 0xffff) THIS_SHOULD_NEVER_HAPPEN; if (object < 0 || object > 0xffff) THIS_SHOULD_NEVER_HAPPEN; alertpoolid = source & 0xffff; alertpoolid <<= 16; alertpoolid |= object & 0xffff; alertpoolid <<= 32; alertpoolid |= ZBX_DEFAULT_UINT64_HASH_FUNC(&objectid); return alertpoolid; } /****************************************************************************** * * * Purpose: gets alert pool object, creating one if the object with specified * * identifiers was not found * * * * Parameters: manager - [IN] * * mediatypeid - [IN] * * alertpoolid - [IN] * * * * Return value: The alert pool object. * * * ******************************************************************************/ static zbx_am_alertpool_t *am_get_alertpool(zbx_am_t *manager, zbx_uint64_t mediatypeid, zbx_uint64_t alertpoolid) { zbx_am_alertpool_t *alertpool, alertpool_local; alertpool_local.id = alertpoolid; alertpool_local.mediatypeid = mediatypeid; if (NULL == (alertpool = (zbx_am_alertpool_t *)zbx_hashset_search(&manager->alertpools, &alertpool_local))) { zbx_binary_heap_create(&(alertpool_local.queue), am_alert_queue_compare, ZBX_BINARY_HEAP_OPTION_EMPTY); alertpool_local.location = ZBX_AM_LOCATION_NOWHERE; alertpool_local.refcount = 0; alertpool_local.alerts_num = 0; alertpool = (zbx_am_alertpool_t *)zbx_hashset_insert(&manager->alertpools, &alertpool_local, sizeof(alertpool_local)); } return alertpool; } /****************************************************************************** * * * Purpose: pushes alert pool into media type alert pool queue * * * * Parameters: mediatype - [IN] * * alertpool - [IN] * * * * Comments: The alert pool is inserted into queue only if it was not already * * queued. Otherwise its position in the queue is updated. * * * ******************************************************************************/ static void am_push_alertpool(zbx_am_mediatype_t *mediatype, zbx_am_alertpool_t *alertpool) { zbx_binary_heap_elem_t elem = {alertpool->id, alertpool}; if (ZBX_AM_LOCATION_NOWHERE == alertpool->location) { if (0 == alertpool->alerts_num) { zbx_binary_heap_insert(&mediatype->queue, &elem); alertpool->location = ZBX_AM_LOCATION_QUEUE; } } else zbx_binary_heap_update_direct(&mediatype->queue, &elem); } /****************************************************************************** * * * Purpose: gets the next alert pool from queue * * * * Parameters: mediatype - [IN] * * * * Return value: The alert pool object. * * * ******************************************************************************/ static zbx_am_alertpool_t *am_pop_alertpool(zbx_am_mediatype_t *mediatype) { zbx_binary_heap_elem_t *elem; zbx_am_alertpool_t *alertpool; if (FAIL != zbx_binary_heap_empty(&mediatype->queue)) return NULL; elem = zbx_binary_heap_find_min(&mediatype->queue); alertpool = (zbx_am_alertpool_t *)elem->data; alertpool->location = ZBX_AM_LOCATION_NOWHERE; zbx_binary_heap_remove_min(&mediatype->queue); return alertpool; } /****************************************************************************** * * * Purpose: removes alert pool * * * * Parameters: manager - [IN] * * alertpool - [IN] * * * * Return value: SUCCEED - the object was removed * * FAIL - otherwise * * * ******************************************************************************/ static int am_release_alertpool(zbx_am_t *manager, zbx_am_alertpool_t *alertpool) { if (0 != -- alertpool->refcount) return FAIL; zbx_binary_heap_destroy(&alertpool->queue); zbx_hashset_remove_direct(&manager->alertpools, alertpool); return SUCCEED; } /****************************************************************************** * * * Purpose: creates new alert object * * * * Parameters: ... - [IN] alert data * * * * Return value: alert object. * * * ******************************************************************************/ static zbx_am_alert_t *am_create_alert(zbx_uint64_t alertid, zbx_uint64_t mediatypeid, int source, int object, zbx_uint64_t objectid, const char *sendto, const char *subject, zbx_shared_str_t message, const char *params, unsigned char message_format, int status, int retries, int nextsend) { zbx_am_alert_t *alert; alert = (zbx_am_alert_t *)zbx_malloc(NULL, sizeof(zbx_am_alert_t)); alert->alertid = alertid; alert->mediatypeid = mediatypeid; alert->alertpoolid = am_calc_alertpoolid(source, object, objectid); alert->objectid = objectid; alert->message_format = message_format; alert->eventid = 0; alert->p_eventid = 0; if (NULL != sendto) alert->sendto = zbx_strdup(NULL, sendto); else alert->sendto = NULL; if (NULL != subject) alert->subject = zbx_strdup(NULL, subject); else alert->subject = NULL; alert->message = shared_str_addref(message); if (NULL != params) alert->params = zbx_strdup(NULL, params); else alert->params = NULL; alert->status = status; alert->retries = retries; alert->nextsend = nextsend; alert->expression = NULL; alert->recovery_expression = NULL; alert->object = object; alert->source = source; return alert; } /****************************************************************************** * * * Purpose: creates new alert object from db alert * * * * Parameters: db_alert - [IN] * * * * Return value: alert object. * * * * Comments: The db_alert is destroyed during copying process and should not * * be accessed/freed afterwards. * * * ******************************************************************************/ static zbx_am_alert_t *am_copy_db_alert(zbx_am_db_alert_t *db_alert) { zbx_am_alert_t *alert; alert = (zbx_am_alert_t *)zbx_malloc(NULL, sizeof(zbx_am_alert_t)); alert->alertid = db_alert->alertid; alert->mediatypeid = db_alert->mediatypeid; alert->alertpoolid = am_calc_alertpoolid(db_alert->source, db_alert->object, db_alert->objectid); alert->objectid = db_alert->objectid; alert->eventid = db_alert->eventid; alert->p_eventid = db_alert->p_eventid; alert->objectid = db_alert->objectid; alert->object = db_alert->object; alert->source = db_alert->source; alert->message_format = ZBX_MEDIA_MESSAGE_FORMAT_DEFAULT; alert->sendto = db_alert->sendto; alert->subject = db_alert->subject; alert->message = shared_str_addref(shared_str_new(db_alert->message)); zbx_free(db_alert->message); alert->params = db_alert->params; alert->expression = db_alert->expression; alert->recovery_expression = db_alert->recovery_expression; alert->status = db_alert->status; alert->retries = db_alert->retries; alert->nextsend = 0; zbx_free(db_alert); return alert; } /****************************************************************************** * * * Purpose: frees the alert object * * * * Parameters: alert - [IN] * * * ******************************************************************************/ static void am_alert_free(zbx_am_alert_t *alert) { zbx_free(alert->sendto); zbx_free(alert->subject); shared_str_release(alert->message); zbx_free(alert->params); zbx_free(alert->expression); zbx_free(alert->recovery_expression); zbx_free(alert); } /****************************************************************************** * * * Purpose: pushes alert into alert pool alert queue * * * * Parameters: alertpool - [IN] * * alert - [IN] * * * ******************************************************************************/ static void am_push_alert(zbx_am_alertpool_t *alertpool, zbx_am_alert_t *alert) { zbx_binary_heap_elem_t elem = {0, alert}; zbx_binary_heap_insert(&alertpool->queue, &elem); } /****************************************************************************** * * * Purpose: gets the next alert from queue * * * * Parameters: manager - [IN] * * * * Return value: The alert object. * * * ******************************************************************************/ static zbx_am_alert_t *am_pop_alert(zbx_am_t *manager) { zbx_am_mediatype_t *mediatype; zbx_am_alertpool_t *alertpool; zbx_am_alert_t *alert; zbx_binary_heap_elem_t *elem; if (NULL == (mediatype = am_pop_mediatype(manager))) return NULL; if (NULL == (alertpool = am_pop_alertpool(mediatype))) return NULL; elem = zbx_binary_heap_find_min(&alertpool->queue); alert = (zbx_am_alert_t *)elem->data; zbx_binary_heap_remove_min(&alertpool->queue); /* requeue media type if the number of parallel alerts has not yet reached */ mediatype->alerts_num++; alertpool->alerts_num++; if (0 == mediatype->maxsessions || mediatype->alerts_num < mediatype->maxsessions) am_push_mediatype(manager, mediatype); return alert; } /****************************************************************************** * * * Purpose: removes alert and requeues associated alert pool and media type * * * * Parameters: manager - [IN] * * alert - [IN] * * * ******************************************************************************/ static void am_remove_alert(zbx_am_t *manager, zbx_am_alert_t *alert) { zbx_am_mediatype_t *mediatype; if (NULL != (mediatype = am_get_mediatype(manager, alert->mediatypeid))) { zbx_am_alertpool_t *alertpool; mediatype->alerts_num--; if (NULL != (alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid))) { alertpool->alerts_num--; if (SUCCEED != am_release_alertpool(manager, alertpool)) am_push_alertpool(mediatype, alertpool); } if (SUCCEED != am_release_mediatype(manager, mediatype)) am_push_mediatype(manager, mediatype); } am_alert_free(alert); manager->alerts_num--; } /****************************************************************************** * * * Purpose: retries alert if there are attempts left or removes it * * * * Parameters: manager - [IN] * * alert - [IN] * * * * Return value: SUCCEED - the alert was queued to be sent again * * FAIL - the alert retries value exceeded the mediatype * * maxattempts limit and alert was removed as failed. * * * ******************************************************************************/ static int am_retry_alert(zbx_am_t *manager, zbx_am_alert_t *alert) { zbx_am_alertpool_t *alertpool; zbx_am_mediatype_t *mediatype; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s() alertid:" ZBX_FS_UI64, __func__, alert->alertid); if (NULL == (mediatype = am_get_mediatype(manager, alert->mediatypeid))) goto out; if (++alert->retries >= mediatype->maxattempts) goto out; alert->nextsend = time(NULL) + mediatype->attempt_interval; alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid); mediatype->alerts_num--; alertpool->alerts_num--; am_push_alert(alertpool, alert); am_push_alertpool(mediatype, alertpool); am_push_mediatype(manager, mediatype); ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: registers alerter * * * * Parameters: manager - [IN] * * client - [IN] connected alerter * * message - [IN] received message * * * ******************************************************************************/ static void am_register_alerter(zbx_am_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message) { zbx_am_alerter_t *alerter = NULL; /* if 'alerter' type changes do not forget to change sizeof() */ /* (see comment below) */ pid_t ppid; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); memcpy(&ppid, message->data, sizeof(ppid)); if (ppid != getppid()) { zbx_ipc_client_close(client); zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process"); } else { if (manager->next_alerter_index == manager->alerters.values_num) { THIS_SHOULD_NEVER_HAPPEN; exit(EXIT_FAILURE); } alerter = (zbx_am_alerter_t *)manager->alerters.values[manager->next_alerter_index++]; alerter->client = client; /* sizeof(zbx_am_alerter_t *) in the following line returns size of 'alerter' pointer. */ /* sizeof(alerter) is not used to avoid analyzer warning */ zbx_hashset_insert(&manager->alerters_client, &alerter, sizeof(zbx_am_alerter_t *)); zbx_queue_ptr_push(&manager->free_alerters, alerter); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static void am_register_alert_syncer(zbx_am_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message) { pid_t ppid; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); memcpy(&ppid, message->data, sizeof(ppid)); if (ppid != getppid()) { zbx_ipc_client_close(client); zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process"); } else manager->syncer_client = client; zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: returns alerter by connected client * * * * Parameters: manager - [IN] * * client - [IN] connected alerter * * * ******************************************************************************/ static zbx_am_alerter_t *am_get_alerter_by_client(zbx_am_t *manager, zbx_ipc_client_t *client) { zbx_am_alerter_t **alerter, alerter_local, *plocal = &alerter_local; plocal->client = client; alerter = (zbx_am_alerter_t **)zbx_hashset_search(&manager->alerters_client, &plocal); if (NULL == alerter) { THIS_SHOULD_NEVER_HAPPEN; exit(EXIT_FAILURE); } return *alerter; } #if defined(HAVE_MYSQL) # define ZBX_DATABASE_TYPE "MySQL" #elif defined(HAVE_ORACLE) # define ZBX_DATABASE_TYPE "Oracle" #elif defined(HAVE_POSTGRESQL) # define ZBX_DATABASE_TYPE "PostgreSQL" #else # define ZBX_DATABASE_TYPE "Unknown" #endif /****************************************************************************** * * * Purpose: gets and formats error message from database when it is * * unavailable * * * * Return value: full database error message is allocated. * * * ******************************************************************************/ static char *am_create_db_alert_message(const zbx_config_dbhigh_t *config_dbhigh) { const char *error; char *alert_message = NULL; size_t alert_message_alloc = 0, alert_message_offset = 0; zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, "%s database \"%s\"", ZBX_DATABASE_TYPE, config_dbhigh->config_dbname); if ('\0' != *config_dbhigh->config_dbhost) { zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, " on \"%s", config_dbhigh->config_dbhost); if (0 != config_dbhigh->config_dbport) { zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, ":%d\"", config_dbhigh->config_dbport); } else zbx_chrcpy_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, '\"'); } zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, " is not available"); if (NULL != (error = zbx_db_last_strerr()) && '\0' != *error) zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, ": %s", error); return alert_message; } #undef ZBX_DATABASE_TYPE /****************************************************************************** * * * Purpose: queues 'database down' watchdog alerts * * * ******************************************************************************/ static void am_queue_watchdog_alerts(zbx_am_t *manager, const zbx_config_dbhigh_t *config_dbhigh) { zbx_am_media_t *media; zbx_hashset_iter_t iter; zabbix_log(LOG_LEVEL_DEBUG, "%s() recipients:%d", __func__, manager->watchdog.num_data); zbx_hashset_iter_reset(&manager->watchdog, &iter); while (NULL != (media = (zbx_am_media_t *)zbx_hashset_iter_next(&iter))) { zbx_am_mediatype_t *mediatype; zbx_am_alertpool_t *alertpool; zbx_am_alert_t *alert; const char *alert_subject = "Zabbix database is not available."; char *alert_message; if (NULL == (mediatype = am_get_mediatype(manager, media->mediatypeid))) { zabbix_log(LOG_LEVEL_DEBUG, "cannot find media type with id " ZBX_FS_UI64, media->mediatypeid); continue; } mediatype->refcount++; alert_message = am_create_db_alert_message(config_dbhigh); if (ZBX_MEDIA_MESSAGE_FORMAT_HTML == mediatype->message_format) { char *am_esc; am_esc = zbx_xml_escape_dyn(alert_message); alert_message = zbx_dsprintf(alert_message, "<html><pre>%s</pre></html>", am_esc); zbx_free(am_esc); } alert = am_create_alert(0, media->mediatypeid, 0, 0, 0, media->sendto, alert_subject, shared_str_new(alert_message), NULL, mediatype->message_format, 0, 0, 0); alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid); alertpool->refcount++; am_push_alert(alertpool, alert); am_push_alertpool(mediatype, alertpool); am_push_mediatype(manager, mediatype); zbx_free(alert_message); } } /****************************************************************************** * * * Purpose: initializes alert manager * * * * Parameters: manager - [IN] * * get_forks_cb - [IN] process fork count callback * * error - [OUT] * * * * Return value: SUCCEED - the alert manager was initialized successfully * * FAIL - otherwise * * * ******************************************************************************/ static int am_init(zbx_am_t *manager, zbx_get_config_forks_f get_forks_cb, char **error) { int ret; zabbix_log(LOG_LEVEL_DEBUG, "In %s() alerters:%d", __func__, get_forks_cb(ZBX_PROCESS_TYPE_ALERTER)); if (FAIL == (ret = zbx_ipc_service_start(&manager->ipc, ZBX_IPC_SERVICE_ALERTER, error))) goto out; manager->alerts_num = 0; zbx_vector_am_alerter_ptr_create(&manager->alerters); zbx_queue_ptr_create(&manager->free_alerters); zbx_hashset_create(&manager->alerters_client, 0, alerter_hash_func, alerter_compare_func); manager->next_alerter_index = 0; for (int i = 0; i < get_forks_cb(ZBX_PROCESS_TYPE_ALERTER); i++) { zbx_am_alerter_t *alerter = (zbx_am_alerter_t *)zbx_malloc(NULL, sizeof(zbx_am_alerter_t)); alerter->client = NULL; zbx_vector_am_alerter_ptr_append(&manager->alerters, alerter); } zbx_hashset_create(&manager->mediatypes, 5, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_hashset_create(&manager->alertpools, 100, am_alertpool_hash_func, am_alertpool_compare_func); zbx_hashset_create(&manager->results, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_hashset_create(&manager->watchdog, 5, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_binary_heap_create(&manager->queue, am_mediatype_queue_compare, ZBX_BINARY_HEAP_OPTION_DIRECT); zbx_es_init(&manager->es); manager->syncer_client = NULL; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: update alert status in local cache to be flushed after reading * * new alerts from database * * * * Parameters: manager - [IN] * * alert - [IN] * * status - [IN] alert status * * retries - [IN] number of attempted sending retries * * value - [IN] * * error - [IN] error message * * * ******************************************************************************/ static void am_db_update_alert(zbx_am_t *manager, zbx_am_alert_t *alert, int status, int retries, const char *value, const char *error) { zabbix_log(LOG_LEVEL_DEBUG, "In %s() alertid:" ZBX_FS_UI64 " status:%d retries:%d value:%s error:%s", __func__, alert->alertid, status, retries, ZBX_NULL2EMPTY_STR(value), ZBX_NULL2EMPTY_STR(error)); /* alerts with 0 alertid are runtime alerts generated by alert manager when database is down */ if (0 != alert->alertid) { zbx_am_result_t *result; if (NULL == (result = (zbx_am_result_t *)zbx_hashset_search(&manager->results, &alert->alertid))) { zbx_am_result_t update_local = { .alertid = alert->alertid, .eventid = alert->eventid, .mediatypeid = alert->mediatypeid, .source = ZBX_ALERTPOOL_SOURCE(alert->alertpoolid) }; result = (zbx_am_result_t *)zbx_hashset_insert(&manager->results, &update_local, sizeof(update_local)); } result->retries = retries; result->status = status; ZBX_UPDATE_STR(result->value, value); ZBX_UPDATE_STR(result->error, error); } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: sends response to external alert request * * * * Parameters: alerter_service - [IN] * * alert - [IN] * * value - [IN] * * errcode - [IN] * * error - [IN] error message * * debug - [IN] debug message * * * ******************************************************************************/ static void am_external_alert_send_response(const zbx_ipc_service_t *alerter_service, const zbx_am_alert_t *alert, const char *value, int errcode, const char *error, const char *debug) { zbx_ipc_client_t *client; if (NULL != (client = zbx_ipc_client_by_id(alerter_service, alert->alertid))) { unsigned char *data; zbx_uint32_t data_len; data_len = zbx_alerter_serialize_result_ext(&data, alert->sendto, value, errcode, error, debug); zbx_ipc_client_send(client, ZBX_IPC_ALERTER_SEND_ALERT, data, data_len); zbx_free(data); } else zabbix_log(LOG_LEVEL_DEBUG, "client has disconnected"); } /****************************************************************************** * * * Purpose: synchronizes watchdog alert recipients * * * * Parameters: manager - [IN] * * medias - [IN] new watchdog media list * * medias_num - [IN] number of watchdog medias * * * ******************************************************************************/ static void am_sync_watchdog(zbx_am_t *manager, zbx_am_media_t **medias, int medias_num) { zbx_hashset_t mediaids; zbx_am_media_t *media; zbx_hashset_iter_t iter; zbx_vector_am_media_ptr_t media_new; static int old_count = -1; zabbix_log(LOG_LEVEL_DEBUG, "In %s() recipients:%d", __func__, medias_num); zbx_hashset_create(&mediaids, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); zbx_vector_am_media_ptr_create(&media_new); for (int i = 0; i < medias_num; i++) { if (NULL == (media = (zbx_am_media_t *)zbx_hashset_search(&manager->watchdog, &medias[i]->mediaid))) { zbx_am_media_t media_local; media_local.mediaid = medias[i]->mediaid; media_local.mediatypeid = 0; media_local.sendto = NULL; media = (zbx_am_media_t *)zbx_hashset_insert(&manager->watchdog, &media_local, sizeof(media_local)); media->sendto = NULL; zbx_vector_am_media_ptr_append(&media_new, media); } media->mediatypeid = medias[i]->mediatypeid; ZBX_UPDATE_STR(media->sendto, medias[i]->sendto); zbx_hashset_insert(&mediaids, &media->mediaid, sizeof(media->mediaid)); } /* drop removed watchdog alert recipients from cache */ zbx_hashset_iter_reset(&manager->watchdog, &iter); while (NULL != (media = (zbx_am_media_t *)zbx_hashset_iter_next(&iter))) { if (NULL != zbx_hashset_search(&mediaids, &media->mediaid)) continue; zbx_free(media->sendto); zbx_hashset_iter_remove(&iter); } zbx_vector_am_media_ptr_destroy(&media_new); zbx_hashset_destroy(&mediaids); if (0 < old_count && 0 == manager->watchdog.num_data) { zabbix_log(LOG_LEVEL_WARNING, "watchdog: no recipients found for database down messages"); } else if (0 == old_count && 0 < manager->watchdog.num_data) { zabbix_log(LOG_LEVEL_WARNING, "watchdog: %d recipient(s) found for database down messages", manager->watchdog.num_data); } old_count = manager->watchdog.num_data; zabbix_log(LOG_LEVEL_DEBUG, "End of %s() recipients:%d", __func__, manager->watchdog.num_data); } static int check_allowed_path(const char *allowed, const char *path, char **error) { char *absolute_path = NULL, *absolute_allowed = NULL, *directory_name; int absolute_path_len, absolute_allowed_len, ret = FAIL; directory_name = zbx_strdup(NULL, path); #if defined(HAVE_LIBGEN_H) absolute_path = realpath(dirname(directory_name), NULL); #else absolute_path = realpath(directory_name, NULL); #endif if (NULL == absolute_path) { *error = zbx_dsprintf(*error, "cannot resolve path %s", zbx_strerror(errno)); goto out; } if (NULL == (absolute_allowed = realpath(allowed, NULL))) { *error = zbx_dsprintf(*error, "cannot resolve allowed path %s", zbx_strerror(errno)); goto out; } absolute_path_len = strlen(absolute_path); if (absolute_path_len < (absolute_allowed_len = strlen(absolute_allowed))) { *error = zbx_dsprintf(*error, "absolute path '%s' is not in allowed path '%s'", absolute_path, absolute_allowed); goto out; } if (0 != memcmp(absolute_allowed, absolute_path, absolute_allowed_len)) { *error = zbx_dsprintf(*error, "absolute path '%s' is not in allowed path '%s'", absolute_path, absolute_allowed); goto out; } ret = SUCCEED; out: zbx_free(absolute_path); zbx_free(absolute_allowed); zbx_free(directory_name); return ret; } /****************************************************************************** * * * Purpose: gets script media type parameters with expanded macros * * * * Parameters: mediatype - [IN] * * alert - [IN] * * scripts_path - [IN] * * cmd - [OUT] command to execute * * error - [OUT] * * * * Return value: SUCCEED - the command was prepared successfully * * FAIL - otherwise * * * ******************************************************************************/ static int am_prepare_mediatype_exec_command(zbx_am_mediatype_t *mediatype, zbx_am_alert_t *alert, const char *scripts_path, char **cmd, char **error) { size_t cmd_alloc = ZBX_KIBIBYTE, cmd_offset = 0; int ret = FAIL; char *error_path = NULL; *cmd = (char *)zbx_malloc(NULL, cmd_alloc); zbx_snprintf_alloc(cmd, &cmd_alloc, &cmd_offset, "%s/%s", scripts_path, mediatype->exec_path); if (FAIL == check_allowed_path(scripts_path, *cmd, &error_path)) { *error = zbx_dsprintf(*error, "Cannot execute command \"%s\": %s", *cmd, error_path); goto out; } if (0 == access(*cmd, X_OK)) { const char *p; struct zbx_json_parse jp; char *buf = NULL; size_t buf_alloc = 0; if (SUCCEED != zbx_json_open(alert->params, &jp)) { *error = zbx_dsprintf(*error, "Cannot parse parameters: %s", zbx_json_strerror()); goto out; } for (p = NULL; NULL != (p = zbx_json_next_value_dyn(&jp, p, &buf, &buf_alloc, NULL));) { char *param_esc; param_esc = zbx_dyn_escape_shell_single_quote(buf); zbx_snprintf_alloc(cmd, &cmd_alloc, &cmd_offset, " '%s'", param_esc); zbx_free(param_esc); } zbx_free(buf); ret = SUCCEED; } else { *error = zbx_dsprintf(*error, "Cannot execute command \"%s\": %s", *cmd, zbx_strerror(errno)); } out: if (SUCCEED != ret) zbx_free(*cmd); zbx_free(error_path); return ret; } /****************************************************************************** * * * Purpose: sends alert to the alerter * * * * Parameters: manager - [IN] * * alerter - [IN] * * alert - [IN] * * scripts_path - [IN] * * * * Return value: SUCCEED - the alert was successfully sent to alerter * * FAIL - otherwise * * * ******************************************************************************/ static int am_process_alert(zbx_am_t *manager, zbx_am_alerter_t *alerter, zbx_am_alert_t *alert, const char *scripts_path) { zbx_am_mediatype_t *mediatype; unsigned char *data = NULL, debug; size_t data_len; zbx_uint64_t command, p_eventid; char *cmd = NULL, *error = NULL; int ret = FAIL; unsigned char message_format; zabbix_log(LOG_LEVEL_DEBUG, "%s() alertid:" ZBX_FS_UI64 " mediatypeid:" ZBX_FS_UI64 " alertpoolid:0x" ZBX_FS_UX64, __func__, alert->alertid, alert->mediatypeid, alert->alertpoolid); if (NULL == (mediatype = am_get_mediatype(manager, alert->mediatypeid))) { am_alert_free(alert); goto out; } if (NULL != mediatype->error) { if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid)) am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, mediatype->error, NULL); else am_db_update_alert(manager, alert, ALERT_STATUS_FAILED, 0, NULL, mediatype->error); am_remove_alert(manager, alert); goto out; } switch (mediatype->type) { case MEDIA_TYPE_EMAIL: command = ZBX_IPC_ALERTER_EMAIL; p_eventid = (0 == alert->p_eventid ? alert->eventid : alert->p_eventid); if (ZBX_MEDIA_MESSAGE_FORMAT_DEFAULT == (message_format = alert->message_format)) message_format = mediatype->message_format; data_len = zbx_alerter_serialize_email(&data, alert->alertid, alert->mediatypeid, p_eventid, alert->source, alert->object, alert->objectid, alert->sendto, alert->subject, alert->message, mediatype->smtp_server, mediatype->smtp_port, mediatype->smtp_helo, mediatype->smtp_email, mediatype->smtp_security, mediatype->smtp_verify_peer, mediatype->smtp_verify_host, mediatype->smtp_authentication, mediatype->username, mediatype->passwd, message_format, alert->expression, alert->recovery_expression); break; case MEDIA_TYPE_SMS: command = ZBX_IPC_ALERTER_SMS; data_len = zbx_alerter_serialize_sms(&data, alert->alertid, alert->sendto, alert->message, mediatype->gsm_modem); break; case MEDIA_TYPE_EXEC: command = ZBX_IPC_ALERTER_EXEC; if (FAIL == am_prepare_mediatype_exec_command(mediatype, alert, scripts_path, &cmd, &error)) { if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid)) am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, error, NULL); else am_db_update_alert(manager, alert, ALERT_STATUS_FAILED, 0, NULL, error); am_remove_alert(manager, alert); zbx_free(error); goto out; } data_len = zbx_alerter_serialize_exec(&data, alert->alertid, cmd); zbx_free(cmd); break; case MEDIA_TYPE_WEBHOOK: command = ZBX_IPC_ALERTER_WEBHOOK; if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid)) debug = ZBX_ALERT_DEBUG; else debug = ZBX_ALERT_NO_DEBUG; data_len = zbx_alerter_serialize_webhook(&data, mediatype->script_bin, mediatype->script_bin_sz, mediatype->timeout, alert->params, debug); break; default: error = "unsupported media type"; if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid)) am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, error, NULL); else am_db_update_alert(manager, alert, ALERT_STATUS_FAILED, 0, NULL, error); zabbix_log(LOG_LEVEL_ERR, "cannot process alertid:" ZBX_FS_UI64 ": unsupported media type: %d", alert->alertid, mediatype->type); am_remove_alert(manager, alert); goto out; } alerter->alert = alert; zbx_ipc_client_send(alerter->client, command, data, data_len); zbx_free(data); ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: processes alerter result * * * * Parameters: manager - [IN] * * client - [IN] connected alerter * * message - [IN] received message * * * * Return value: SUCCEED - the alert was sent successfully * * FAIL - otherwise * * * ******************************************************************************/ static int am_process_result(zbx_am_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message) { int ret = FAIL; zbx_am_alerter_t *alerter; char *value, *errmsg, *debug; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (NULL == (alerter = am_get_alerter_by_client(manager, client))) { THIS_SHOULD_NEVER_HAPPEN; goto out; } if (NULL == alerter->alert) { THIS_SHOULD_NEVER_HAPPEN; goto out; } zabbix_log(LOG_LEVEL_DEBUG, "%s() alertid:" ZBX_FS_UI64 " mediatypeid:" ZBX_FS_UI64 " alertpoolid:0x" ZBX_FS_UX64, __func__, alerter->alert->alertid, alerter->alert->mediatypeid, alerter->alert->alertpoolid); zbx_alerter_deserialize_result(message->data, &value, &ret, &errmsg, &debug); if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alerter->alert->alertpoolid)) { am_external_alert_send_response(&manager->ipc, alerter->alert, value, ret, errmsg, debug); am_remove_alert(manager, alerter->alert); } else { int status; if (SUCCEED == ret) { status = ALERT_STATUS_SENT; } else { if (SUCCEED == am_retry_alert(manager, alerter->alert)) status = ALERT_STATUS_NOT_SENT; else status = ALERT_STATUS_FAILED; } am_db_update_alert(manager, alerter->alert, status, alerter->alert->retries, value, errmsg); if (ALERT_STATUS_NOT_SENT != status) am_remove_alert(manager, alerter->alert); } zbx_free(value); zbx_free(errmsg); zbx_free(debug); alerter->alert = NULL; zbx_queue_ptr_push(&manager->free_alerters, alerter); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); return ret; } /****************************************************************************** * * * Purpose: checks alert queue if there is an alert that should be sent now * * * * Parameters: manager - [IN] * * now - [IN] current timestamp * * * * Return value: SUCCEED - an alert can be sent * * FAIL - there are no alerts to be sent at this time * * * ******************************************************************************/ static int am_check_queue(zbx_am_t *manager, int now) { const zbx_binary_heap_elem_t *elem; const zbx_am_mediatype_t *mediatype; const zbx_am_alertpool_t *alertpool; const zbx_am_alert_t *alert; if (SUCCEED == zbx_binary_heap_empty(&manager->queue)) return FAIL; elem = zbx_binary_heap_find_min(&manager->queue); mediatype = (const zbx_am_mediatype_t *)elem->data; if (SUCCEED == zbx_binary_heap_empty(&mediatype->queue)) return FAIL; elem = zbx_binary_heap_find_min(&mediatype->queue); alertpool = (const zbx_am_alertpool_t *)elem->data; if (SUCCEED == zbx_binary_heap_empty(&alertpool->queue)) return FAIL; elem = zbx_binary_heap_find_min(&alertpool->queue); alert = (const zbx_am_alert_t *)elem->data; if (alert->nextsend > now) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: updates cached media types * * * ******************************************************************************/ static void am_update_mediatypes(zbx_am_t *manager, zbx_ipc_message_t *message, const char *config_source_ip) { zbx_am_db_mediatype_t **mediatypes; int mediatypes_num; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_alerter_deserialize_mediatypes(message->data, &mediatypes, &mediatypes_num); zabbix_log(LOG_LEVEL_DEBUG, "update %d media types", mediatypes_num); for (int i = 0; i < mediatypes_num; i++) { zbx_am_db_mediatype_t *mt = mediatypes[i]; am_update_mediatype(manager, mt->mediatypeid, mt->type, mt->smtp_server, mt->smtp_helo, mt->smtp_email, mt->exec_path, mt->gsm_modem, mt->username, mt->passwd, mt->smtp_port, mt->smtp_security, mt->smtp_verify_peer, mt->smtp_verify_host, mt->smtp_authentication, mt->maxsessions, mt->maxattempts, mt->attempt_interval, mt->message_format, mt->script, mt->timeout, ZBX_AM_MEDIATYPE_FLAG_NONE, config_source_ip); zbx_am_db_mediatype_clear(mt); zbx_free(mt); } zbx_free(mediatypes); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: queues new alerts * * * * Parameters: manager - [IN] * * alert - [IN] * * now - [IN] current timestamp * * * * Return value: SUCCEED - alert is queued * * FAIL - otherwise * * * ******************************************************************************/ static int am_queue_alert(zbx_am_t *manager, zbx_am_alert_t *alert, int now) { zbx_am_mediatype_t *mediatype; zbx_am_alertpool_t *alertpool; alert->nextsend = now; if (NULL == (mediatype = am_get_mediatype(manager, alert->mediatypeid))) return FAIL; alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid); alertpool->refcount++; mediatype->refcount++; am_push_alert(alertpool, alert); am_push_alertpool(mediatype, alertpool); am_push_mediatype(manager, mediatype); manager->alerts_num++; return SUCCEED; } /****************************************************************************** * * * Purpose: queues new alerts * * * ******************************************************************************/ static void am_queue_alerts(zbx_am_t *manager, zbx_ipc_message_t *message, int now) { zbx_am_db_alert_t **alerts; int alerts_num; now = time(NULL); zbx_alerter_deserialize_alerts(message->data, &alerts, &alerts_num); for (int i = 0; i < alerts_num; i++) { zbx_am_alert_t *alert = am_copy_db_alert(alerts[i]); if (FAIL == am_queue_alert(manager, alert, now)) { am_alert_free(alert); continue; } } zbx_free(alerts); } /****************************************************************************** * * * Purpose: updates 'database down' watchdog alert recipients * * * ******************************************************************************/ static void am_update_watchdog(zbx_am_t *manager, zbx_ipc_message_t *message) { zbx_am_media_t **medias; int medias_num; zbx_alerter_deserialize_medias(message->data, &medias, &medias_num); am_sync_watchdog(manager, medias, medias_num); for (int i = 0; i < medias_num; i++) zbx_am_media_free(medias[i]); zbx_free(medias); } /****************************************************************************** * * * Purpose: removes unused mediatypes * * * ******************************************************************************/ static void am_drop_mediatypes(zbx_am_t *manager, zbx_ipc_message_t *message) { zbx_uint64_t *ids; int ids_num; zbx_alerter_deserialize_ids(message->data, &ids, &ids_num); for (int i = 0; i < ids_num; i++) { zbx_am_mediatype_t *mediatype; if (NULL == (mediatype = (zbx_am_mediatype_t *)zbx_hashset_search(&manager->mediatypes, &ids[i]))) continue; if (0 == mediatype->refcount) am_remove_mediatype(manager, mediatype); else mediatype->flags = ZBX_AM_MEDIATYPE_FLAG_REMOVE; } zbx_free(ids); } /****************************************************************************** * * * Purpose: returns alert sending results * * * ******************************************************************************/ static void am_flush_results(zbx_am_t *manager, zbx_ipc_client_t *client) { zbx_vector_am_result_ptr_t results; zbx_hashset_iter_t iter; zbx_am_result_t *result; zbx_uint32_t data_len; unsigned char *data; zabbix_log(LOG_LEVEL_DEBUG, "In %s() results:%d", __func__, manager->results.num_data); if (0 == manager->results.num_data) { int results_num = 0; unsigned char buf[sizeof(results_num)]; (void)zbx_serialize_value(buf, results_num); zbx_ipc_client_send(client, ZBX_IPC_ALERTER_RESULTS, buf, sizeof(results_num)); goto out; } zbx_vector_am_result_ptr_create(&results); zbx_hashset_iter_reset(&manager->results, &iter); while (NULL != (result = (zbx_am_result_t *)zbx_hashset_iter_next(&iter))) zbx_vector_am_result_ptr_append(&results, result); zbx_vector_am_result_ptr_sort(&results, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); data_len = zbx_alerter_serialize_results(&data, (zbx_am_result_t **)results.values, results.values_num); zbx_ipc_client_send(client, ZBX_IPC_ALERTER_RESULTS, data, data_len); zbx_free(data); zbx_hashset_iter_reset(&manager->results, &iter); while (NULL != (result = (zbx_am_result_t *)zbx_hashset_iter_next(&iter))) { zbx_free(result->value); zbx_free(result->error); } zbx_hashset_clear(&manager->results); zbx_vector_am_result_ptr_destroy(&results); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /********************************************************************************** * * * Purpose: processes external alert request * * * * Parameters: manager - [IN] * * id - [IN] client id that sent external alert request * * data - [IN] received message * * config_source_ip - [IN] * * * **********************************************************************************/ static void am_process_external_alert_request(zbx_am_t *manager, zbx_uint64_t id, const unsigned char *data, const char *config_source_ip) { zbx_uint64_t mediatypeid; char *sendto, *subject, *message, *params, *smtp_server, *smtp_helo, *smtp_email, *exec_path, *gsm_modem, *username, *passwd, *attempt_interval, *script, *timeout; unsigned short smtp_port; int maxsessions, maxattempts; unsigned char type, smtp_security, smtp_verify_peer, smtp_verify_host, smtp_authentication, message_format; zbx_am_alert_t *alert; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_alerter_deserialize_alert_send(data, &mediatypeid, &type, &smtp_server, &smtp_helo, &smtp_email, &exec_path, &gsm_modem, &username, &passwd, &smtp_port, &smtp_security, &smtp_verify_peer, &smtp_verify_host, &smtp_authentication, &maxsessions, &maxattempts, &attempt_interval, &message_format, &script, &timeout, &sendto, &subject, &message, ¶ms); /* update with initial 'remove' flag so the mediatype is removed if it's not used by other alerts */ am_update_mediatype(manager, mediatypeid, type, smtp_server, smtp_helo, smtp_email, exec_path, gsm_modem, username, passwd, smtp_port, smtp_security, smtp_verify_peer, smtp_verify_host, smtp_authentication, maxsessions, maxattempts, attempt_interval, message_format, script, timeout, ZBX_AM_MEDIATYPE_FLAG_REMOVE, config_source_ip); alert = am_create_alert(id, mediatypeid, ALERT_SOURCE_EXTERNAL, 0, id, sendto, subject, shared_str_new(message), params, message_format, 0, 0, 0); if (FAIL == am_queue_alert(manager, alert, 0)) { am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, "Media type unavailable", NULL); am_alert_free(alert); } zbx_free(params); zbx_free(smtp_server); zbx_free(smtp_helo); zbx_free(smtp_email); zbx_free(exec_path); zbx_free(gsm_modem); zbx_free(username); zbx_free(passwd); zbx_free(attempt_interval); zbx_free(script); zbx_free(timeout); zbx_free(message); zbx_free(subject); zbx_free(sendto); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: begins file dispatch * * * * Parameters: client - [IN] connected worker interprocess communication * * client * * data - [IN] * * * ******************************************************************************/ static void am_process_begin_dispatch(zbx_ipc_client_t *client, const unsigned char *data) { zbx_am_dispatch_t *dispatch; zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, zbx_ipc_client_id(client)); dispatch = (zbx_am_dispatch_t*) zbx_malloc(NULL, sizeof(zbx_am_dispatch_t)); zbx_alerter_deserialize_begin_dispatch(data, &dispatch->subject, &dispatch->message, &dispatch->content_name, &dispatch->message_format, &dispatch->content, &dispatch->content_size); zbx_ipc_client_set_userdata(client, dispatch); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() name:%s message_format:%s size:%u", __func__, dispatch->content_name, dispatch->message_format, dispatch->content_size); } /****************************************************************************** * * * Purpose: prepares message to dispatch by attaching dispatch contents for * * supported media types * * * * Parameters: dispatch - [IN] * * mt - [IN] media type * * message - [OUT] message to send * * message_format - [OUT] message content type * * * ******************************************************************************/ static void am_prepare_dispatch_message(zbx_am_dispatch_t *dispatch, zbx_db_mediatype *mt, zbx_shared_str_t *message, unsigned char *message_format) { char *body = NULL; if (0 != dispatch->content_size) { if (MEDIA_TYPE_EMAIL == mt->type) { body = zbx_email_make_body(dispatch->message, mt->message_format, dispatch->content_name, dispatch->message_format, dispatch->content, dispatch->content_size); *message_format = ZBX_MEDIA_MESSAGE_FORMAT_MULTI; } } if (NULL == body) { *message = shared_str_new(dispatch->message); *message_format = mt->message_format; } else { *message = shared_str_new(body); zbx_free(body); } } /************************************************************************************ * * * Purpose: sends dispatch to the specified media type users * * * * Parameters: manager - [IN] * * client - [IN] connected worker interprocess communication * * client * * data - [IN] * * config_source_ip - [IN] * * * ************************************************************************************/ static void am_process_send_dispatch(zbx_am_t *manager, zbx_ipc_client_t *client, const unsigned char *data, const char *config_source_ip) { zbx_vector_str_t recipients; zbx_db_mediatype mt; zbx_shared_str_t message; unsigned char message_format; zbx_uint64_t id; zbx_am_dispatch_t *dispatch; zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, zbx_ipc_client_id(client)); if (NULL == (dispatch = (zbx_am_dispatch_t *)zbx_ipc_client_get_userdata(client))) { THIS_SHOULD_NEVER_HAPPEN; zbx_ipc_client_send(client, ZBX_IPC_ALERTER_ABORT_DISPATCH, NULL, 0); goto out; } id = zbx_ipc_client_id(client); zbx_vector_str_create(&recipients); zbx_alerter_deserialize_send_dispatch(data, &mt, &recipients); /* update with initial 'remove' flag so the mediatype is removed */ /* if it's not used by other test alerts/dispatches */ am_update_mediatype(manager, mt.mediatypeid, mt.type, mt.smtp_server, mt.smtp_helo, mt.smtp_email, mt.exec_path, mt.gsm_modem, mt.username, mt.passwd, mt.smtp_port, mt.smtp_security, mt.smtp_verify_peer, mt.smtp_verify_host, mt.smtp_authentication, mt.maxsessions, mt.maxattempts, mt.attempt_interval, mt.message_format, mt.script, mt.timeout, ZBX_AM_MEDIATYPE_FLAG_REMOVE, config_source_ip); am_prepare_dispatch_message(dispatch, &mt, &message, &message_format); for (int i = 0; i < recipients.values_num; i++) { zbx_am_alert_t *alert = am_create_alert(id, mt.mediatypeid, ALERT_SOURCE_EXTERNAL, 0, id, recipients.values[i], dispatch->subject, message, NULL, message_format, 0, 0, 0); if (FAIL == am_queue_alert(manager, alert, 0)) { am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, "Media type unavailable", NULL); am_alert_free(alert); } } zbx_db_mediatype_clean(&mt); zbx_vector_str_clear_ext(&recipients, zbx_str_free); zbx_vector_str_destroy(&recipients); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: finishes sending dispatches * * * ******************************************************************************/ static void am_process_end_dispatch(zbx_ipc_client_t *client) { zbx_am_dispatch_t *dispatch; zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, zbx_ipc_client_id(client)); dispatch = (zbx_am_dispatch_t *)zbx_ipc_client_get_userdata(client); zbx_ipc_client_set_userdata(client, NULL); am_dispatch_free(dispatch); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: removes unused media types * * * ******************************************************************************/ static void am_remove_unused_mediatypes(zbx_am_t *manager) { zbx_hashset_iter_t iter; zbx_am_mediatype_t *mediatype; zbx_hashset_iter_reset(&manager->mediatypes, &iter); while (NULL != (mediatype = (zbx_am_mediatype_t *)zbx_hashset_iter_next(&iter))) { if (0 != (mediatype->flags & ZBX_AM_MEDIATYPE_FLAG_REMOVE) && 0 == mediatype->refcount) am_remove_mediatype(manager, mediatype); } } /****************************************************************************** * * * Purpose: processes diagnostic statistics request * * * ******************************************************************************/ static void am_process_diag_stats(zbx_am_t *manager, zbx_ipc_client_t *client) { unsigned char *data; zbx_uint32_t data_len; data_len = zbx_alerter_serialize_diag_stats(&data, manager->alerts_num); zbx_ipc_client_send(client, ZBX_IPC_ALERTER_DIAG_STATS_RESULT, data, data_len); zbx_free(data); } /****************************************************************************** * * * Purpose: compares mediatypes by total queued alerts * * * * Return value: respective difference of total amounts. * * * ******************************************************************************/ static int am_compare_mediatype_by_alerts_desc(const void *d1, const void *d2) { zbx_am_mediatype_t *m1 = *(zbx_am_mediatype_t * const *)d1; zbx_am_mediatype_t *m2 = *(zbx_am_mediatype_t * const *)d2; return m2->refcount - m1->refcount; } /****************************************************************************** * * * Purpose: processes top mediatypes by queued alerts * * * * Parameters: manager - [IN] * * client - [IN] connected worker IPC client data * * message - [IN] received message * * * ******************************************************************************/ static void am_process_diag_top_mediatypes(zbx_am_t *manager, zbx_ipc_client_t *client, const zbx_ipc_message_t *message) { int limit; unsigned char *data; zbx_uint32_t data_len; zbx_vector_am_mediatype_ptr_t view; zbx_hashset_iter_t iter; zbx_am_mediatype_t *mediatype; int mediatypes_num; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_alerter_deserialize_top_request(message->data, &limit); zbx_vector_am_mediatype_ptr_create(&view); zbx_hashset_iter_reset(&manager->mediatypes, &iter); while (NULL != (mediatype = (zbx_am_mediatype_t *)zbx_hashset_iter_next(&iter))) { if (0 != mediatype->refcount) zbx_vector_am_mediatype_ptr_append(&view, mediatype); } zbx_vector_am_mediatype_ptr_sort(&view, am_compare_mediatype_by_alerts_desc); mediatypes_num = MIN(limit, view.values_num); data_len = zbx_alerter_serialize_top_mediatypes_result(&data, (zbx_am_mediatype_t **)view.values, mediatypes_num); zbx_ipc_client_send(client, ZBX_IPC_ALERTER_DIAG_TOP_MEDIATYPES_RESULT, data, data_len); zbx_free(data); zbx_vector_am_mediatype_ptr_destroy(&view); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /* alert source hashset support */ static zbx_hash_t am_source_hash_func(const void *data) { const zbx_am_source_stats_t *source = (const zbx_am_source_stats_t *)data; zbx_uint64_t source_object = (zbx_uint64_t)source->source << 32 | source->object; zbx_hash_t hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&source_object); hash = ZBX_DEFAULT_UINT64_HASH_ALGO(&source->objectid, sizeof(source->objectid), hash); return hash; } static int am_source_compare_func(const void *d1, const void *d2) { const zbx_am_source_stats_t *s1 = (const zbx_am_source_stats_t *)d1; const zbx_am_source_stats_t *s2 = (const zbx_am_source_stats_t *)d2; ZBX_RETURN_IF_NOT_EQUAL(s1->source, s2->source); ZBX_RETURN_IF_NOT_EQUAL(s1->object, s2->object); ZBX_RETURN_IF_NOT_EQUAL(s1->objectid, s2->objectid); return 0; } /****************************************************************************** * * * Purpose: processes top alert sources by queued alerts * * * * Parameters: manager - [IN] * * client - [IN] connected worker IPC client data * * message - [IN] received message * * * ******************************************************************************/ static void am_process_diag_top_sources(zbx_am_t *manager, zbx_ipc_client_t *client, const zbx_ipc_message_t *message) { int limit; unsigned char *data; zbx_uint32_t data_len; zbx_vector_am_source_stats_ptr_t view; zbx_am_alertpool_t *alertpool; int sources_num; zbx_hashset_t sources; zbx_hashset_iter_t iter; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zbx_alerter_deserialize_top_request(message->data, &limit); zbx_hashset_create(&sources, 1024, am_source_hash_func, am_source_compare_func); zbx_vector_am_source_stats_ptr_create(&view); zbx_hashset_iter_reset(&manager->alertpools, &iter); while (NULL != (alertpool = (zbx_am_alertpool_t *)zbx_hashset_iter_next(&iter))) { for (int i = 0; i < alertpool->queue.elems_num; i++) { zbx_am_source_stats_t *source, source_local; const zbx_am_alert_t *alert = (const zbx_am_alert_t *)alertpool->queue.elems[i].data; source_local.source = ZBX_ALERTPOOL_SOURCE(alert->alertpoolid); source_local.object = ZBX_ALERTPOOL_OBJECT(alert->alertpoolid); source_local.objectid = alert->objectid; if (NULL == (source = zbx_hashset_search(&sources, &source_local))) { source_local.alerts_num = 0; source = zbx_hashset_insert(&sources, &source_local, sizeof(source_local)); zbx_vector_am_source_stats_ptr_append(&view, source); } source->alerts_num++; } } zbx_vector_am_source_stats_ptr_sort(&view, am_compare_mediatype_by_alerts_desc); sources_num = MIN(limit, view.values_num); data_len = zbx_alerter_serialize_top_sources_result(&data, (zbx_am_source_stats_t **)view.values, sources_num); zbx_ipc_client_send(client, ZBX_IPC_ALERTER_DIAG_TOP_SOURCES_RESULT, data, data_len); zbx_free(data); zbx_vector_am_source_stats_ptr_destroy(&view); zbx_hashset_destroy(&sources); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } static int am_check_dbstatus(void) { int ret = ZBX_DB_OK; zbx_db_result_t res; #ifdef HAVE_ORACLE res = zbx_db_select_once("select null from dual"); #else res = zbx_db_select_once("select null"); #endif if ((zbx_db_result_t)ZBX_DB_DOWN == res || NULL == res) { zbx_db_close(); ret = zbx_db_connect(ZBX_DB_CONNECT_ONCE); } else zbx_db_free_result(res); return ret; } ZBX_THREAD_ENTRY(zbx_alert_manager_thread, args) { #define ZBX_DB_PING_FREQUENCY SEC_PER_MIN #define ZBX_AM_MEDIATYPE_CLEANUP_FREQUENCY SEC_PER_HOUR zbx_thread_alert_manager_args *alert_manager_args_in = (zbx_thread_alert_manager_args *) (((zbx_thread_args_t *)args)->args); zbx_am_t manager; char *error = NULL; double time_stat, time_idle = 0; int server_num = ((zbx_thread_args_t *)args)->info.server_num, process_num = ((zbx_thread_args_t *)args)->info.process_num, time_ping = 0, time_watchdog = 0, time_mediatype = 0, syncer_is_ready = 0; unsigned char process_type = ((zbx_thread_args_t *)args)->info.process_type; const zbx_thread_info_t *info = &((zbx_thread_args_t *)args)->info; zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num); zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type), server_num, get_process_type_string(process_type), process_num); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); if (FAIL == am_init(&manager, alert_manager_args_in->get_process_forks_cb_arg, &error)) { zabbix_log(LOG_LEVEL_CRIT, "cannot initialize alert manager: %s", error); zbx_free(error); exit(EXIT_FAILURE); } manager.dbstatus = zbx_db_connect(ZBX_DB_CONNECT_ONCE); /* initialize statistics */ time_stat = zbx_time(); zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num); while (ZBX_IS_RUNNING()) { zbx_ipc_client_t *client; zbx_ipc_message_t *message; zbx_am_alerter_t *alerter; int ret, sent_num = 0, failed_num = 0, now; double time_now, sec; zbx_timespec_t timeout = {1, 0}; const char *scripts_path = alert_manager_args_in->get_scripts_path_cb_arg(); time_now = zbx_time(); now = (int)time_now; if ((time_ping + ZBX_DB_PING_FREQUENCY) < now) { manager.dbstatus = am_check_dbstatus(); time_ping = now; } if (ZBX_DB_DOWN == manager.dbstatus) { if (0 == time_watchdog) zabbix_log(LOG_LEVEL_ERR, "database connection lost"); if (time_watchdog + ZBX_WATCHDOG_ALERT_FREQUENCY <= now) { am_queue_watchdog_alerts(&manager, alert_manager_args_in->config_dbhigh); time_watchdog = now; } } else if (0 != time_watchdog) { zabbix_log(LOG_LEVEL_ERR, "database connection re-established"); time_watchdog = 0; } #define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */ /* once in STAT_INTERVAL seconds */ if (STAT_INTERVAL < time_now - time_stat) { zbx_setproctitle("%s #%d [sent %d, failed %d alerts, idle " ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num, sent_num, failed_num, time_idle, time_now - time_stat); time_stat = time_now; time_idle = 0; sent_num = 0; failed_num = 0; } #undef STAT_INTERVAL now = time(NULL); while (SUCCEED == am_check_queue(&manager, now)) { if (NULL == (alerter = (zbx_am_alerter_t *)zbx_queue_ptr_pop(&manager.free_alerters))) break; zbx_am_alert_t *alert; if (NULL == (alert = am_pop_alert(&manager))) break; if (FAIL == am_process_alert(&manager, alerter, alert, scripts_path)) zbx_queue_ptr_push(&manager.free_alerters, alerter); } if (time_mediatype + ZBX_AM_MEDIATYPE_CLEANUP_FREQUENCY < now) { am_remove_unused_mediatypes(&manager); time_mediatype = now; } zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE); ret = zbx_ipc_service_recv(&manager.ipc, &timeout, &client, &message); zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY); sec = zbx_time(); zbx_update_env(get_process_type_string(process_type), sec); if (ZBX_IPC_RECV_IMMEDIATE != ret) time_idle += sec - time_now; if (NULL != message) { switch (message->code) { case ZBX_IPC_ALERTER_REGISTER: am_register_alerter(&manager, client, message); break; case ZBX_IPC_ALERT_SYNCER_REGISTER: am_register_alert_syncer(&manager, client, message); syncer_is_ready = 1; ZBX_FALLTHROUGH; case ZBX_IPC_ALERTER_SYNC_ALERTS: if (1 == syncer_is_ready && FAIL == zbx_ipc_client_send(manager.syncer_client, ZBX_IPC_ALERTER_SYNC_ALERTS, NULL, 0)) { zabbix_log(LOG_LEVEL_ERR, "failed to send message to sync alerts"); } break; case ZBX_IPC_ALERTER_RESULT: if (SUCCEED == am_process_result(&manager, client, message)) sent_num++; else failed_num++; break; case ZBX_IPC_ALERTER_SEND_ALERT: am_process_external_alert_request(&manager, zbx_ipc_client_id(client), message->data, alert_manager_args_in->config_source_ip); break; case ZBX_IPC_ALERTER_MEDIATYPES: am_update_mediatypes(&manager, message, alert_manager_args_in->config_source_ip); break; case ZBX_IPC_ALERTER_ALERTS: am_queue_alerts(&manager, message, now); break; case ZBX_IPC_ALERTER_WATCHDOG: am_update_watchdog(&manager, message); break; case ZBX_IPC_ALERTER_RESULTS: am_flush_results(&manager, client); break; case ZBX_IPC_ALERTER_DROP_MEDIATYPES: am_drop_mediatypes(&manager, message); break; case ZBX_IPC_ALERTER_DIAG_STATS: am_process_diag_stats(&manager, client); break; case ZBX_IPC_ALERTER_DIAG_TOP_MEDIATYPES: am_process_diag_top_mediatypes(&manager, client, message); break; case ZBX_IPC_ALERTER_DIAG_TOP_SOURCES: am_process_diag_top_sources(&manager, client, message); break; case ZBX_IPC_ALERTER_BEGIN_DISPATCH: am_process_begin_dispatch(client, message->data); break; case ZBX_IPC_ALERTER_SEND_DISPATCH: am_process_send_dispatch(&manager, client, message->data, alert_manager_args_in->config_source_ip); break; case ZBX_IPC_ALERTER_END_DISPATCH: am_process_end_dispatch(client); break; } zbx_ipc_message_free(message); } if (NULL != client) zbx_ipc_client_release(client); } zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num); while (1) zbx_sleep(SEC_PER_MIN); #undef ZBX_DB_PING_FREQUENCY #undef ZBX_AM_MEDIATYPE_CLEANUP_FREQUENCY }