/*
** Zabbix
** Copyright (C) 2001-2023 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**
**/

/*
** We use the library Eclipse Paho (eclipse/paho.mqtt.golang), which is
** distributed under the terms of the Eclipse Distribution License 1.0 (The 3-Clause BSD License)
** available at https://www.eclipse.org/org/documents/edl-v10.php
**/

package mqtt

import (
	"crypto/rand"
	"crypto/tls"
	"encoding/json"
	"errors"
	"fmt"
	"net/url"
	"strings"
	"time"

	"git.zabbix.com/ap/plugin-support/metric"
	"git.zabbix.com/ap/plugin-support/plugin"
	"git.zabbix.com/ap/plugin-support/tlsconfig"
	"git.zabbix.com/ap/plugin-support/zbxerr"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"zabbix.com/pkg/itemutil"
	"zabbix.com/pkg/version"
	"zabbix.com/pkg/watch"
)

const (
	pluginName = "MQTT"
)

type mqttClient struct {
	client    mqtt.Client
	broker    broker
	subs      map[string]*mqttSub
	opts      *mqtt.ClientOptions
	connected bool
}

type mqttSub struct {
	broker   broker
	topic    string
	wildCard bool
}

type broker struct {
	url      string
	username string
	password string
}
type Plugin struct {
	plugin.Base
	options     Options
	manager     *watch.Manager
	mqttClients map[broker]*mqttClient
}

var impl Plugin

func (p *Plugin) createOptions(
	clientid, username, password string, b broker, details tlsconfig.Details) (*mqtt.ClientOptions, error) {
	opts := mqtt.NewClientOptions().AddBroker(b.url).SetClientID(clientid).SetCleanSession(true).SetConnectTimeout(
		time.Duration(impl.options.Timeout) * time.Second)
	if username != "" {
		opts.SetUsername(username)
		if password != "" {
			opts.SetPassword(password)
		}
	}

	opts.OnConnectionLost = func(client mqtt.Client, reason error) {
		impl.Warningf("connection lost to [%s]: %s", b.url, reason.Error())
	}

	opts.OnConnect = func(client mqtt.Client) {
		impl.Debugf("connected to [%s]", b.url)

		impl.manager.Lock()
		defer impl.manager.Unlock()

		mc, ok := p.mqttClients[b]
		if !ok || mc == nil || mc.client == nil {
			impl.Warningf("cannot subscribe to [%s]: broker is not connected", b.url)
			return
		}

		mc.connected = true
		for _, ms := range mc.subs {
			if err := ms.subscribe(mc); err != nil {
				impl.Warningf("cannot subscribe topic '%s' to [%s]: %s", ms.topic, b.url, err)
				impl.manager.Notify(ms, err)
			}
		}
	}

	t, err := getTlsConfig(details)
	if err != nil {
		return nil, err
	}

	opts.SetTLSConfig(t)

	return opts, nil
}

func getTlsConfig(d tlsconfig.Details) (*tls.Config, error) {
	if d.TlsCaFile == "" && d.TlsCertFile == "" && d.TlsKeyFile == "" {
		return nil, nil
	}

	return tlsconfig.CreateConfig(
		tlsconfig.Details{
			TlsCaFile:   d.TlsCaFile,
			TlsCertFile: d.TlsCertFile,
			TlsKeyFile:  d.TlsKeyFile,
			RawUri:      d.RawUri,
		},
		false,
	)
}

func newClient(options *mqtt.ClientOptions) (mqtt.Client, error) {
	c := mqtt.NewClient(options)
	token := c.Connect()
	if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) {
		c.Disconnect(200)
		return nil, fmt.Errorf("timed out while connecting")
	}

	if token.Error() != nil {
		return nil, token.Error()
	}

	return c, nil
}

func (ms *mqttSub) handler(client mqtt.Client, msg mqtt.Message) {
	impl.manager.Lock()
	impl.Tracef("received publish from [%s] on topic '%s' got: %s", ms.broker.url, msg.Topic(), string(msg.Payload()))
	impl.manager.Notify(ms, msg)
	impl.manager.Unlock()
}

func (ms *mqttSub) subscribe(mc *mqttClient) error {
	impl.Tracef("subscribing '%s' to [%s]", ms.topic, ms.broker.url)

	token := mc.client.Subscribe(ms.topic, 0, ms.handler)
	if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) {
		return fmt.Errorf("timed out while subscribing")
	}

	if token.Error() != nil {
		return token.Error()
	}

	impl.Tracef("subscribed '%s' to [%s]", ms.topic, ms.broker.url)
	return nil
}

// Watch MQTT plugin
func (p *Plugin) Watch(items []*plugin.Item, ctx plugin.ContextProvider) {
	impl.manager.Lock()
	impl.manager.Update(ctx.ClientID(), ctx.Output(), items)
	impl.manager.Unlock()
}

func (ms *mqttSub) Initialize() (err error) {
	mc, ok := impl.mqttClients[ms.broker]
	if !ok || mc == nil {
		return fmt.Errorf("Cannot connect to [%s]: broker could not be initialized", ms.broker.url)
	}

	if mc.client == nil {
		impl.Debugf("establishing connection to [%s]", ms.broker.url)
		mc.client, err = newClient(mc.opts)
		if err != nil {
			impl.Warningf("cannot establish connection to [%s]: %s", ms.broker.url, err)
			return
		}

		impl.Debugf("established connection to [%s]", ms.broker.url)
		return
	}

	if mc.connected {
		return ms.subscribe(mc)
	}

	return
}

func (ms *mqttSub) Release() {
	mc, ok := impl.mqttClients[ms.broker]
	if !ok || mc == nil || mc.client == nil {
		impl.Errf("cannot release [%s]: broker was not initialized", ms.broker.url)
		return
	}

	impl.Tracef("unsubscribing topic '%s' from [%s]", ms.topic, ms.broker.url)
	token := mc.client.Unsubscribe(ms.topic)
	if !token.WaitTimeout(time.Duration(impl.options.Timeout) * time.Second) {
		impl.Errf("cannot unsubscribe topic '%s' from [%s]: timed out", ms.topic, ms.broker.url)
	}

	if token.Error() != nil {
		impl.Errf("cannot unsubscribe topic '%s' from [%s]: %s", ms.topic, ms.broker.url, token.Error())
	}

	delete(mc.subs, ms.topic)
	impl.Tracef("unsubscribed topic '%s' from [%s]", ms.topic, ms.broker.url)
	if len(mc.subs) == 0 {
		impl.Debugf("disconnecting from [%s]", ms.broker.url)
		mc.client.Disconnect(200)
		delete(impl.mqttClients, mc.broker)
	}
}

type respFilter struct {
	wildcard bool
}

func (f *respFilter) Process(v interface{}) (s *string, err error) {
	m, ok := v.(mqtt.Message)
	if !ok {
		if err, ok = v.(error); !ok {
			err = fmt.Errorf("unexpected input type %T", v)
		}
		return
	}

	var value string
	if f.wildcard {
		j, err := json.Marshal(map[string]string{m.Topic(): string(m.Payload())})
		if err != nil {
			return nil, err
		}
		value = string(j)
	} else {
		value = string(m.Payload())
	}

	return &value, nil
}

func (ms *mqttSub) NewFilter(key string) (filter watch.EventFilter, err error) {
	return &respFilter{ms.wildCard}, nil
}

func (p *Plugin) EventSourceByKey(rawKey string) (es watch.EventSource, err error) {
	var key string
	var raw []string
	if key, raw, err = itemutil.ParseKey(rawKey); err != nil {
		return
	}

	params, _, hc, err := metrics[key].EvalParams(raw, p.options.Sessions)
	if err != nil {
		return nil, err
	}

	err = metric.SetDefaults(params, hc, p.options.Default)
	if err != nil {
		return nil, err
	}

	if err != nil {
		return nil, err
	}

	topic := params["Topic"]
	username := params["User"]
	password := params["Password"]
	url, err := parseURL(params["URL"])
	if err != nil {
		return nil, err
	}

	if topic == "" {
		return nil, zbxerr.ErrorTooFewParameters.Wrap(errors.New("second parameter \"Topic\" is required."))
	}

	broker := broker{url.String(), username, password}
	var client *mqttClient
	var ok bool

	opt, err := p.createOptions(getClientID(),
		username,
		password,
		broker,
		tlsconfig.Details{
			TlsCaFile:   params["TLSCAFile"],
			TlsCertFile: params["TLSCertFile"],
			TlsKeyFile:  params["TLSKeyFile"],
			RawUri:      url.String(),
		},
	)

	if err != nil {
		return nil, err
	}

	if client, ok = p.mqttClients[broker]; !ok {
		impl.Tracef("creating client for [%s]", broker.url)
		client = &mqttClient{
			nil,
			broker,
			make(map[string]*mqttSub),
			opt,
			false,
		}
		p.mqttClients[broker] = client
	}

	var sub *mqttSub
	if sub, ok = client.subs[topic]; !ok {
		impl.Tracef("creating new subscriber on topic '%s' for [%s]", topic, broker.url)

		sub = &mqttSub{broker, topic, hasWildCards(topic)}
		client.subs[topic] = sub
	}

	return sub, nil
}

func getClientID() string {
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		impl.Errf("failed to generate a uuid for mqtt Client ID: %s", err.Error)
		return "Zabbix agent 2 " + version.Long()
	}
	return fmt.Sprintf("Zabbix agent 2 %s %x-%x-%x-%x-%x", version.Long(), b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}

func hasWildCards(topic string) bool {
	return strings.HasSuffix(topic, "#") || strings.Contains(topic, "+")
}

func parseURL(rawUrl string) (out *url.URL, err error) {
	if !strings.Contains(rawUrl, "://") {
		rawUrl = "tcp://" + rawUrl
	}

	out, err = url.Parse(rawUrl)
	if err != nil {
		return
	}

	if out.Port() != "" && out.Hostname() == "" {
		return nil, errors.New("Host is required.")
	}

	if out.Port() == "" {
		out.Host = fmt.Sprintf("%s:1883", out.Host)
	}

	if len(out.Query()) > 0 {
		return nil, errors.New("URL should not contain query parameters.")
	}

	return
}