#include "zbxcommon.h" #ifdef HAVE_IPCSERVICE #ifdef HAVE_LIBEVENT # include <event.h> # include <event2/thread.h> #endif #include "zbxipcservice.h" #include "zbxalgo.h" #include "log.h" #include "zbxstr.h" #define ZBX_IPC_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path) #define ZBX_IPC_DATA_DUMP_SIZE 128 static char ipc_path[ZBX_IPC_PATH_MAX] = {0}; static size_t ipc_path_root_len = 0; #define ZBX_IPC_CLIENT_STATE_NONE 0 #define ZBX_IPC_CLIENT_STATE_QUEUED 1 #define ZBX_IPC_ASYNC_SOCKET_STATE_NONE 0 #define ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT 1 #define ZBX_IPC_ASYNC_SOCKET_STATE_ERROR 2 /* IPC client, providing nonblocking connections through socket */ struct zbx_ipc_client { zbx_ipc_socket_t csocket; zbx_ipc_service_t *service; zbx_uint32_t rx_header[2]; unsigned char *rx_data; zbx_uint32_t rx_bytes; zbx_queue_ptr_t rx_queue; struct event *rx_event; zbx_uint32_t tx_header[2]; unsigned char *tx_data; zbx_uint32_t tx_bytes; zbx_queue_ptr_t tx_queue; struct event *tx_event; zbx_uint64_t id; unsigned char state; void *userdata; zbx_uint32_t refcount; }; /* * Private API */ #define ZBX_IPC_HEADER_SIZE (int)(sizeof(zbx_uint32_t) * 2) #define ZBX_IPC_MESSAGE_CODE 0 #define ZBX_IPC_MESSAGE_SIZE 1 #if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x2000000 typedef int evutil_socket_t; static struct event *event_new(struct event_base *ev, evutil_socket_t fd, short what, void(*cb_func)(int, short, void *), void *cb_arg) { struct event *event; event = zbx_malloc(NULL, sizeof(struct event)); event_set(event, fd, what, cb_func, cb_arg); event_base_set(ev, event); return event; } static void event_free(struct event *event) { event_del(event); zbx_free(event); } #endif static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg); static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg); static const char *ipc_get_path(void) { ipc_path[ipc_path_root_len] = '\0'; return ipc_path; } #define ZBX_IPC_SOCKET_PREFIX "/zabbix_" #define ZBX_IPC_SOCKET_SUFFIX ".sock" #define ZBX_IPC_CLASS_PREFIX_NONE "" #define ZBX_IPC_CLASS_PREFIX_SERVER "server_" #define ZBX_IPC_CLASS_PREFIX_PROXY "proxy_" #define ZBX_IPC_CLASS_PREFIX_AGENT "agent_" static const char *ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_NONE; static size_t ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE); /****************************************************************************** * * * Purpose: makes socket path from the service name * * * * Parameters: service_name - [IN] the service name * * error - [OUT] the error message * * * * Return value: The created path or NULL if the path exceeds unix domain * * socket path maximum length * * * ******************************************************************************/ static const char *ipc_make_path(const char *service_name, char **error) { size_t path_len, offset; path_len = strlen(service_name); if (ZBX_IPC_PATH_MAX < ipc_path_root_len + path_len + 1 + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX) + ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + ipc_path_prefix_len) { *error = zbx_dsprintf(*error, "Socket path \"%s%s%s%s%s\" exceeds maximum length of unix domain socket path.", ipc_path, ZBX_IPC_SOCKET_PREFIX, ipc_path_prefix, service_name, ZBX_IPC_SOCKET_SUFFIX); return NULL; } offset = ipc_path_root_len; memcpy(ipc_path + offset , ZBX_IPC_SOCKET_PREFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX)); offset += ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_PREFIX); memcpy(ipc_path + offset, ipc_path_prefix, ipc_path_prefix_len); offset += ipc_path_prefix_len; memcpy(ipc_path + offset, service_name, path_len); offset += path_len; memcpy(ipc_path + offset, ZBX_IPC_SOCKET_SUFFIX, ZBX_CONST_STRLEN(ZBX_IPC_SOCKET_SUFFIX) + 1); return ipc_path; } /****************************************************************************** * * * Purpose: writes data to a socket * * * * Parameters: fd - [IN] the socket file descriptor * * data - [IN] the data * * size - [IN] the data size * * size_sent - [IN] the actual size written to socket * * * * Return value: SUCCEED - no socket errors were detected. Either the data or * * a part of it was written to socket or a write to * * non-blocking socket would block * * FAIL - otherwise * * * ******************************************************************************/ static int ipc_write_data(int fd, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *size_sent) { zbx_uint32_t offset = 0; int ret = SUCCEED; ssize_t n; while (offset != size) { n = write(fd, data + offset, size - offset); if (-1 == n) { if (EINTR == errno) continue; if (EWOULDBLOCK == errno || EAGAIN == errno) break; zabbix_log(LOG_LEVEL_WARNING, "cannot write to IPC socket: %s", strerror(errno)); ret = FAIL; break; } offset += n; } *size_sent = offset; return ret; } /****************************************************************************** * * * Purpose: reads data from a socket * * * * Parameters: fd - [IN] the socket file descriptor * * data - [IN] the data * * size - [IN] the data size * * size_sent - [IN] the actual size read from socket * * * * Return value: SUCCEED - the data was successfully read * * FAIL - otherwise * * * * Comments: When reading data from non-blocking sockets SUCCEED will be * * returned also if there were no more data to read. * * * ******************************************************************************/ static int ipc_read_data(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size) { int n; *read_size = 0; while (-1 == (n = read(fd, buffer + *read_size, size - *read_size))) { if (EINTR == errno) continue; if (EWOULDBLOCK == errno || EAGAIN == errno) return SUCCEED; return FAIL; } if (0 == n) return FAIL; *read_size += n; return SUCCEED; } /****************************************************************************** * * * Purpose: reads data from a socket until the requested data has been read * * * * Parameters: fd - [IN] the socket file descriptor * * buffer - [IN] the data * * size - [IN] the data size * * read_size - [IN] the actual size read from socket * * * * Return value: SUCCEED - the data was successfully read * * FAIL - otherwise * * * * Comments: When reading data from non-blocking sockets this function will * * return SUCCEED if there are no data to read, even if not all of * * the requested data has been read. * * * ******************************************************************************/ static int ipc_read_data_full(int fd, unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size) { int ret = FAIL; zbx_uint32_t offset = 0, chunk_size; *read_size = 0; while (offset < size) { if (FAIL == ipc_read_data(fd, buffer + offset, size - offset, &chunk_size)) goto out; if (0 == chunk_size) break; offset += chunk_size; } ret = SUCCEED; out: *read_size = offset; return ret; } /****************************************************************************** * * * Purpose: writes IPC message to socket * * * * Parameters: csocket - [IN] the IPC socket * * code - [IN] the message code * * data - [IN] the data * * size - [IN] the data size * * tx_size - [IN] the actual size written to socket * * * * Return value: SUCCEED - no socket errors were detected. Either the data or * * a part of it was written to socket or a write to * * non-blocking socket would block * * FAIL - otherwise * * * * Comments: When using non-blocking sockets the tx_size parameter must be * * checked in addition to return value to tell if the message was * * sent successfully. * * * ******************************************************************************/ static int ipc_socket_write_message(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size, zbx_uint32_t *tx_size) { int ret; zbx_uint32_t size_data, buffer[ZBX_IPC_SOCKET_BUFFER_SIZE / sizeof(zbx_uint32_t)]; buffer[0] = code; buffer[1] = size; if (ZBX_IPC_SOCKET_BUFFER_SIZE - ZBX_IPC_HEADER_SIZE >= size) { if (0 != size) memcpy(buffer + 2, data, size); return ipc_write_data(csocket->fd, (unsigned char *)buffer, size + ZBX_IPC_HEADER_SIZE, tx_size); } if (FAIL == ipc_write_data(csocket->fd, (unsigned char *)buffer, ZBX_IPC_HEADER_SIZE, tx_size)) return FAIL; /* in the case of non-blocking sockets only a part of the header might be sent */ if (ZBX_IPC_HEADER_SIZE != *tx_size) return SUCCEED; ret = ipc_write_data(csocket->fd, data, size, &size_data); *tx_size += size_data; return ret; } /****************************************************************************** * * * Purpose: reads message header and data from buffer * * * * Parameters: header - [IN/OUT] the message header * * data - [OUT] the message data * * rx_bytes - [IN] the number of bytes stored in message * * (including header) * * buffer - [IN] the buffer to parse * * size - [IN] the number of bytes to parse * * read_size - [OUT] the number of bytes read * * * * Return value: SUCCEED - message was successfully parsed * * FAIL - not enough data * * * ******************************************************************************/ static int ipc_read_buffer(zbx_uint32_t *header, unsigned char **data, zbx_uint32_t rx_bytes, const unsigned char *buffer, zbx_uint32_t size, zbx_uint32_t *read_size) { zbx_uint32_t copy_size, data_size, data_offset; *read_size = 0; if (ZBX_IPC_HEADER_SIZE > rx_bytes) { copy_size = MIN(ZBX_IPC_HEADER_SIZE - rx_bytes, size); memcpy((char *)header + rx_bytes, buffer, copy_size); *read_size += copy_size; if (ZBX_IPC_HEADER_SIZE > rx_bytes + copy_size) return FAIL; data_size = header[ZBX_IPC_MESSAGE_SIZE]; if (0 == data_size) { *data = NULL; return SUCCEED; } *data = (unsigned char *)zbx_malloc(NULL, data_size); data_offset = 0; } else { data_size = header[ZBX_IPC_MESSAGE_SIZE]; data_offset = rx_bytes - ZBX_IPC_HEADER_SIZE; } copy_size = MIN(data_size - data_offset, size - *read_size); memcpy(*data + data_offset, buffer + *read_size, copy_size); *read_size += copy_size; return (rx_bytes + *read_size == data_size + ZBX_IPC_HEADER_SIZE ? SUCCEED : FAIL); } /****************************************************************************** * * * Purpose: checks if IPC message has been completed * * * * Parameters: header - [IN] the message header * * rx_bytes - [IN] the number of bytes set in message * * (including header) * * * * Return value: SUCCEED - message has been completed * * FAIL - otherwise * * * ******************************************************************************/ static int ipc_message_is_completed(const zbx_uint32_t *header, zbx_uint32_t rx_bytes) { if (ZBX_IPC_HEADER_SIZE > rx_bytes) return FAIL; if (header[ZBX_IPC_MESSAGE_SIZE] + ZBX_IPC_HEADER_SIZE != rx_bytes) return FAIL; return SUCCEED; } /****************************************************************************** * * * Purpose: reads IPC message from buffered client socket * * * * Parameters: csocket - [IN] the source socket * * header - [OUT] the header of the message * * data - [OUT] the data of the message * * rx_bytes - [IN/OUT] the total message size read (including * * header * * * * Return value: SUCCEED - data was read successfully, check rx_bytes to * * determine if the message was completed. * * FAIL - failed to read message (socket error or connection * * was closed). * * * ******************************************************************************/ static int ipc_socket_read_message(zbx_ipc_socket_t *csocket, zbx_uint32_t *header, unsigned char **data, zbx_uint32_t *rx_bytes) { zbx_uint32_t data_size, offset, read_size = 0; int ret = FAIL; /* try to read message from socket buffer */ if (csocket->rx_buffer_bytes > csocket->rx_buffer_offset) { ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer + csocket->rx_buffer_offset, csocket->rx_buffer_bytes - csocket->rx_buffer_offset, &read_size); csocket->rx_buffer_offset += read_size; *rx_bytes += read_size; if (SUCCEED == ret) goto out; } /* not enough data in socket buffer, try to read more until message is completed or no data to read */ while (SUCCEED != ret) { csocket->rx_buffer_offset = 0; csocket->rx_buffer_bytes = 0; if (ZBX_IPC_HEADER_SIZE < *rx_bytes) { offset = *rx_bytes - ZBX_IPC_HEADER_SIZE; data_size = header[ZBX_IPC_MESSAGE_SIZE] - offset; /* long messages will be read directly into message buffer */ if (ZBX_IPC_SOCKET_BUFFER_SIZE * 0.75 < data_size) { ret = ipc_read_data_full(csocket->fd, *data + offset, data_size, &read_size); *rx_bytes += read_size; goto out; } } if (FAIL == ipc_read_data(csocket->fd, csocket->rx_buffer, ZBX_IPC_SOCKET_BUFFER_SIZE, &read_size)) goto out; /* it's possible that nothing will be read on non-blocking sockets, return success */ if (0 == read_size) { ret = SUCCEED; goto out; } csocket->rx_buffer_bytes = read_size; ret = ipc_read_buffer(header, data, *rx_bytes, csocket->rx_buffer, csocket->rx_buffer_bytes, &read_size); csocket->rx_buffer_offset += read_size; *rx_bytes += read_size; } out: return ret; } /****************************************************************************** * * * Purpose: frees client's libevent event * * * * Parameters: client - [IN] the client * * * ******************************************************************************/ static void ipc_client_free_events(zbx_ipc_client_t *client) { if (NULL != client->rx_event) { event_free(client->rx_event); client->rx_event = NULL; } if (NULL != client->tx_event) { event_free(client->tx_event); client->tx_event = NULL; } } /****************************************************************************** * * * Purpose: frees IPC service client * * * * Parameters: client - [IN] the client to free * * * ******************************************************************************/ static void ipc_client_free(zbx_ipc_client_t *client) { zbx_ipc_message_t *message; ipc_client_free_events(client); zbx_ipc_socket_close(&client->csocket); while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->rx_queue))) zbx_ipc_message_free(message); zbx_queue_ptr_destroy(&client->rx_queue); zbx_free(client->rx_data); while (NULL != (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue))) zbx_ipc_message_free(message); zbx_queue_ptr_destroy(&client->tx_queue); zbx_free(client->tx_data); ipc_client_free_events(client); zbx_free(client); } /****************************************************************************** * * * Purpose: adds message to received messages queue * * * * Parameters: client - [IN] the client to read * * * ******************************************************************************/ static void ipc_client_push_rx_message(zbx_ipc_client_t *client) { zbx_ipc_message_t *message; message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t)); message->code = client->rx_header[ZBX_IPC_MESSAGE_CODE]; message->size = client->rx_header[ZBX_IPC_MESSAGE_SIZE]; message->data = client->rx_data; zbx_queue_ptr_push(&client->rx_queue, message); client->rx_data = NULL; client->rx_bytes = 0; } /****************************************************************************** * * * Purpose: prepares to send the next message in send queue * * * * Parameters: client - [IN] the client * * * ******************************************************************************/ static void ipc_client_pop_tx_message(zbx_ipc_client_t *client) { zbx_ipc_message_t *message; zbx_free(client->tx_data); client->tx_bytes = 0; if (NULL == (message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&client->tx_queue))) return; client->tx_bytes = ZBX_IPC_HEADER_SIZE + message->size; client->tx_header[ZBX_IPC_MESSAGE_CODE] = message->code; client->tx_header[ZBX_IPC_MESSAGE_SIZE] = message->size; client->tx_data = message->data; zbx_free(message); } /****************************************************************************** * * * Purpose: reads data from IPC service client * * * * Parameters: client - [IN] the client to read * * * * Return value: FAIL - read error/connection was closed * * * * Comments: This function reads data from socket, parses it and adds * * parsed messages to received messages queue. * * * ******************************************************************************/ static int ipc_client_read(zbx_ipc_client_t *client) { int rc; do { if (FAIL == ipc_socket_read_message(&client->csocket, client->rx_header, &client->rx_data, &client->rx_bytes)) { zbx_free(client->rx_data); client->rx_bytes = 0; return FAIL; } if (SUCCEED == (rc = ipc_message_is_completed(client->rx_header, client->rx_bytes))) ipc_client_push_rx_message(client); } while (SUCCEED == rc); return SUCCEED; } /****************************************************************************** * * * Purpose: writes queued data to IPC service client * * * * Parameters: client - [IN] the client * * * * Return value: SUCCEED - the data was sent successfully * * FAIL - otherwise * * * ******************************************************************************/ static int ipc_client_write(zbx_ipc_client_t *client) { zbx_uint32_t data_size, write_size; data_size = client->tx_header[ZBX_IPC_MESSAGE_SIZE]; if (data_size < client->tx_bytes) { zbx_uint32_t size, offset; size = client->tx_bytes - data_size; offset = ZBX_IPC_HEADER_SIZE - size; if (SUCCEED != ipc_write_data(client->csocket.fd, (unsigned char *)client->tx_header + offset, size, &write_size)) { return FAIL; } client->tx_bytes -= write_size; if (data_size < client->tx_bytes) return SUCCEED; } while (0 < client->tx_bytes) { if (SUCCEED != ipc_write_data(client->csocket.fd, client->tx_data + data_size - client->tx_bytes, client->tx_bytes, &write_size)) { return FAIL; } if (0 == write_size) return SUCCEED; client->tx_bytes -= write_size; } if (0 == client->tx_bytes) ipc_client_pop_tx_message(client); return SUCCEED; } /****************************************************************************** * * * Purpose: gets the next client with messages/closed socket from recv queue * * * * Parameters: service - [IN] the IPC service * * * * Return value: The client with messages/closed socket * * * ******************************************************************************/ static zbx_ipc_client_t *ipc_service_pop_client(zbx_ipc_service_t *service) { zbx_ipc_client_t *client; if (NULL != (client = (zbx_ipc_client_t *)zbx_queue_ptr_pop(&service->clients_recv))) client->state = ZBX_IPC_CLIENT_STATE_NONE; return client; } /****************************************************************************** * * * Purpose: pushes client to the recv queue if needed * * * * Parameters: service - [IN] the IPC service * * client - [IN] the IPC client * * * * Comments: The client is pushed to the recv queue if it isn't already there * * and there is messages to return or the client connection was * * closed. * * * ******************************************************************************/ static void ipc_service_push_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client) { if (ZBX_IPC_CLIENT_STATE_QUEUED == client->state) return; if (0 == zbx_queue_ptr_values_num(&client->rx_queue) && NULL != client->rx_event) return; client->state = ZBX_IPC_CLIENT_STATE_QUEUED; zbx_queue_ptr_push(&service->clients_recv, client); } /****************************************************************************** * * * Purpose: adds a new IPC service client * * * * Parameters: service - [IN] the IPC service * * fd - [IN] the client socket descriptor * * * ******************************************************************************/ static void ipc_service_add_client(zbx_ipc_service_t *service, int fd) { static zbx_uint64_t next_clientid = 1; zbx_ipc_client_t *client; int flags; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t)); memset(client, 0, sizeof(zbx_ipc_client_t)); if (-1 == (flags = fcntl(fd, F_GETFL, 0))) { zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags"); exit(EXIT_FAILURE); } if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK)) { zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket"); exit(EXIT_FAILURE); } client->csocket.fd = fd; client->csocket.rx_buffer_bytes = 0; client->csocket.rx_buffer_offset = 0; client->id = next_clientid++; client->state = ZBX_IPC_CLIENT_STATE_NONE; client->refcount = 1; zbx_queue_ptr_create(&client->rx_queue); zbx_queue_ptr_create(&client->tx_queue); client->service = service; client->rx_event = event_new(service->ev, fd, EV_READ | EV_PERSIST, ipc_client_read_event_cb, (void *)client); client->tx_event = event_new(service->ev, fd, EV_WRITE | EV_PERSIST, ipc_client_write_event_cb, (void *)client); event_add(client->rx_event, NULL); zbx_vector_ptr_append(&service->clients, client); zabbix_log(LOG_LEVEL_DEBUG, "End of %s() clientid:" ZBX_FS_UI64, __func__, client->id); } /****************************************************************************** * * * Purpose: removes IPC service client * * * * Parameters: service - [IN] the IPC service * * client - [IN] the client to remove * * * ******************************************************************************/ static void ipc_service_remove_client(zbx_ipc_service_t *service, zbx_ipc_client_t *client) { int i; for (i = 0; i < service->clients.values_num; i++) { if (service->clients.values[i] == client) zbx_vector_ptr_remove_noorder(&service->clients, i); } } /****************************************************************************** * * * Purpose: to find connected client when only it's ID is known * * * * Parameters: service - [IN] the IPC service * * id - [IN] ID of client * * * * Return value: address of client or NULL if client has already disconnected * * * ******************************************************************************/ zbx_ipc_client_t *zbx_ipc_client_by_id(const zbx_ipc_service_t *service, zbx_uint64_t id) { int i; zbx_ipc_client_t *client; for (i = 0; i < service->clients.values_num; i++) { client = (zbx_ipc_client_t *) service->clients.values[i]; if (id == client->id) return client; } return NULL; } /****************************************************************************** * * * Purpose: service client read event libevent callback * * * ******************************************************************************/ static void ipc_client_read_event_cb(evutil_socket_t fd, short what, void *arg) { zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg; ZBX_UNUSED(fd); ZBX_UNUSED(what); if (SUCCEED != ipc_client_read(client)) { ipc_client_free_events(client); ipc_service_remove_client(client->service, client); } ipc_service_push_client(client->service, client); } /****************************************************************************** * * * Purpose: service client write event libevent callback * * * ******************************************************************************/ static void ipc_client_write_event_cb(evutil_socket_t fd, short what, void *arg) { zbx_ipc_client_t *client = (zbx_ipc_client_t *)arg; ZBX_UNUSED(fd); ZBX_UNUSED(what); if (SUCCEED != ipc_client_write(client)) { zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client"); zbx_ipc_client_close(client); return; } if (0 == client->tx_bytes) event_del(client->tx_event); } /****************************************************************************** * * * Purpose: asynchronous socket write event libevent callback * * * ******************************************************************************/ static void ipc_async_socket_write_event_cb(evutil_socket_t fd, short what, void *arg) { zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg; ZBX_UNUSED(fd); ZBX_UNUSED(what); if (SUCCEED != ipc_client_write(asocket->client)) { zabbix_log(LOG_LEVEL_CRIT, "cannot send data to IPC client"); ipc_client_free_events(asocket->client); zbx_ipc_socket_close(&asocket->client->csocket); asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR; return; } if (0 == asocket->client->tx_bytes) event_del(asocket->client->tx_event); } /****************************************************************************** * * * Purpose: asynchronous socket read event libevent callback * * * ******************************************************************************/ static void ipc_async_socket_read_event_cb(evutil_socket_t fd, short what, void *arg) { zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg; ZBX_UNUSED(fd); ZBX_UNUSED(what); if (SUCCEED != ipc_client_read(asocket->client)) { ipc_client_free_events(asocket->client); asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_ERROR; } } /****************************************************************************** * * * Purpose: timer callback * * * ******************************************************************************/ static void ipc_async_socket_timer_cb(evutil_socket_t fd, short what, void *arg) { zbx_ipc_async_socket_t *asocket = (zbx_ipc_async_socket_t *)arg; ZBX_UNUSED(fd); ZBX_UNUSED(what); asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT; } /****************************************************************************** * * * Purpose: accepts a new client connection * * * * Parameters: service - [IN] the IPC service * * * ******************************************************************************/ static void ipc_service_accept(zbx_ipc_service_t *service) { int fd; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); while (-1 == (fd = accept(service->fd, NULL, NULL))) { if (EINTR != errno) { /* If there is unaccepted connection libevent will call registered callback function over and */ /* over again. It is better to exit straight away and cause all other processes to stop. */ zabbix_log(LOG_LEVEL_CRIT, "cannot accept incoming IPC connection: %s", zbx_strerror(errno)); exit(EXIT_FAILURE); } } ipc_service_add_client(service, fd); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: creates IPC message * * * * Parameters: code - [IN] the message code * * data - [IN] the data * * size - [IN] the data size * * * * Return value: The created message. * * * ******************************************************************************/ static zbx_ipc_message_t *ipc_message_create(zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size) { zbx_ipc_message_t *message; message = (zbx_ipc_message_t *)zbx_malloc(NULL, sizeof(zbx_ipc_message_t)); message->code = code; message->size = size; if (0 != size) { message->data = (unsigned char *)zbx_malloc(NULL, size); memcpy(message->data, data, size); } else message->data = NULL; return message; } /****************************************************************************** * * * Purpose: libevent logging callback * * * ******************************************************************************/ static void ipc_service_event_log_cb(int severity, const char *msg) { int loglevel; switch (severity) { case _EVENT_LOG_DEBUG: loglevel = LOG_LEVEL_TRACE; break; case _EVENT_LOG_MSG: loglevel = LOG_LEVEL_DEBUG; break; case _EVENT_LOG_WARN: loglevel = LOG_LEVEL_WARNING; break; case _EVENT_LOG_ERR: loglevel = LOG_LEVEL_DEBUG; break; default: loglevel = LOG_LEVEL_DEBUG; break; } zabbix_log(loglevel, "IPC service: %s", msg); } /****************************************************************************** * * * Purpose: initialize libevent library * * * ******************************************************************************/ static void ipc_service_init_libevent(void) { event_set_log_callback(ipc_service_event_log_cb); } /****************************************************************************** * * * Purpose: uninitialize libevent library * * * ******************************************************************************/ static void ipc_service_free_libevent(void) { } /****************************************************************************** * * * Purpose: libevent listener callback * * * ******************************************************************************/ static void ipc_service_client_connected_cb(evutil_socket_t fd, short what, void *arg) { zbx_ipc_service_t *service = (zbx_ipc_service_t *)arg; ZBX_UNUSED(fd); ZBX_UNUSED(what); ipc_service_accept(service); } /****************************************************************************** * * * Purpose: timer callback * * * ******************************************************************************/ static void ipc_service_timer_cb(evutil_socket_t fd, short what, void *arg) { ZBX_UNUSED(fd); ZBX_UNUSED(what); ZBX_UNUSED(arg); } /****************************************************************************** * * * Purpose: checks if an IPC service is already running * * * * Parameters: service_name - [IN] * * * ******************************************************************************/ static int ipc_check_running_service(const char *service_name) { zbx_ipc_socket_t csocket; int ret; char *error = NULL; if (SUCCEED == (ret = zbx_ipc_socket_open(&csocket, service_name, 0, &error))) zbx_ipc_socket_close(&csocket); else zbx_free(error); return ret; } /* * Public client API */ /****************************************************************************** * * * Purpose: opens socket to an IPC service listening on the specified path * * * * Parameters: csocket - [OUT] the IPC socket to the service * * service_name - [IN] the IPC service name * * timeout - [IN] the connection timeout * * error - [OUT] the error message * * * * Return value: SUCCEED - the socket was successfully opened * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_socket_open(zbx_ipc_socket_t *csocket, const char *service_name, int timeout, char **error) { struct sockaddr_un addr; time_t start; struct timespec ts = {0, 100000000}; const char *socket_path; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (NULL == (socket_path = ipc_make_path(service_name, error))) goto out; if (-1 == (csocket->fd = socket(AF_UNIX, SOCK_STREAM, 0))) { *error = zbx_dsprintf(*error, "Cannot create client socket: %s.", zbx_strerror(errno)); goto out; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path)); start = time(NULL); while (0 != connect(csocket->fd, (struct sockaddr*)&addr, sizeof(addr))) { if (0 == timeout || time(NULL) - start > timeout) { *error = zbx_dsprintf(*error, "Cannot connect to service \"%s\": %s.", service_name, zbx_strerror(errno)); close(csocket->fd); goto out; } nanosleep(&ts, NULL); } csocket->rx_buffer_bytes = 0; csocket->rx_buffer_offset = 0; ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: closes socket to an IPC service * * * * Parameters: csocket - [IN/OUT] the IPC socket to close * * * ******************************************************************************/ void zbx_ipc_socket_close(zbx_ipc_socket_t *csocket) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (-1 != csocket->fd) { close(csocket->fd); csocket->fd = -1; } zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: writes a message to IPC service * * * * Parameters: csocket - [IN] an opened IPC socket to the service * * code - [IN] the message code * * data - [IN] the data * * size - [IN] the data size * * * * Return value: SUCCEED - the message was successfully written * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_socket_write(zbx_ipc_socket_t *csocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size) { int ret; zbx_uint32_t size_sent; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (SUCCEED == ipc_socket_write_message(csocket, code, data, size, &size_sent) && size_sent == size + ZBX_IPC_HEADER_SIZE) { ret = SUCCEED; } else ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: reads a message from IPC service * * * * Parameters: csocket - [IN] an opened IPC socket to the service * * message - [OUT] the received message * * * * Return value: SUCCEED - the message was successfully received * * FAIL - otherwise * * * * Comments: If this function succeeds the message must be cleaned/freed by * * the caller. * * * ******************************************************************************/ int zbx_ipc_socket_read(zbx_ipc_socket_t *csocket, zbx_ipc_message_t *message) { int ret = FAIL; zbx_uint32_t rx_bytes = 0, header[2]; unsigned char *data = NULL; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); if (SUCCEED != ipc_socket_read_message(csocket, header, &data, &rx_bytes)) goto out; if (SUCCEED != ipc_message_is_completed(header, rx_bytes)) { zbx_free(data); goto out; } message->code = header[ZBX_IPC_MESSAGE_CODE]; message->size = header[ZBX_IPC_MESSAGE_SIZE]; message->data = data; if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE)) { char *msg = NULL; zbx_ipc_message_format(message, &msg); zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, msg); zbx_free(msg); } ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: check if socket is opened * * * * Parameters: csocket - [OUT] the IPC socket to the service * * * * Return value: SUCCEED - the socket was successfully opened * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_socket_connected(const zbx_ipc_socket_t *csocket) { return 0 < csocket->fd ? SUCCEED : FAIL; } /****************************************************************************** * * * Purpose: frees the resources allocated to store IPC message data * * * * Parameters: message - [IN] the message to free * * * ******************************************************************************/ void zbx_ipc_message_free(zbx_ipc_message_t *message) { if (NULL != message) { zbx_free(message->data); zbx_free(message); } } /****************************************************************************** * * * Purpose: frees the resources allocated to store IPC message data * * * * Parameters: message - [IN] the message to clean * * * ******************************************************************************/ void zbx_ipc_message_clean(zbx_ipc_message_t *message) { zbx_free(message->data); } /****************************************************************************** * * * Purpose: initializes IPC message * * * * Parameters: message - [IN] the message to initialize * * * ******************************************************************************/ void zbx_ipc_message_init(zbx_ipc_message_t *message) { memset(message, 0, sizeof(zbx_ipc_message_t)); } /****************************************************************************** * * * Purpose: formats message to readable format for debug messages * * * * Parameters: message - [IN] the message * * data - [OUT] the formatted message * * * ******************************************************************************/ void zbx_ipc_message_format(const zbx_ipc_message_t *message, char **data) { size_t data_alloc = ZBX_IPC_DATA_DUMP_SIZE * 4 + 32, data_offset = 0; zbx_uint32_t i, data_num; if (NULL == message) return; data_num = message->size; if (ZBX_IPC_DATA_DUMP_SIZE < data_num) data_num = ZBX_IPC_DATA_DUMP_SIZE; *data = (char *)zbx_malloc(*data, data_alloc); zbx_snprintf_alloc(data, &data_alloc, &data_offset, "code:%u size:%u data:", message->code, message->size); for (i = 0; i < data_num; i++) { if (0 != i) zbx_strcpy_alloc(data, &data_alloc, &data_offset, (0 == (i & 7) ? " | " : " ")); zbx_snprintf_alloc(data, &data_alloc, &data_offset, "%02x", (int)message->data[i]); } (*data)[data_offset] = '\0'; } #ifdef HAVE_OPENIPMI /****************************************************************************** * * * Purpose: copies ipc message * * * * Parameters: dst - [IN] the destination message * * src - [IN] the source message * * * ******************************************************************************/ void zbx_ipc_message_copy(zbx_ipc_message_t *dst, const zbx_ipc_message_t *src) { dst->code = src->code; dst->size = src->size; dst->data = (unsigned char *)zbx_malloc(NULL, src->size); memcpy(dst->data, src->data, src->size); } #endif /* HAVE_OPENIPMI */ static void ipc_service_user_cb(evutil_socket_t fd, short what, void *arg) { ZBX_UNUSED(fd); ZBX_UNUSED(what); ZBX_UNUSED(arg); } /* * Public service API */ /****************************************************************************** * * * Purpose: initializes IPC service environment * * * * Parameters: path - [IN] the service root path * * error - [OUT] the error message * * * * Return value: SUCCEED - the environment was initialized successfully. * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_service_init_env(const char *path, char **error) { struct stat fs; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, path); if (0 != ipc_path_root_len) { *error = zbx_dsprintf(*error, "The IPC service environment has been already initialized with" " root directory at \"%s\".", ipc_get_path()); goto out; } if (0 != stat(path, &fs)) { *error = zbx_dsprintf(*error, "Failed to stat the specified path \"%s\": %s.", path, zbx_strerror(errno)); goto out; } if (0 == S_ISDIR(fs.st_mode)) { *error = zbx_dsprintf(*error, "The specified path \"%s\" is not a directory.", path); goto out; } if (0 != access(path, W_OK | R_OK)) { *error = zbx_dsprintf(*error, "Cannot access path \"%s\": %s.", path, zbx_strerror(errno)); goto out; } ipc_path_root_len = strlen(path); if (ZBX_IPC_PATH_MAX < ipc_path_root_len + 3) { *error = zbx_dsprintf(*error, "The IPC root path \"%s\" is too long.", path); goto out; } memcpy(ipc_path, path, ipc_path_root_len + 1); while (1 < ipc_path_root_len && '/' == ipc_path[ipc_path_root_len - 1]) ipc_path[--ipc_path_root_len] = '\0'; ipc_service_init_libevent(); if (0 != evthread_use_pthreads()) { *error = zbx_strdup(*error, "Cannot initialize libevent threading support"); goto out; } ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: frees IPC service environment * * * ******************************************************************************/ void zbx_ipc_service_free_env(void) { ipc_service_free_libevent(); } /****************************************************************************** * * * Purpose: starts IPC service on the specified path * * * * Parameters: service - [IN/OUT] the IPC service * * service_name - [IN] the unix domain socket path * * error - [OUT] the error message * * * * Return value: SUCCEED - the service was initialized successfully. * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_service_start(zbx_ipc_service_t *service, const char *service_name, char **error) { struct sockaddr_un addr; const char *socket_path; int ret = FAIL; mode_t mode; zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:%s", __func__, service_name); mode = umask(077); if (NULL == (socket_path = ipc_make_path(service_name, error))) goto out; if (0 == access(socket_path, F_OK)) { if (0 != access(socket_path, W_OK)) { *error = zbx_dsprintf(*error, "The file \"%s\" is used by another process.", socket_path); goto out; } if (SUCCEED == ipc_check_running_service(service_name)) { *error = zbx_dsprintf(*error, "\"%s\" service is already running.", service_name); goto out; } unlink(socket_path); } if (-1 == (service->fd = socket(AF_UNIX, SOCK_STREAM, 0))) { *error = zbx_dsprintf(*error, "Cannot create socket: %s.", zbx_strerror(errno)); goto out; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; memcpy(addr.sun_path, socket_path, sizeof(addr.sun_path)); if (0 != bind(service->fd, (struct sockaddr*)&addr, sizeof(addr))) { *error = zbx_dsprintf(*error, "Cannot bind socket to \"%s\": %s.", socket_path, zbx_strerror(errno)); goto out; } if (0 != listen(service->fd, SOMAXCONN)) { *error = zbx_dsprintf(*error, "Cannot listen socket: %s.", zbx_strerror(errno)); goto out; } service->path = zbx_strdup(NULL, service_name); zbx_vector_ptr_create(&service->clients); zbx_queue_ptr_create(&service->clients_recv); service->ev = event_base_new(); service->ev_listener = event_new(service->ev, service->fd, EV_READ | EV_PERSIST, ipc_service_client_connected_cb, service); event_add(service->ev_listener, NULL); service->ev_timer = event_new(service->ev, -1, 0, ipc_service_timer_cb, service); service->ev_alert = event_new(service->ev, -1, 0, ipc_service_user_cb, NULL); ret = SUCCEED; out: umask(mode); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: closes IPC service and frees the resources allocated by it * * * * Parameters: service - [IN/OUT] the IPC service * * * ******************************************************************************/ void zbx_ipc_service_close(zbx_ipc_service_t *service) { int i; zabbix_log(LOG_LEVEL_DEBUG, "In %s() path:%s", __func__, service->path); close(service->fd); for (i = 0; i < service->clients.values_num; i++) ipc_client_free((zbx_ipc_client_t *)service->clients.values[i]); zbx_free(service->path); zbx_vector_ptr_destroy(&service->clients); zbx_queue_ptr_destroy(&service->clients_recv); event_free(service->ev_alert); event_free(service->ev_timer); event_free(service->ev_listener); event_base_free(service->ev); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: receives ipc message from a connected client * * * * Parameters: service - [IN] the IPC service * * timeout - [IN] the timeout. (0,0) is used for nonblocking call * * and (ZBX_IPC_WAIT_FOREVER, *) is * * used for blocking call without timeout * * client - [OUT] the client that sent the message or * * NULL if there are no messages and the * * specified timeout passed. * * The client must be released by caller with * * zbx_ipc_client_release() function. * * message - [OUT] the received message or NULL if the client * * connection was closed. * * The message must be freed by caller with * * ipc_message_free() function. * * * * Return value: ZBX_IPC_RECV_IMMEDIATE - returned immediately without * * waiting for socket events * * (pending events are processed) * * ZBX_IPC_RECV_WAIT - returned after receiving socket * * event * * ZBX_IPC_RECV_TIMEOUT - returned after timeout expired * * * ******************************************************************************/ int zbx_ipc_service_recv(zbx_ipc_service_t *service, const zbx_timespec_t *timeout, zbx_ipc_client_t **client, zbx_ipc_message_t **message) { int ret, flags; zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d.%03d", __func__, timeout->sec, timeout->ns / 1000000); if ((0 != timeout->sec || 0 != timeout->ns) && SUCCEED == zbx_queue_ptr_empty(&service->clients_recv)) { if (ZBX_IPC_WAIT_FOREVER != timeout->sec) { struct timeval tv = {timeout->sec, timeout->ns / 1000}; evtimer_add(service->ev_timer, &tv); } flags = EVLOOP_ONCE; } else flags = EVLOOP_NONBLOCK; event_base_loop(service->ev, flags); if (NULL != (*client = ipc_service_pop_client(service))) { if (NULL != (*message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&(*client)->rx_queue))) { if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE)) { char *data = NULL; zbx_ipc_message_format(*message, &data); zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data); zbx_free(data); } ipc_service_push_client(service, *client); zbx_ipc_client_addref(*client); } ret = (EVLOOP_NONBLOCK == flags ? ZBX_IPC_RECV_IMMEDIATE : ZBX_IPC_RECV_WAIT); } else { ret = ZBX_IPC_RECV_TIMEOUT; *client = NULL; *message = NULL; } evtimer_del(service->ev_timer); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret); return ret; } /****************************************************************************** * * * Purpose: interrupt IPC service recv loop from another thread * * * ******************************************************************************/ void zbx_ipc_service_alert(zbx_ipc_service_t *service) { event_active(service->ev_alert, 0, 0); } /****************************************************************************** * * * Purpose: Sends IPC message to client * * * * Parameters: client - [IN] the IPC client * * code - [IN] the message code * * data - [IN] the data * * size - [IN] the data size * * * * Comments: If data can't be written directly to socket (buffer full) then * * the message is queued and sent during zbx_ipc_service_recv() * * messaging loop whenever socket becomes ready. * * * ******************************************************************************/ int zbx_ipc_client_send(zbx_ipc_client_t *client, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size) { zbx_uint32_t tx_size = 0; zbx_ipc_message_t *message; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, client->id); if (0 != client->tx_bytes) { message = ipc_message_create(code, data, size); zbx_queue_ptr_push(&client->tx_queue, message); ret = SUCCEED; goto out; } if (FAIL == ipc_socket_write_message(&client->csocket, code, data, size, &tx_size)) goto out; if (tx_size != ZBX_IPC_HEADER_SIZE + size) { client->tx_header[ZBX_IPC_MESSAGE_CODE] = code; client->tx_header[ZBX_IPC_MESSAGE_SIZE] = size; client->tx_data = (unsigned char *)zbx_malloc(NULL, size); memcpy(client->tx_data, data, size); client->tx_bytes = ZBX_IPC_HEADER_SIZE + size - tx_size; event_add(client->tx_event, NULL); } ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: closes client socket and frees resources allocated for client * * * * Parameters: client - [IN] the IPC client * * * ******************************************************************************/ void zbx_ipc_client_close(zbx_ipc_client_t *client) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); ipc_client_free_events(client); zbx_ipc_socket_close(&client->csocket); ipc_service_remove_client(client->service, client); zbx_queue_ptr_remove_value(&client->service->clients_recv, client); zbx_ipc_client_release(client); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } void zbx_ipc_client_addref(zbx_ipc_client_t *client) { client->refcount++; } void zbx_ipc_client_release(zbx_ipc_client_t *client) { if (0 == --client->refcount) ipc_client_free(client); } int zbx_ipc_client_connected(zbx_ipc_client_t *client) { return (NULL == client->rx_event ? FAIL : SUCCEED); } zbx_uint64_t zbx_ipc_client_id(const zbx_ipc_client_t *client) { return client->id; } void zbx_ipc_client_set_userdata(zbx_ipc_client_t *client, void *userdata) { client->userdata = userdata; } void *zbx_ipc_client_get_userdata(zbx_ipc_client_t *client) { return client->userdata; } /****************************************************************************** * * * Purpose: opens asynchronous socket to IPC service client * * * * Parameters: client - [OUT] the IPC service client * * service_name - [IN] the IPC service name * * timeout - [IN] the connection timeout * * error - [OUT] the error message * * * * Return value: SUCCEED - the socket was successfully opened * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_async_socket_open(zbx_ipc_async_socket_t *asocket, const char *service_name, int timeout, char **error) { int ret = FAIL, flags; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); memset(asocket, 0, sizeof(zbx_ipc_async_socket_t)); asocket->client = (zbx_ipc_client_t *)zbx_malloc(NULL, sizeof(zbx_ipc_client_t)); memset(asocket->client, 0, sizeof(zbx_ipc_client_t)); if (SUCCEED != zbx_ipc_socket_open(&asocket->client->csocket, service_name, timeout, error)) { zbx_free(asocket->client); goto out; } if (-1 == (flags = fcntl(asocket->client->csocket.fd, F_GETFL, 0))) { zabbix_log(LOG_LEVEL_CRIT, "cannot get IPC client socket flags"); exit(EXIT_FAILURE); } if (-1 == fcntl(asocket->client->csocket.fd, F_SETFL, flags | O_NONBLOCK)) { zabbix_log(LOG_LEVEL_CRIT, "cannot set non-blocking mode for IPC client socket"); exit(EXIT_FAILURE); } asocket->ev = event_base_new(); asocket->ev_timer = event_new(asocket->ev, -1, 0, ipc_async_socket_timer_cb, asocket); asocket->client->rx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_READ | EV_PERSIST, ipc_async_socket_read_event_cb, (void *)asocket); asocket->client->tx_event = event_new(asocket->ev, asocket->client->csocket.fd, EV_WRITE | EV_PERSIST, ipc_async_socket_write_event_cb, (void *)asocket); event_add(asocket->client->rx_event, NULL); asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE; ret = SUCCEED; out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: closes asynchronous IPC socket and frees allocated resources * * * * Parameters: asocket - [IN] the asynchronous IPC socket * * * ******************************************************************************/ void zbx_ipc_async_socket_close(zbx_ipc_async_socket_t *asocket) { zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); ipc_client_free(asocket->client); asocket->client = NULL; event_free(asocket->ev_timer); event_base_free(asocket->ev); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); } /****************************************************************************** * * * Purpose: Sends message through asynchronous IPC socket * * * * Parameters: asocket - [IN] the asynchronous IPC socket * * code - [IN] the message code * * data - [IN] the data * * size - [IN] the data size * * * * Comments: If data can't be written directly to socket (buffer full) then * * the message is queued and sent during zbx_ipc_async_socket_recv()* * or zbx_ipc_async_socket_flush() functions whenever socket becomes* * ready. * * * ******************************************************************************/ int zbx_ipc_async_socket_send(zbx_ipc_async_socket_t *asocket, zbx_uint32_t code, const unsigned char *data, zbx_uint32_t size) { int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); ret = zbx_ipc_client_send(asocket->client, code, data, size); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } /****************************************************************************** * * * Purpose: receives message through asynchronous IPC socket * * * * Parameters: asocket - [IN] the asynchronous IPC socket * * timeout - [IN] the timeout in seconds, 0 is used for * * nonblocking call and ZBX_IPC_WAIT_FOREVER is * * used for blocking call without timeout * * message - [OUT] the received message or NULL if the client * * connection was closed. * * The message must be freed by caller with * * ipc_message_free() function. * * * * Return value: SUCCEED - the message was read successfully or timeout * * occurred * * FAIL - otherwise * * * * Comments: After socket has been closed (or connection error has occurred) * * calls to zbx_ipc_client_read() will return success with buffered * * messages, until all buffered messages are retrieved. * * * ******************************************************************************/ int zbx_ipc_async_socket_recv(zbx_ipc_async_socket_t *asocket, int timeout, zbx_ipc_message_t **message) { int ret, flags; zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout); if (timeout != 0 && SUCCEED == zbx_queue_ptr_empty(&asocket->client->rx_queue)) { if (ZBX_IPC_WAIT_FOREVER != timeout) { struct timeval tv = {timeout, 0}; evtimer_add(asocket->ev_timer, &tv); } flags = EVLOOP_ONCE; } else flags = EVLOOP_NONBLOCK; /* do only single event base loop if timeout is not set */ asocket->state = (0 != timeout ? ZBX_IPC_ASYNC_SOCKET_STATE_NONE : ZBX_IPC_ASYNC_SOCKET_STATE_TIMEOUT); do { event_base_loop(asocket->ev, flags); *message = (zbx_ipc_message_t *)zbx_queue_ptr_pop(&asocket->client->rx_queue); } while (NULL == *message && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state); if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_TRACE) && NULL != *message) { char *data = NULL; zbx_ipc_message_format(*message, &data); zabbix_log(LOG_LEVEL_DEBUG, "%s() %s", __func__, data); zbx_free(data); } if (NULL != *message || ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state) ret = SUCCEED; else ret = FAIL; evtimer_del(asocket->ev_timer); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret); return ret; } /****************************************************************************** * * * Purpose: flushes unsent through asynchronous IPC socket * * * * Parameters: asocket - [IN] the asynchronous IPC service socket * * timeout - [IN] the timeout in seconds, 0 is used for * * nonblocking call and ZBX_IPC_WAIT_FOREVER is * * used for blocking call without timeout * * * * Return value: SUCCEED - the data was flushed successfully or timeout * * occurred. Use zbx_ipc_client_unsent_data() to * * check if all data was sent. * * FAIL - failed to send data (connection was closed or an * * error occurred). * * * ******************************************************************************/ int zbx_ipc_async_socket_flush(zbx_ipc_async_socket_t *asocket, int timeout) { int ret = FAIL, flags; zabbix_log(LOG_LEVEL_DEBUG, "In %s() timeout:%d", __func__, timeout); if (0 == asocket->client->tx_bytes) { ret = SUCCEED; goto out; } if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR == asocket->state) goto out; asocket->state = ZBX_IPC_ASYNC_SOCKET_STATE_NONE; if (0 != timeout) { if (ZBX_IPC_WAIT_FOREVER != timeout) { struct timeval tv = {timeout, 0}; evtimer_add(asocket->ev_timer, &tv); } flags = EVLOOP_ONCE; } else flags = EVLOOP_NONBLOCK; do { event_base_loop(asocket->ev, flags); if (SUCCEED != zbx_ipc_client_connected(asocket->client)) goto out; } while (0 != timeout && 0 != asocket->client->tx_bytes && ZBX_IPC_ASYNC_SOCKET_STATE_NONE == asocket->state); if (ZBX_IPC_ASYNC_SOCKET_STATE_ERROR != asocket->state) { ret = SUCCEED; asocket->state = ZBX_IPC_CLIENT_STATE_NONE; } out: evtimer_del(asocket->ev_timer); zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d", __func__, ret); return ret; } /****************************************************************************** * * * Purpose: check if there are data to be sent * * * * Parameters: asocket - [IN] the asynchronous IPC service socket * * * * Return value: SUCCEED - there are messages queued to be sent * * FAIL - all data has been sent * * * ******************************************************************************/ int zbx_ipc_async_socket_check_unsent(zbx_ipc_async_socket_t *asocket) { return (0 == asocket->client->tx_bytes ? FAIL : SUCCEED); } /****************************************************************************** * * * Purpose: check if socket is connected * * * * Parameters: asocket - [IN] the asynchronous IPC service socket * * * * Return value: SUCCEED - socket is connected * * FAIL - otherwise * * * ******************************************************************************/ int zbx_ipc_async_socket_connected(zbx_ipc_async_socket_t *asocket) { if (NULL == asocket->client) return FAIL; return zbx_ipc_client_connected(asocket->client); } /****************************************************************************** * * * Purpose: connect, send message and receive response in a given timeout * * * * Parameters: service_name - [IN] the IPC service name * * code - [IN] the message code * * timeout - [IN] time allowed to be spent on receive, note * * that this does not include open, send and * * flush that have their own timeouts * * data - [IN] the data * * size - [IN] the data size * * out - [OUT] the received message or NULL on error * * The message must be freed by zbx_free() * * error - [OUT] the error message * * * * Return value: SUCCEED - successfully sent message and received response * * FAIL - error occurred * * * ******************************************************************************/ int zbx_ipc_async_exchange(const char *service_name, zbx_uint32_t code, int timeout, const unsigned char *data, zbx_uint32_t size, unsigned char **out, char **error) { zbx_ipc_message_t *message; zbx_ipc_async_socket_t asocket; int ret = FAIL; zabbix_log(LOG_LEVEL_DEBUG, "In %s() service:'%s' code:%u timeout:%d", __func__, service_name, code, timeout); if (FAIL == zbx_ipc_async_socket_open(&asocket, service_name, timeout, error)) goto out; if (FAIL == zbx_ipc_async_socket_send(&asocket, code, data, size)) { *error = zbx_strdup(NULL, "Cannot send request"); goto fail; } if (FAIL == zbx_ipc_async_socket_flush(&asocket, timeout)) { *error = zbx_strdup(NULL, "Cannot flush request"); goto fail; } if (FAIL == zbx_ipc_async_socket_recv(&asocket, timeout, &message)) { *error = zbx_strdup(NULL, "Cannot receive response"); goto fail; } if (NULL == message) { *error = zbx_strdup(NULL, "Timeout while waiting for response"); goto fail; } *out = message->data; message->data = NULL; zbx_ipc_message_free(message); ret = SUCCEED; fail: zbx_ipc_async_socket_close(&asocket); out: zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); return ret; } void zbx_init_library_ipcservice(unsigned char program_type) { switch (program_type) { case ZBX_PROGRAM_TYPE_SERVER: ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_SERVER; ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_SERVER); break; case ZBX_PROGRAM_TYPE_PROXY_ACTIVE: case ZBX_PROGRAM_TYPE_PROXY_PASSIVE: ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_PROXY; ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_PROXY); break; case ZBX_PROGRAM_TYPE_AGENTD: ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_AGENT; ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_AGENT); break; default: ipc_path_prefix = ZBX_IPC_CLASS_PREFIX_NONE; ipc_path_prefix_len = ZBX_CONST_STRLEN(ZBX_IPC_CLASS_PREFIX_NONE); break; } } #endif