/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "zbxalerter.h"
#include "alerter_defs.h"

#include "alerter_protocol.h"
#include "zbxlog.h"
#include "zbxalgo.h"
#include "zbxdb.h"
#include "zbxdbhigh.h"
#include "zbxipcservice.h"
#include "zbxjson.h"
#include "zbxnix.h"
#include "zbxnum.h"
#include "zbxself.h"
#include "zbxservice.h"
#include "zbxstr.h"
#include "zbxthreads.h"
#include "zbxtime.h"
#include "zbxtypes.h"
#include "zbxmedia.h"
#include "zbxcacheconfig.h"

typedef struct
{
	zbx_hashset_t		mediatypes;
	zbx_ipc_async_socket_t	am;
}
zbx_am_db_t;

/******************************************************************************
 *                                                                            *
 * Purpose: creates new alert object                                          *
 *                                                                            *
 * Parameters: ...           - [IN] alert data                                *
 *                                                                            *
 * Return value: alert object.                                                *
 *                                                                            *
 ******************************************************************************/
static zbx_am_db_alert_t	*am_db_create_alert(zbx_uint64_t alertid, zbx_uint64_t mediatypeid, int source,
		int object, zbx_uint64_t objectid, zbx_uint64_t eventid, zbx_uint64_t p_eventid, const char *sendto,
		const char *subject, const char *message, const char *params, int status, int retries)
{
	zbx_am_db_alert_t	*alert;

	alert = (zbx_am_db_alert_t *)zbx_malloc(NULL, sizeof(zbx_am_db_alert_t));
	alert->alertid = alertid;
	alert->mediatypeid = mediatypeid;
	alert->source = source;
	alert->object = object;
	alert->objectid = objectid;
	alert->eventid = eventid;
	alert->p_eventid = p_eventid;

	alert->sendto = zbx_strdup(NULL, sendto);
	alert->subject = zbx_strdup(NULL, subject);
	alert->message = zbx_strdup(NULL, message);
	alert->params = zbx_strdup(NULL, params);

	alert->status = status;
	alert->retries = retries;

	alert->expression = NULL;
	alert->recovery_expression = NULL;

	return alert;
}

static int 	am_db_init(zbx_am_db_t *amdb, char **error)
{
	zbx_hashset_create(&amdb->mediatypes, 5, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	if (SUCCEED != zbx_ipc_async_socket_open(&amdb->am, ZBX_IPC_SERVICE_ALERTER, SEC_PER_MIN, error))
		return FAIL;

	return SUCCEED;
}

static void	am_db_clear(zbx_am_db_t *amdb)
{
	zbx_hashset_iter_t	iter;
	zbx_am_db_mediatype_t	*mediatype;

	zbx_hashset_iter_reset(&amdb->mediatypes, &iter);
	while (NULL != (mediatype = (zbx_am_db_mediatype_t *)zbx_hashset_iter_next(&iter)))
		zbx_am_db_mediatype_clear(mediatype);

	zbx_hashset_destroy(&amdb->mediatypes);

	zbx_ipc_async_socket_close(&amdb->am);
}

/******************************************************************************
 *                                                                            *
 * Purpose: reads the new alerts from database                                *
 *                                                                            *
 * Parameters: alerts - [OUT] new alerts                                      *
 *                                                                            *
 * Comments: On the first call this function will return new and not sent     *
 *           alerts. After that only new alerts are returned.                 *
 *                                                                            *
 * Return value: SUCCEED - the alerts were read successfully                  *
 *               FAIL    - database connection error                          *
 *                                                                            *
 ******************************************************************************/
static int	am_db_get_alerts(zbx_vector_am_db_alert_ptr_t *alerts)
{
	static int		status_limit = 2;
	zbx_uint64_t		status_filter[] = {ALERT_STATUS_NEW, ALERT_STATUS_NOT_SENT};
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	char			*sql = NULL;
	size_t			sql_alloc = 0, sql_offset = 0;
	int			ret = SUCCEED;
	zbx_am_db_alert_t	*alert;
	zbx_vector_uint64_t	alertids;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_uint64_create(&alertids);

	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
			"select a.alertid,a.mediatypeid,a.sendto,a.subject,a.message,a.status,a.retries,"
				"e.source,e.object,e.objectid,a.parameters,a.eventid,a.p_eventid"
			" from alerts a"
			" left join events e"
				" on a.eventid=e.eventid"
			" where alerttype=%d"
			" and",
			ALERT_TYPE_MESSAGE);

	zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "a.status", status_filter, status_limit);
	zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " order by a.alertid");

	zbx_db_begin();
	result = zbx_db_select("%s", sql);
	sql_offset = 0;
	zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset);

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_uint64_t	alertid, mediatypeid, objectid, eventid, p_eventid;
		int		status, attempts, source, object;

		ZBX_STR2UINT64(alertid, row[0]);
		ZBX_STR2UINT64(mediatypeid, row[1]);
		ZBX_STR2UINT64(eventid, row[11]);
		ZBX_DBROW2UINT64(p_eventid, row[12]);
		status = atoi(row[5]);
		attempts = atoi(row[6]);

		if (SUCCEED == zbx_db_is_null(row[7]))
		{
			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
					"update alerts set status=%d,retries=0,error='Related event was removed.';\n",
					ALERT_STATUS_FAILED);
			if (FAIL == (ret = zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset)))
				break;
			continue;
		}

		source = atoi(row[7]);
		object = atoi(row[8]);
		ZBX_STR2UINT64(objectid, row[9]);

		alert = am_db_create_alert(alertid, mediatypeid, source, object, objectid, eventid, p_eventid, row[2],
				row[3], row[4], row[10], status, attempts);

		zbx_vector_am_db_alert_ptr_append(alerts, alert);

		if (ALERT_STATUS_NEW == alert->status)
			zbx_vector_uint64_append(&alertids, alert->alertid);
	}
	zbx_db_free_result(result);

	if (SUCCEED == ret)
	{
		if (0 != alertids.values_num)
		{
			sql_offset = 0;
			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update alerts set status=%d where",
					ALERT_STATUS_NOT_SENT);
			zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "alertid", alertids.values,
					alertids.values_num);

		}
		if (16 < sql_offset)
			ret = (ZBX_DB_OK <= zbx_db_execute("%s", sql) ? SUCCEED : FAIL);
	}
	if (SUCCEED == ret)
	{
		if (ZBX_DB_OK != zbx_db_commit())
			ret = FAIL;
	}
	else
		zbx_db_rollback();

	zbx_vector_uint64_destroy(&alertids);
	zbx_free(sql);

	if (SUCCEED != ret)
		zbx_vector_am_db_alert_ptr_clear_ext(alerts, zbx_am_db_alert_free);
	else
		status_limit = 1;

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s alerts:%d", __func__, zbx_result_string(ret), alerts->values_num);

	return ret;
}

static void	am_db_get_trigger_expressions(zbx_vector_uint64_t *auth_email_mediatypeids,
		zbx_vector_am_db_alert_ptr_t *alerts)
{
	int			i;
	char			*sql = NULL;
	size_t			sql_alloc = 0, sql_offset = 0;
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	zbx_am_db_alert_t	*alert;
	zbx_vector_uint64_t	triggerids;

	if (0 == auth_email_mediatypeids->values_num)
		return;

	zbx_vector_uint64_create(&triggerids);
	zbx_vector_uint64_sort(auth_email_mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	for (i = 0; i < alerts->values_num; i++)
	{
		alert = (zbx_am_db_alert_t *)alerts->values[i];

		if (((EVENT_SOURCE_INTERNAL == alert->source && EVENT_OBJECT_TRIGGER == alert->object) ||
				EVENT_SOURCE_TRIGGERS == alert->source) &&
				FAIL != zbx_vector_uint64_bsearch(auth_email_mediatypeids, alert->mediatypeid,
				ZBX_DEFAULT_UINT64_COMPARE_FUNC))
		{
			zbx_vector_uint64_append(&triggerids, alert->objectid);
		}
	}

	if (0 == triggerids.values_num)
	{
		zbx_vector_uint64_destroy(&triggerids);
		return;
	}

	zbx_vector_uint64_sort(&triggerids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_uint64_uniq(&triggerids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);

	zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "triggerid", triggerids.values,
			triggerids.values_num);

	result = zbx_db_select(
			"select triggerid,expression,recovery_expression"
			" from triggers"
			" where%s",
			sql);

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_uint64_t	triggerid;

		ZBX_STR2UINT64(triggerid, row[0]);

		for (i = 0; i < alerts->values_num; i++)
		{
			alert = (zbx_am_db_alert_t *)alerts->values[i];

			if (((EVENT_SOURCE_INTERNAL == alert->source && EVENT_OBJECT_TRIGGER == alert->object) ||
					EVENT_SOURCE_TRIGGERS == alert->source) && triggerid == alert->objectid)
			{
				alert->expression = zbx_strdup(alert->expression, row[1]);
				alert->recovery_expression = zbx_strdup(alert->recovery_expression, row[2]);
			}
		}
	}
	zbx_db_free_result(result);

	zbx_free(sql);
	zbx_vector_uint64_destroy(&triggerids);
}

#define ZBX_UPDATE_STR(dst, src, ret)				\
	do							\
	{							\
		if (NULL == dst || 0 != strcmp(dst, src))	\
		{						\
			dst = zbx_strdup(dst, src);		\
			ret = SUCCEED;				\
		}						\
	}							\
	while(0)

#define ZBX_UPDATE_VALUE(dst, src, ret)	\
	do				\
	{				\
		if (dst != src)		\
		{			\
			dst = src;	\
			ret = SUCCEED;	\
		}			\
	}				\
	while(0)

/******************************************************************************
 *                                                                            *
 * Purpose: updates media type object, creating one if necessary              *
 *                                                                            *
 * Parameters: amdb - [IN] alert manager cache                                *
 *             ...  - [IN] mediatype data                                     *
 *                                                                            *
 * Return value: Updated mediatype or NULL, if the cached media was up to     *
 *               date.                                                        *
 *                                                                            *
 ******************************************************************************/
static zbx_am_db_mediatype_t	*am_db_update_mediatype(zbx_am_db_t *amdb, time_t now, zbx_uint64_t mediatypeid,
		int type, const char *smtp_server, const char *smtp_helo, const char *smtp_email,
		const char *exec_path, const char *gsm_modem, const char *username, const char *passwd,
		unsigned short smtp_port, unsigned char smtp_security, unsigned char smtp_verify_peer,
		unsigned char smtp_verify_host, unsigned char smtp_authentication, int maxsessions, int maxattempts,
		const char *attempt_interval, unsigned char message_format, const char *script, const char *timeout,
		int process_tags)
{
	zbx_am_db_mediatype_t	*mediatype;
	int			ret = FAIL;

	if (NULL == (mediatype = (zbx_am_db_mediatype_t *)zbx_hashset_search(&amdb->mediatypes, &mediatypeid)))
	{
		zbx_am_db_mediatype_t	mediatype_local = {
				.mediatypeid = mediatypeid
		};

		mediatype = (zbx_am_db_mediatype_t *)zbx_hashset_insert(&amdb->mediatypes, &mediatype_local,
				sizeof(mediatype_local));
		ret = SUCCEED;
	}

	mediatype->last_access = now;
	ZBX_UPDATE_VALUE(mediatype->type, type, ret);
	ZBX_UPDATE_STR(mediatype->smtp_server, smtp_server, ret);
	ZBX_UPDATE_STR(mediatype->smtp_helo, smtp_helo, ret);
	ZBX_UPDATE_STR(mediatype->smtp_email, smtp_email, ret);
	ZBX_UPDATE_STR(mediatype->exec_path, exec_path, ret);
	ZBX_UPDATE_STR(mediatype->gsm_modem, gsm_modem, ret);
	ZBX_UPDATE_STR(mediatype->username, username, ret);
	ZBX_UPDATE_STR(mediatype->passwd, passwd, ret);
	ZBX_UPDATE_STR(mediatype->script, script, ret);
	ZBX_UPDATE_STR(mediatype->timeout, timeout, ret);
	ZBX_UPDATE_STR(mediatype->attempt_interval, attempt_interval, ret);

	ZBX_UPDATE_VALUE(mediatype->smtp_port, smtp_port, ret);
	ZBX_UPDATE_VALUE(mediatype->smtp_security, smtp_security, ret);
	ZBX_UPDATE_VALUE(mediatype->smtp_verify_peer, smtp_verify_peer, ret);
	ZBX_UPDATE_VALUE(mediatype->smtp_verify_host, smtp_verify_host, ret);
	ZBX_UPDATE_VALUE(mediatype->smtp_authentication, smtp_authentication, ret);

	ZBX_UPDATE_VALUE(mediatype->maxsessions, maxsessions, ret);
	ZBX_UPDATE_VALUE(mediatype->maxattempts, maxattempts, ret);
	ZBX_UPDATE_VALUE(mediatype->message_format, message_format, ret);

	ZBX_UPDATE_VALUE(mediatype->process_tags, process_tags, ret);

	return SUCCEED == ret ? mediatype : NULL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates alert manager media types                                 *
 *                                                                            *
 * Parameters: amdb                     - [IN] the alert manager cache        *
 *             mediatypeids             - [IN]                                *
 *             medatypeids_num          - [IN]                                *
 *             mediatypes               - [OUT]                               *
 *             auth_email_mediatypeids  - [OUT] email media types ids with    *
 *                                              authentication enabled        *
 *                                                                            *
 ******************************************************************************/
static void	am_db_update_mediatypes(zbx_am_db_t *amdb, const zbx_uint64_t *mediatypeids, int mediatypeids_num,
		zbx_vector_am_db_mediatype_ptr_t *mediatypes, zbx_vector_uint64_t *auth_email_mediatypeids)
{
	zbx_db_result_t	result;
	zbx_db_row_t	row;
	char		*sql = NULL;
	size_t		sql_alloc = 0, sql_offset = 0;
	time_t		now;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset,
			"select mediatypeid,type,smtp_server,smtp_helo,smtp_email,exec_path,gsm_modem,username,"
				"passwd,smtp_port,smtp_security,smtp_verify_peer,smtp_verify_host,smtp_authentication,"
				"maxsessions,maxattempts,attempt_interval,message_format,script,timeout,process_tags"
			" from media_type"
			" where");

	zbx_db_add_condition_alloc(&sql, &sql_alloc, &sql_offset, "mediatypeid", mediatypeids, mediatypeids_num);

	result = zbx_db_select("%s", sql);
	zbx_free(sql);

	now = time(NULL);
	while (NULL != (row = zbx_db_fetch(result)))
	{
		int			type, maxsessions, maxattempts;
		zbx_uint64_t		mediatypeid;
		unsigned short		smtp_port;
		unsigned char		smtp_security, smtp_verify_peer, smtp_verify_host, smtp_authentication,
					message_format;
		zbx_am_db_mediatype_t	*mediatype;

		if (FAIL == zbx_is_ushort(row[9], &smtp_port))
		{
			THIS_SHOULD_NEVER_HAPPEN;
			continue;
		}

		ZBX_STR2UINT64(mediatypeid, row[0]);
		type = atoi(row[1]);
		ZBX_STR2UCHAR(smtp_security, row[10]);
		ZBX_STR2UCHAR(smtp_verify_peer, row[11]);
		ZBX_STR2UCHAR(smtp_verify_host, row[12]);
		ZBX_STR2UCHAR(smtp_authentication, row[13]);
		maxsessions = atoi(row[14]);
		maxattempts = atoi(row[15]);
		ZBX_STR2UCHAR(message_format, row[17]);

		mediatype = am_db_update_mediatype(amdb, now, mediatypeid, type,row[2], row[3], row[4], row[5],
				row[6], row[7], row[8], smtp_port, smtp_security, smtp_verify_peer, smtp_verify_host,
				smtp_authentication, maxsessions, maxattempts, row[16], message_format, row[18], row[19],
				atoi(row[20]));

		if (NULL != mediatype)
			zbx_vector_am_db_mediatype_ptr_append(mediatypes, mediatype);

		if (NULL != auth_email_mediatypeids && MEDIA_TYPE_EMAIL == type &&
				SMTP_AUTHENTICATION_NORMAL_PASSWORD == smtp_authentication)
		{
			zbx_vector_uint64_append(auth_email_mediatypeids, mediatypeid);
		}
	}
	zbx_db_free_result(result);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() mediatypes:%d/%d", __func__, mediatypes->values_num, mediatypeids_num);
}

/******************************************************************************
 *                                                                            *
 * Purpose: reads alerts/mediatypes from database and queues them in alert    *
 *          manager                                                           *
 *                                                                            *
 * Parameters: amdb - [IN] alert manager cache                                *
 *                                                                            *
 * Return value: count of alerts                                              *
 *                                                                            *
 ******************************************************************************/
static int	am_db_queue_alerts(zbx_am_db_t *amdb)
{
	zbx_vector_am_db_mediatype_ptr_t	mediatypes;
	zbx_vector_am_db_alert_ptr_t		alerts;
	int					alerts_num;
	zbx_am_db_alert_t			*alert;
	zbx_vector_uint64_t			mediatypeids, auth_email_mediatypeids;

	zbx_vector_am_db_alert_ptr_create(&alerts);
	zbx_vector_uint64_create(&mediatypeids);
	zbx_vector_uint64_create(&auth_email_mediatypeids);
	zbx_vector_am_db_mediatype_ptr_create(&mediatypes);

	if (FAIL == am_db_get_alerts(&alerts) || 0 == alerts.values_num)
		goto out;

	for (int i = 0; i < alerts.values_num; i++)
	{
		alert = (zbx_am_db_alert_t *)alerts.values[i];
		zbx_vector_uint64_append(&mediatypeids, alert->mediatypeid);
	}

	zbx_vector_uint64_sort(&mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_uint64_uniq(&mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	am_db_update_mediatypes(amdb, mediatypeids.values, mediatypeids.values_num, &mediatypes,
			&auth_email_mediatypeids);

	am_db_get_trigger_expressions(&auth_email_mediatypeids, &alerts);

	if (0 != mediatypes.values_num)
	{
		unsigned char	*data;
		zbx_uint32_t	data_len;

		data_len = zbx_alerter_serialize_mediatypes(&data, (zbx_am_db_mediatype_t **)mediatypes.values,
				mediatypes.values_num);
		if (FAIL == zbx_ipc_async_socket_send(&amdb->am, ZBX_IPC_ALERTER_MEDIATYPES, data, data_len))
			zabbix_log(LOG_LEVEL_ERR, "failed to queue mediatypes in alerter");
		zbx_free(data);
	}

#define ZBX_ALERT_BATCH_SIZE		1000
	for (int i = 0; i < alerts.values_num; i += ZBX_ALERT_BATCH_SIZE)
	{
		unsigned char	*data;
		zbx_uint32_t	data_len;
		int		to = i + ZBX_ALERT_BATCH_SIZE;

		if (to >= alerts.values_num)
			to = alerts.values_num;

		data_len = zbx_alerter_serialize_alerts(&data, (zbx_am_db_alert_t **)&alerts.values[i], to - i);
		if (FAIL == zbx_ipc_async_socket_send(&amdb->am, ZBX_IPC_ALERTER_ALERTS, data, data_len))
			zabbix_log(LOG_LEVEL_ERR, "failed to queue alerts in alerter");
		zbx_free(data);
	}
#undef ZBX_ALERT_BATCH_SIZE

out:
	zbx_vector_am_db_mediatype_ptr_destroy(&mediatypes);
	zbx_vector_uint64_destroy(&mediatypeids);
	zbx_vector_uint64_destroy(&auth_email_mediatypeids);
	alerts_num = alerts.values_num;
	zbx_vector_am_db_alert_ptr_clear_ext(&alerts, zbx_am_db_alert_free);
	zbx_vector_am_db_alert_ptr_destroy(&alerts);

	return alerts_num;
}

typedef struct
{
	zbx_uint64_t		eventid;
	zbx_vector_tags_ptr_t	tags;
	int			need_to_add_problem_tag;
}
zbx_event_tags_t;

ZBX_PTR_VECTOR_DECL(events_tags, zbx_event_tags_t*)
ZBX_PTR_VECTOR_IMPL(events_tags, zbx_event_tags_t*)

static int	zbx_event_tags_compare_func(const void *d1, const void *d2)
{
	const zbx_event_tags_t	*event_tags_1 = *(const zbx_event_tags_t **)d1;
	const zbx_event_tags_t	*event_tags_2 = *(const zbx_event_tags_t **)d2;

	ZBX_RETURN_IF_NOT_EQUAL(event_tags_1->eventid, event_tags_2->eventid);

	return 0;
}

static void	event_tags_free(zbx_event_tags_t *event_tags)
{
	zbx_vector_tags_ptr_clear_ext(&event_tags->tags, zbx_free_tag);
	zbx_vector_tags_ptr_destroy(&event_tags->tags);
	zbx_free(event_tags);
}

/******************************************************************************
 *                                                                            *
 * Purpose: adds event tags to sql query                                      *
 *                                                                            *
 * Parameters: eventid     - [IN]  problem_tag update db event                *
 *             params      - [IN]  values to process                          *
 *             events_tags - [OUT] vector of events with tags                 *
 *                                                                            *
 * Comments: The event tags are in json object format.                        *
 *                                                                            *
 ******************************************************************************/
static void	am_db_update_event_tags(zbx_uint64_t eventid, const char *params, zbx_vector_events_tags_t *events_tags)
{
	zbx_db_result_t		result;
	zbx_db_row_t		row;
	struct zbx_json_parse	jp, jp_tags;
	const char		*pnext = NULL;
	char			key[ZBX_DB_TAG_NAME_LEN * 4 + 1], value[ZBX_DB_TAG_VALUE_LEN * 4 + 1];
	int			event_tag_index, need_to_add_problem_tag = 0;
	zbx_event_tags_t	*event_tags, local_event_tags;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() eventid:" ZBX_FS_UI64 " tags:%s", __func__, eventid, params);

	result = zbx_db_select("select p.eventid"
			" from events e left join problem p"
				" on p.eventid=e.eventid"
			" where e.eventid=" ZBX_FS_UI64, eventid);

	if (NULL == (row = zbx_db_fetch(result)))
	{
		zabbix_log(LOG_LEVEL_DEBUG, "cannot add event tags: event " ZBX_FS_UI64 " was removed", eventid);
		goto out;
	}

	if (SUCCEED != zbx_db_is_null(row[0]))
		need_to_add_problem_tag = 1;

	if (FAIL == zbx_json_open(params, &jp))
	{
		zabbix_log(LOG_LEVEL_WARNING, "cannot process returned result: %s", zbx_json_strerror());
		goto out;
	}

	if (FAIL == zbx_json_brackets_by_name(&jp, ZBX_PROTO_TAG_TAGS, &jp_tags))
	{
		zabbix_log(LOG_LEVEL_WARNING, "cannot process returned result: missing tags field");
		goto out;
	}

	local_event_tags.eventid = eventid;

	event_tag_index = zbx_vector_events_tags_search(events_tags, &local_event_tags, zbx_event_tags_compare_func);

	if (FAIL == event_tag_index)
	{
		event_tags = (zbx_event_tags_t*) zbx_malloc(NULL, sizeof(zbx_event_tags_t));
		event_tags->eventid = eventid;
		zbx_vector_tags_ptr_create(&(event_tags->tags));
		event_tags->need_to_add_problem_tag = need_to_add_problem_tag;
		zbx_vector_events_tags_append(events_tags, event_tags);
	}
	else
		event_tags = events_tags->values[event_tag_index];

	while (NULL != (pnext = zbx_json_pair_next(&jp_tags, pnext, key, sizeof(key))))
	{
		zbx_tag_t	*tag, tag_local = {.tag = key, .value = value};

		if (NULL == zbx_json_decodevalue(pnext, value, sizeof(value), NULL))
		{
			zabbix_log(LOG_LEVEL_DEBUG, "invalid tag value starting with %s", pnext);
			continue;
		}

		zbx_ltrim(key, ZBX_WHITESPACE);
		zbx_ltrim(value, ZBX_WHITESPACE);

		if (ZBX_DB_TAG_NAME_LEN < zbx_strlen_utf8(key))
			key[zbx_strlen_utf8_nchars(key, ZBX_DB_TAG_NAME_LEN)] = '\0';
		if (ZBX_DB_TAG_VALUE_LEN < zbx_strlen_utf8(value))
			value[zbx_strlen_utf8_nchars(value, ZBX_DB_TAG_VALUE_LEN)] = '\0';

		zbx_rtrim(key, ZBX_WHITESPACE);
		zbx_rtrim(value, ZBX_WHITESPACE);

		if (FAIL == zbx_vector_tags_ptr_search(&(event_tags->tags), &tag_local, zbx_compare_tags_and_values))
		{
			tag = (zbx_tag_t *)zbx_malloc(NULL, sizeof(zbx_tag_t));
			tag->tag = zbx_strdup(NULL, key);
			tag->value = zbx_strdup(NULL, value);
			zbx_vector_tags_ptr_append(&(event_tags->tags), tag);
		}
	}
out:
	zbx_db_free_result(result);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: removes duplicate event tags and checks if problem tags need to   *
 *          be updated                                                        *
 *                                                                            *
 * Parameters: update_event_tags - [IN/OUT] vector of pointers to events with *
 *                                          tags                              *
 *             db_event          - [IN/OUT] event_tag update db event         *
 *             db_problem        - [IN/OUT] problem_tag update db event       *
 *                                                                            *
 ******************************************************************************/
static void	am_db_validate_tags_for_update(zbx_vector_events_tags_t *update_events_tags, zbx_db_insert_t *db_event,
		zbx_db_insert_t *db_problem)
{
	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	for (int i = 0; i < update_events_tags->values_num; i++)
	{
		zbx_tag_t		tag_local, *tag;
		zbx_db_result_t		result;
		zbx_db_row_t		row;
		zbx_event_tags_t	*local_event_tags = update_events_tags->values[i];

		/* remove duplicate tags */
		if (0 != local_event_tags->tags.values_num)
		{
			result = zbx_db_select("select tag,value from event_tag where eventid=" ZBX_FS_UI64,
					local_event_tags->eventid);

			while (NULL != (row = zbx_db_fetch(result)))
			{
				int	index;

				tag_local.tag = row[0];
				tag_local.value = row[1];

				if (FAIL != (index = zbx_vector_tags_ptr_search(&(local_event_tags->tags), &tag_local,
						zbx_compare_tags_and_values)))
				{
					zbx_free_tag(local_event_tags->tags.values[index]);
					zbx_vector_tags_ptr_remove_noorder(&(local_event_tags->tags), index);
				}
			}

			zbx_db_free_result(result);
		}

		for (int j = 0; j < local_event_tags->tags.values_num; j++)
		{
			tag = local_event_tags->tags.values[j];
			zbx_db_insert_add_values(db_event, __UINT64_C(0), local_event_tags->eventid, tag->tag,
					tag->value);

			if (0 != local_event_tags->need_to_add_problem_tag)
			{
				zbx_db_insert_add_values(db_problem, __UINT64_C(0), local_event_tags->eventid,
						tag->tag, tag->value);
			}
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

static void	am_service_add_event_tags(zbx_vector_events_tags_t *events_tags)
{
	unsigned char	*data = NULL;
	size_t		data_alloc = 0, data_offset = 0;

	for (int i = 0; i < events_tags->values_num; i++)
	{
		zbx_event_tags_t	*event_tag = events_tags->values[i];

		zbx_service_serialize_problem_tags(&data, &data_alloc, &data_offset, event_tag->eventid,
				&event_tag->tags);
	}

	if (NULL == data)
		return;

	if (0 != zbx_dc_get_itservices_num())
		zbx_service_flush(ZBX_IPC_SERVICE_SERVICE_PROBLEMS_TAGS, data, data_offset);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Purpose: flushes alert results to database                                 *
 *                                                                            *
 * Parameters: mediatypes - [IN]                                              *
 *             data       - [IN] serialized alert results                     *
 *                                                                            *
 * Return value: count of results                                             *
 *                                                                            *
 ******************************************************************************/
static int	am_db_flush_results(zbx_hashset_t *mediatypes, const unsigned char *data)
{
	int				results_num;
	zbx_vector_events_tags_t	update_events_tags;
	zbx_am_result_t			**results;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_events_tags_create(&update_events_tags);

	zbx_alerter_deserialize_results(data, &results, &results_num);

	if (0 != results_num)
	{
		int 		ret;
		char		*sql;
		size_t		sql_alloc = results_num * 128, sql_offset;
		zbx_db_insert_t	db_event, db_problem;

		sql = (char *)zbx_malloc(NULL, sql_alloc);

		do
		{
			zbx_vector_events_tags_clear_ext(&update_events_tags, event_tags_free);
			sql_offset = 0;

			zbx_db_begin();
			zbx_db_begin_multiple_update(&sql, &sql_alloc, &sql_offset);
			zbx_db_insert_prepare(&db_event, "event_tag", "eventtagid", "eventid", "tag", "value",
					(char *)NULL);
			zbx_db_insert_prepare(&db_problem, "problem_tag", "problemtagid", "eventid", "tag", "value",
					(char *)NULL);

			for (int i = 0; i < results_num; i++)
			{
				zbx_am_db_mediatype_t	*mediatype;
				zbx_am_result_t		*result = results[i];

				zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
						"update alerts set status=%d,retries=%d",
						result->status, result->retries);

				if (NULL != result->error)
				{
					char	*error_esc;

					error_esc = zbx_db_dyn_escape_field("alerts", "error", result->error);
					zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, ",error='%s'", error_esc);
					zbx_free(error_esc);
				}
				else
					zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",error=''");

				zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " where alertid=" ZBX_FS_UI64 ";\n",
						result->alertid);

				if ((EVENT_SOURCE_TRIGGERS == result->source ||
						EVENT_SOURCE_INTERNAL == result->source ||
						EVENT_SOURCE_SERVICE == result->source) && NULL != result->value)
				{
					mediatype = zbx_hashset_search(mediatypes, &result->mediatypeid);
					if (NULL != mediatype && 0 != mediatype->process_tags)
					{
						am_db_update_event_tags(result->eventid, result->value,
								&update_events_tags);
					}
				}
				zbx_db_execute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
			}

			am_db_validate_tags_for_update(&update_events_tags, &db_event, &db_problem);

			zbx_db_end_multiple_update(&sql, &sql_alloc, &sql_offset);
			if (16 < sql_offset)
				zbx_db_execute("%s", sql);

			zbx_db_insert_autoincrement(&db_event, "eventtagid");
			zbx_db_insert_execute(&db_event);
			zbx_db_insert_clean(&db_event);

			zbx_db_insert_autoincrement(&db_problem, "problemtagid");
			zbx_db_insert_execute(&db_problem);
			zbx_db_insert_clean(&db_problem);
		}
		while (ZBX_DB_DOWN == (ret = zbx_db_commit()));

		if (ZBX_DB_OK == ret)
			am_service_add_event_tags(&update_events_tags);

		for (int i = 0; i < results_num; i++)
		{
			zbx_am_result_t	*result = results[i];

			zbx_free(result->value);
			zbx_free(result->error);
			zbx_free(result);
		}

		zbx_free(sql);
	}

	zbx_vector_events_tags_clear_ext(&update_events_tags, event_tags_free);
	zbx_vector_events_tags_destroy(&update_events_tags);
	zbx_free(results);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() flushed:%d", __func__, results_num);

	return results_num;
}

/******************************************************************************
 *                                                                            *
 * Purpose: removes cached media types used more than a day ago               *
 *                                                                            *
 * Parameters: amdb - [IN] alert manager cache                                *
 *                                                                            *
 ******************************************************************************/
static void	am_db_remove_expired_mediatypes(zbx_am_db_t *amdb)
{
	zbx_hashset_iter_t	iter;
	zbx_am_db_mediatype_t	*mediatype;
	time_t			now;
	zbx_vector_uint64_t	dropids;
	int			num;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_uint64_create(&dropids);
	now = time(NULL);
	zbx_hashset_iter_reset(&amdb->mediatypes, &iter);

#define ZBX_MEDIATYPE_CACHE_TTL	SEC_PER_DAY
	while (NULL != (mediatype = (zbx_am_db_mediatype_t *)zbx_hashset_iter_next(&iter)))
	{
		if (mediatype->last_access + ZBX_MEDIATYPE_CACHE_TTL <= now)
		{
			zbx_vector_uint64_append(&dropids, mediatype->mediatypeid);
			zbx_am_db_mediatype_clear(mediatype);
			zbx_hashset_iter_remove(&iter);
		}
	}
#undef ZBX_MEDIATYPE_CACHE_TTL

	if (0 != dropids.values_num)
	{
		unsigned char	*data;
		zbx_uint32_t	data_len;

		data_len = zbx_alerter_serialize_ids(&data, dropids.values, dropids.values_num);
		if (FAIL == zbx_ipc_async_socket_send(&amdb->am, ZBX_IPC_ALERTER_DROP_MEDIATYPES, data, data_len))
			zabbix_log(LOG_LEVEL_ERR, "failed to send request to drop old media types");
		zbx_free(data);
	}

	num = dropids.values_num;
	zbx_vector_uint64_destroy(&dropids);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() removed:%d", __func__, num);
}

/******************************************************************************
 *                                                                            *
 * Purpose: updates watchdog recipients                                       *
 *                                                                            *
 * Parameters: amdb - [IN] alert manager cache                                *
 *                                                                            *
 ******************************************************************************/
static void	am_db_update_watchdog(zbx_am_db_t *amdb)
{
	zbx_db_result_t				result;
	zbx_db_row_t				row;
	int					medias_num = 0;
	zbx_vector_uint64_t			mediatypeids;
	zbx_vector_am_db_mediatype_ptr_t	mediatypes;
	zbx_vector_am_media_ptr_t		medias;
	unsigned char				*data;
	zbx_uint32_t				data_len;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	result = zbx_db_select(
			"select m.mediaid,m.mediatypeid,m.sendto"
			" from media m,users_groups u,config c,media_type mt"
			" where m.userid=u.userid"
				" and u.usrgrpid=c.alert_usrgrpid"
				" and m.mediatypeid=mt.mediatypeid"
				" and m.active=%d"
				" and mt.status=%d"
				" and mt.type<>%d",
				MEDIA_STATUS_ACTIVE,
				MEDIA_TYPE_STATUS_ACTIVE,
				MEDIA_TYPE_WEBHOOK);

	zbx_vector_uint64_create(&mediatypeids);
	zbx_vector_am_media_ptr_create(&medias);
	zbx_vector_am_db_mediatype_ptr_create(&mediatypes);

	/* read watchdog alert recipients */
	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_am_media_t	*media = (zbx_am_media_t *)zbx_malloc(NULL, sizeof(zbx_am_media_t));

		ZBX_STR2UINT64(media->mediaid, row[0]);
		ZBX_STR2UINT64(media->mediatypeid, row[1]);
		media->sendto = zbx_strdup(NULL, row[2]);
		zbx_vector_am_media_ptr_append(&medias, media);
		zbx_vector_uint64_append(&mediatypeids, media->mediatypeid);
	}
	zbx_db_free_result(result);

	/* update media types used for watchdog alerts */

	if (0 != mediatypeids.values_num)
	{
		zbx_vector_uint64_sort(&mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
		zbx_vector_uint64_uniq(&mediatypeids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
		am_db_update_mediatypes(amdb, mediatypeids.values, mediatypeids.values_num, &mediatypes, NULL);

		if (0 != mediatypes.values_num)
		{
			data_len = zbx_alerter_serialize_mediatypes(&data, (zbx_am_db_mediatype_t **)mediatypes.values,
					mediatypes.values_num);
			if (FAIL == zbx_ipc_async_socket_send(&amdb->am, ZBX_IPC_ALERTER_MEDIATYPES, data, data_len))
				zabbix_log(LOG_LEVEL_ERR, "failed to send watchdog media types");
			zbx_free(data);
		}
	}

	data_len = zbx_alerter_serialize_medias(&data, (zbx_am_media_t **)medias.values, medias.values_num);
	if (FAIL == zbx_ipc_async_socket_send(&amdb->am, ZBX_IPC_ALERTER_WATCHDOG, data, data_len))
		zabbix_log(LOG_LEVEL_ERR, "failed to update watchdog recipients");
	zbx_free(data);

	medias_num = medias.values_num;

	zbx_vector_am_media_ptr_clear_ext(&medias, zbx_am_media_free);
	zbx_vector_am_db_mediatype_ptr_destroy(&mediatypes);
	zbx_vector_uint64_destroy(&mediatypeids);
	zbx_vector_am_media_ptr_destroy(&medias);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() recipients:%d", __func__, medias_num);
}

static void	alert_syncer_register(zbx_ipc_async_socket_t *socket)
{
	pid_t	ppid;

	ppid = getppid();

	if (FAIL == zbx_ipc_async_socket_send(socket, ZBX_IPC_ALERT_SYNCER_REGISTER, (unsigned char *)&ppid,
			sizeof(ppid)))
	{
		zabbix_log(LOG_LEVEL_ERR, "failed to send syncer register message");
	}
}

ZBX_THREAD_ENTRY(zbx_alert_syncer_thread, args)
{
#define ZBX_POLL_INTERVAL		1
	zbx_thread_alert_syncer_args	*alert_syncer_args_in = (zbx_thread_alert_syncer_args *)
							(((zbx_thread_args_t *)args)->args);
	int				sleeptime, freq_watchdog, alerts_num;
	zbx_am_db_t			amdb;
	char				*error = NULL;
	const zbx_thread_info_t		*info = &((zbx_thread_args_t *)args)->info;
	int				server_num = ((zbx_thread_args_t *)args)->info.server_num;
	int				process_num = ((zbx_thread_args_t *)args)->info.process_num;
	unsigned char			process_type = ((zbx_thread_args_t *)args)->info.process_type;
	double				time_cleanup = 0,  time_watchdog = 0, time_results = 0, sec1;

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(info->program_type),
			server_num, get_process_type_string(process_type), process_num);

	if (SUCCEED != am_db_init(&amdb, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot initialize alert loader: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type));
	zbx_db_connect(ZBX_DB_CONNECT_NORMAL);

	alert_syncer_register(&amdb.am);

	sleeptime = ZBX_POLL_INTERVAL;

	if (ZBX_WATCHDOG_ALERT_FREQUENCY < (freq_watchdog = alert_syncer_args_in->confsyncer_frequency))
		freq_watchdog = ZBX_WATCHDOG_ALERT_FREQUENCY;

	zbx_setproctitle("%s [queuing alerts]", get_process_type_string(process_type));

	sec1 = zbx_time();
	alerts_num = am_db_queue_alerts(&amdb);

	zbx_setproctitle("%s [queued %d alerts(s) in " ZBX_FS_DBL " sec]",
			get_process_type_string(process_type), alerts_num, zbx_time() - sec1);


	while (ZBX_IS_RUNNING())
	{
		int			results_num = 0;
		zbx_ipc_message_t	*message = NULL;

		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_IDLE);
		if (SUCCEED != zbx_ipc_async_socket_recv(&amdb.am, sleeptime, &message))
		{
			zabbix_log(LOG_LEVEL_CRIT, "cannot read alert syncer request");
			exit(EXIT_FAILURE);
		}
		zbx_update_selfmon_counter(info, ZBX_PROCESS_STATE_BUSY);

		sec1 = zbx_time();

		zbx_update_env(get_process_type_string(process_type), sec1);

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_ALERTER_SYNC_ALERTS:
					zbx_setproctitle("%s [queuing alerts]", get_process_type_string(process_type));
					alerts_num = am_db_queue_alerts(&amdb);
					break;
				case ZBX_IPC_ALERTER_RESULTS:
					results_num = am_db_flush_results(&amdb.mediatypes, message->data);
					break;
				default:
					zabbix_log(LOG_LEVEL_WARNING, "unrecognized message in alert syncer %d",
							message->code);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (time_results + ZBX_POLL_INTERVAL < sec1)
		{
			if (FAIL == zbx_ipc_async_socket_send(&amdb.am, ZBX_IPC_ALERTER_RESULTS, NULL, 0))
				zabbix_log(LOG_LEVEL_ERR, "failed to request alert results");

			time_results = sec1;
		}

		if (time_cleanup + SEC_PER_HOUR < sec1)
		{
			am_db_remove_expired_mediatypes(&amdb);
			time_cleanup = sec1;
		}

		if (time_watchdog + freq_watchdog < sec1)
		{
			am_db_update_watchdog(&amdb);
			time_watchdog = sec1;
		}

		double	sec2 = zbx_time();

		time_t	nextcheck = (time_t)sec1 + ZBX_POLL_INTERVAL;

		sleeptime = (int)((nextcheck > (time_t)sec2) ? nextcheck - (time_t)sec2 : 0);

		zbx_setproctitle("%s [queued %d alerts(s), flushed %d result(s) in " ZBX_FS_DBL " sec, idle %d sec]",
				get_process_type_string(process_type), alerts_num, results_num, sec2 - sec1, sleeptime);
	}

	am_db_clear(&amdb);

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);
#undef ZBX_POLL_INTERVAL
}