/* ** Zabbix ** Copyright 2001-2024 Zabbix SIA ** ** Permission is hereby granted, free of charge, to any person obtaining a copy of ** this software and associated documentation files (the "Software"), to deal in ** the Software without restriction, including without limitation the rights to ** use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies ** of the Software, and to permit persons to whom the Software is furnished to do ** so, subject to the following conditions: ** ** The above copyright notice and this permission notice shall be included in all ** copies or substantial portions of the Software. ** ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR ** IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, ** FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE ** AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER ** LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, ** OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE ** SOFTWARE. **/ package kafka import ( "fmt" "time" "git.zabbix.com/ap/plugin-support/errs" "git.zabbix.com/ap/plugin-support/log" "git.zabbix.com/ap/plugin-support/tlsconfig" "github.com/IBM/sarama" ) const ( clientID = "zabbix" ) // Producer produces data to Kafka broker. type Producer struct { eventsTopic string itemsTopic string async sarama.AsyncProducer timeout time.Duration } // Configuration hold kafka configuration tags bases on Zabbix configuration package from plugin support. type Configuration struct { URL string `conf:"optional,default=localhost"` Port string `conf:"optional,default=9093"` Events string `conf:"optional,default=events"` Items string `conf:"optional,default=items"` KeepAlive int `conf:"optional,range=60:300,default=300"` Username string `conf:"optional"` Password string `conf:"optional"` Oauth string `conf:"optional"` CaFile string `conf:"optional"` ClientCertFile string `conf:"optional"` ClientKeyFile string `conf:"optional"` Retry int `conf:"optional,default=0"` Timeout int `conf:"optional,default=1"` TLSAuth bool `conf:"optional,default=false"` EnableTLS bool `conf:"optional"` } // ProduceItem produces Kafka message to the item topic // in the broker provided in the async producer. func (p *Producer) ProduceItem(key, message string) { m := &sarama.ProducerMessage{ Topic: p.itemsTopic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(message), } p.produce(m) } // Close closes the underlying async producer. func (p *Producer) Close() error { err := p.async.Close() if err != nil { return errs.Wrap(err, "failed to close Kafka async producer") } return nil } // ProduceEvent produces Kafka message to the event topic // in the broker provided in the async producer. func (p *Producer) ProduceEvent(key, message string) { m := &sarama.ProducerMessage{ Topic: p.eventsTopic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(message), } p.produce(m) } // NewProducer creates Kafka producers from with provided configuration. func NewProducer(c *Configuration) (*Producer, error) { kafkaURL := fmt.Sprintf("%s:%s", c.URL, c.Port) kconf, err := newConfig( kafkaURL, c.Username, c.Password, c.CaFile, c.ClientCertFile, c.ClientKeyFile, c.Retry, c.TLSAuth, c.EnableTLS, time.Duration(c.Timeout)*time.Second, time.Duration(c.KeepAlive)*time.Second, ) if err != nil { return nil, errs.Wrap(err, "failed to create kafka configuration") } producer, err := newProducer( kconf, kafkaURL, c.Events, c.Items, ) if err != nil { return nil, errs.Wrap(err, "failed to create new kafka producer") } return producer, nil } // NewProducer returns a new producer initialized // and ready to produce messages to Kafka. func newProducer(config *sarama.Config, url, eventsTopic, itemsTopic string) (*Producer, error) { p, err := sarama.NewAsyncProducer([]string{url}, config) if err != nil { return nil, errs.Wrap(err, "async producer init failed") } prod := &Producer{async: p, eventsTopic: eventsTopic, itemsTopic: itemsTopic, timeout: 3 * time.Second} go prod.errorListener() return prod, nil } //nolint:revive // configuration requires a lot of parameters func newConfig( url, username, password, tlsCa, tlsCert, tlsKey string, retries int, tlsAuth, enableTLS bool, timeout, keepAlive time.Duration, ) (*sarama.Config, error) { config := sarama.NewConfig() config.ClientID = clientID config.Net.KeepAlive = keepAlive config.Net.DialTimeout = timeout config.Net.ReadTimeout = timeout config.Net.WriteTimeout = timeout config.Producer.Retry.Max = retries config.Net.TLS.Enable = enableTLS config.Metadata.AllowAutoTopicCreation = false if username != "" { config.Net.SASL.Enable = true config.Net.SASL.User = username config.Net.SASL.Password = password } if tlsAuth { var err error config.Net.TLS.Enable = tlsAuth d := tlsconfig.Details{ RawUri: url, TlsCaFile: tlsCa, TlsCertFile: tlsCert, TlsKeyFile: tlsKey, } config.Net.TLS.Config, err = d.GetTLSConfig(false) if err != nil { return nil, errs.Wrap(err, "failed to create TLS config") } } return config, nil } func (p *Producer) errorListener() { for perr := range p.async.Errors() { log.Errf( "kafka producer error: %s, for topic %s, with key %s", perr.Err.Error(), perr.Msg.Topic) } } func (p *Producer) produce(m *sarama.ProducerMessage) { ticker := time.NewTicker(p.timeout) defer ticker.Stop() select { case p.async.Input() <- m: log.Debugf("New Message produced") case <-ticker.C: log.Warningf("Message send timeout") } }