/*
** Zabbix
** Copyright (C) 2001-2023 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "ha.h"
#include "zbxha.h"

#include "zbxdbhigh.h"
#include "zbxipcservice.h"
#include "zbxserialize.h"
#include "zbxthreads.h"
#include "zbxmutexs.h"
#include "audit/zbxaudit.h"
#include "audit/zbxaudit_ha.h"
#include "audit/zbxaudit_settings.h"
#include "zbxnum.h"
#include "zbxtime.h"
#include "zbxip.h"
#include "zbxcomms.h"
#include "../rtc/rtc_server.h"

#define ZBX_HA_POLL_PERIOD	5

#define ZBX_HA_NODE_LOCK	1

static pid_t			ha_pid = ZBX_THREAD_ERROR;

extern zbx_cuid_t	ha_sessionid;

typedef struct
{
	zbx_cuid_t	ha_nodeid;

	/* HA status */
	int		ha_status;

	/* database connection status */
	int		db_status;

	/* timestamp in database time */
	int		db_time;

	int		failover_delay;

	/* last access time of active node */
	int		lastaccess_active;

	/* number of ticks active node has not been updated its lastaccess */
	int		offline_ticks_active;

	/* 0 if auditlog is disabled */
	int		auditlog;

	const char	*name;
	char		*error;
}
zbx_ha_info_t;

ZBX_THREAD_ENTRY(ha_manager_thread, args);

typedef struct
{
	zbx_cuid_t	ha_nodeid;
	zbx_cuid_t	ha_sessionid;
	char		*name;
	char		*address;
	unsigned short	port;
	int		status;
	int		lastaccess;
}
zbx_ha_node_t;

ZBX_PTR_VECTOR_DECL(ha_node, zbx_ha_node_t *)
ZBX_PTR_VECTOR_IMPL(ha_node, zbx_ha_node_t *)

static void	zbx_ha_node_free(zbx_ha_node_t *node)
{
	zbx_free(node->name);
	zbx_free(node->address);
	zbx_free(node);
}

static void	ha_set_error(zbx_ha_info_t *info, const char *fmt, ...) __zbx_attr_format_printf(2, 3);
static DB_RESULT	ha_db_select(zbx_ha_info_t *info, const char *sql, ...) __zbx_attr_format_printf(2, 3);
static int	ha_db_execute(zbx_ha_info_t *info, const char *sql, ...) __zbx_attr_format_printf(2, 3);

/******************************************************************************
 *                                                                            *
 * Purpose: check if server is a part of HA cluster                           *
 *                                                                            *
 ******************************************************************************/
static int is_ha_cluster(const char *ha_node_name)
{
	return (NULL != ha_node_name && '\0' != *ha_node_name) ? 1 : 0;
}

/******************************************************************************
 *                                                                            *
 * Purpose: connect, send message and receive response in a given timeout     *
 *                                                                            *
 * Parameters: service_name - [IN] the IPC service name                       *
 *             code         - [IN] the message code                           *
 *             timeout      - [IN] time allowed to be spent on receive, note  *
 *                                 that this does not include open, send and  *
 *                                 flush that have their own timeouts         *
 *             data         - [IN] the data                                   *
 *             size         - [IN] the data size                              *
 *             out          - [OUT] the received message or NULL on error     *
 *                                  The message must be freed by zbx_free()   *
 *             error        - [OUT] the error message                         *
 *                                                                            *
 * Return value: SUCCEED - successfully sent message and received response    *
 *                         or timeout occurred while waiting for response     *
 *               FAIL    - error occurred                                     *
 *                                                                            *
 ******************************************************************************/
static int	ha_manager_send_message(zbx_uint32_t code, int timeout, const unsigned char *data, zbx_uint32_t size,
		unsigned char **out, char **error)
{
	zbx_ipc_message_t	*message;
	zbx_ipc_async_socket_t	asocket;
	int			ret = FAIL;

	if (FAIL == zbx_ipc_async_socket_open(&asocket, ZBX_IPC_SERVICE_HA, timeout, error))
		return FAIL;

	if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size))
	{
		*error = zbx_strdup(NULL, "Cannot send request");
		goto out;
	}

	if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout))
	{
		*error = zbx_strdup(NULL, "Cannot flush request");
		goto out;
	}

	if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message))
	{
		*error = zbx_strdup(NULL, "Cannot receive response");
		goto out;
	}

	if (NULL != message)
	{
		*out = message->data;
		message->data = NULL;
		zbx_ipc_message_free(message);
	}
	else
		*out = NULL;
	ret = SUCCEED;
out:
	zbx_ipc_async_socket_close(&asocket);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: update parent process with ha_status and failover delay           *
 *                                                                            *
 ******************************************************************************/
static void	ha_update_parent(zbx_ipc_async_socket_t *rtc_socket, zbx_ha_info_t *info)
{
	zbx_uint32_t	len = 0, error_len;
	unsigned char	*ptr, *data;
	const char	*error = info->error;
	int		ret;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() ha_status:%s failover_delay:%d info:%s", __func__,
			zbx_ha_status_str(info->ha_status),  info->failover_delay, ZBX_NULL2EMPTY_STR(info->error));

	zbx_serialize_prepare_value(len, info->ha_status);
	zbx_serialize_prepare_value(len, info->failover_delay);
	zbx_serialize_prepare_str(len, error);

	ptr = data = (unsigned char *)zbx_malloc(NULL, len);
	ptr += zbx_serialize_value(ptr, info->ha_status);
	ptr += zbx_serialize_value(ptr, info->failover_delay);
	(void)zbx_serialize_str(ptr, error, error_len);

	if (SUCCEED == (ret = zbx_ipc_async_socket_send(rtc_socket, ZBX_IPC_SERVICE_HA_STATUS_UPDATE, data, len)))
		ret = zbx_ipc_async_socket_flush(rtc_socket, ZBX_HA_SERVICE_TIMEOUT);

	zbx_free(data);

	if (SUCCEED != ret)
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot send HA notification to main process");
		exit(EXIT_FAILURE);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: send heartbeat message to main process                            *
 *                                                                            *
 ******************************************************************************/
static void	ha_send_heartbeat(zbx_ipc_async_socket_t *rtc_socket)
{
	if (SUCCEED != zbx_ipc_async_socket_send(rtc_socket, ZBX_IPC_SERVICE_HA_HEARTBEAT, NULL, 0) ||
			SUCCEED != zbx_ipc_async_socket_flush(rtc_socket, ZBX_HA_SERVICE_TIMEOUT))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot send HA heartbeat to main process");
		exit(EXIT_FAILURE);
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: set HA manager error                                              *
 *                                                                            *
 ******************************************************************************/
static void	ha_set_error(zbx_ha_info_t *info, const char *fmt, ...)
{
	va_list	args;
	size_t	len;

	/* don't override errors */
	if (ZBX_NODE_STATUS_ERROR == info->ha_status)
		return;

	va_start(args, fmt);
	len = (size_t)vsnprintf(NULL, 0, fmt, args) + 1;
	va_end(args);

	info->error = (char *)zbx_malloc(info->error, len);

	va_start(args, fmt);
	vsnprintf(info->error, len, fmt, args);
	va_end(args);

	info->ha_status = ZBX_NODE_STATUS_ERROR;
}

/******************************************************************************
 *                                                                            *
 * Purpose: start database transaction                                        *
 *                                                                            *
 * Comments: Sets error status on non-recoverable database error              *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_begin(zbx_ha_info_t *info)
{
	if (ZBX_DB_DOWN == info->db_status)
		info->db_status = zbx_db_connect(ZBX_DB_CONNECT_ONCE);

	if (ZBX_DB_OK <= info->db_status)
		info->db_status = zbx_db_begin_basic();

	if (ZBX_DB_FAIL == info->db_status)
		ha_set_error(info, "database error");
	else if (ZBX_DB_DOWN == info->db_status)
		zbx_db_close();

	return info->db_status;
}

/******************************************************************************
 *                                                                            *
 * Purpose: roll back database transaction                                    *
 *                                                                            *
 * Comments: Sets error status on non-recoverable database error              *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_rollback(zbx_ha_info_t *info)
{
	if (ZBX_DB_OK > (info->db_status = zbx_db_rollback_basic()))
	{
		if (ZBX_DB_DOWN == info->db_status)
			zbx_db_close();
	}

	if (ZBX_DB_FAIL == info->db_status)
		ha_set_error(info, "database error");
	else if (ZBX_DB_DOWN == info->db_status)
		zbx_db_close();

	return info->db_status;
}

/******************************************************************************
 *                                                                            *
 * Purpose: commit/rollback database transaction depending on commit result   *
 *                                                                            *
 * Comments: Sets error status on non-recoverable database error              *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_commit(zbx_ha_info_t *info)
{
	if (ZBX_DB_OK <= info->db_status)
		info->db_status = zbx_db_commit_basic();

	if (ZBX_DB_OK > info->db_status)
	{
		zbx_db_rollback_basic();

		if (ZBX_DB_FAIL == info->db_status)
			ha_set_error(info, "database error");
		else
			zbx_db_close();
	}

	return info->db_status;
}

/******************************************************************************
 *                                                                            *
 * Purpose: perform database select sql query based on current database       *
 *          connection status                                                 *
 *                                                                            *
 ******************************************************************************/
static DB_RESULT	ha_db_select(zbx_ha_info_t *info, const char *sql, ...)
{
	va_list		args;
	DB_RESULT	result;

	if (ZBX_DB_OK > info->db_status)
		return NULL;

	va_start(args, sql);
	result = zbx_db_vselect(sql, args);
	va_end(args);

	if (NULL == result)
	{
		info->db_status = ZBX_DB_FAIL;
	}
	else if (ZBX_DB_DOWN == (intptr_t)result)
	{
		info->db_status = ZBX_DB_DOWN;
		result = NULL;
	}

	return result;
}

/******************************************************************************
 *                                                                            *
 * Purpose: perform database sql query based on current database              *
 *          connection status                                                 *
 *                                                                            *
 ******************************************************************************/

static int	ha_db_execute(zbx_ha_info_t *info, const char *sql, ...)
{
	va_list	args;

	if (ZBX_DB_OK > info->db_status)
		return FAIL;

	va_start(args, sql);
	info->db_status = zbx_db_vexecute(sql, args);
	va_end(args);

	return ZBX_DB_OK <= info->db_status ? SUCCEED : FAIL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: update HA configuration from database                             *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_update_config(zbx_ha_info_t *info)
{
	DB_RESULT	result;
	DB_ROW		row;

	if (NULL == (result = ha_db_select(info, "select ha_failover_delay,auditlog_enabled from config")))
		return FAIL;

	if (NULL != (row = zbx_db_fetch(result)))
	{
		if (SUCCEED != zbx_is_time_suffix(row[0], &info->failover_delay, ZBX_LENGTH_UNLIMITED))
			THIS_SHOULD_NEVER_HAPPEN;

		info->auditlog = atoi(row[1]);
	}
	else
		THIS_SHOULD_NEVER_HAPPEN;

	zbx_db_free_result(result);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get all nodes from database                                       *
 *                                                                            *
 * Return value: SUCCEED - the nodes were retrieved from database             *
 *               FAIL    - database/connection error                          *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_get_nodes(zbx_ha_info_t *info, zbx_vector_ha_node_t *nodes, int lock)
{
	DB_RESULT	result;
	DB_ROW		row;

	if (NULL == (result = ha_db_select(info, "select ha_nodeid,name,status,lastaccess,address,port,ha_sessionid"
			" from ha_node order by ha_nodeid%s",
			(0 == lock ? "" : ZBX_FOR_UPDATE))))
	{
		return FAIL;
	}

	while (NULL != (row = zbx_db_fetch(result)))
	{
		zbx_ha_node_t	*node;

		node = (zbx_ha_node_t *)zbx_malloc(NULL, sizeof(zbx_ha_node_t));
		zbx_strlcpy(node->ha_nodeid.str, row[0], sizeof(node->ha_nodeid));
		node->name = zbx_strdup(NULL, row[1]);
		node->status = atoi(row[2]);
		node->lastaccess = atoi(row[3]);
		node->address = zbx_strdup(NULL, row[4]);

		if (SUCCEED != zbx_is_ushort(row[5], &node->port))
		{
			zabbix_log(LOG_LEVEL_WARNING, "node \"%s\" has invalid port value \"%s\"", row[1], row[5]);
			node->port = 0;
		}

		zbx_strlcpy(node->ha_sessionid.str, row[6], sizeof(node->ha_sessionid));
		zbx_vector_ha_node_append(nodes, node);
	}

	zbx_db_free_result(result);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if the node is registered in node table and get ID          *
 *                                                                            *
 ******************************************************************************/
static zbx_ha_node_t	*ha_find_node_by_name(zbx_vector_ha_node_t *nodes, const char *name)
{
	int	i;

	for (i = 0; i < nodes->values_num; i++)
	{
		if (0 == strcmp(nodes->values[i]->name, name))
			return nodes->values[i];
	}

	return NULL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get server external address and port from configuration           *
 *                                                                            *
 ******************************************************************************/
static void	ha_get_external_address(char **address, unsigned short *port, zbx_ha_config_t *ha_config)
{
	if (NULL != ha_config->ha_node_address)
	{
		(void)zbx_parse_serveractive_element(ha_config->ha_node_address, address, port, 0);
	}
	else if (NULL != ha_config->default_node_ip)
	{
		char	*tmp;

		zbx_strsplit_first(ha_config->default_node_ip, ',', address, &tmp);
		zbx_free(tmp);
	}

	if (NULL == *address || 0 == strcmp(*address, "0.0.0.0") || 0 == strcmp(*address, "::"))
		*address = zbx_strdup(*address, "localhost");

	if (0 == *port)
	{
		if (0 != ha_config->default_node_port)
			*port = (unsigned short)ha_config->default_node_port;
		else
			*port = ZBX_DEFAULT_SERVER_PORT;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: lock nodes in database                                            *
 *                                                                            *
 * Comments: To lock ha_node table it must have at least one node             *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_lock_nodes(zbx_ha_info_t *info)
{
	DB_RESULT	result;

	if (NULL == (result = ha_db_select(info, "select null from ha_node order by ha_nodeid" ZBX_FOR_UPDATE)))
		return FAIL;

	zbx_db_free_result(result);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: check availability based on lastaccess timestamp, database time   *
 *          and failover delay                                                *
 *                                                                            *
 * Return value: SUCCEED - server can be started in active mode               *
 *               FAIL    - server cannot be started based on node registry    *
 *                                                                            *
 ******************************************************************************/
static int	ha_is_available(const zbx_ha_info_t *info, int lastaccess, int db_time)
{
	if (lastaccess + info->failover_delay <= db_time)
		return FAIL;

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if server can be started in standalone configuration        *
 *                                                                            *
 * Return value: SUCCEED - server can be started in active mode               *
 *               FAIL    - server cannot be started based on node registry    *
 *                                                                            *
 * Comments: Sets error status on configuration errors.                       *
 *                                                                            *
 ******************************************************************************/
static int	ha_check_standalone_config(zbx_ha_info_t *info, zbx_vector_ha_node_t *nodes, int db_time)
{
	int	i;

	for (i = 0; i < nodes->values_num; i++)
	{
		if ('\0' == *nodes->values[i]->name)
			continue;

		if (ZBX_NODE_STATUS_STOPPED != nodes->values[i]->status &&
				SUCCEED == ha_is_available(info, nodes->values[i]->lastaccess, db_time))
		{
			ha_set_error(info, "cannot change mode to standalone while HA node \"%s\" is %s",
					nodes->values[i]->name, zbx_ha_status_str(nodes->values[i]->status));
			return FAIL;
		}
	}

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if server can be started in cluster configuration           *
 *                                                                            *
 * Parameters: info     - [IN] - the HA node information                      *
 *             nodes    - [IN] - the cluster nodes                            *
 *             db_time  - [IN] - the current database timestamp               *
 *             activate - [OUT] SUCCEED - start in active mode                *
 *                              FAIL    - start in standby mode               *
 *                                                                            *
 * Return value: SUCCEED - server can be started in returned mode             *
 *               FAIL    - server cannot be started based on node registry    *
 *                                                                            *
 * Comments: Sets error status on configuration errors.                       *
 *                                                                            *
 ******************************************************************************/
static int	ha_check_cluster_config(zbx_ha_info_t *info, zbx_vector_ha_node_t *nodes, int db_time, int *activate)
{
	int	i;

	*activate = SUCCEED;

	for (i = 0; i < nodes->values_num; i++)
	{
		if (ZBX_NODE_STATUS_STOPPED == nodes->values[i]->status ||
				SUCCEED != ha_is_available(info, nodes->values[i]->lastaccess, db_time))
		{
			continue;
		}

		if ('\0' == *nodes->values[i]->name)
		{
			ha_set_error(info, "cannot change mode to HA while standalone node is %s",
					zbx_ha_status_str(nodes->values[i]->status));
			return FAIL;
		}

		if (0 == strcmp(info->name, nodes->values[i]->name))
		{
			ha_set_error(info, "found %s duplicate \"%s\" node",
					zbx_ha_status_str(nodes->values[i]->status), info->name);
			return FAIL;
		}

		/* immediately switch to active mode if there is no other node that can take over */
		if (ZBX_NODE_STATUS_ACTIVE == nodes->values[i]->status ||
				ZBX_NODE_STATUS_STANDBY == nodes->values[i]->status)
		{
			*activate = FAIL;
		}
	}

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get current database time                                         *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_get_time(zbx_ha_info_t *info, int *db_time)
{
	DB_ROW		row;
	DB_RESULT	result;
	int		ret = FAIL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (NULL == (result = ha_db_select(info, "select " ZBX_DB_TIMESTAMP() " from config")))
		goto out;

	if (NULL != (row = zbx_db_fetch(result)))
		*db_time = atoi(row[0]);
	else
		*db_time = 0;

	zbx_db_free_result(result);

	ret = SUCCEED;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s db_time:%d", __func__, zbx_result_string(ret),
			SUCCEED == ret ? *db_time : -1);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: flush audit taking in account database connection status          *
 *                                                                            *
 ******************************************************************************/
static void	ha_flush_audit(zbx_ha_info_t *info)
{
	if (ZBX_DB_OK > info->db_status)
	{
		zbx_audit_clean();
		return;
	}

	info->db_status = zbx_audit_flush_once();
}

/******************************************************************************
 *                                                                            *
 * Purpose: add new node record in ha_node table if necessary                 *
 *                                                                            *
 * Return value: SUCCEED - node exists, was created or database is offline    *
 *               FAIL    - node configuration or database error               *
 *                                                                            *
 ******************************************************************************/
static void	ha_db_create_node(zbx_ha_info_t *info, zbx_ha_config_t *ha_config)
{
	zbx_vector_ha_node_t	nodes;
	int			i, activate, db_time;
	zbx_cuid_t		nodeid;
	char			*name_esc;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_ha_node_create(&nodes);

	if (ZBX_DB_OK > ha_db_begin(info))
		goto finish;

	if (SUCCEED != ha_db_get_nodes(info, &nodes, 0))
		goto out;

	if (FAIL == ha_db_update_config(info))
		goto out;

	for (i = 0; i < nodes.values_num; i++)
	{
		if (0 == strcmp(info->name, nodes.values[i]->name))
		{
			nodeid = nodes.values[i]->ha_nodeid;
			goto out;
		}
	}

	if (SUCCEED != ha_db_get_time(info, &db_time))
		goto out;

	if (0 != is_ha_cluster(ha_config->ha_node_name))
	{
		if (SUCCEED != ha_check_cluster_config(info, &nodes, db_time, &activate))
			goto out;
	}
	else
	{
		if (SUCCEED != ha_check_standalone_config(info, &nodes, db_time))
			goto out;
	}

	zbx_new_cuid(nodeid.str);
	name_esc = zbx_db_dyn_escape_string(info->name);

	if (SUCCEED == ha_db_execute(info, "insert into ha_node (ha_nodeid,name,status,lastaccess)"
			" values ('%s','%s',%d," ZBX_DB_TIMESTAMP() ")",
			nodeid.str, name_esc, ZBX_NODE_STATUS_STOPPED))
	{
		zbx_audit_init(info->auditlog);
		zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_ADD, nodeid.str, info->name);
		zbx_audit_ha_add_create_fields(nodeid.str, info->name, ZBX_NODE_STATUS_STOPPED);
		ha_flush_audit(info);
	}

	zbx_free(name_esc);
out:
	if (ZBX_NODE_STATUS_ERROR != info->ha_status)
		ha_db_commit(info);
	else
		ha_db_rollback(info);

	if (ZBX_NODE_STATUS_ERROR != info->ha_status)
	{
		if (ZBX_DB_OK <= info->db_status)
			info->ha_nodeid = nodeid;
	}
finish:
	zbx_vector_ha_node_clear_ext(&nodes, zbx_ha_node_free);
	zbx_vector_ha_node_destroy(&nodes);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: check for active and standby node availability and update         *
 *          unavailable nodes accordingly                                     *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_check_unavailable_nodes(zbx_ha_info_t *info, zbx_vector_ha_node_t *nodes, int db_time)
{
	int	i, ret = SUCCEED;

	zbx_vector_str_t	unavailable_nodes;

	zbx_vector_str_create(&unavailable_nodes);

	for (i = 0; i < nodes->values_num; i++)
	{
		if (SUCCEED == zbx_cuid_compare(nodes->values[i]->ha_nodeid, info->ha_nodeid))
			continue;

		if (ZBX_NODE_STATUS_STANDBY != nodes->values[i]->status &&
				ZBX_NODE_STATUS_ACTIVE != nodes->values[i]->status)
		{
			continue;
		}

		if (db_time >= nodes->values[i]->lastaccess + info->failover_delay)
		{
			zbx_vector_str_append(&unavailable_nodes, nodes->values[i]->ha_nodeid.str);

			zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_UPDATE, nodes->values[i]->ha_nodeid.str,
					nodes->values[i]->name);
			zbx_audit_ha_update_field_int(nodes->values[i]->ha_nodeid.str, ZBX_AUDIT_HA_STATUS,
					nodes->values[i]->status, ZBX_NODE_STATUS_UNAVAILABLE);
		}
	}

	if (0 != unavailable_nodes.values_num)
	{
		char	*sql = NULL;
		size_t	sql_alloc = 0, sql_offset = 0;

		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update ha_node set status=%d where",
				ZBX_NODE_STATUS_UNAVAILABLE);

		zbx_db_add_str_condition_alloc(&sql, &sql_alloc, &sql_offset, "ha_nodeid",
				(const char **)unavailable_nodes.values, unavailable_nodes.values_num);

		ret = ha_db_execute(info, "%s", sql);
		zbx_free(sql);
	}

	zbx_vector_str_destroy(&unavailable_nodes);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: register server node                                              *
 *                                                                            *
 * Return value: SUCCEED - node was registered or database was offline        *
 *               FAIL    - fatal error                                        *
 *                                                                            *
 * Comments: If registration was successful the status will be set to either  *
 *           active or standby. If database connection was lost the status    *
 *           will stay unknown until another registration attempt succeeds.   *
 *                                                                            *
 *           In the case of critical error the error status will be set.      *
 *                                                                            *
 ******************************************************************************/
static void	ha_db_register_node(zbx_ha_info_t *info, zbx_ha_config_t *ha_config)
{
	zbx_vector_ha_node_t	nodes;
	int			ha_status = ZBX_NODE_STATUS_UNKNOWN, activate = SUCCEED, db_time;
	char			*address = NULL, *sql = NULL;
	size_t			sql_alloc = 0, sql_offset = 0;
	unsigned short		port = 0;
	zbx_ha_node_t		*node;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_vector_ha_node_create(&nodes);

	ha_db_create_node(info, ha_config);

	if (SUCCEED == zbx_cuid_empty(info->ha_nodeid))
		goto finish;

	if (ZBX_DB_OK > ha_db_begin(info))
		goto finish;

	if (SUCCEED != ha_db_get_nodes(info, &nodes, ZBX_HA_NODE_LOCK))
		goto out;

	if (SUCCEED != ha_db_get_time(info, &db_time))
		goto out;

	if (0 != is_ha_cluster(ha_config->ha_node_name))
	{
		if (SUCCEED != ha_check_cluster_config(info, &nodes, db_time, &activate))
			goto out;
	}
	else
	{
		if (SUCCEED != ha_check_standalone_config(info, &nodes, db_time))
			goto out;
	}

	if (NULL == (node = ha_find_node_by_name(&nodes, info->name)))
	{
		ha_set_error(info, "cannot find server node \"%s\" in registry", info->name);
		goto out;
	}

	ha_status = SUCCEED == activate ? ZBX_NODE_STATUS_ACTIVE : ZBX_NODE_STATUS_STANDBY;
	ha_get_external_address(&address, &port, ha_config);

	zbx_audit_init(info->auditlog);
	zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_UPDATE, info->ha_nodeid.str, info->name);

	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update ha_node set lastaccess="
				ZBX_DB_TIMESTAMP() ",ha_sessionid='%s'", ha_sessionid.str);

	if (ha_status != node->status)
	{
		zbx_audit_ha_update_field_int(info->ha_nodeid.str, ZBX_AUDIT_HA_STATUS, node->status, ha_status);
		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, ",status=%d", ha_status);
	}

	if (0 != strcmp(address, node->address))
	{
		char	*address_esc;

		address_esc = zbx_db_dyn_escape_string(address);
		zbx_audit_ha_update_field_string(node->ha_nodeid.str, ZBX_AUDIT_HA_ADDRESS, node->address, address);
		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, ",address='%s'", address_esc);
		zbx_free(address_esc);
	}

	if (port != node->port)
	{
		zbx_audit_ha_update_field_int(info->ha_nodeid.str, ZBX_AUDIT_HA_PORT, node->port, port);
		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, ",port=%d", port);
	}

	if (SUCCEED == ha_db_execute(info, "%s where ha_nodeid='%s'", sql, info->ha_nodeid.str))
	{
		if (0 != is_ha_cluster(ha_config->ha_node_name))
			ha_db_execute(info, "delete from ha_node where name=''");
		else
			ha_db_execute(info, "delete from ha_node where name<>''");
	}

	if (0 != is_ha_cluster(ha_config->ha_node_name) && ZBX_NODE_STATUS_ERROR != info->ha_status &&
			ZBX_NODE_STATUS_ACTIVE == ha_status)
	{
		ha_db_check_unavailable_nodes(info, &nodes, db_time);
	}

	ha_flush_audit(info);

	zbx_free(sql);
	zbx_free(address);
out:
	if (ZBX_NODE_STATUS_ERROR != info->ha_status)
		ha_db_commit(info);
	else
		ha_db_rollback(info);

	if (ZBX_NODE_STATUS_ERROR != info->ha_status)
	{
		if (ZBX_DB_OK <= info->db_status)
			info->ha_status = ha_status;
	}
finish:
	zbx_vector_ha_node_clear_ext(&nodes, zbx_ha_node_free);
	zbx_vector_ha_node_destroy(&nodes);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() nodeid:%s ha_status:%s db_status:%d", __func__,
			info->ha_nodeid.str, zbx_ha_status_str(info->ha_status), info->db_status);
}

/******************************************************************************
 *                                                                            *
 * Purpose: check for standby nodes being unavailable for failrover_delay     *
 *          seconds and mark them unavailable                                 *
 *                                                                            *
 ******************************************************************************/
static int	ha_check_standby_nodes(zbx_ha_info_t *info, zbx_vector_ha_node_t *nodes, int db_time)
{
	int	ret;

	zbx_audit_init(info->auditlog);

	if (SUCCEED == (ret = ha_db_check_unavailable_nodes(info, nodes, db_time)))
		ha_flush_audit(info);
	else
		zbx_audit_clean();

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: check for active nodes being unavailable for failover_delay       *
 *          seconds, mark them unavailable and set own status to active       *
 *                                                                            *
 ******************************************************************************/
static int	ha_check_active_node(zbx_ha_info_t *info, zbx_vector_ha_node_t *nodes, int *unavailable_index,
		int *ha_status)
{
	int	i, ret = SUCCEED;

	for (i = 0; i < nodes->values_num; i++)
	{
		if (ZBX_NODE_STATUS_ACTIVE == nodes->values[i]->status)
		{
			if ('\0' == *nodes->values[i]->name)
			{
				ha_set_error(info, "found active standalone node in HA mode");
				return FAIL;
			}

			break;
		}
	}

	/* 1) No active nodes - set this node as active.                */
	/* 2) This node is active - update its status as it might have  */
	/*    switched itself to standby mode in the case of prolonged  */
	/*    database connection loss.                                 */
	if (i == nodes->values_num || SUCCEED == zbx_cuid_compare(nodes->values[i]->ha_nodeid, info->ha_nodeid))
	{
		*ha_status = ZBX_NODE_STATUS_ACTIVE;
	}
	else
	{
		if (nodes->values[i]->lastaccess != info->lastaccess_active)
		{
			info->lastaccess_active = nodes->values[i]->lastaccess;
			info->offline_ticks_active = 0;
		}
		else
			info->offline_ticks_active++;

		if (info->failover_delay / ZBX_HA_POLL_PERIOD < info->offline_ticks_active)
		{
			*unavailable_index = i;
			*ha_status = ZBX_NODE_STATUS_ACTIVE;
		}
	}

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: check HA status based on nodes                                    *
 *                                                                            *
 * Comments: Sets error status on critical errors forcing manager to exit     *
 *                                                                            *
 ******************************************************************************/
static void	ha_check_nodes(zbx_ha_info_t *info, zbx_ha_config_t *ha_config)
{
	zbx_vector_ha_node_t	nodes;
	zbx_ha_node_t		*node;
	int			ha_status, db_time, unavailable_index = FAIL;
	char			*sql = NULL;
	size_t			sql_alloc = 0, sql_offset = 0;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() ha_status:%s db_status:%d", __func__, zbx_ha_status_str(info->ha_status),
			info->db_status);

	zbx_vector_ha_node_create(&nodes);

	if (ZBX_DB_OK > ha_db_begin(info))
		goto finish;

	ha_status = info->ha_status;

	if (SUCCEED != ha_db_get_nodes(info, &nodes, ZBX_HA_NODE_LOCK))
		goto out;

	if (NULL == (node = ha_find_node_by_name(&nodes, info->name)))
	{
		ha_set_error(info, "cannot find server node \"%s\" in registry", info->name);
		goto out;
	}

	if (SUCCEED != zbx_cuid_compare(ha_sessionid, node->ha_sessionid))
	{
		if ('\0' == *info->name)
		{
			ha_set_error(info, "multiple servers have been started without configuring \"HANodeName\" "
					"parameter");
		}
		else
			ha_set_error(info, "the server HA registry record has changed ownership");
		goto out;
	}

	/* update nodeid after manager restart */
	if (SUCCEED == zbx_cuid_empty(info->ha_nodeid))
		info->ha_nodeid = node->ha_nodeid;

	if (SUCCEED != ha_db_update_config(info))
		goto out;

	if (SUCCEED != ha_db_get_time(info, &db_time))
		goto out;

	if (0 != is_ha_cluster(ha_config->ha_node_name))
	{
		if (ZBX_NODE_STATUS_ACTIVE == info->ha_status)
		{
			if (SUCCEED != ha_check_standby_nodes(info, &nodes, db_time))
				goto out;
		}
		else /* passive status */
		{
			if (SUCCEED != ha_check_active_node(info, &nodes, &unavailable_index, &ha_status))
				goto out;
		}
	}

	zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "update ha_node set lastaccess=" ZBX_DB_TIMESTAMP());

	zbx_audit_init(info->auditlog);

	if (ha_status != node->status)
	{
		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, ",status=%d", ha_status);

		zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_UPDATE, node->ha_nodeid.str, node->name);
		zbx_audit_ha_update_field_int(node->ha_nodeid.str, ZBX_AUDIT_HA_STATUS, node->status,
				ha_status);
	}

	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " where ha_nodeid='%s'", info->ha_nodeid.str);

	if (SUCCEED == ha_db_execute(info, "%s", sql) && FAIL != unavailable_index)
	{
		zbx_ha_node_t	*last_active = nodes.values[unavailable_index];

		ha_db_execute(info, "update ha_node set status=%d where ha_nodeid='%s'",
				ZBX_NODE_STATUS_UNAVAILABLE, last_active->ha_nodeid.str);

		zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_UPDATE, last_active->ha_nodeid.str, last_active->name);
		zbx_audit_ha_update_field_int(last_active->ha_nodeid.str, ZBX_AUDIT_HA_STATUS, last_active->status,
				ZBX_NODE_STATUS_UNAVAILABLE);
	}

	ha_flush_audit(info);

	zbx_free(sql);
out:
	if (ZBX_NODE_STATUS_ERROR != info->ha_status)
		ha_db_commit(info);
	else
		ha_db_rollback(info);

	if (ZBX_NODE_STATUS_ERROR != info->ha_status)
	{
		if (ZBX_DB_OK <= info->db_status)
			info->ha_status = ha_status;
	}
finish:
	zbx_vector_ha_node_clear_ext(&nodes, zbx_ha_node_free);
	zbx_vector_ha_node_destroy(&nodes);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() nodeid:%s ha_status:%s db_status:%d", __func__,
			info->ha_nodeid.str, zbx_ha_status_str(info->ha_status), info->db_status);
}

/******************************************************************************
 *                                                                            *
 * Purpose: update node lastaccess                                            *
 *                                                                            *
 ******************************************************************************/
static void	ha_db_update_lastaccess(zbx_ha_info_t *info)
{
	zabbix_log(LOG_LEVEL_DEBUG, "In %s() ha_status:%s", __func__, zbx_ha_status_str(info->ha_status));

	if (ZBX_DB_OK > ha_db_begin(info))
		goto out;

	if (SUCCEED == ha_db_lock_nodes(info) &&
			SUCCEED == ha_db_execute(info, "update ha_node set lastaccess=" ZBX_DB_TIMESTAMP()
					" where ha_nodeid='%s'", info->ha_nodeid.str))
	{
		ha_db_commit(info);
	}
	else
		ha_db_rollback(info);
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get cluster status in lld compatible json format                  *
 *                                                                            *
 ******************************************************************************/
static int	ha_db_get_nodes_json(zbx_ha_info_t *info, char **nodes_json, char **error, zbx_ha_config_t *ha_config)
{
	zbx_vector_ha_node_t	nodes;
	int			i, db_time, ret = FAIL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (ZBX_DB_OK > info->db_status)
		goto out;

	if (0 == is_ha_cluster(ha_config->ha_node_name))
	{
		/* return empty json array in standalone mode */
		*nodes_json = zbx_strdup(NULL, "[]");
		ret = SUCCEED;
		goto out;
	}

	if (SUCCEED != ha_db_get_time(info, &db_time))
		goto out;

	zbx_vector_ha_node_create(&nodes);

	if (SUCCEED == ha_db_get_nodes(info, &nodes, 0))
	{
		struct zbx_json	j;
		char		address[512];

		zbx_json_initarray(&j, 1024);

		for (i = 0; i < nodes.values_num; i++)
		{
			zbx_snprintf(address, sizeof(address), "%s:%hu", nodes.values[i]->address,
					nodes.values[i]->port);
			zbx_json_addobject(&j, NULL);

			zbx_json_addstring(&j, ZBX_PROTO_TAG_ID, nodes.values[i]->ha_nodeid.str, ZBX_JSON_TYPE_STRING);
			zbx_json_addstring(&j, ZBX_PROTO_TAG_NAME, nodes.values[i]->name, ZBX_JSON_TYPE_STRING);
			zbx_json_addint64(&j, ZBX_PROTO_TAG_STATUS, (zbx_int64_t)nodes.values[i]->status);
			zbx_json_addint64(&j, ZBX_PROTO_TAG_LASTACCESS, (zbx_int64_t)nodes.values[i]->lastaccess);
			zbx_json_addstring(&j, ZBX_PROTO_TAG_ADDRESS, address, ZBX_JSON_TYPE_STRING);
			zbx_json_addint64(&j, ZBX_PROTO_TAG_DB_TIMESTAMP, (zbx_int64_t)db_time);
			zbx_json_addint64(&j, ZBX_PROTO_TAG_LASTACCESS_AGE,
					(zbx_int64_t)(db_time -nodes.values[i]->lastaccess));

			zbx_json_close(&j);
		}

		*nodes_json = zbx_strdup(NULL, j.buffer);
		zbx_json_free(&j);

		ret = SUCCEED;
	}

	zbx_vector_ha_node_clear_ext(&nodes, zbx_ha_node_free);
	zbx_vector_ha_node_destroy(&nodes);
out:
	if (SUCCEED != ret)
		*error = zbx_strdup(NULL, "database error");

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: remove node by its cuid or name                                   *
 *                                                                            *
 ******************************************************************************/
static int	ha_remove_node_impl(zbx_ha_info_t *info, const char *node, char **result, char **error)
{
	zbx_vector_ha_node_t	nodes;
	int			i, ret = FAIL;

	if (ZBX_DB_OK > ha_db_begin(info))
	{
		*error = zbx_strdup(NULL, "database connection problem");
		return FAIL;
	}

	zbx_vector_ha_node_create(&nodes);

	if (SUCCEED != ha_db_get_nodes(info, &nodes, 0))
	{
		*error = zbx_strdup(NULL, "database connection problem");
		goto out;
	}

	for (i = 0; i < nodes.values_num; i++)
	{
		if (0 == strcmp(node, nodes.values[i]->ha_nodeid.str))
			break;
	}

	if (i == nodes.values_num)
	{
		for (i = 0; i < nodes.values_num; i++)
		{
			if (0 == strcmp(node, nodes.values[i]->name))
				break;
		}
	}

	if (i == nodes.values_num)
	{
		*error = zbx_dsprintf(NULL, "unknown node \"%s\"", node);
		goto out;
	}

	if (ZBX_NODE_STATUS_ACTIVE == nodes.values[i]->status || ZBX_NODE_STATUS_STANDBY == nodes.values[i]->status)
	{
		*error = zbx_dsprintf(NULL, "node \"%s\" is %s", nodes.values[i]->name,
				zbx_ha_status_str(nodes.values[i]->status));
		goto out;
	}

	if (SUCCEED != ha_db_execute(info, "delete from ha_node where ha_nodeid='%s'", nodes.values[i]->ha_nodeid.str))
	{
		*error = zbx_strdup(NULL, "database connection problem");
		goto out;
	}
	else
	{
		zbx_audit_init(info->auditlog);
		zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_DELETE, nodes.values[i]->ha_nodeid.str,
				nodes.values[i]->name);
		ha_flush_audit(info);
	}

	ret = SUCCEED;
out:
	if (SUCCEED == ret)
	{
		if (ZBX_DB_OK <= ha_db_commit(info))
		{
			size_t	result_alloc = 0, result_offset = 0;

			zbx_strlog_alloc(LOG_LEVEL_WARNING, result, &result_alloc, &result_offset,
					"removed node \"%s\" with ID \"%s\"", nodes.values[i]->name,
					nodes.values[i]->ha_nodeid.str);
		}
	}
	else
		ha_db_rollback(info);

	zbx_vector_ha_node_clear_ext(&nodes, zbx_ha_node_free);
	zbx_vector_ha_node_destroy(&nodes);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: report cluster status in log file                                 *
 *                                                                            *
 ******************************************************************************/
static void	ha_remove_node(zbx_ha_info_t *info, zbx_ipc_client_t *client, const zbx_ipc_message_t *message)
{
	char		*error = NULL, *result = NULL;
	zbx_uint32_t	len = 0, error_len, result_len;
	unsigned char	*data, *ptr;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	ha_remove_node_impl(info, (const char *)message->data, &result, &error);

	zbx_serialize_prepare_str(len, result);
	zbx_serialize_prepare_str(len, error);

	ptr = data = zbx_malloc(NULL, len);
	ptr += zbx_serialize_str(ptr, result, result_len);
	zbx_serialize_str(ptr, error, error_len);

	zbx_free(error);
	zbx_free(result);

	zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_REMOVE_NODE, data, len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: reply to ha_status request                                        *
 *                                                                            *
 ******************************************************************************/
static void	ha_send_status(zbx_ha_info_t *info, zbx_ipc_client_t *client)
{
	zbx_uint32_t	len = 0, error_len;
	unsigned char	*ptr, *data;
	const char	*error = info->error;
	int		ret;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() ha_status:%s info:%s", __func__, zbx_ha_status_str(info->ha_status),
			ZBX_NULL2EMPTY_STR(info->error));

	zbx_serialize_prepare_value(len, info->ha_status);
	zbx_serialize_prepare_value(len, info->failover_delay);
	zbx_serialize_prepare_str(len, error);

	ptr = data = (unsigned char *)zbx_malloc(NULL, len);
	ptr += zbx_serialize_value(ptr, info->ha_status);
	ptr += zbx_serialize_value(ptr, info->failover_delay);
	(void)zbx_serialize_str(ptr, error, error_len);

	ret = zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_STATUS, data, len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_sysinfo_ret_string(ret));
}

/******************************************************************************
 *                                                                            *
 * Purpose: set failover delay                                                *
 *                                                                            *
 ******************************************************************************/
static void	ha_set_failover_delay(zbx_ha_info_t *info, zbx_ipc_client_t *client, const zbx_ipc_message_t *message)
{
	int		delay;
	const char	*error = NULL;
	zbx_uint32_t	len = 0, error_len;
	unsigned char	*data;
	DB_RESULT	result;
	DB_ROW		row;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (NULL == (result = ha_db_select(info, "select configid,ha_failover_delay from config")))
	{
		error = "database error";
		goto out;
	}

	memcpy(&delay, message->data, sizeof(delay));

	if (NULL != (row = zbx_db_fetch(result)) &&
		SUCCEED == ha_db_execute(info, "update config set ha_failover_delay=%d", delay))
	{
		zbx_uint64_t	configid;

		info->failover_delay = delay;
		zabbix_log(LOG_LEVEL_WARNING, "HA failover delay set to %ds", delay);

		ZBX_STR2UINT64(configid, row[0]);
		zbx_audit_init(info->auditlog);
		zbx_audit_settings_create_entry(ZBX_AUDIT_ACTION_UPDATE, configid);
		zbx_audit_settings_update_field_int(configid, "settings.ha_failover_delay", atoi(row[1]), delay);
		ha_flush_audit(info);
	}
	else
		error = "database error";

	zbx_db_free_result(result);
out:
	zbx_serialize_prepare_str(len, error);

	data = zbx_malloc(NULL, len);
	zbx_serialize_str(data, error, error_len);

	zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_SET_FAILOVER_DELAY, data, len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: get failover delay                                                *
 *                                                                            *
 ******************************************************************************/
static void	ha_get_failover_delay(zbx_ha_info_t *info, zbx_ipc_client_t *client)
{

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_GET_FAILOVER_DELAY, (const unsigned char *)&info->failover_delay,
			(zbx_uint32_t)sizeof(info->failover_delay));

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/******************************************************************************
 *                                                                            *
 * Purpose: reply to get nodes request                                        *
 *                                                                            *
 ******************************************************************************/
static void	ha_send_node_list(zbx_ha_info_t *info, zbx_ipc_client_t *client, zbx_ha_config_t *ha_config)
{
	int		ret;
	char		*error = NULL, *nodes_json = NULL, *str;
	zbx_uint32_t	len = 0, str_len;
	unsigned char	*data, *ptr;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (SUCCEED == (ret = ha_db_get_nodes_json(info, &nodes_json, &error, ha_config)))
		str = nodes_json;
	else
		str = error;

	zbx_serialize_prepare_value(len, ret);
	zbx_serialize_prepare_str(len, str);

	ptr = data = zbx_malloc(NULL, len);
	ptr += zbx_serialize_value(ptr, ret);
	(void)zbx_serialize_str(ptr, str, str_len);
	zbx_free(str);

	zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_GET_NODES, data, len);
	zbx_free(data);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Purpose: update node status in database on shutdown                        *
 *                                                                            *
 ******************************************************************************/
static void	ha_db_update_exit_status(zbx_ha_info_t *info)
{
	if (ZBX_NODE_STATUS_ACTIVE != info->ha_status && ZBX_NODE_STATUS_STANDBY != info->ha_status)
		return;

	if (ZBX_DB_OK > ha_db_begin(info))
		return;

	if (SUCCEED != ha_db_lock_nodes(info))
		goto out;

	if (SUCCEED == ha_db_execute(info, "update ha_node set status=%d where ha_nodeid='%s'",
			ZBX_NODE_STATUS_STOPPED, info->ha_nodeid.str))
	{
		zbx_audit_init(info->auditlog);
		zbx_audit_ha_create_entry(ZBX_AUDIT_ACTION_UPDATE, info->ha_nodeid.str, info->name);
		zbx_audit_ha_update_field_int(info->ha_nodeid.str, ZBX_AUDIT_HA_STATUS, info->ha_status, ZBX_NODE_STATUS_STOPPED);
		ha_flush_audit(info);
	}
out:
	ha_db_commit(info);
}

/*
 * public API
 */

/******************************************************************************
 *                                                                            *
 * Purpose: get HA manager status                                             *
 *                                                                            *
 * Comments: In the case of timeout the ha_status will be force to:           *
 *   standby - for cluster setup                                              *
 *   active  - for standalone setup                                           *
 *                                                                            *
 ******************************************************************************/
int	zbx_ha_get_status(const char *ha_node_name, int *ha_status, int *ha_failover_delay, char **error)
{
	int		ret;
	unsigned char	*result = NULL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (SUCCEED == (ret = ha_manager_send_message(ZBX_IPC_SERVICE_HA_STATUS, ZBX_HA_SERVICE_TIMEOUT, NULL, 0,
			&result, error)))
	{
		if (NULL != result)
		{
			unsigned char	*ptr = result;
			zbx_uint32_t	len;

			ptr += zbx_deserialize_value(ptr, ha_status);
			ptr += zbx_deserialize_value(ptr, ha_failover_delay);
			(void)zbx_deserialize_str(ptr, error, len);

			zbx_free(result);

			if (ZBX_NODE_STATUS_ERROR == *ha_status)
				ret = FAIL;
		}
		else
		{
			if (0 != is_ha_cluster(ha_node_name))
				*ha_status = ZBX_NODE_STATUS_STANDBY;
			else
				*ha_status = ZBX_NODE_STATUS_ACTIVE;
		}
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: handle HA manager notifications                                   *
 *                                                                            *
 * Comments: This function also monitors heartbeat notifications and          *
 *           returns standby status if no heartbeats are received for         *
 *           failover delay - poll period seconds. This would make main       *
 *           process to switch to standby mode and initiate teardown process  *
 *                                                                            *
 ******************************************************************************/
int	zbx_ha_dispatch_message(const char *ha_node_name, zbx_ipc_message_t *message, int *ha_status,
		int *ha_failover_delay, char **error)
{
	static time_t	last_hb;
	int		ret = SUCCEED, ha_status_old;
	time_t		now;
	unsigned char	*ptr;
	zbx_uint32_t	len;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	now = time(NULL);

	if (NULL != message)
	{
		switch (message->code)
		{
			case ZBX_IPC_SERVICE_HA_STATUS_UPDATE:
				ha_status_old = *ha_status;

				ptr = message->data;
				ptr += zbx_deserialize_value(ptr, ha_status);
				ptr += zbx_deserialize_value(ptr, ha_failover_delay);
				(void)zbx_deserialize_str(ptr, error, len);

				if (ZBX_NODE_STATUS_ERROR == *ha_status)
				{
					ret = FAIL;
					goto out;
				}

				/* reset heartbeat on status change */
				if (ha_status_old != *ha_status)
					last_hb = now;

				break;
			case ZBX_IPC_SERVICE_HA_HEARTBEAT:
				last_hb = now;
				break;
		}
	}

	if (is_ha_cluster(ha_node_name) && 0 != last_hb)
	{
		if (last_hb + *ha_failover_delay - ZBX_HA_POLL_PERIOD <= now || now < last_hb)
		{
			last_hb = 0;

			if (ZBX_NODE_STATUS_ACTIVE == *ha_status)
				*ha_status = ZBX_NODE_STATUS_STANDBY;
			else
				*ha_status = ZBX_NODE_STATUS_HATIMEOUT;
		}
	}
out:
	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: start HA manager                                                  *
 *                                                                            *
 ******************************************************************************/
int	zbx_ha_start(zbx_rtc_t *rtc, zbx_ha_config_t *ha_config, char **error)
{
	int			ret = FAIL, status;
	zbx_uint32_t		code = 0;
	zbx_thread_args_t	args;
	zbx_ipc_client_t	*client;
	zbx_ipc_message_t	*message;
	zbx_timespec_t		rtc_timeout = {1, 0};
	time_t			now, start;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	args.args = (void *)ha_config;
	zbx_thread_start(ha_manager_thread, &args, &ha_pid);

	if (ZBX_THREAD_ERROR == ha_pid)
	{
		*error = zbx_dsprintf(NULL, "cannot create HA manager process: %s", zbx_strerror(errno));
		goto out;
	}

	start = now = time(NULL);

	/* Add few seconds to allow HA manager to terminate by its own in the case of RTC timeout. */
	/* Otherwise it will get killed before logging timeout error.                              */
	while (start + ZBX_HA_SERVICE_TIMEOUT + 5 > now)
	{
		(void)zbx_ipc_service_recv(&rtc->service, &rtc_timeout, &client, &message);

		if (NULL != client)
			zbx_ipc_client_release(client);

		if (NULL != message)
		{
			code = message->code;
			zbx_ipc_message_free(message);

			if (ZBX_IPC_SERVICE_HA_REGISTER == code)
				break;
		}

		if (0 < waitpid(ha_pid, &status, WNOHANG))
		{
			ha_pid = ZBX_THREAD_ERROR;
			*error = zbx_strdup(NULL, "HA manager has stopped during startup registration");
			goto out;
		}

		now = time(NULL);
	}

	if (ZBX_IPC_SERVICE_HA_REGISTER != code)
	{
		*error = zbx_strdup(NULL, "timeout while waiting for HA manager registration");
		goto out;
	}

	ret = SUCCEED;
out:
	if (SUCCEED != ret)
	{
#ifdef HAVE_PTHREAD_PROCESS_SHARED
		zbx_locks_disable();
#endif
		zbx_ha_kill();
	}

	zbx_free(ha_config);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: pause HA manager                                                  *
 *                                                                            *
 * Comments: HA manager must be paused before stopping it normally            *
 *                                                                            *
 ******************************************************************************/
int	zbx_ha_pause(char **error)
{
	int		ret;
	unsigned char	*result = NULL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	ret = zbx_ipc_async_exchange(ZBX_IPC_SERVICE_HA, ZBX_IPC_SERVICE_HA_PAUSE, ZBX_HA_SERVICE_TIMEOUT, NULL, 0,
			&result, error);
	zbx_free(result);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: stop  HA manager                                                  *
 *                                                                            *
 * Comments: This function is used to stop HA manager on normal shutdown      *
 *                                                                            *
 ******************************************************************************/
int	zbx_ha_stop(char **error)
{
	int		ret = FAIL;
	unsigned char	*result = NULL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (ZBX_THREAD_ERROR == ha_pid || 0 != kill(ha_pid, 0))
	{
		ret = SUCCEED;
		goto out;
	}

	if (SUCCEED == zbx_ipc_async_exchange(ZBX_IPC_SERVICE_HA, ZBX_IPC_SERVICE_HA_STOP, ZBX_HA_SERVICE_TIMEOUT,
			NULL, 0, &result, error))
	{
		zbx_free(result);

		if (ZBX_THREAD_ERROR == zbx_thread_wait(ha_pid))
		{
			*error = zbx_dsprintf(NULL, "failed to wait for HA manager to exit: %s", zbx_strerror(errno));
			goto out;
		}

		ret = SUCCEED;
	}
out:
	if (SUCCEED == ret)
		ha_pid = ZBX_THREAD_ERROR;

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Purpose: kill HA manager                                                   *
 *                                                                            *
 ******************************************************************************/
void	zbx_ha_kill(void)
{
	if (ZBX_THREAD_ERROR != ha_pid)
	{
		kill(ha_pid, SIGKILL);
		zbx_thread_wait(ha_pid);
		ha_pid = ZBX_THREAD_ERROR;
	}
}

/******************************************************************************
 *                                                                            *
 * Purpose: check if the pid is HA manager pid                                *
 *                                                                            *
 ******************************************************************************/
int	zbx_ha_check_pid(pid_t pid)
{
	return pid == ha_pid ? SUCCEED : FAIL;
}

/*
 * main process loop
 */
ZBX_THREAD_ENTRY(ha_manager_thread, args)
{
	zbx_ipc_service_t	service;
	char			*error = NULL;
	zbx_ipc_client_t	*client;
	zbx_ipc_async_socket_t	rtc_socket;
	zbx_ipc_message_t	*message;
	int			pause = FAIL, stop = FAIL, ticks_num = 0, nextcheck;
	double			now, tick;
	zbx_ha_info_t		info;
	zbx_timespec_t		timeout;
	zbx_ha_config_t		ha_config;

	zbx_setproctitle("ha manager");

	zabbix_log(LOG_LEVEL_INFORMATION, "starting HA manager");

	ha_config = *(zbx_ha_config_t *)((zbx_thread_args_t *)args)->args;

	if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_HA, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start HA manager service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	if (FAIL == rtc_open(&rtc_socket, ZBX_HA_SERVICE_TIMEOUT, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot start HA manager service: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	if (FAIL == zbx_ipc_async_socket_send(&rtc_socket, ZBX_IPC_SERVICE_HA_REGISTER, NULL, 0) ||
			FAIL == zbx_ipc_async_socket_flush(&rtc_socket, ZBX_HA_SERVICE_TIMEOUT))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot register HA manager to runtime control service");
		exit(EXIT_FAILURE);
	}

	zbx_cuid_clear(info.ha_nodeid);
	info.name = ZBX_NULL2EMPTY_STR(ha_config.ha_node_name);
	info.ha_status = ha_config.ha_status;
	info.error = NULL;
	info.db_status = ZBX_DB_DOWN;
	info.offline_ticks_active = 0;
	info.lastaccess_active = 0;
	info.failover_delay = ZBX_HA_DEFAULT_FAILOVER_DELAY;
	info.auditlog = 0;

	tick = zbx_time();

	if (ZBX_NODE_STATUS_UNKNOWN == info.ha_status)
	{
		ha_db_register_node(&info, &ha_config);

		if (ZBX_NODE_STATUS_ERROR == info.ha_status)
			goto pause;
	}

	nextcheck = ZBX_HA_POLL_PERIOD;

	/* triple the initial database check delay in standby mode to avoid the same node becoming active */
	/* immediately after switching to standby mode or crashing and being restarted                    */
	if (ZBX_NODE_STATUS_STANDBY == info.ha_status)
		nextcheck *= 3;

	zabbix_log(LOG_LEVEL_INFORMATION, "HA manager started in %s mode", zbx_ha_status_str(info.ha_status));

	while (SUCCEED != pause && ZBX_NODE_STATUS_ERROR != info.ha_status)
	{
		if (tick <= (now = zbx_time()))
		{
			ticks_num++;

			if (nextcheck <= ticks_num)
			{
				int	old_status = info.ha_status, delay;

				if (ZBX_NODE_STATUS_UNKNOWN == info.ha_status)
					ha_db_register_node(&info, &ha_config);
				else
					ha_check_nodes(&info, &ha_config);

				if (old_status != info.ha_status && ZBX_NODE_STATUS_UNKNOWN != info.ha_status)
					ha_update_parent(&rtc_socket, &info);

				if (ZBX_NODE_STATUS_ERROR == info.ha_status)
					break;

				/* in offline mode try connecting to database every second otherwise */
				/* with small failover delay (10s) it might switch to standby mode   */
				/* despite connection being restored shortly                         */
				delay = ZBX_DB_OK <= info.db_status ? ZBX_HA_POLL_PERIOD : 1;

				while (nextcheck <= ticks_num)
					nextcheck += delay;
			}

			if (ZBX_DB_OK <= info.db_status || ZBX_NODE_STATUS_ACTIVE != info.ha_status)
				ha_send_heartbeat(&rtc_socket);

			while (tick <= now)
				tick++;
		}

		timeout.sec = (int)(tick - now);
		timeout.ns = (int)((tick - now) * 1000000000) % 1000000000;

		(void)zbx_ipc_service_recv(&service, &timeout, &client, &message);

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_SERVICE_HA_STATUS:
					ha_send_status(&info, client);
					break;
				case ZBX_IPC_SERVICE_HA_STOP:
					zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_STOP, NULL, 0);
					pause = stop = SUCCEED;
					break;
				case ZBX_IPC_SERVICE_HA_PAUSE:
					zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_PAUSE, NULL, 0);
					pause = SUCCEED;
					break;
				case ZBX_IPC_SERVICE_HA_GET_NODES:
					ha_send_node_list(&info, client, &ha_config);
					break;
				case ZBX_IPC_SERVICE_HA_REMOVE_NODE:
					ha_remove_node(&info, client, message);
					break;
				case ZBX_IPC_SERVICE_HA_SET_FAILOVER_DELAY:
					ha_set_failover_delay(&info, client, message);
					ha_update_parent(&rtc_socket, &info);
					break;
				case ZBX_IPC_SERVICE_HA_GET_FAILOVER_DELAY:
					ha_get_failover_delay(&info, client);
					break;
				case ZBX_IPC_SERVICE_HA_LOGLEVEL_INCREASE:
					zabbix_increase_log_level();
					zabbix_report_log_level_change();
					zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_LOGLEVEL_INCREASE, NULL, 0);
					break;
				case ZBX_IPC_SERVICE_HA_LOGLEVEL_DECREASE:
					zabbix_decrease_log_level();
					zabbix_report_log_level_change();
					zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_LOGLEVEL_DECREASE, NULL, 0);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);
	}

	zabbix_log(LOG_LEVEL_INFORMATION, "HA manager has been paused");
pause:
	timeout.sec = ZBX_HA_POLL_PERIOD;
	timeout.ns = 0;

	while (SUCCEED != stop)
	{
		(void)zbx_ipc_service_recv(&service, &timeout, &client, &message);

		if (ZBX_NODE_STATUS_STANDBY == info.ha_status || ZBX_NODE_STATUS_ACTIVE == info.ha_status)
			ha_db_update_lastaccess(&info);

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_SERVICE_HA_STATUS:
					ha_send_status(&info, client);
					break;
				case ZBX_IPC_SERVICE_HA_STOP:
					zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_STOP, NULL, 0);
					stop = SUCCEED;
					break;
				case ZBX_IPC_SERVICE_HA_PAUSE:
					zbx_ipc_client_send(client, ZBX_IPC_SERVICE_HA_PAUSE, NULL, 0);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);
	}

	zbx_free(info.error);

	ha_db_update_exit_status(&info);

	zbx_db_close();

	zbx_ipc_async_socket_close(&rtc_socket);
	zbx_ipc_service_close(&service);

	zabbix_log(LOG_LEVEL_INFORMATION, "HA manager has been stopped");

	exit(EXIT_SUCCESS);

	return 0;
}