/*
** Zabbix
** Copyright (C) 2001-2023 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

// package resultcache provides result caching component.
//
// ResultCache runs in separate goroutine, caches results and flushes data to the
// specified output interface in json format when requested or cache is full.
// The cache limits are specified by configuration file (BufferSize). If cache
// limits are reached the following logic is applied to new results:
// * non persistent results replaces either oldest result of the same item, or
//   oldest non persistent result if item was not yet cached.
// * persistent results replaces oldest non persistent result if the total number
//   of persistent results is less than half maximum cache size. Otherwise the result
//   is appended, extending cache beyond configured limit.
//
// Because of asynchronous nature of the communications it's not possible for
// result cache to return error if it cannot accept new persistent result. So
// instead before writing result to the cache the caller (plugin) must check
// the result cache state with PersistSlotsAvailable() function. This still
// can lead to more results written than cache limits allow. However it's not a
// big problem because cache buffer is not static and will be extended as required.
// The cache limit (BufferSize) is treated more like recommendation than hard limit.
//
package resultcache

import (
	"database/sql"
	"errors"
	"fmt"
	"os"
	"time"

	"git.zabbix.com/ap/plugin-support/log"
	"git.zabbix.com/ap/plugin-support/plugin"
	"zabbix.com/internal/agent"
)

const (
	UploadRetryInterval = time.Second
)

type ResultCache interface {
	Start()
	Stop()
	Upload(u Uploader)
	// TODO: will be used once the runtime configuration reload is implemented
	UpdateOptions(options *agent.AgentOptions)
}

type AgentData struct {
	Id             uint64  `json:"id"`
	Itemid         uint64  `json:"itemid"`
	LastLogsize    *uint64 `json:"lastlogsize,omitempty"`
	Mtime          *int    `json:"mtime,omitempty"`
	State          *int    `json:"state,omitempty"`
	Value          *string `json:"value,omitempty"`
	EventSource    *string `json:"source,omitempty"`
	EventID        *int    `json:"eventid,omitempty"`
	EventSeverity  *int    `json:"severity,omitempty"`
	EventTimestamp *int    `json:"timestamp,omitempty"`
	Clock          int     `json:"clock,omitempty"`
	Ns             int     `json:"ns,omitempty"`
	persistent     bool
}

type AgentDataRequest struct {
	Request string       `json:"request"`
	Data    []*AgentData `json:"data"`
	Session string       `json:"session"`
	Host    string       `json:"host"`
	Version string       `json:"version"`
}

type Uploader interface {
	Write(data []byte, timeout time.Duration) (err []error)
	Addr() (s string)
	Hostname() (s string)
	CanRetry() (enabled bool)
	Session() (s string)
}

// common cache data
type cacheData struct {
	log.Logger
	input      chan interface{}
	uploader   Uploader
	clientID   uint64
	lastDataID uint64
	lastErrors []error
	retry      *time.Timer
	timeout    int
}

func (c *cacheData) Stop() {
	c.input <- nil
}

func (c *cacheData) Write(result *plugin.Result) {
	c.input <- result
}

// TODO: will be used once the runtime configuration reload is implemented
func (c *cacheData) UpdateOptions(options *agent.AgentOptions) {
	c.input <- options
}

func (c *cacheData) Upload(u Uploader) {
	if u == nil {
		u = c.uploader
	}
	if u != nil {
		c.input <- u
	}
}

func (c *cacheData) Flush() {
	c.Upload(nil)
}

func tableName(prefix string, index int) string {
	return fmt.Sprintf("%s_%d", prefix, index)
}

// fetchRowAndClose fetches and scans the next row. False is returned if there are no
// rows to fetch or an error occurred.
func fetchRowAndClose(rows *sql.Rows, args ...interface{}) (ok bool, err error) {
	if rows.Next() {
		err = rows.Scan(args...)
		rows.Close()
		return err == nil, err
	}
	return false, rows.Err()
}

func New(options *agent.AgentOptions, clientid uint64, output Uploader) ResultCache {
	data := &cacheData{
		Logger:   log.New(fmt.Sprintf("%d", clientid)),
		clientID: clientid,
		input:    make(chan interface{}, 100),
		uploader: output,
	}

	if options.EnablePersistentBuffer == 0 {
		c := &MemoryCache{
			cacheData: data,
		}
		c.init(options)
		return c
	} else {
		c := &DiskCache{
			cacheData: data,
		}
		c.init(options)
		return c
	}
}

func createTableQuery(table string, id int) string {
	return fmt.Sprintf(
		"CREATE TABLE IF NOT EXISTS %s_%d ("+
			"id INTEGER,"+
			"write_clock INTEGER,"+
			"itemid INTEGER,"+
			"lastlogsize INTEGER,"+
			"mtime INTEGER,"+
			"state INTEGER,"+
			"value TEXT,"+
			"eventsource TEXT,"+
			"eventid INTEGER,"+
			"eventseverity INTEGER,"+
			"eventtimestamp INTEGER,"+
			"clock INTEGER,"+
			"ns INTEGER"+
			")",
		table, id)
}

func prepareDiskCache(options *agent.AgentOptions, addresses [][]string, hostnames []string) (err error) {
	type activeCombination struct {
		address  string
		hostname string
	}

	var database *sql.DB
	database, err = sql.Open("sqlite3", options.PersistentBufferFile)
	if err != nil {
		return fmt.Errorf("Cannot open database %s : %s.", options.PersistentBufferFile, err)
	}
	defer database.Close()

	stmt, err := database.Prepare("CREATE TABLE IF NOT EXISTS registry (id INTEGER PRIMARY KEY,address TEXT,hostname TEXT,UNIQUE(address,hostname))")
	if err != nil {
		return err
	}

	defer stmt.Close()

	if _, err = stmt.Exec(); err != nil {
		return err
	}

	var id int
	var address string
	var hostname string
	ids := make([]int, 0)
	combinations := make([]activeCombination, 0)
	registeredCombinations := make([]activeCombination, 0)

	for _, addr := range addresses {
		for _, host := range hostnames {
			combinations = append(combinations, activeCombination{address: addr[0], hostname: host})
		}
	}

	rows, err := database.Query("SELECT id,address,hostname FROM registry")
	if err != nil {
		return err
	}

	for rows.Next() {
		if err = rows.Scan(&id, &address, &hostname); err != nil {
			rows.Close()
			return err
		}
		ids = append(ids, id)
		registeredCombinations = append(registeredCombinations, activeCombination{address: address, hostname: hostname})
	}
	if err = rows.Err(); err != nil {
		return err
	}
addressCheck:
	for i, cr := range registeredCombinations {
		for _, c := range combinations {
			if c.address == cr.address && c.hostname == cr.hostname {
				continue addressCheck
			}
		}
		if _, err = database.Exec(fmt.Sprintf("DELETE FROM registry WHERE ID = %d", ids[i])); err != nil {
			return err
		}
		if _, err = database.Exec(fmt.Sprintf("DROP TABLE data_%d", ids[i])); err != nil {
			return err
		}
		if _, err = database.Exec(fmt.Sprintf("DROP TABLE log_%d", ids[i])); err != nil {
			return err
		}
	}

	for _, c := range combinations {
		stmt, err = database.Prepare("INSERT OR IGNORE INTO registry (address,hostname) VALUES (?,?)")
		if err != nil {
			return err
		}

		defer stmt.Close()

		if _, err = stmt.Exec(c.address, c.hostname); err != nil {
			return err
		}
		rows, err = database.Query("SELECT id FROM registry WHERE address=? AND hostname=?", c.address, c.hostname)
		if err != nil {
			return err
		}

		if ok, err := fetchRowAndClose(rows, &id); !ok {
			if err == nil {
				err = fmt.Errorf("cannot select id for address %s hostname %s", c.address, c.hostname)
			}
			return err
		}

		stmt, err = database.Prepare(createTableQuery("data", id))
		if err != nil {
			return err
		}

		defer stmt.Close()

		if _, err = stmt.Exec(); err != nil {
			return err
		}
		if _, err = database.Exec(fmt.Sprintf("CREATE INDEX IF NOT EXISTS data_%d_1 ON data_%d (write_clock)", id, id)); err != nil {
			return err
		}

		stmt, err = database.Prepare(createTableQuery("log", id))
		if err != nil {
			return err
		}

		defer stmt.Close()

		if _, err = stmt.Exec(); err != nil {
			return err
		}
		if _, err = database.Exec(fmt.Sprintf("CREATE INDEX IF NOT EXISTS log_%d_1 ON log_%d (write_clock)", id, id)); err != nil {
			return err
		}

		/* delete gathered logs - they will be rescanned using the lastlogsize received from server */
		if _, err = database.Exec(fmt.Sprintf("DELETE FROM log_%d", id)); err != nil {
			return err
		}
	}

	return nil
}

func Prepare(options *agent.AgentOptions, addresses [][]string, hostnames []string) (err error) {
	if options.EnablePersistentBuffer == 1 && options.PersistentBufferFile == "" {
		return errors.New("\"EnablePersistentBuffer\" parameter misconfiguration: \"PersistentBufferFile\" parameter is not set")
	}
	if options.EnablePersistentBuffer == 0 {
		if options.PersistentBufferFile != "" {
			return errors.New("\"PersistentBufferFile\" parameter is not empty but \"EnablePersistentBuffer\" is not set")
		}
		return
	}

	if err = prepareDiskCache(options, addresses, hostnames); err != nil {
		if err = os.Remove(options.PersistentBufferFile); err != nil {
			return
		}
		err = prepareDiskCache(options, addresses, hostnames)
	}
	return
}