/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ package scheduler import ( "errors" "fmt" "reflect" "time" "git.zabbix.com/ap/plugin-support/log" "git.zabbix.com/ap/plugin-support/plugin" "zabbix.com/internal/agent" "zabbix.com/internal/agent/resultcache" "zabbix.com/pkg/itemutil" "zabbix.com/pkg/zbxlib" ) // task priority within the same second is done by setting nanosecond component const ( priorityConfiguratorTaskNs = iota priorityCommandTaskNs = iota priorityStarterTaskNs priorityCollectorTaskNs priorityWatcherTaskNs priorityExporterTaskNs priorityStopperTaskNs ) // exporterTaskAccessor is used by clients to track item exporter tasks . type exporterTaskAccessor interface { task() *exporterTask } // taskBase implements common task properties and functionality type taskBase struct { plugin *pluginAgent scheduled time.Time index int active bool recurring bool } func (t *taskBase) getPlugin() *pluginAgent { return t.plugin } func (t *taskBase) setPlugin(p *pluginAgent) { t.plugin = p } func (t *taskBase) getScheduled() time.Time { return t.scheduled } func (t *taskBase) getWeight() int { return 1 } func (t *taskBase) getIndex() int { return t.index } func (t *taskBase) setIndex(index int) { t.index = index } func (t *taskBase) deactivate() { if t.index != -1 { t.plugin.removeTask(t.index) } t.active = false } func (t *taskBase) isActive() bool { return t.active } func (t *taskBase) isRecurring() bool { return t.recurring } func (t *taskBase) isItemKeyEqual(itemkey string) bool { return false } // collectorTask provides access to plugin Collector interaface. type collectorTask struct { taskBase seed uint64 } func (t *collectorTask) perform(s Scheduler) { log.Debugf("plugin %s: executing collector task", t.plugin.name()) go func() { collector, _ := t.plugin.impl.(plugin.Collector) if err := collector.Collect(); err != nil { log.Warningf("plugin '%s' collector failed: %s", t.plugin.impl.Name(), err.Error()) } s.FinishTask(t) }() } func (t *collectorTask) reschedule(now time.Time) (err error) { collector, _ := t.plugin.impl.(plugin.Collector) period := int64(collector.Period()) if period == 0 { return fmt.Errorf("invalid collector interval 0 seconds") } seconds := now.Unix() nextcheck := period*(seconds/period) + int64(t.seed)%period for nextcheck <= seconds { nextcheck += period } t.scheduled = time.Unix(nextcheck, priorityCollectorTaskNs) return } func (t *collectorTask) getWeight() int { return t.plugin.maxCapacity } func (t *collectorTask) isItemKeyEqual(itemkey string) bool { return false } // exporterTask provides access to plugin Exporter interaface. It's used // for active check items. type exporterTask struct { taskBase item clientItem failed bool updated time.Time client ClientAccessor meta plugin.Meta output plugin.ResultWriter } func invokeExport(a plugin.Accessor, key string, params []string, ctx plugin.ContextProvider) (any, error) { exporter, _ := a.(plugin.Exporter) if a.HandleTimeout() { return exporter.Export(key, params, ctx) } var ret any var err error tc := make(chan bool) go func() { ret, err = exporter.Export(key, params, ctx) tc <- true }() select { case <-tc: case <-time.After(time.Second * time.Duration(ctx.Timeout())): err = fmt.Errorf("Timeout occurred while gathering data.") } return ret, err } func (t *exporterTask) perform(s Scheduler) { // pass item key as parameter so it can be safely updated while task is being processed in its goroutine go func(itemkey string) { var result *plugin.Result now := time.Now() var key string var params []string var err error if key, params, err = itemutil.ParseKey(itemkey); err == nil { var ret interface{} ret, err = invokeExport(t.plugin.impl, key, params, t) if err == nil { log.Debugf("executed exporter task for itemid:%d key '%s'", t.item.itemid, itemkey) if ret != nil { rt := reflect.TypeOf(ret) switch rt.Kind() { case reflect.Slice: fallthrough case reflect.Array: s := reflect.ValueOf(ret) for i := 0; i < s.Len(); i++ { result = itemutil.ValueToResult(t.item.itemid, now, s.Index(i).Interface()) t.output.Write(result) } default: result = itemutil.ValueToResult(t.item.itemid, now, ret) t.output.Write(result) } } } else { log.Debugf("failed to execute exporter task for itemid:%d key '%s' error: '%s'", t.item.itemid, itemkey, err.Error()) } } if err != nil { result = &plugin.Result{Itemid: t.item.itemid, Error: err, Ts: now} t.output.Write(result) } // set failed state based on last result if result != nil && result.Error != nil { log.Warningf(`check '%s' is not supported: %s`, itemkey, result.Error) t.failed = true } else { t.failed = false } s.FinishTask(t) }(t.item.key) } func (t *exporterTask) reschedule(now time.Time) (err error) { var nextcheck time.Time nextcheck, _, err = zbxlib.GetNextcheck(t.item.itemid, t.item.delay, now) if err != nil { return } t.scheduled = nextcheck.Add(priorityExporterTaskNs) return } func (t *exporterTask) task() (task *exporterTask) { return t } // plugin.ContextProvider interface func (t *exporterTask) ClientID() (clientid uint64) { return t.client.ID() } func (t *exporterTask) Output() (output plugin.ResultWriter) { return t.output } func (t *exporterTask) ItemID() (itemid uint64) { return t.item.itemid } func (t *exporterTask) isItemKeyEqual(itemkey string) bool { return t.item.key == itemkey } func (t *exporterTask) Meta() (meta *plugin.Meta) { return &t.meta } func (t *exporterTask) GlobalRegexp() plugin.RegexpMatcher { return t.client.GlobalRegexp() } func (t *exporterTask) Timeout() int { return t.item.timeout } // directExporterTask provides access to plugin Exporter interaface. // It's used for non-recurring exporter requests - single passive checks // and internal requests to obtain HostnameItem, HostMetadataItem, // HostInterfaceItem etc values. type directExporterTask struct { taskBase item clientItem done bool expire time.Time client ClientAccessor meta plugin.Meta output plugin.ResultWriter } func (t *directExporterTask) isRecurring() bool { return !t.done } func (t *directExporterTask) invokeExport(key string, params []string) (any, error) { ret, err := invokeExport(t.plugin.impl, key, params, t) if err != nil { log.Debugf("failed to execute direct exporter task for key '%s[%s]' error: '%s'", key, params, err.Error()) return nil, err } log.Debugf("executed direct exporter task for key '%s[%s]'", key, params) if ret != nil { rt := reflect.TypeOf(ret) switch rt.Kind() { case reflect.Slice, reflect.Array: return nil, errors.New("Multiple return values are not supported for single passive checks") default: return ret, nil } } return ret, nil } func (t *directExporterTask) perform(s Scheduler) { // pass item key as parameter so it can be safely updated while task is being processed in its goroutine go func(itemkey string) { var result *plugin.Result now := time.Now() var key string var params []string var err error if now.After(t.expire) { err = errors.New("No data available.") log.Debugf("direct exporter task expired for key '%s' error: '%s'", itemkey, err.Error()) } else { if key, params, err = itemutil.ParseKey(itemkey); err == nil { var ret interface{} log.Debugf("executing direct exporter task for key '%s'", itemkey) ret, err = t.invokeExport(key, params) if err == nil { result = itemutil.ValueToResult(t.item.itemid, now, ret) t.output.Write(result) t.done = true } } } if err != nil { result = &plugin.Result{Itemid: t.item.itemid, Error: err, Ts: now} t.output.Write(result) t.done = true } s.FinishTask(t) }(t.item.key) } func (t *directExporterTask) reschedule(now time.Time) (err error) { if t.scheduled.IsZero() { t.scheduled = time.Unix(now.Unix(), priorityExporterTaskNs) } else { t.scheduled = time.Unix(now.Unix()+1, priorityExporterTaskNs) } return } // plugin.ContextProvider interface func (t *directExporterTask) ClientID() (clientid uint64) { return t.client.ID() } func (t *directExporterTask) Output() (output plugin.ResultWriter) { return t.output } func (t *directExporterTask) ItemID() (itemid uint64) { return t.item.itemid } func (t *directExporterTask) isItemKeyEqual(itemkey string) bool { return t.item.key == itemkey } func (t *directExporterTask) Meta() (meta *plugin.Meta) { return &t.meta } func (t *directExporterTask) GlobalRegexp() plugin.RegexpMatcher { return t.client.GlobalRegexp() } func (t *directExporterTask) Timeout() int { return t.item.timeout } // starterTask provides access to plugin Exporter interaface Start() method. type starterTask struct { taskBase } func (t *starterTask) perform(s Scheduler) { log.Debugf("plugin %s: executing starter task", t.plugin.name()) go func() { runner, _ := t.plugin.impl.(plugin.Runner) runner.Start() s.FinishTask(t) }() } func (t *starterTask) reschedule(now time.Time) (err error) { t.scheduled = time.Unix(now.Unix(), priorityStarterTaskNs) return } func (t *starterTask) getWeight() int { return t.plugin.maxCapacity } func (t *starterTask) isItemKeyEqual(itemkey string) bool { return false } // stopperTask provides access to plugin Exporter interaface Start() method. type stopperTask struct { taskBase } func (t *stopperTask) perform(s Scheduler) { log.Debugf("plugin %s: executing stopper task", t.plugin.name()) go func() { runner, _ := t.plugin.impl.(plugin.Runner) runner.Stop() s.FinishTask(t) }() } func (t *stopperTask) reschedule(now time.Time) (err error) { t.scheduled = time.Unix(now.Unix(), priorityStopperTaskNs) return } func (t *stopperTask) getWeight() int { return t.plugin.maxCapacity } func (t *stopperTask) isItemKeyEqual(itemkey string) bool { return false } // stopperTask provides access to plugin Watcher interaface. type watcherTask struct { taskBase items []*plugin.Item client ClientAccessor } func (t *watcherTask) perform(s Scheduler) { log.Debugf("plugin %s: executing watcher task", t.plugin.name()) go func() { watcher, _ := t.plugin.impl.(plugin.Watcher) watcher.Watch(t.items, t) s.FinishTask(t) }() } func (t *watcherTask) reschedule(now time.Time) (err error) { t.scheduled = time.Unix(now.Unix(), priorityWatcherTaskNs) return } func (t *watcherTask) getWeight() int { return t.plugin.maxCapacity } // plugin.ContextProvider interface func (t *watcherTask) ClientID() (clientid uint64) { return t.client.ID() } func (t *watcherTask) Output() (output plugin.ResultWriter) { return t.client.Output() } func (t *watcherTask) ItemID() (itemid uint64) { return 0 } func (t *watcherTask) isItemKeyEqual(itemkey string) bool { return false } func (t *watcherTask) Meta() (meta *plugin.Meta) { return nil } func (t *watcherTask) GlobalRegexp() plugin.RegexpMatcher { return t.client.GlobalRegexp() } func (t *watcherTask) Timeout() int { return 0 } // configuratorTask provides access to plugin Configurator interaface. type configuratorTask struct { taskBase options *agent.AgentOptions } func (t *configuratorTask) perform(s Scheduler) { log.Debugf("plugin %s: executing configurator task", t.plugin.name()) go func() { config, _ := t.plugin.impl.(plugin.Configurator) config.Configure(agent.GlobalOptions(t.options), t.options.Plugins[t.plugin.name()]) s.FinishTask(t) }() } func (t *configuratorTask) reschedule(now time.Time) (err error) { t.scheduled = time.Unix(now.Unix(), priorityConfiguratorTaskNs) return } func (t *configuratorTask) getWeight() int { return t.plugin.maxCapacity } func (t *configuratorTask) isItemKeyEqual(itemkey string) bool { return false } // commandTask executes remote commands received with active requestes type commandTask struct { taskBase id uint64 params []string output resultcache.Writer } func (t *commandTask) isRecurring() bool { return false } func (t *commandTask) perform(s Scheduler) { // execute remote command go func() { e := t.plugin.impl.(plugin.Exporter) var cr *resultcache.CommandResult if ret, err := e.Export("system.run", t.params, nil); err == nil { if ret != nil { cr = &resultcache.CommandResult{ ID: t.id, Result: itemutil.ValueToString(ret), } } } else { log.Debugf("failed to execute remote command '%s' error: '%s'", t.params[0], err.Error()) cr = &resultcache.CommandResult{ ID: t.id, Error: err, } } t.output.WriteCommand(cr) t.output.Flush() s.FinishTask(t) }() } func (t *commandTask) reschedule(now time.Time) (err error) { t.scheduled = time.Unix(now.Unix(), priorityCommandTaskNs) return }