/*
** Copyright (C) 2001-2024 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see .
**/
#include "zbxcommon.h"
#ifdef HAVE_IPCSERVICE
#ifdef HAVE_LIBEVENT
# include
# include
#endif
#include "zbxipcservice.h"
#include "zbxalgo.h"
#include "zbxstr.h"
#define ZBX_IPC_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
#define ZBX_IPC_DATA_DUMP_SIZE 128
static char ipc_path[ZBX_IPC_PATH_MAX] = {0};
static size_t ipc_path_root_len = 0;
#define ZBX_IPC_CLIENT_STATE_NONE 0
#define ZBX_IPC_CLIENT_STATE_QUEUED 1
#define ZBX_IPC_ASYNC_SOCKET_STATE_NONE 0
#define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT 1
#define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR 2
/* IPC client, providing nonblocking connections through socket */
struct zbx_ipc_client
{
zbx_ipc_socket_t csocket;
zbx_ipc_service_t *service;
zbx_uint32_t rx_header[2];
unsigned char *rx_data;
zbx_uint32_t rx_bytes;
zbx_queue_ptr_t rx_queue;
struct event *rx_event;
zbx_uint32_t tx_header[2];
unsigned char *tx_data;
zbx_uint32_t tx_bytes;
zbx_queue_ptr_t tx_queue;
struct event *tx_event;
zbx_uint64_t id;
unsigned char state;
void *userdata;
zbx_uint32_t refcount;
};
ZBX_PTR_VECTOR_IMPL(ipc_client_ptr, zbx_ipc_client_t *)
/*
* Private API
*/
#define ZBX_IPC_HEADER_SIZE (int)(sizeof(zbx_uint32_t) * 2)
#define ZBX_IPC_MESSAGE_CODE 0
#define ZBX_IPC_MESSAGE_SIZE 1
#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000
typedef int evutil_socket_t;
static struct event *event_new(struct event_base *ev, evutil_socket_t fd, short what,
void(*cb_func)(int, short, void *), void *cb_arg)
{
struct event *event;
event = zbx_malloc(NULL, sizeof(struct event));
event_set(event, fd, what, cb_func, cb_arg);
event_base_set(ev, event);
return event;
}
static void event_free(struct event *event)
{
event_del(event);
zbx_free(event);
}
#endif
static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg);
static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg);
static const char *ipc_get_path(void)
{
ipc_path[ipc_path_root_len] = '\0';
return ipc_path;
}
#define ZBX_IPC_SOCKET_PREFIX "/zabbix_"
#define ZBX_IPC_SOCKET_SUFFIX ".sock"
#define ZBX_IPC_CLASS_PREFIX_NONE ""
#define ZBX_IPC_CLASS_PREFIX_SERVER "server_"
#define ZBX_IPC_CLASS_PREFIX_PROXY "proxy_"
#define ZBX_IPC_CLASS_PREFIX_AGENT "agent_"
static const char *ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_NONE;
static size_t ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
/******************************************************************************
* *
* Purpose: makes socket path from the service name *
* *
* Parameters: service_name - [IN] the service name *
* error - [OUT] the error message *
* *
* Return value: The created path or NULL if the path exceeds unix domain *
* socket path maximum length *
* *
******************************************************************************/
static const char *ipc_make_path(const char *service_name, char **error)
{
size_t path_len, offset;
static ZBX_THREAD_LOCAL char ipc_path_full[ZBX_IPC_PATH_MAX];
path_len = strlen(service_name);
if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) +
ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + ipc_path_prefix_len)
{
*error = zbx_dsprintf(*error,
"Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.",
ipc_path, ZBX_IPC_SOCKET_PREFIX, ipc_path_prefix, service_name, ZBX_IPC_SOCKET_SUFFIX);
return NULL;
}
memcpy(ipc_path_full, ipc_path, ipc_path_root_len);
offset = ipc_path_root_len;
memcpy(ipc_path_full + offset, ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX));
offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX);
memcpy(ipc_path_full + offset, ipc_path_prefix, ipc_path_prefix_len);
offset += ipc_path_prefix_len;
memcpy(ipc_path_full + offset, service_name, path_len);
offset += path_len;
memcpy(ipc_path_full + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1);
return ipc_path_full;
}
/******************************************************************************
* *
* Purpose: writes data to a socket *
* *
* Parameters: fd - [IN] the socket file descriptor *
* data - [IN] the data *
* size - [IN] the data size *
* size_sent - [IN] the actual size written to socket *
* *
* Return value: SUCCEED - no socket errors were detected. Either the data or *
* a part of it was written to socket or a write to *
* non-blocking socket would block *
* FAIL - otherwise *
* *
******************************************************************************/
static int ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent)
{
zbx_uint32_t offset = 0;
int ret = SUCCEED;
ssize_t n;
while (offset != size)
{
n = write(fd, data + offset, size - offset);
if (-1 == n)
{
if (EINTR == errno)
continue;
if (EWOULDBLOCK == errno || EAGAIN == errno)
break;
zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno));
ret = FAIL;
break;
}
offset += n;
}
*size_sent = offset;
return ret;
}
/******************************************************************************
* *
* Purpose: reads data from a socket *
* *
* Parameters: fd - [IN] the socket file descriptor *
* data - [IN] the data *
* size - [IN] the data size *
* size_sent - [IN] the actual size read from socket *
* *
* Return value: SUCCEED - the data was successfully read *
* FAIL - otherwise *
* *
* Comments: When reading data from non-blocking sockets SUCCEED will be *
* returned also if there were no more data to read. *
* *
******************************************************************************/
static int ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
{
int n;
*read_size = 0;
while (-1 == (n = read(fd, buffer + *read_size, size - *read_size)))
{
if (EINTR == errno)
continue;
if (EWOULDBLOCK == errno || EAGAIN == errno)
return SUCCEED;
return FAIL;
}
if (0 == n)
return FAIL;
*read_size += n;
return SUCCEED;
}
/******************************************************************************
* *
* Purpose: reads data from a socket until the requested data has been read *
* *
* Parameters: fd - [IN] the socket file descriptor *
* buffer - [IN] the data *
* size - [IN] the data size *
* read_size - [IN] the actual size read from socket *
* *
* Return value: SUCCEED - the data was successfully read *
* FAIL - otherwise *
* *
* Comments: When reading data from non-blocking sockets this function will *
* return SUCCEED if there are no data to read, even if not all of *
* the requested data has been read. *
* *
******************************************************************************/
static int ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
{
int ret = FAIL;
zbx_uint32_t offset = 0, chunk_size;
*read_size = 0;
while (offset < size)
{
if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size))
goto out;
if (0 == chunk_size)
break;
offset += chunk_size;
}
ret = SUCCEED;
out:
*read_size = offset;
return ret;
}
/******************************************************************************
* *
* Purpose: writes IPC message to socket *
* *
* Parameters: csocket - [IN] the IPC socket *
* code - [IN] the message code *
* data - [IN] the data *
* size - [IN] the data size *
* tx_size - [IN] the actual size written to socket *
* *
* Return value: SUCCEED - no socket errors were detected. Either the data or *
* a part of it was written to socket or a write to *
* non-blocking socket would block *
* FAIL - otherwise *
* *
* Comments: When using non-blocking sockets the tx_size parameter must be *
* checked in addition to return value to tell if the message was *
* sent successfully. *
* *
******************************************************************************/
static int ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data,
zbx_uint32_t size, zbx_uint32_t *tx_size)
{
int ret;
zbx_uint32_t size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)];
buffer[0] = code;
buffer[1] = size;
if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size)
{
if (0 != size)
memcpy(buffer + 2, data, size);
return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size);
}
if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size))
return FAIL;
/* in the case of non-blocking sockets only a part of the header might be sent */
if (ZBX_IPC_HEADER_SIZE != *tx_size)
return SUCCEED;
ret = ipc_write_data(csocket->fd, data, size, &size_data);
*tx_size += size_data;
return ret;
}
/******************************************************************************
* *
* Purpose: reads message header and data from buffer *
* *
* Parameters: header - [IN/OUT] the message header *
* data - [OUT] the message data *
* rx_bytes - [IN] the number of bytes stored in message *
* (including header) *
* buffer - [IN] the buffer to parse *
* size - [IN] the number of bytes to parse *
* read_size - [OUT] the number of bytes read *
* *
* Return value: SUCCEED - message was successfully parsed *
* FAIL - not enough data *
* *
******************************************************************************/
static int ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes,
const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size)
{
zbx_uint32_t copy_size, data_size, data_offset;
*read_size = 0;
if (ZBX_IPC_HEADER_SIZE > rx_bytes)
{
copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size);
memcpy((char *)header + rx_bytes, buffer, copy_size);
*read_size += copy_size;
if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size)
return FAIL;
data_size = header[ZBX_IPC_MESSAGE_SIZE];
if (0 == data_size)
{
*data = NULL;
return SUCCEED;
}
*data = (unsigned char *)zbx_malloc(NULL, data_size);
data_offset = 0;
}
else
{
data_size = header[ZBX_IPC_MESSAGE_SIZE];
data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE;
}
copy_size = MIN(data_size - data_offset, size - *read_size);
memcpy(*data + data_offset, buffer + *read_size, copy_size);
*read_size += copy_size;
return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL);
}
/******************************************************************************
* *
* Purpose: checks if IPC message has been completed *
* *
* Parameters: header - [IN] the message header *
* rx_bytes - [IN] the number of bytes set in message *
* (including header) *
* *
* Return value: SUCCEED - message has been completed *
* FAIL - otherwise *
* *
******************************************************************************/
static int ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes)
{
if (ZBX_IPC_HEADER_SIZE > rx_bytes)
return FAIL;
if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes)
return FAIL;
return SUCCEED;
}
/******************************************************************************
* *
* Purpose: reads IPC message from buffered client socket *
* *
* Parameters: csocket - [IN] the source socket *
* header - [OUT] the header of the message *
* data - [OUT] the data of the message *
* rx_bytes - [IN/OUT] the total message size read (including *
* header *
* *
* Return value: SUCCEED - data was read successfully, check rx_bytes to *
* determine if the message was completed. *
* FAIL - failed to read message (socket error or connection *
* was closed). *
* *
******************************************************************************/
static int ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data,
zbx_uint32_t *rx_bytes)
{
zbx_uint32_t data_size, offset, read_size = 0;
int ret = FAIL;
/* try to read message from socket buffer */
if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset)
{
ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset,
csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size);
csocket->rx_buffer_offset += read_size;
*rx_bytes += read_size;
if (SUCCEED == ret)
goto out;
}
/* not enough data in socket buffer, try to read more until message is completed or no data to read */
while (SUCCEED != ret)
{
csocket->rx_buffer_offset = 0;
csocket->rx_buffer_bytes = 0;
if (ZBX_IPC_HEADER_SIZE < *rx_bytes)
{
offset = *rx_bytes - ZBX_IPC_HEADER_SIZE;
data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset;
/* long messages will be read directly into message buffer */
if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size)
{
ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size);
*rx_bytes += read_size;
goto out;
}
}
if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size))
goto out;
/* it's possible that nothing will be read on non-blocking sockets, return success */
if (0 == read_size)
{
ret = SUCCEED;
goto out;
}
csocket->rx_buffer_bytes = read_size;
ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes,
&read_size);
csocket->rx_buffer_offset += read_size;
*rx_bytes += read_size;
}
out:
return ret;
}
/******************************************************************************
* *
* Purpose: frees client's libevent event *
* *
* Parameters: client - [IN] the client *
* *
******************************************************************************/
static void ipc_client_free_events(zbx_ipc_client_t *client)
{
if (NULL != client->rx_event)
{
event_free(client->rx_event);
client->rx_event = NULL;
}
if (NULL != client->tx_event)
{
event_free(client->tx_event);
client->tx_event = NULL;
}
}
/******************************************************************************
* *
* Purpose: frees IPC service client *
* *
* Parameters: client - [IN] the client to free *
* *
******************************************************************************/
static void ipc_client_free(zbx_ipc_client_t *client)
{
zbx_ipc_message_t *message;
ipc_client_free_events(client);
zbx_ipc_socket_close(&client->csocket);
while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue)))
zbx_ipc_message_free(message);
zbx_queue_ptr_destroy(&client->rx_queue);
zbx_free(client->rx_data);
while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
zbx_ipc_message_free(message);
zbx_queue_ptr_destroy(&client->tx_queue);
zbx_free(client->tx_data);
ipc_client_free_events(client);
zbx_free(client);
}
/******************************************************************************
* *
* Purpose: adds message to received messages queue *
* *
* Parameters: client - [IN] the client to read *
* *
******************************************************************************/
static void ipc_client_push_rx_message(zbx_ipc_client_t *client)
{
zbx_ipc_message_t *message;
message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE];
message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE];
message->data = client->rx_data;
zbx_queue_ptr_push(&client->rx_queue, message);
client->rx_data = NULL;
client->rx_bytes = 0;
}
/******************************************************************************
* *
* Purpose: prepares to send the next message in send queue *
* *
* Parameters: client - [IN] the client *
* *
******************************************************************************/
static void ipc_client_pop_tx_message(zbx_ipc_client_t *client)
{
zbx_ipc_message_t *message;
zbx_free(client->tx_data);
client->tx_bytes = 0;
if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue)))
return;
client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size;
client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code;
client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size;
client->tx_data = message->data;
zbx_free(message);
}
/******************************************************************************
* *
* Purpose: reads data from IPC service client *
* *
* Parameters: client - [IN] the client to read *
* *
* Return value: FAIL - read error/connection was closed *
* *
* Comments: This function reads data from socket, parses it and adds *
* parsed messages to received messages queue. *
* *
******************************************************************************/
static int ipc_client_read(zbx_ipc_client_t *client)
{
int rc;
do
{
if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data,
&client->rx_bytes))
{
zbx_free(client->rx_data);
client->rx_bytes = 0;
return FAIL;
}
if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes)))
ipc_client_push_rx_message(client);
}
while (SUCCEED == rc);
return SUCCEED;
}
/******************************************************************************
* *
* Purpose: writes queued data to IPC service client *
* *
* Parameters: client - [IN] the client *
* *
* Return value: SUCCEED - the data was sent successfully *
* FAIL - otherwise *
* *
******************************************************************************/
static int ipc_client_write(zbx_ipc_client_t *client)
{
zbx_uint32_t data_size, write_size;
data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE];
if (data_size < client->tx_bytes)
{
zbx_uint32_t size, offset;
size = client->tx_bytes - data_size;
offset = ZBX_IPC_HEADER_SIZE - size;
if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size,
&write_size))
{
return FAIL;
}
client->tx_bytes -= write_size;
if (data_size < client->tx_bytes)
return SUCCEED;
}
while (0 < client->tx_bytes)
{
if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes,
client->tx_bytes, &write_size))
{
return FAIL;
}
if (0 == write_size)
return SUCCEED;
client->tx_bytes -= write_size;
}
if (0 == client->tx_bytes)
ipc_client_pop_tx_message(client);
return SUCCEED;
}
/******************************************************************************
* *
* Purpose: gets the next client with messages/closed socket from recv queue *
* *
* Parameters: service - [IN] the IPC service *
* *
* Return value: The client with messages/closed socket *
* *
******************************************************************************/
static zbx_ipc_client_t *ipc_service_pop_client(zbx_ipc_service_t *service)
{
zbx_ipc_client_t *client;
if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv)))
client->state = ZBX_IPC_CLIENT_STATE_NONE;
return client;
}
/******************************************************************************
* *
* Purpose: pushes client to the recv queue if needed *
* *
* Parameters: service - [IN] the IPC service *
* client - [IN] the IPC client *
* *
* Comments: The client is pushed to the recv queue if it isn't already there *
* and there is messages to return or the client connection was *
* closed. *
* *
******************************************************************************/
static void ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
{
if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state)
return;
if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event)
return;
client->state = ZBX_IPC_CLIENT_STATE_QUEUED;
zbx_queue_ptr_push(&service->clients_recv, client);
}
/******************************************************************************
* *
* Purpose: adds a new IPC service client *
* *
* Parameters: service - [IN] the IPC service *
* fd - [IN] the client socket descriptor *
* *
******************************************************************************/
static void ipc_service_add_client(zbx_ipc_service_t *service, int fd)
{
static zbx_uint64_t next_clientid = 1;
zbx_ipc_client_t *client;
int flags;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
memset(client, 0, sizeof(zbx_ipc_client_t));
if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
exit(EXIT_FAILURE);
}
if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
exit(EXIT_FAILURE);
}
client->csocket.fd = fd;
client->csocket.rx_buffer_bytes = 0;
client->csocket.rx_buffer_offset = 0;
client->id = next_clientid++;
client->state = ZBX_IPC_CLIENT_STATE_NONE;
client->refcount = 1;
zbx_queue_ptr_create(&client->rx_queue);
zbx_queue_ptr_create(&client->tx_queue);
client->service = service;
client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client);
client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client);
event_add(client->rx_event, NULL);
zbx_vector_ipc_client_ptr_append(&service->clients, client);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __func__, client->id);
}
/******************************************************************************
* *
* Purpose: removes IPC service client *
* *
* Parameters: service - [IN] the IPC service *
* client - [IN] the client to remove *
* *
******************************************************************************/
static void ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client)
{
for (int i = 0; i < service->clients.values_num; i++)
{
if (service->clients.values[i] == client)
zbx_vector_ipc_client_ptr_remove_noorder(&service->clients, i);
}
}
/******************************************************************************
* *
* Purpose: to find connected client when only it's ID is known *
* *
* Parameters: service - [IN] the IPC service *
* id - [IN] ID of client *
* *
* Return value: address of client or NULL if client has already disconnected *
* *
******************************************************************************/
zbx_ipc_client_t *zbx_ipc_client_by_id(const zbx_ipc_service_t *service, zbx_uint64_t id)
{
zbx_ipc_client_t *client;
for (int i = 0; i < service->clients.values_num; i++)
{
client = service->clients.values[i];
if (id == client->id)
return client;
}
return NULL;
}
/******************************************************************************
* *
* Purpose: service client read event libevent callback *
* *
******************************************************************************/
static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg)
{
zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
if (SUCCEED != ipc_client_read(client))
{
ipc_client_free_events(client);
ipc_service_remove_client(client->service, client);
}
ipc_service_push_client(client->service, client);
}
/******************************************************************************
* *
* Purpose: service client write event libevent callback *
* *
******************************************************************************/
static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg)
{
zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg;
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
if (SUCCEED != ipc_client_write(client))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
zbx_ipc_client_close(client);
return;
}
if (0 == client->tx_bytes)
event_del(client->tx_event);
}
/******************************************************************************
* *
* Purpose: asynchronous socket write event libevent callback *
* *
******************************************************************************/
static void ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg)
{
zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
if (SUCCEED != ipc_client_write(asocket->client))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client");
ipc_client_free_events(asocket->client);
zbx_ipc_socket_close(&asocket->client->csocket);
asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
return;
}
if (0 == asocket->client->tx_bytes)
event_del(asocket->client->tx_event);
}
/******************************************************************************
* *
* Purpose: asynchronous socket read event libevent callback *
* *
******************************************************************************/
static void ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg)
{
zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
if (SUCCEED != ipc_client_read(asocket->client))
{
ipc_client_free_events(asocket->client);
asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR;
}
}
/******************************************************************************
* *
* Purpose: timer callback *
* *
******************************************************************************/
static void ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg)
{
zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg;
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT;
}
/******************************************************************************
* *
* Purpose: accepts a new client connection *
* *
* Parameters: service - [IN] the IPC service *
* *
******************************************************************************/
static void ipc_service_accept(zbx_ipc_service_t *service)
{
int fd;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
while (-1 == (fd = accept(service->fd, NULL, NULL)))
{
if (EINTR != errno)
{
/* If there is unaccepted connection libevent will call registered callback function over and */
/* over again. It is better to exit straight away and cause all other processes to stop. */
zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno));
exit(EXIT_FAILURE);
}
}
ipc_service_add_client(service, fd);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/******************************************************************************
* *
* Purpose: creates IPC message *
* *
* Parameters: code - [IN] the message code *
* data - [IN] the data *
* size - [IN] the data size *
* *
* Return value: The created message. *
* *
******************************************************************************/
static zbx_ipc_message_t *ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
{
zbx_ipc_message_t *message;
message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t));
message->code = code;
message->size = size;
if (0 != size)
{
message->data = (unsigned char *)zbx_malloc(NULL, size);
memcpy(message->data, data, size);
}
else
message->data = NULL;
return message;
}
/******************************************************************************
* *
* Purpose: libevent logging callback *
* *
******************************************************************************/
static void ipc_service_event_log_cb(int severity, const char *msg)
{
int loglevel;
switch (severity)
{
case _EVENT_LOG_DEBUG:
loglevel = LOG_LEVEL_TRACE;
break;
case _EVENT_LOG_MSG:
loglevel = LOG_LEVEL_DEBUG;
break;
case _EVENT_LOG_WARN:
loglevel = LOG_LEVEL_WARNING;
break;
case _EVENT_LOG_ERR:
loglevel = LOG_LEVEL_DEBUG;
break;
default:
loglevel = LOG_LEVEL_DEBUG;
break;
}
zabbix_log(loglevel, "IPC service: %s", msg);
}
/******************************************************************************
* *
* Purpose: initialize libevent library *
* *
******************************************************************************/
static void ipc_service_init_libevent(void)
{
event_set_log_callback(ipc_service_event_log_cb);
}
/******************************************************************************
* *
* Purpose: uninitialize libevent library *
* *
******************************************************************************/
static void ipc_service_free_libevent(void)
{
}
/******************************************************************************
* *
* Purpose: libevent listener callback *
* *
******************************************************************************/
static void ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg)
{
zbx_ipc_service_t *service = (zbx_ipc_service_t *)arg;
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
ipc_service_accept(service);
}
/******************************************************************************
* *
* Purpose: timer callback *
* *
******************************************************************************/
static void ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg)
{
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
ZBX_UNUSED(arg);
}
/******************************************************************************
* *
* Purpose: checks if an IPC service is already running *
* *
* Parameters: service_name - [IN] *
* *
******************************************************************************/
static int ipc_check_running_service(const char *service_name)
{
zbx_ipc_socket_t csocket;
int ret;
char *error = NULL;
if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error)))
zbx_ipc_socket_close(&csocket);
else
zbx_free(error);
return ret;
}
/*
* Public client API
*/
/******************************************************************************
* *
* Purpose: opens socket to an IPC service listening on the specified path *
* *
* Parameters: csocket - [OUT] the IPC socket to the service *
* service_name - [IN] the IPC service name *
* timeout - [IN] the connection timeout *
* error - [OUT] the error message *
* *
* Return value: SUCCEED - the socket was successfully opened *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error)
{
struct sockaddr_un addr;
time_t start;
struct timespec ts = {0, 100000000};
const char *socket_path;
int ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (NULL == (socket_path = ipc_make_path(service_name, error)))
goto out;
if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
{
*error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno));
goto out;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
start = time(NULL);
while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr)))
{
if (0 == timeout || time(NULL) - start > timeout)
{
*error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name,
zbx_strerror(errno));
close(csocket->fd);
goto out;
}
nanosleep(&ts, NULL);
}
csocket->rx_buffer_bytes = 0;
csocket->rx_buffer_offset = 0;
ret = SUCCEED;
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: closes socket to an IPC service *
* *
* Parameters: csocket - [IN/OUT] the IPC socket to close *
* *
******************************************************************************/
void zbx_ipc_socket_close(zbx_ipc_socket_t *csocket)
{
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (-1 != csocket->fd)
{
close(csocket->fd);
csocket->fd = -1;
}
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/******************************************************************************
* *
* Purpose: writes a message to IPC service *
* *
* Parameters: csocket - [IN] an opened IPC socket to the service *
* code - [IN] the message code *
* data - [IN] the data *
* size - [IN] the data size *
* *
* Return value: SUCCEED - the message was successfully written *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
{
int ret;
zbx_uint32_t size_sent;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) &&
size_sent == size + ZBX_IPC_HEADER_SIZE)
{
ret = SUCCEED;
}
else
ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: reads a message from IPC service *
* *
* Parameters: csocket - [IN] an opened IPC socket to the service *
* message - [OUT] the received message *
* *
* Return value: SUCCEED - the message was successfully received *
* FAIL - otherwise *
* *
* Comments: If this function succeeds the message must be cleaned/freed by *
* the caller. *
* *
******************************************************************************/
int zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message)
{
int ret = FAIL;
zbx_uint32_t rx_bytes = 0, header[2];
unsigned char *data = NULL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes))
goto out;
if (SUCCEED != ipc_message_is_completed(header, rx_bytes))
{
zbx_free(data);
goto out;
}
message->code = header[ZBX_IPC_MESSAGE_CODE];
message->size = header[ZBX_IPC_MESSAGE_SIZE];
message->data = data;
if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
{
char *msg = NULL;
zbx_ipc_message_format(message, &msg);
zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, msg);
zbx_free(msg);
}
ret = SUCCEED;
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: check if socket is opened *
* *
* Parameters: csocket - [OUT] the IPC socket to the service *
* *
* Return value: SUCCEED - the socket was successfully opened *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_socket_connected(const zbx_ipc_socket_t *csocket)
{
return 0 < csocket->fd ? SUCCEED : FAIL;
}
/******************************************************************************
* *
* Purpose: frees the resources allocated to store IPC message data *
* *
* Parameters: message - [IN] the message to free *
* *
******************************************************************************/
void zbx_ipc_message_free(zbx_ipc_message_t *message)
{
if (NULL != message)
{
zbx_free(message->data);
zbx_free(message);
}
}
/******************************************************************************
* *
* Purpose: frees the resources allocated to store IPC message data *
* *
* Parameters: message - [IN] the message to clean *
* *
******************************************************************************/
void zbx_ipc_message_clean(zbx_ipc_message_t *message)
{
zbx_free(message->data);
}
/******************************************************************************
* *
* Purpose: initializes IPC message *
* *
* Parameters: message - [IN] the message to initialize *
* *
******************************************************************************/
void zbx_ipc_message_init(zbx_ipc_message_t *message)
{
memset(message, 0, sizeof(zbx_ipc_message_t));
}
/******************************************************************************
* *
* Purpose: formats message to readable format for debug messages *
* *
* Parameters: message - [IN] the message *
* data - [OUT] the formatted message *
* *
******************************************************************************/
void zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data)
{
size_t data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0;
zbx_uint32_t i, data_num;
if (NULL == message)
return;
data_num = message->size;
if (ZBX_IPC_DATA_DUMP_SIZE < data_num)
data_num = ZBX_IPC_DATA_DUMP_SIZE;
*data = (char *)zbx_malloc(*data, data_alloc);
zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size);
for (i = 0; i < data_num; i++)
{
if (0 != i)
zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " "));
zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]);
}
(*data)[data_offset] = '\0';
}
#ifdef HAVE_OPENIPMI
/******************************************************************************
* *
* Purpose: copies ipc message *
* *
* Parameters: dst - [IN] the destination message *
* src - [IN] the source message *
* *
******************************************************************************/
void zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src)
{
dst->code = src->code;
dst->size = src->size;
dst->data = (unsigned char *)zbx_malloc(NULL, src->size);
memcpy(dst->data, src->data, src->size);
}
#endif /* HAVE_OPENIPMI */
static void ipc_service_user_cb(evutil_socket_t fd, short what, void *arg)
{
ZBX_UNUSED(fd);
ZBX_UNUSED(what);
ZBX_UNUSED(arg);
}
/*
* Public service API
*/
/******************************************************************************
* *
* Purpose: initializes IPC service environment *
* *
* Parameters: path - [IN] the service root path *
* error - [OUT] the error message *
* *
* Return value: SUCCEED - the environment was initialized successfully. *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_service_init_env(const char *path, char **error)
{
struct stat fs;
int ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, path);
if (0 != ipc_path_root_len)
{
*error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with"
" root directory at \"%s\".", ipc_get_path());
goto out;
}
if (0 != stat(path, &fs))
{
*error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path,
zbx_strerror(errno));
goto out;
}
if (0 == S_ISDIR(fs.st_mode))
{
*error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path);
goto out;
}
if (0 != access(path, W_OK | R_OK))
{
*error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno));
goto out;
}
ipc_path_root_len = strlen(path);
if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3)
{
*error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path);
goto out;
}
memcpy(ipc_path, path, ipc_path_root_len + 1);
while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1])
ipc_path[--ipc_path_root_len] = '\0';
ipc_service_init_libevent();
if (0 != evthread_use_pthreads())
{
*error = zbx_strdup(*error, "Cannot initialize libevent threading support");
goto out;
}
ret = SUCCEED;
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: frees IPC service environment *
* *
******************************************************************************/
void zbx_ipc_service_free_env(void)
{
ipc_service_free_libevent();
}
/******************************************************************************
* *
* Purpose: starts IPC service on the specified path *
* *
* Parameters: service - [IN/OUT] the IPC service *
* service_name - [IN] the unix domain socket path *
* error - [OUT] the error message *
* *
* Return value: SUCCEED - the service was initialized successfully. *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error)
{
struct sockaddr_un addr;
const char *socket_path;
int ret = FAIL;
mode_t mode;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __func__, service_name);
mode = umask(077);
if (NULL == (socket_path = ipc_make_path(service_name, error)))
goto out;
if (0 == access(socket_path, F_OK))
{
if (0 != access(socket_path, W_OK))
{
*error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path);
goto out;
}
if (SUCCEED == ipc_check_running_service(service_name))
{
*error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name);
goto out;
}
unlink(socket_path);
}
if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0)))
{
*error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno));
goto out;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr)))
{
*error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno));
goto out;
}
if (0 != listen(service->fd, SOMAXCONN))
{
*error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno));
goto out;
}
service->path = zbx_strdup(NULL, socket_path);
zbx_vector_ipc_client_ptr_create(&service->clients);
zbx_queue_ptr_create(&service->clients_recv);
service->ev = event_base_new();
service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST,
ipc_service_client_connected_cb, service);
event_add(service->ev_listener, NULL);
service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service);
service->ev_alert = event_new(service->ev, -1, 0, ipc_service_user_cb, NULL);
ret = SUCCEED;
out:
umask(mode);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: closes IPC service and frees the resources allocated by it *
* *
* Parameters: service - [IN/OUT] the IPC service *
* *
******************************************************************************/
void zbx_ipc_service_close(zbx_ipc_service_t *service)
{
zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, service->path);
if (0 != close(service->fd))
zabbix_log(LOG_LEVEL_DEBUG, "Cannot close path \"%s\": %s", service->path, zbx_strerror(errno));
if (-1 == unlink(service->path))
zabbix_log(LOG_LEVEL_WARNING, "cannot remove socket at %s: %s.", service->path, zbx_strerror(errno));
for (int i = 0; i < service->clients.values_num; i++)
ipc_client_free(service->clients.values[i]);
zbx_free(service->path);
zbx_vector_ipc_client_ptr_destroy(&service->clients);
zbx_queue_ptr_destroy(&service->clients_recv);
event_free(service->ev_alert);
event_free(service->ev_timer);
event_free(service->ev_listener);
event_base_free(service->ev);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/******************************************************************************
* *
* Purpose: receives ipc message from a connected client *
* *
* Parameters: service - [IN] the IPC service *
* timeout - [IN] the timeout. (0,0) is used for nonblocking call *
* and (ZBX_IPC_WAIT_FOREVER, *) is *
* used for blocking call without timeout *
* client - [OUT] the client that sent the message or *
* NULL if there are no messages and the *
* specified timeout passed. *
* The client must be released by caller with *
* zbx_ipc_client_release() function. *
* message - [OUT] the received message or NULL if the client *
* connection was closed. *
* The message must be freed by caller with *
* ipc_message_free() function. *
* *
* Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without *
* waiting for socket events *
* (pending events are processed) *
* ZBX_IPC_RECV_WAIT - returned after receiving socket *
* event *
* ZBX_IPC_RECV_TIMEOUT - returned after timeout expired *
* *
******************************************************************************/
int zbx_ipc_service_recv(zbx_ipc_service_t *service, const zbx_timespec_t *timeout, zbx_ipc_client_t **client,
zbx_ipc_message_t **message)
{
int ret, flags;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d.%03d", __func__, timeout->sec, timeout->ns / 1000000);
if ((0 != timeout->sec || 0 != timeout->ns) && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv))
{
if (ZBX_IPC_WAIT_FOREVER != timeout->sec)
{
struct timeval tv = {timeout->sec, timeout->ns / 1000};
evtimer_add(service->ev_timer, &tv);
}
flags = EVLOOP_ONCE;
}
else
flags = EVLOOP_NONBLOCK;
event_base_loop(service->ev, flags);
if (NULL != (*client = ipc_service_pop_client(service)))
{
if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue)))
{
if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE))
{
char *data = NULL;
zbx_ipc_message_format(*message, &data);
zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
zbx_free(data);
}
ipc_service_push_client(service, *client);
zbx_ipc_client_addref(*client);
}
ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT);
}
else
{ ret = ZBX_IPC_RECV_TIMEOUT;
*client = NULL;
*message = NULL;
}
evtimer_del(service->ev_timer);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
return ret;
}
/******************************************************************************
* *
* Purpose: interrupt IPC service recv loop from another thread *
* *
******************************************************************************/
void zbx_ipc_service_alert(zbx_ipc_service_t *service)
{
event_active(service->ev_alert, 0, 0);
}
/******************************************************************************
* *
* Purpose: Sends IPC message to client *
* *
* Parameters: client - [IN] the IPC client *
* code - [IN] the message code *
* data - [IN] the data *
* size - [IN] the data size *
* *
* Comments: If data can't be written directly to socket (buffer full) then *
* the message is queued and sent during zbx_ipc_service_recv() *
* messaging loop whenever socket becomes ready. *
* *
******************************************************************************/
int zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size)
{
zbx_uint32_t tx_size = 0;
zbx_ipc_message_t *message;
int ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, client->id);
if (0 != client->tx_bytes)
{
message = ipc_message_create(code, data, size);
zbx_queue_ptr_push(&client->tx_queue, message);
ret = SUCCEED;
goto out;
}
if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size))
goto out;
if (tx_size != ZBX_IPC_HEADER_SIZE + size)
{
client->tx_header[ZBX_IPC_MESSAGE_CODE] = code;
client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size;
client->tx_data = (unsigned char *)zbx_malloc(NULL, size);
memcpy(client->tx_data, data, size);
client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size;
event_add(client->tx_event, NULL);
}
ret = SUCCEED;
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: closes client socket and frees resources allocated for client *
* *
* Parameters: client - [IN] the IPC client *
* *
******************************************************************************/
void zbx_ipc_client_close(zbx_ipc_client_t *client)
{
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
ipc_client_free_events(client);
zbx_ipc_socket_close(&client->csocket);
ipc_service_remove_client(client->service, client);
zbx_queue_ptr_remove_value(&client->service->clients_recv, client);
zbx_ipc_client_release(client);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
int zbx_ipc_client_get_fd(zbx_ipc_client_t *client)
{
return client->csocket.fd;
}
void zbx_ipc_client_addref(zbx_ipc_client_t *client)
{
client->refcount++;
}
void zbx_ipc_client_release(zbx_ipc_client_t *client)
{
if (0 == --client->refcount)
ipc_client_free(client);
}
int zbx_ipc_client_connected(zbx_ipc_client_t *client)
{
return (NULL == client->rx_event ? FAIL : SUCCEED);
}
zbx_uint64_t zbx_ipc_client_id(const zbx_ipc_client_t *client)
{
return client->id;
}
void zbx_ipc_client_set_userdata(zbx_ipc_client_t *client, void *userdata)
{
client->userdata = userdata;
}
void *zbx_ipc_client_get_userdata(zbx_ipc_client_t *client)
{
return client->userdata;
}
/******************************************************************************
* *
* Purpose: opens asynchronous socket to IPC service client *
* *
* Parameters: client - [OUT] the IPC service client *
* service_name - [IN] the IPC service name *
* timeout - [IN] the connection timeout *
* error - [OUT] the error message *
* *
* Return value: SUCCEED - the socket was successfully opened *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error)
{
int ret = FAIL, flags;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
memset(asocket, 0, sizeof(zbx_ipc_async_socket_t));
asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t));
memset(asocket->client, 0, sizeof(zbx_ipc_client_t));
if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error))
{
zbx_free(asocket->client);
goto out;
}
if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0)))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags");
exit(EXIT_FAILURE);
}
if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket");
exit(EXIT_FAILURE);
}
asocket->ev = event_base_new();
asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket);
asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST,
ipc_async_socket_read_event_cb, (void *)asocket);
asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST,
ipc_async_socket_write_event_cb, (void *)asocket);
event_add(asocket->client->rx_event, NULL);
asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
ret = SUCCEED;
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: closes asynchronous IPC socket and frees allocated resources *
* *
* Parameters: asocket - [IN] the asynchronous IPC socket *
* *
******************************************************************************/
void zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket)
{
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
ipc_client_free(asocket->client);
asocket->client = NULL;
event_free(asocket->ev_timer);
event_base_free(asocket->ev);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
/******************************************************************************
* *
* Purpose: Sends message through asynchronous IPC socket *
* *
* Parameters: asocket - [IN] the asynchronous IPC socket *
* code - [IN] the message code *
* data - [IN] the data *
* size - [IN] the data size *
* *
* Comments: If data can't be written directly to socket (buffer full) then *
* the message is queued and sent during zbx_ipc_async_socket_recv()*
* or zbx_ipc_async_socket_flush() functions whenever socket becomes*
* ready. *
* *
******************************************************************************/
int zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data,
zbx_uint32_t size)
{
int ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
ret = zbx_ipc_client_send(asocket->client, code, data, size);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
/******************************************************************************
* *
* Purpose: receives message through asynchronous IPC socket *
* *
* Parameters: asocket - [IN] the asynchronous IPC socket *
* timeout - [IN] the timeout in seconds, 0 is used for *
* nonblocking call and ZBX_IPC_WAIT_FOREVER is *
* used for blocking call without timeout *
* message - [OUT] the received message or NULL if the client *
* connection was closed. *
* The message must be freed by caller with *
* ipc_message_free() function. *
* *
* Return value: SUCCEED - the message was read successfully or timeout *
* occurred *
* FAIL - otherwise *
* *
* Comments: After socket has been closed (or connection error has occurred) *
* calls to zbx_ipc_client_read() will return success with buffered *
* messages, until all buffered messages are retrieved. *
* *
******************************************************************************/
int zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message)
{
int ret, flags;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue))
{
if (ZBX_IPC_WAIT_FOREVER != timeout)
{
struct timeval tv = {timeout, 0};
evtimer_add(asocket->ev_timer, &tv);
}
flags = EVLOOP_ONCE;
}
else
flags = EVLOOP_NONBLOCK;
/* do only single event base loop if timeout is not set */
asocket->state = (0 != timeout ? ZBX_IPC_ASYNC_SOCKET_STATE_NONE : ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT);
do
{
event_base_loop(asocket->ev, flags);
*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue);
}
while (NULL == *message && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && NULL != *message)
{
char *data = NULL;
zbx_ipc_message_format(*message, &data);
zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data);
zbx_free(data);
}
if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
ret = SUCCEED;
else
ret = FAIL;
evtimer_del(asocket->ev_timer);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
return ret;
}
/******************************************************************************
* *
* Purpose: flushes unsent through asynchronous IPC socket *
* *
* Parameters: asocket - [IN] the asynchronous IPC service socket *
* timeout - [IN] the timeout in seconds, 0 is used for *
* nonblocking call and ZBX_IPC_WAIT_FOREVER is *
* used for blocking call without timeout *
* *
* Return value: SUCCEED - the data was flushed successfully or timeout *
* occurred. Use zbx_ipc_client_unsent_data() to *
* check if all data was sent. *
* FAIL - failed to send data (connection was closed or an *
* error occurred). *
* *
******************************************************************************/
int zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout)
{
int ret = FAIL, flags;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout);
if (0 == asocket->client->tx_bytes)
{
ret = SUCCEED;
goto out;
}
if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state)
goto out;
asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE;
if (0 != timeout)
{
if (ZBX_IPC_WAIT_FOREVER != timeout)
{
struct timeval tv = {timeout, 0};
evtimer_add(asocket->ev_timer, &tv);
}
flags = EVLOOP_ONCE;
}
else
flags = EVLOOP_NONBLOCK;
do
{
event_base_loop(asocket->ev, flags);
if (SUCCEED != zbx_ipc_client_connected(asocket->client))
goto out;
}
while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state);
if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state)
{
ret = SUCCEED;
asocket->state = ZBX_IPC_CLIENT_STATE_NONE;
}
out:
evtimer_del(asocket->ev_timer);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret);
return ret;
}
/******************************************************************************
* *
* Purpose: check if there are data to be sent *
* *
* Parameters: asocket - [IN] the asynchronous IPC service socket *
* *
* Return value: SUCCEED - there are messages queued to be sent *
* FAIL - all data has been sent *
* *
******************************************************************************/
int zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket)
{
return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED);
}
/******************************************************************************
* *
* Purpose: check if socket is connected *
* *
* Parameters: asocket - [IN] the asynchronous IPC service socket *
* *
* Return value: SUCCEED - socket is connected *
* FAIL - otherwise *
* *
******************************************************************************/
int zbx_ipc_async_socket_connected(zbx_ipc_async_socket_t *asocket)
{
if (NULL == asocket->client)
return FAIL;
return zbx_ipc_client_connected(asocket->client);
}
/******************************************************************************
* *
* Purpose: connect, send message and receive response in a given timeout *
* *
* Parameters: service_name - [IN] the IPC service name *
* code - [IN] the message code *
* timeout - [IN] time allowed to be spent on receive, note *
* that this does not include open, send and *
* flush that have their own timeouts *
* data - [IN] the data *
* size - [IN] the data size *
* out - [OUT] the received message or NULL on error *
* The message must be freed by zbx_free() *
* error - [OUT] the error message *
* *
* Return value: SUCCEED - successfully sent message and received response *
* FAIL - error occurred *
* *
******************************************************************************/
int zbx_ipc_async_exchange(const char *service_name, zbx_uint32_t code, int timeout, const unsigned char *data,
zbx_uint32_t size, unsigned char **out, char **error)
{
zbx_ipc_message_t *message;
zbx_ipc_async_socket_t asocket;
int ret = FAIL;
zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:'%s' code:%u timeout:%d", __func__, service_name, code, timeout);
if (FAIL == zbx_ipc_async_socket_open(&asocket, service_name, timeout, error))
goto out;
if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size))
{
*error = zbx_strdup(NULL, "Cannot send request");
goto fail;
}
if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout))
{
*error = zbx_strdup(NULL, "Cannot flush request");
goto fail;
}
if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message))
{
*error = zbx_strdup(NULL, "Cannot receive response");
goto fail;
}
if (NULL == message)
{
*error = zbx_strdup(NULL, "Timeout while waiting for response");
goto fail;
}
*out = message->data;
message->data = NULL;
zbx_ipc_message_free(message);
ret = SUCCEED;
fail:
zbx_ipc_async_socket_close(&asocket);
out:
zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
return ret;
}
void zbx_init_library_ipcservice(unsigned char program_type)
{
switch (program_type)
{
case ZBX_PROGRAM_TYPE_SERVER:
ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_SERVER;
ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER);
break;
case ZBX_PROGRAM_TYPE_PROXY_ACTIVE:
case ZBX_PROGRAM_TYPE_PROXY_PASSIVE:
ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_PROXY;
ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY);
break;
case ZBX_PROGRAM_TYPE_AGENTD:
ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_AGENT;
ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT);
break;
default:
ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_NONE;
ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE);
break;
}
}
#endif