/*
** Copyright (C) 2001-2025 Zabbix SIA
**
** This program is free software: you can redistribute it and/or modify it under the terms of
** the GNU Affero General Public License as published by the Free Software Foundation, version 3.
**
** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
** See the GNU Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public License along with this program.
** If not, see <https://www.gnu.org/licenses/>.
**/

#include "send_buffer.h"

#include "zbxalgo.h"
#include "zbxjson.h"

typedef struct
{
	char		*host;
	int		values_num;
	struct zbx_json	*json;
}
zbx_send_batch_t;

static zbx_hash_t	send_batch_hash(const void *d)
{
	const zbx_send_batch_t	*b = (const zbx_send_batch_t *)d;

	return ZBX_DEFAULT_STRING_HASH_ALGO(b->host, strlen(b->host), ZBX_DEFAULT_HASH_SEED);
}

static int	send_batch_compare(const void *d1, const void *d2)
{
	const zbx_send_batch_t	*b1 = (const zbx_send_batch_t *)d1;
	const zbx_send_batch_t	*b2 = (const zbx_send_batch_t *)d2;

	return strcmp(b1->host, b2->host);
}

/******************************************************************************
 *                                                                            *
 * Purpose: initialize send buffer                                            *
 *                                                                            *
 ******************************************************************************/
void	sb_init(zbx_send_buffer_t *buf, int group_mode, const char *host, int with_clock, int with_ns)
{
	buf->group_mode = group_mode;
	buf->with_clock = with_clock;
	buf->with_ns = with_ns;
	buf->host = (NULL == host ? NULL : zbx_strdup(NULL, host));
	buf->key = NULL;
	buf->value = NULL;
	buf->kv_alloc = 0;

	zbx_hashset_create(&buf->batches, 100, send_batch_hash, send_batch_compare);
}

/******************************************************************************
 *                                                                            *
 * Purpose: destroy send buffer                                               *
 *                                                                            *
 ******************************************************************************/
void	sb_destroy(zbx_send_buffer_t *buf)
{
	zbx_hashset_iter_t	iter;
	zbx_send_batch_t	*batch;

	zbx_hashset_iter_reset(&buf->batches, &iter);
	while (NULL != (batch = (zbx_send_batch_t *)zbx_hashset_iter_next(&iter)))
	{
		zbx_free(batch->host);

		if (NULL != batch->json)
		{
			zbx_json_free(batch->json);
			zbx_free(batch->json);
		}
	}

	zbx_hashset_destroy(&buf->batches);

	zbx_free(buf->key);
	zbx_free(buf->value);
	zbx_free(buf->host);
}

/******************************************************************************
 *                                                                            *
 * Purpose: add parsed value to the send buffer                               *
 *                                                                            *
 * Parameters: buf   - [IN/OUT] send buffer                                   *
 *             host  - [IN] host name                                         *
 *             key   - [IN] item key                                          *
 *             value - [IN] value                                             *
 *             clock - [IN] value timestamp seconds (can be 0)                *
 *             ns    - [IN] value timestamp nanoseconds (can be 0)            *
 *                                                                            *
 * Return value: The batch the value was added to.                            *
 *                                                                            *
 * Comments: Depending on sending mode it will group batches by hosts or have *
 *           one batch for all values.                                        *
 *                                                                            *
 ******************************************************************************/
static zbx_send_batch_t	*sb_add_value(zbx_send_buffer_t *buf, const char *host, const char *key, const char *value,
		int clock, int ns)
{
	zbx_send_batch_t	batch_local, *batch;

	if (ZBX_SEND_GROUP_NONE == buf->group_mode)
		batch_local.host = "";
	else
		batch_local.host = (char *)(uintptr_t)host;

	if (NULL == (batch = (zbx_send_batch_t *)zbx_hashset_search(&buf->batches, &batch_local)))
	{
		batch_local.host = zbx_strdup(NULL, batch_local.host);

		batch = (zbx_send_batch_t *)zbx_hashset_insert(&buf->batches, &batch_local, sizeof(batch_local));
		batch->values_num = 0;
		batch->json = NULL;
	}

	if (0 == batch->values_num)
	{
		batch->json = (struct zbx_json *)zbx_malloc(NULL, sizeof(struct zbx_json));

		zbx_json_init(batch->json, ZBX_JSON_STAT_BUF_LEN);
		zbx_json_addstring(batch->json, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_SENDER_DATA,
				ZBX_JSON_TYPE_STRING);
		zbx_json_addarray(batch->json, ZBX_PROTO_TAG_DATA);
	}

	zbx_json_addobject(batch->json, NULL);
	zbx_json_addstring(batch->json, ZBX_PROTO_TAG_HOST, host, ZBX_JSON_TYPE_STRING);
	zbx_json_addstring(batch->json, ZBX_PROTO_TAG_KEY, key, ZBX_JSON_TYPE_STRING);
	zbx_json_addstring(batch->json, ZBX_PROTO_TAG_VALUE, value, ZBX_JSON_TYPE_STRING);

	if (1 == buf->with_clock)
	{
		zbx_json_addint64(batch->json, ZBX_PROTO_TAG_CLOCK, clock);

		if (1 == buf->with_ns)
			zbx_json_addint64(batch->json, ZBX_PROTO_TAG_NS, ns);
	}

	zbx_json_close(batch->json);

	batch->values_num++;

	return batch;
}

/******************************************************************************
 *                                                                            *
 * Purpose: parse input line and cache parsed data                            *
 *                                                                            *
 * Parameters: buf        - [IN/OUT] send buffer                              *
 *             line       - [IN] input line                                   *
 *             line_alloc - [IN] number of bytes allocated for line buffer    *
 *             send_mode  - [IN] ZBX_SEND_BATCHED - cache the parsed data to  *
 *                                                  send in batches           *
 *                               ZBX_SEND_IMMEDIATE - prepare the batch with  *
 *                                                    added value for sending *
 *             out        - [OUT] data to send (NULL - nothing to send)       *
 *             error      - [OUT] error message                               *
 *                                                                            *
 * Return value: SUCCEED - line was parsed successfully                       *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 * Comments: line format: <hostname> <key> [<timestamp>] [<ns>] <value>       *
 *                                                                            *
 ******************************************************************************/
int	sb_parse_line(zbx_send_buffer_t *buf, const char *line, size_t line_alloc, int send_mode, struct zbx_json **out,
		char **error)
{
	const char	*p = line;
	char		hostname[MAX_STRING_LEN], tmp[32];
	int		clock = 0, ns = 0;

	if ('\0' == *p || NULL == (p = get_string(p, hostname, sizeof(hostname))) || '\0' == *hostname)
	{
		*error = zbx_strdup(NULL, "'Hostname' required");
		return FAIL;
	}

	if (0 == strcmp(hostname, "-"))
	{
		if (NULL == buf->host)
		{
			*error = zbx_strdup(NULL, "'-' encountered as 'Hostname', "
					"but no default hostname was specified");
			return FAIL;
		}
		else
			zbx_strlcpy(hostname, buf->host, sizeof(hostname));
	}

	if (buf->kv_alloc != line_alloc)
	{
		buf->kv_alloc = line_alloc;
		buf->key = (char *)zbx_realloc(buf->key, buf->kv_alloc);
		buf->value = (char *)zbx_realloc(buf->value, buf->kv_alloc);
	}

	if ('\0' == *p || NULL == (p = get_string(p, buf->key, buf->kv_alloc)) || '\0' == *buf->key)
	{
		*error = zbx_strdup(NULL, "'Key' required");
		return FAIL;
	}

	if (1 == buf->with_clock)
	{
		if ('\0' == *p || NULL == (p = get_string(p, tmp, sizeof(tmp))) || '\0' == *tmp)
		{
			*error = zbx_strdup(NULL, "'Timestamp' required");
			return FAIL;
		}

		if (FAIL == zbx_is_uint31(tmp, &clock))
		{
			*error = zbx_strdup(NULL, "invalid 'Timestamp' value detected");
			return FAIL;
		}

		if (1 == buf->with_ns)
		{
			if ('\0' == *p || NULL == (p = get_string(p, tmp, sizeof(tmp))) || '\0' == *tmp)
			{
				*error = zbx_strdup(NULL, "'Nanoseconds' required");
				return FAIL;

			}

			if (FAIL == zbx_is_uint_n_range(tmp, sizeof(tmp), &ns, sizeof(ns), 0LL, 999999999LL))
			{
				*error = zbx_strdup(NULL, "invalid 'Nanoseconds' value detected");
				return FAIL;
			}
		}
	}

	if ('\0' != *p && '"' != *p)
	{
		zbx_strlcpy(buf->value, p, buf->kv_alloc);
	}
	else if ('\0' == *p || NULL == (p = get_string(p, buf->value, buf->kv_alloc)))
	{
		*error = zbx_strdup(NULL, "'Key value' required");
		return FAIL;
	}
	else if ('\0' != *p)
	{
		*error = zbx_strdup(NULL, "too many parameters");
		return FAIL;
	}

	zbx_send_batch_t	*batch;

	batch = sb_add_value(buf, hostname, buf->key, buf->value, clock, ns);

	if (ZBX_SEND_IMMEDIATE == send_mode || VALUES_MAX <= batch->values_num)
	{
		zbx_json_close(batch->json);
		*out = batch->json;

		batch->json = NULL;
		batch->values_num = 0;
	}
	else
		*out = NULL;

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Purpose: pop batch with cached values from buffer                          *
 *                                                                            *
 * Return value: SUCCEED - batch with cached data or NULL if the buffer is    *
 *                         empty                                              *
 ******************************************************************************/
struct zbx_json	*sb_pop(zbx_send_buffer_t *buf)
{
	zbx_hashset_iter_t	iter;
	zbx_send_batch_t	*batch;

	zbx_hashset_iter_reset(&buf->batches, &iter);
	while (NULL != (batch = (zbx_send_batch_t *)zbx_hashset_iter_next(&iter)))
	{
		struct zbx_json	*out = batch->json;
		int		values_num = batch->values_num;

		zbx_free(batch->host);
		zbx_hashset_iter_remove(&iter);

		if (0 != values_num)
		{
			zbx_json_close(out);

			return out;
		}
	}

	return NULL;
}

/******************************************************************************
 *                                                                            *
 * Purpose: get current string from the quoted or unquoted string list,       *
 *          delimited by blanks                                               *
 *                                                                            *
 * Parameters:                                                                *
 *      p       - [IN] parameter list, delimited by blanks (' ' or '\t')      *
 *      buf     - [OUT] output buffer                                         *
 *      bufsize - [IN] output buffer size                                     *
 *                                                                            *
 * Return value: pointer to the next string                                   *
 *                                                                            *
 ******************************************************************************/
const char	*get_string(const char *p, char *buf, size_t bufsize)
{
/* 0 - init, 1 - inside quoted param, 2 - inside unquoted param */
	int	state;
	size_t	buf_i = 0;

	bufsize--;	/* '\0' */

	for (state = 0; '\0' != *p; p++)
	{
		switch (state)
		{
			/* init state */
			case 0:
				if (' ' == *p || '\t' == *p)
				{
					/* skipping the leading spaces */
				}
				else if ('"' == *p)
				{
					state = 1;
				}
				else
				{
					state = 2;
					p--;
				}
				break;
			/* quoted */
			case 1:
				if ('"' == *p)
				{
					if (' ' != p[1] && '\t' != p[1] && '\0' != p[1])
						return NULL;	/* incorrect syntax */

					while (' ' == p[1] || '\t' == p[1])
						p++;

					buf[buf_i] = '\0';
					return ++p;
				}
				else if ('\\' == *p && ('"' == p[1] || '\\' == p[1]))
				{
					p++;
					if (buf_i < bufsize)
						buf[buf_i++] = *p;
				}
				else if ('\\' == *p && 'n' == p[1])
				{
					p++;
					if (buf_i < bufsize)
						buf[buf_i++] = '\n';
				}
				else if (buf_i < bufsize)
				{
					buf[buf_i++] = *p;
				}
				break;
			/* unquoted */
			case 2:
				if (' ' == *p || '\t' == *p)
				{
					while (' ' == *p || '\t' == *p)
						p++;

					buf[buf_i] = '\0';
					return p;
				}
				else if (buf_i < bufsize)
				{
					buf[buf_i++] = *p;
				}
				break;
		}
	}

	/* missing terminating '"' character */
	if (1 == state)
		return NULL;

	buf[buf_i] = '\0';

	return p;
}