/*
** Zabbix
** Copyright 2001-2024 Zabbix SIA
**
** Permission is hereby granted, free of charge, to any person obtaining a copy of
** this software and associated documentation files (the "Software"), to deal in
** the Software without restriction, including without limitation the rights to
** use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
** of the Software, and to permit persons to whom the Software is furnished to do
** so, subject to the following conditions:
**
** The above copyright notice and this permission notice shall be included in all
** copies or substantial portions of the Software.
**
** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
** IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
** FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
** AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
** LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
** OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
** SOFTWARE.
**/

package kafka

import (
	"fmt"
	"time"

	"git.zabbix.com/ap/plugin-support/errs"
	"git.zabbix.com/ap/plugin-support/log"
	"git.zabbix.com/ap/plugin-support/tlsconfig"
	"github.com/IBM/sarama"
)

const (
	clientID = "zabbix"
)

// Producer produces data to Kafka broker.
type Producer struct {
	eventsTopic string
	itemsTopic  string
	async       sarama.AsyncProducer
	timeout     time.Duration
}

// Configuration hold kafka configuration tags bases on Zabbix configuration package from plugin support.
type Configuration struct {
	URL            string `conf:"optional,default=localhost"`
	Port           string `conf:"optional,default=9093"`
	Events         string `conf:"optional,default=events"`
	Items          string `conf:"optional,default=items"`
	KeepAlive      int    `conf:"optional,range=60:300,default=300"`
	Username       string `conf:"optional"`
	Password       string `conf:"optional"`
	Oauth          string `conf:"optional"`
	CaFile         string `conf:"optional"`
	ClientCertFile string `conf:"optional"`
	ClientKeyFile  string `conf:"optional"`
	Retry          int    `conf:"optional,default=0"`
	Timeout        int    `conf:"optional,default=1"`
	TLSAuth        bool   `conf:"optional,default=false"`
	EnableTLS      bool   `conf:"optional"`
}

// ProduceItem produces Kafka message to the item topic
// in the broker provided in the async producer.
func (p *Producer) ProduceItem(key, message string) {
	m := &sarama.ProducerMessage{
		Topic: p.itemsTopic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.StringEncoder(message),
	}

	p.produce(m)
}

// Close closes the underlying async producer.
func (p *Producer) Close() error {
	err := p.async.Close()
	if err != nil {
		return errs.Wrap(err, "failed to close Kafka async producer")
	}

	return nil
}

// ProduceEvent produces Kafka message to the event topic
// in the broker provided in the async producer.
func (p *Producer) ProduceEvent(key, message string) {
	m := &sarama.ProducerMessage{
		Topic: p.eventsTopic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.StringEncoder(message),
	}

	p.produce(m)
}

// NewProducer creates Kafka producers from with provided configuration.
func NewProducer(c *Configuration) (*Producer, error) {
	kafkaURL := fmt.Sprintf("%s:%s", c.URL, c.Port)

	kconf, err := newConfig(
		kafkaURL,
		c.Username,
		c.Password,
		c.CaFile,
		c.ClientCertFile,
		c.ClientKeyFile,
		c.Retry,
		c.TLSAuth,
		c.EnableTLS,
		time.Duration(c.Timeout)*time.Second,
		time.Duration(c.KeepAlive)*time.Second,
	)
	if err != nil {
		return nil, errs.Wrap(err, "failed to create kafka configuration")
	}

	producer, err := newProducer(
		kconf,
		kafkaURL,
		c.Events,
		c.Items,
	)
	if err != nil {
		return nil, errs.Wrap(err, "failed to create new kafka producer")
	}

	return producer, nil
}

// NewProducer returns a new producer initialized
// and ready to produce messages to Kafka.
func newProducer(config *sarama.Config, url, eventsTopic, itemsTopic string) (*Producer, error) {
	p, err := sarama.NewAsyncProducer([]string{url}, config)
	if err != nil {
		return nil, errs.Wrap(err, "async producer init failed")
	}

	prod := &Producer{async: p, eventsTopic: eventsTopic, itemsTopic: itemsTopic, timeout: 3 * time.Second}

	go prod.errorListener()

	return prod, nil
}

//nolint:revive // configuration requires a lot of parameters
func newConfig(
	url,
	username,
	password,
	tlsCa,
	tlsCert,
	tlsKey string,
	retries int,
	tlsAuth,
	enableTLS bool,
	timeout,
	keepAlive time.Duration,
) (*sarama.Config, error) {
	config := sarama.NewConfig()
	config.ClientID = clientID
	config.Net.KeepAlive = keepAlive
	config.Net.DialTimeout = timeout
	config.Net.ReadTimeout = timeout
	config.Net.WriteTimeout = timeout
	config.Producer.Retry.Max = retries
	config.Net.TLS.Enable = enableTLS
	config.Metadata.AllowAutoTopicCreation = false

	if username != "" {
		config.Net.SASL.Enable = true
		config.Net.SASL.User = username
		config.Net.SASL.Password = password
	}

	if tlsAuth {
		var err error

		config.Net.TLS.Enable = tlsAuth
		d := tlsconfig.Details{
			RawUri:      url,
			TlsCaFile:   tlsCa,
			TlsCertFile: tlsCert,
			TlsKeyFile:  tlsKey,
		}

		config.Net.TLS.Config, err = d.GetTLSConfig(false)
		if err != nil {
			return nil, errs.Wrap(err, "failed to create TLS config")
		}
	}

	return config, nil
}

func (p *Producer) errorListener() {
	for perr := range p.async.Errors() {
		log.Errf(
			"kafka producer error: %s, for topic %s, with key %s", perr.Err.Error(), perr.Msg.Topic)
	}
}

func (p *Producer) produce(m *sarama.ProducerMessage) {
	ticker := time.NewTicker(p.timeout)
	defer ticker.Stop()

	select {
	case p.async.Input() <- m:
		log.Debugf("New Message produced")
	case <-ticker.C:
		log.Warningf("Message send timeout")
	}
}