/*
** Zabbix
** Copyright (C) 2001-2023 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/
package resultcache

import (
	"encoding/json"
	"errors"
	"reflect"
	"sync/atomic"
	"time"

	"git.zabbix.com/ap/plugin-support/log"
	"git.zabbix.com/ap/plugin-support/plugin"
	"zabbix.com/internal/agent"
	"zabbix.com/internal/monitor"
	"zabbix.com/pkg/itemutil"
	"zabbix.com/pkg/version"
)

type MemoryCache struct {
	*cacheData
	results         []*AgentData
	maxBufferSize   int32
	totalValueNum   int32
	persistValueNum int32
}

func (c *MemoryCache) upload(u Uploader) (err error) {
	if len(c.results) == 0 {
		return
	}

	c.Debugf("upload history data, %d/%d value(s)", len(c.results), cap(c.results))

	request := AgentDataRequest{
		Request: "agent data",
		Data:    c.results,
		Session: u.Session(),
		Host:    u.Hostname(),
		Version: version.Short(),
	}

	var data []byte

	if data, err = json.Marshal(&request); err != nil {
		c.Errf("cannot convert cached history to json: %s", err.Error())
		return
	}

	timeout := len(c.results) * c.timeout
	if timeout > 60 {
		timeout = 60
	}
	if errs := u.Write(data, time.Duration(timeout)*time.Second); errs != nil {
		if !reflect.DeepEqual(errs, c.lastErrors) {
			for i := 0; i < len(errs); i++ {
				c.Warningf("%s", errs[i])
			}
			c.Warningf("history upload to [%s] [%s] started to fail", u.Addr(), u.Hostname())
			c.lastErrors = errs
		}

		return errors.New("history upload failed")
	}

	if c.lastErrors != nil {
		c.Warningf("history upload to [%s] [%s] is working again", u.Addr(), u.Hostname())
		c.lastErrors = nil
	}

	// clear results slice to ensure that the data is garbage collected
	c.results[0] = nil
	for i := 1; i < len(c.results); i *= 2 {
		copy(c.results[i:], c.results[:i])
	}
	c.results = c.results[:0]

	c.totalValueNum = 0
	c.persistValueNum = 0
	return
}

func (c *MemoryCache) flushOutput(u Uploader) {
	if c.retry != nil {
		c.retry.Stop()
		c.retry = nil
	}

	if c.upload(u) != nil && u.CanRetry() {
		c.retry = time.AfterFunc(UploadRetryInterval, func() { c.Upload(u) })
	}
}

// addResult appends received result at the end of results slice
func (c *MemoryCache) addResult(result *AgentData) {
	full := c.persistValueNum >= c.maxBufferSize/2 || c.totalValueNum >= c.maxBufferSize
	c.results = append(c.results, result)
	c.totalValueNum++
	if result.persistent {
		c.persistValueNum++
	}

	if c.persistValueNum >= c.maxBufferSize/2 || c.totalValueNum >= c.maxBufferSize {
		if !full && c.uploader != nil {
			c.flushOutput(c.uploader)
		}
	}
}

// insertResult attempts to insert the received result into results slice by replacing existing value.
// If no appropriate target was found it calls addResult to append value.
func (c *MemoryCache) insertResult(result *AgentData) {
	index := -1
	if !result.persistent {
		for i, r := range c.results {
			if r.Itemid == result.Itemid {
				c.Debugf("cache is full, replacing oldest value for itemid:%d", r.Itemid)
				index = i
				break
			}
		}
	}
	if index == -1 && (!result.persistent || c.persistValueNum < c.maxBufferSize/2) {
		for i, r := range c.results {
			if !r.persistent {
				if result.persistent {
					c.persistValueNum++
				}
				c.Debugf("cache is full, removing oldest value for itemid:%d", r.Itemid)
				index = i
				break
			}
		}
	}
	if index == -1 {
		c.Warningf("cache is full and cannot cannot find a value to replace, adding new instead")
		c.addResult(result)
		return
	}

	copy(c.results[index:], c.results[index+1:])
	c.results[len(c.results)-1] = result
}

func (c *MemoryCache) write(r *plugin.Result) {
	c.lastDataID++
	var value *string
	var state *int
	if r.Error == nil {
		value = r.Value
	} else {
		errmsg := r.Error.Error()
		value = &errmsg
		tmp := itemutil.StateNotSupported
		state = &tmp
	}

	var clock, ns int
	if !r.Ts.IsZero() {
		clock = int(r.Ts.Unix())
		ns = r.Ts.Nanosecond()
	}

	data := &AgentData{
		Id:             c.lastDataID,
		Itemid:         r.Itemid,
		LastLogsize:    r.LastLogsize,
		Mtime:          r.Mtime,
		Clock:          clock,
		Ns:             ns,
		Value:          value,
		State:          state,
		EventSource:    r.EventSource,
		EventID:        r.EventID,
		EventSeverity:  r.EventSeverity,
		EventTimestamp: r.EventTimestamp,
		persistent:     r.Persistent,
	}

	if c.totalValueNum >= c.maxBufferSize {
		c.insertResult(data)
	} else {
		c.addResult(data)
	}
}

func (c *MemoryCache) run() {
	defer log.PanicHook()
	c.Debugf("starting memory cache")

	for {
		u := <-c.input
		if u == nil {
			break
		}
		switch v := u.(type) {
		case Uploader:
			c.flushOutput(v)
		case *plugin.Result:
			c.write(v)
		case *agent.AgentOptions:
			c.updateOptions(v)
		}
	}
	c.Debugf("memory cache has been stopped")
	monitor.Unregister(monitor.Output)
}

func (c *MemoryCache) updateOptions(options *agent.AgentOptions) {
	c.maxBufferSize = int32(options.BufferSize)
	c.timeout = options.Timeout
}

func (c *MemoryCache) init(options *agent.AgentOptions) {
	c.updateOptions(options)
	c.results = make([]*AgentData, 0, c.maxBufferSize)
}

func (c *MemoryCache) Start() {
	// register with secondary group to stop result cache after other components are stopped
	monitor.Register(monitor.Output)
	go c.run()
}

func (c *MemoryCache) SlotsAvailable() int {
	slots := atomic.LoadInt32(&c.maxBufferSize) - atomic.LoadInt32(&c.totalValueNum)
	if slots < 0 {
		slots = 0
	}

	return int(slots)
}

func (c *MemoryCache) PersistSlotsAvailable() int {
	slots := atomic.LoadInt32(&c.maxBufferSize)/2 - atomic.LoadInt32(&c.persistValueNum)
	if slots < 0 {
		slots = 0
	}
	return int(slots)
}