/*
** Zabbix
** Copyright (C) 2001-2023 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

package scheduler

import (
	"container/heap"
	"errors"
	"fmt"
	"math"
	"sort"
	"time"

	"git.zabbix.com/ap/plugin-support/conf"
	"git.zabbix.com/ap/plugin-support/log"
	"git.zabbix.com/ap/plugin-support/plugin"
	"git.zabbix.com/ap/plugin-support/plugin/comms"
	"zabbix.com/internal/agent"
	"zabbix.com/internal/agent/alias"
	"zabbix.com/internal/agent/keyaccess"
	"zabbix.com/internal/monitor"
	"zabbix.com/pkg/glexpr"
	"zabbix.com/pkg/itemutil"
	"zabbix.com/plugins/external"
)

const (
	// number of seconds to wait for plugins to finish during scheduler shutdown
	shutdownTimeout = 5
	// inactive shutdown value
	shutdownInactive = -1
)

// Manager implements Scheduler interface and manages plugin interface usage.
type Manager struct {
	input       chan interface{}
	plugins     map[string]*pluginAgent
	pluginQueue pluginHeap
	clients     map[uint64]*client
	aliases     *alias.Manager
	// number of active tasks (running in their own goroutines)
	activeTasksNum int
	// number of seconds left on shutdown timer
	shutdownSeconds int
}

// updateRequest contains list of metrics monitored by a client and additional client configuration data.
type updateRequest struct {
	clientID                   uint64
	sink                       plugin.ResultWriter
	firstActiveChecksRefreshed bool
	requests                   []*plugin.Request
	expressions                []*glexpr.Expression
}

// queryRequest contains status/debug query request.
type queryRequest struct {
	command string
	sink    chan string
}

// queryRequestUserParams contains status user parameters query request.
type queryRequestUserParams struct {
	sink chan string
}

type Scheduler interface {
	UpdateTasks(clientID uint64, writer plugin.ResultWriter, firstActiveChecksRefreshed bool,
		expressions []*glexpr.Expression, requests []*plugin.Request)
	FinishTask(task performer)
	PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error)
	Query(command string) (status string)
	QueryUserParams() (status string)
}

// cleanupClient performs deactivation of plugins the client is not using anymore.
// It's called after client update and once per hour for the client associated to
// single passive checks.
func (m *Manager) cleanupClient(c *client, now time.Time) {
	// get a list of plugins the client stopped using
	released := c.cleanup(m.plugins, now)
	for _, p := range released {
		// check if the plugin is used by other clients
		if p.refcount != 0 {
			continue
		}
		log.Debugf("[%d] deactivate unused plugin %s", c.id, p.name())

		// deactivate recurring tasks
		for deactivate := true; deactivate; {
			deactivate = false
			for _, t := range p.tasks {
				if t.isActive() && t.isRecurring() {
					t.deactivate()
					// deactivation can change tasks ordering, so repeat the iteration if task was deactivated
					deactivate = true
					break
				}
			}
		}

		// queue stopper task if plugin has Runner interface
		if _, ok := p.impl.(plugin.Runner); ok {
			task := &stopperTask{
				taskBase: taskBase{plugin: p, active: true},
			}
			if err := task.reschedule(now); err != nil {
				log.Debugf("[%d] cannot schedule stopper task for plugin %s", c.id, p.name())
				continue
			}
			p.enqueueTask(task)
			log.Debugf("[%d] created stopper task for plugin %s", c.id, p.name())

			if p.queued() {
				m.pluginQueue.Update(p)
			}
		}

		// queue plugin if there are still some tasks left to be finished before deactivating
		if len(p.tasks) != 0 {
			if !p.queued() {
				heap.Push(&m.pluginQueue, p)
			}
		}
	}
}

// processUpdateRequest processes client update request. It's being used for multiple requests
// (active checks on a server) and also for direct requets (single passive and internal checks).
func (m *Manager) processUpdateRequest(update *updateRequest, now time.Time) {
	log.Debugf("[%d] processing update request (%d requests)", update.clientID, len(update.requests))

	// immediately fail direct checks and ignore bulk requests when shutting down
	if m.shutdownSeconds != shutdownInactive {
		if update.clientID <= agent.MaxBuiltinClientID {
			if len(update.requests) == 1 {
				update.sink.Write(&plugin.Result{
					Itemid: update.requests[0].Itemid,
					Error:  errors.New("Cannot obtain item value during shutdown process."),
					Ts:     now,
				})
			} else {
				log.Warningf("[%d] direct checks can contain only single request while received %d requests",
					update.clientID, len(update.requests))
			}
		}
		return
	}

	var c *client
	var ok bool
	if c, ok = m.clients[update.clientID]; !ok {
		if len(update.requests) == 0 {
			log.Debugf("[%d] skipping empty update for unregistered client", update.clientID)
			return
		}
		log.Debugf("[%d] registering new client", update.clientID)
		c = newClient(update.clientID, update.sink)
		m.clients[update.clientID] = c
	}

	c.updateExpressions(update.expressions)

	for _, r := range update.requests {
		var key string
		var params []string
		var err error
		var p *pluginAgent

		r.Key = m.aliases.Get(r.Key)
		if key, params, err = itemutil.ParseKey(r.Key); err == nil {
			p, ok = m.plugins[key]
			if ok && update.clientID != agent.LocalChecksClientID {
				ok = keyaccess.CheckRules(key, params)
			}
			if !ok {
				err = fmt.Errorf("Unknown metric %s", key)
			} else {
				err = c.addRequest(p, r, update.sink, now, update.firstActiveChecksRefreshed)
			}
		}

		if err != nil {
			if c.id > agent.MaxBuiltinClientID {
				if tacc, ok := c.exporters[r.Itemid]; ok {
					log.Debugf("deactivate exporter task for item %d because of error: %s", r.Itemid, err)
					tacc.task().deactivate()
				}
			}
			update.sink.Write(&plugin.Result{Itemid: r.Itemid, Error: err, Ts: now})
			log.Debugf("[%d] cannot monitor metric \"%s\": %s", update.clientID, r.Key, err.Error())
			continue
		}

		if !p.queued() {
			heap.Push(&m.pluginQueue, p)
		} else {
			m.pluginQueue.Update(p)
		}
	}

	m.cleanupClient(c, now)
}

// processQueue processes queued plugins/tasks
func (m *Manager) processQueue(now time.Time) {
	seconds := now.Unix()
	for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() {
		if task := p.peekTask(); task != nil {
			if task.getScheduled().Unix() > seconds {
				break
			}

			heap.Pop(&m.pluginQueue)
			if !p.hasCapacity() {
				// plugin has no free capacity for the next task, keep the plugin out of queue
				// until active tasks finishes and the required capacity is released
				continue
			}

			// take the task out of plugin tasks queue and perform it
			m.activeTasksNum++
			p.reserveCapacity(p.popTask())
			task.perform(m)

			// if the plugin has capacity for the next task put it back into plugin queue
			if !p.hasCapacity() {
				continue
			}
			heap.Push(&m.pluginQueue, p)
		} else {
			// plugins with empty task queue should not be in Manager queue
			heap.Pop(&m.pluginQueue)
		}
	}
}

// processAndFlushUserParamQueue processes queued user parameters plugins/tasks and/or removes them
func (m *Manager) processAndFlushUserParamQueue(now time.Time) {
	seconds := now.Unix()
	num := m.pluginQueue.Len()
	var pluginsBuf []*pluginAgent

	for p := m.pluginQueue.Peek(); p != nil && num > 0; p = m.pluginQueue.Peek() {
		heap.Pop(&m.pluginQueue)
		num--

		if !p.usrprm {
			pluginsBuf = append(pluginsBuf, p)
			continue
		}

		if task := p.peekTask(); task != nil {
			if !p.hasCapacity() || task.getScheduled().Unix() > seconds {
				continue
			}

			m.activeTasksNum++
			p.reserveCapacity(p.popTask())
			task.perform(m)
		}
	}

	for _, p := range pluginsBuf {
		m.pluginQueue.Push(p)
	}
}

// processFinishRequest handles finished tasks
func (m *Manager) processFinishRequest(task performer) {
	m.activeTasksNum--
	p := task.getPlugin()
	p.releaseCapacity(task)
	if p.active() && task.isActive() && task.isRecurring() {
		if err := task.reschedule(time.Now()); err != nil {
			log.Warningf("cannot reschedule plugin %s: %s", p.impl.Name(), err)
		} else {
			p.enqueueTask(task)
		}
	}
	if !p.queued() && p.hasCapacity() {
		heap.Push(&m.pluginQueue, p)
	}
}

// rescheduleQueue reschedules all queued tasks. This is done whenever time
// difference between ticks exceeds limits (for example during daylight saving changes).
func (m *Manager) rescheduleQueue(now time.Time) {
	// easier to rebuild queues than update each element
	queue := make(pluginHeap, 0, len(m.pluginQueue))
	for _, p := range m.pluginQueue {
		tasks := p.tasks
		p.tasks = make(performerHeap, 0, len(tasks))
		for _, t := range tasks {
			if err := t.reschedule(now); err == nil {
				p.enqueueTask(t)
			}
		}
		heap.Push(&queue, p)
	}
	m.pluginQueue = queue
}

// deactivatePlugins removes all tasks and creates stopper tasks for active runner plugins
func (m *Manager) deactivatePlugins() {
	m.shutdownSeconds = shutdownTimeout

	m.pluginQueue = make(pluginHeap, 0, len(m.pluginQueue))
	for _, p := range m.plugins {
		if p.refcount != 0 {
			p.tasks = make(performerHeap, 0)
			if _, ok := p.impl.(plugin.Runner); ok {
				task := &stopperTask{
					taskBase: taskBase{plugin: p, active: true},
				}
				p.enqueueTask(task)
				heap.Push(&m.pluginQueue, p)
				p.refcount = 0
				log.Debugf("created final stopper task for plugin %s", p.name())
			}
			p.refcount = 0
		}
	}
}

// run() is the main worker loop running in own goroutine until stopped
func (m *Manager) run() {
	defer log.PanicHook()
	log.Debugf("starting manager")
	// Adjust ticker creation at the 0 nanosecond timestamp. In reality it will have at least
	// some microseconds, which will be enough to include all scheduled tasks at this second
	// even with nanosecond priority adjustment.
	lastTick := time.Now()
	cleaned := lastTick
	time.Sleep(time.Duration(1e9 - lastTick.Nanosecond()))
	ticker := time.NewTicker(time.Second)
run:
	for {
		select {
		case <-ticker.C:
			now := time.Now()
			diff := now.Sub(lastTick)
			interval := time.Second * 10
			if diff <= -interval || diff >= interval {
				log.Warningf("detected %d time difference between queue checks, rescheduling tasks",
					int(math.Abs(float64(diff))/1e9))
				m.rescheduleQueue(now)
			}
			lastTick = now
			m.processQueue(now)
			if m.shutdownSeconds != shutdownInactive {
				m.shutdownSeconds--
				if m.shutdownSeconds == 0 {
					break run
				}
			} else {
				// cleanup plugins used by passive checks
				if now.Sub(cleaned) >= time.Hour {
					if passive, ok := m.clients[0]; ok {
						m.cleanupClient(passive, now)
					}
					// remove inactive clients
					for _, client := range m.clients {
						if len(client.pluginsInfo) == 0 {
							delete(m.clients, client.ID())
						}
					}
					cleaned = now
				}
			}
		case u := <-m.input:
			if u == nil {
				m.deactivatePlugins()
				if m.activeTasksNum+len(m.pluginQueue) == 0 {
					break run
				}
				m.processQueue(time.Now())
			}
			switch v := u.(type) {
			case *updateRequest:
				m.processUpdateRequest(v, time.Now())
				m.processQueue(time.Now())
			case performer:
				m.processFinishRequest(v)
				if m.shutdownSeconds != shutdownInactive && m.activeTasksNum+len(m.pluginQueue) == 0 {
					break run
				}
				m.processQueue(time.Now())
			case *queryRequest:
				if response, err := m.processQuery(v); err != nil {
					v.sink <- "cannot process request: " + err.Error()
				} else {
					v.sink <- response
				}
			case *queryRequestUserParams:
				var keys []string
				var rerr error

				metrics := plugin.ClearUserParamMetrics()

				if keys, rerr = agent.InitUserParameterPlugin(agent.Options.UserParameter,
					agent.Options.UnsafeUserParameters, agent.Options.UserParameterDir); rerr != nil {
					plugin.RestoreUserParamMetrics(metrics)
					v.sink <- "cannot process user parameters request: " + rerr.Error()
					continue
				}

				m.processAndFlushUserParamQueue(time.Now())

				tasks := make(map[string]performerHeap)

				for key, plg := range m.plugins {
					if plg.usrprm {
						tasks[key] = plg.tasks
						delete(m.plugins, key)
					}
				}

				for _, key := range keys {
					m.addUserParamsPlugin(key)
					m.plugins[key].refcount++
				}

				for pluginkey, ltasks := range tasks {
					for task := peekTask(ltasks); task != nil; task = peekTask(ltasks) {
						heap.Pop(&ltasks)

						for _, key := range keys {
							if task.isItemKeyEqual(key) {
								task.setPlugin(m.plugins[pluginkey])
								m.plugins[pluginkey].enqueueTask(task)
							}
						}
					}
				}

				for _, key := range keys {
					heap.Push(&m.pluginQueue, m.plugins[key])
				}

				v.sink <- "ok"
			}
		}
	}
	log.Debugf("manager has been stopped")
	monitor.Unregister(monitor.Scheduler)
}

type pluginOptions struct {
	Capacity int `conf:"optional"`
	System   struct {
		ForceActiveChecksOnStart *int `conf:"optional"`
		Capacity                 int  `conf:"optional"`
	} `conf:"optional"`
}

func (m *Manager) init() {
	m.input = make(chan interface{}, 10)
	m.pluginQueue = make(pluginHeap, 0, len(plugin.Metrics))
	m.clients = make(map[uint64]*client)
	m.plugins = make(map[string]*pluginAgent)
	m.shutdownSeconds = shutdownInactive

	metrics := make([]*plugin.Metric, 0, len(plugin.Metrics))

	for _, metric := range plugin.Metrics {
		metrics = append(metrics, metric)
	}
	sort.Slice(metrics, func(i, j int) bool {
		return metrics[i].Plugin.Name() < metrics[j].Plugin.Name()
	})

	pagent := &pluginAgent{}
	for _, metric := range metrics {
		if metric.Plugin != pagent.impl {
			capacity, forceActiveChecksOnStart := getPluginOptions(agent.Options.Plugins[metric.Plugin.Name()], metric.Plugin.Name())
			if capacity > metric.Plugin.Capacity() {
				log.Warningf("lowering the plugin %s capacity to %d as the configured capacity %d exceeds limits",
					metric.Plugin.Name(), metric.Plugin.Capacity(), capacity)
				capacity = metric.Plugin.Capacity()
			}

			pagent = &pluginAgent{
				impl:                     metric.Plugin,
				tasks:                    make(performerHeap, 0),
				maxCapacity:              capacity,
				usedCapacity:             0,
				forceActiveChecksOnStart: forceActiveChecksOnStart,
				index:                    -1,
				refcount:                 0,
				usrprm:                   metric.UsrPrm,
			}

			interfaces := ""
			if _, ok := metric.Plugin.(plugin.Exporter); ok {
				interfaces += "exporter, "
			}
			if _, ok := metric.Plugin.(plugin.Collector); ok {
				interfaces += "collector, "
			}
			if _, ok := metric.Plugin.(plugin.Runner); ok {
				interfaces += "runner, "
			}
			if _, ok := metric.Plugin.(plugin.Watcher); ok {
				interfaces += "watcher, "
			}
			if _, ok := metric.Plugin.(plugin.Configurator); ok {
				interfaces += "configurator, "
			}
			interfaces = interfaces[:len(interfaces)-2]

			if metric.Plugin.IsExternal() {
				ext := metric.Plugin.(*external.Plugin)
				metric.Plugin.SetCapacity(1)
				log.Infof("using plugin '%s' (%s) providing following interfaces: %s", metric.Plugin.Name(),
					ext.Path, interfaces)
			} else {
				log.Infof("using plugin '%s' (built-in) providing following interfaces: %s", metric.Plugin.Name(),
					interfaces)
			}
		}
		m.plugins[metric.Key] = pagent
	}
}

func (m *Manager) Start() {
	log.Infof("Plugin communication protocol version is %s", comms.ProtocolVersion)

	monitor.Register(monitor.Scheduler)
	go m.run()
}

func (m *Manager) Stop() {
	m.input <- nil
}

func (m *Manager) UpdateTasks(clientID uint64, writer plugin.ResultWriter, firstActiveChecksRefreshed bool,
	expressions []*glexpr.Expression, requests []*plugin.Request) {

	m.input <- &updateRequest{clientID: clientID,
		sink:                       writer,
		requests:                   requests,
		expressions:                expressions,
		firstActiveChecksRefreshed: firstActiveChecksRefreshed,
	}
}

type resultWriter chan *plugin.Result

func (r resultWriter) Write(result *plugin.Result) {
	r <- result
}

func (r resultWriter) Flush() {
}

func (r resultWriter) SlotsAvailable() int {
	return 1
}

func (r resultWriter) PersistSlotsAvailable() int {
	return 1
}

func (m *Manager) PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) {
	var lastLogsize uint64
	var mtime int

	w := make(resultWriter, 1)

	m.UpdateTasks(clientID, w, false, nil, []*plugin.Request{{Key: key, LastLogsize: &lastLogsize, Mtime: &mtime}})

	select {
	case r := <-w:
		if r.Error == nil {
			if r.Value != nil {
				result = *r.Value
			} else {
				// single metric requests do not support empty values, return error instead
				err = errors.New("No values have been gathered yet.")
			}
		} else {
			err = r.Error
		}
	case <-time.After(timeout):
		err = fmt.Errorf("Timeout occurred while gathering data.")
	}
	return
}

func (m *Manager) FinishTask(task performer) {
	m.input <- task
}

func (m *Manager) Query(command string) (status string) {
	request := &queryRequest{command: command, sink: make(chan string)}
	m.input <- request
	return <-request.sink
}

func (m *Manager) QueryUserParams() (status string) {
	request := &queryRequestUserParams{sink: make(chan string)}
	m.input <- request
	return <-request.sink
}

func (m *Manager) validatePlugins(options *agent.AgentOptions) (err error) {
	for _, p := range plugin.Plugins {
		if c, ok := p.(plugin.Configurator); ok && !p.IsExternal() {
			if err = c.Validate(options.Plugins[p.Name()]); err != nil {
				return fmt.Errorf("invalid plugin %s configuration: %s", p.Name(), err)
			}
		}
	}
	return
}

func (m *Manager) configure(options *agent.AgentOptions) (err error) {
	m.aliases, err = alias.NewManager(options)
	return
}

func NewManager(options *agent.AgentOptions) (mannager *Manager, err error) {
	var m Manager
	m.init()
	if err = m.validatePlugins(options); err != nil {
		return
	}
	return &m, m.configure(options)
}

func (m *Manager) addUserParamsPlugin(key string) {
	var metric *plugin.Metric

	for _, metric = range plugin.Metrics {
		if metric.Key == key {
			break
		}
	}

	capacity := metric.Plugin.Capacity()

	pagent := &pluginAgent{
		impl:         metric.Plugin,
		tasks:        make(performerHeap, 0),
		maxCapacity:  capacity,
		usedCapacity: 0,
		index:        -1,
		refcount:     0,
		usrprm:       metric.UsrPrm,
	}

	m.plugins[key] = pagent
}

func peekTask(tasks performerHeap) performer {
	if len(tasks) == 0 {
		return nil
	}

	return tasks[0]
}

func getPluginOptions(optsRaw interface{}, name string) (capacity int, forceActiveChecksOnStart int) {
	pluginCap, pluginSystemCap, pluginForceActiveChecksOnStart := getPluginOpts(optsRaw, name)

	if pluginSystemCap > 0 {
		if pluginCap > 0 {
			log.Warningf("both Plugins.%s.Capacity and Plugins.%s.System.Capacity configuration parameters are set, using System.Capacity: %d",
				name, name, pluginSystemCap)
		}
		capacity = pluginSystemCap
	} else if pluginCap > 0 {
		log.Warningf(
			"plugin %s configuration parameter Plugins.%s.Capacity is deprecated, use Plugins.%s.System.Capacity instead",
			name, name, name,
		)
		capacity = pluginCap
	} else {
		capacity = plugin.DefaultCapacity
	}

	if nil != pluginForceActiveChecksOnStart {
		if *pluginForceActiveChecksOnStart > 1 || *pluginForceActiveChecksOnStart < 0 {
			log.Warningf("invalid Plugins.%s.System.ForceActiveChecksOnStart configuration parameter: %d",
				name, *pluginForceActiveChecksOnStart)
			forceActiveChecksOnStart = agent.Options.ForceActiveChecksOnStart
		} else {
			forceActiveChecksOnStart = *pluginForceActiveChecksOnStart
		}
	} else {
		forceActiveChecksOnStart = agent.Options.ForceActiveChecksOnStart
	}

	return
}

func getPluginOpts(optsRaw interface{}, name string) (pluginCap, pluginSystemCap int, forceActiveChecksOnStart *int) {
	var opt pluginOptions

	if optsRaw == nil {
		return
	}

	if err := conf.Unmarshal(optsRaw, &opt, false); err != nil {
		log.Warningf("invalid plugin %s configuration: %s", name, err)

		return
	}

	pluginCap = opt.Capacity
	pluginSystemCap = opt.System.Capacity
	forceActiveChecksOnStart = opt.System.ForceActiveChecksOnStart

	return
}