/* ** Zabbix ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ package scheduler import ( "container/heap" "fmt" "reflect" "strconv" "testing" "time" "zabbix.com/internal/agent" "zabbix.com/internal/agent/alias" "zabbix.com/pkg/conf" "zabbix.com/pkg/itemutil" "zabbix.com/pkg/log" "zabbix.com/pkg/plugin" ) // getNextCheck calculates simplified nextcheck based on the specified delay string and current time func getNextcheck(delay string, from time.Time) (nextcheck time.Time) { simple_delay, _ := strconv.ParseInt(delay, 10, 64) from_seconds := from.Unix() return time.Unix(from_seconds-from_seconds%simple_delay+simple_delay, 0) } type callTracker interface { call(key string) called() map[string][]time.Time } type mockPlugin struct { calls map[string][]time.Time now *time.Time } func (p *mockPlugin) call(key string) { if p.calls == nil { p.calls = make(map[string][]time.Time) } if p.calls[key] == nil { p.calls[key] = make([]time.Time, 0, 20) } p.calls[key] = append(p.calls[key], *p.now) } func (p *mockPlugin) called() map[string][]time.Time { return p.calls } type mockExporterPlugin struct { plugin.Base mockPlugin } func (p *mockExporterPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) { p.call(key) return } type mockCollectorPlugin struct { plugin.Base mockPlugin period int } func (p *mockCollectorPlugin) Collect() (err error) { p.call("$collect") return } func (p *mockCollectorPlugin) Period() (period int) { return p.period } type mockCollectorExporterPlugin struct { plugin.Base mockPlugin period int } func (p *mockCollectorExporterPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) { p.call(key) return } func (p *mockCollectorExporterPlugin) Collect() (err error) { p.call("$collect") return } func (p *mockCollectorExporterPlugin) Period() (period int) { return p.period } type mockRunnerPlugin struct { plugin.Base mockPlugin } func (p *mockRunnerPlugin) Start() { p.call("$start") } func (p *mockRunnerPlugin) Stop() { p.call("$stop") } type mockPassiveRunnerPlugin struct { plugin.Base mockPlugin } func (p *mockPassiveRunnerPlugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) { return } func (p *mockPassiveRunnerPlugin) Start() { p.call("$start") } func (p *mockPassiveRunnerPlugin) Stop() { p.call("$stop") } type watchTracker interface { watched() []*plugin.Request } type mockWatcherPlugin struct { plugin.Base mockPlugin requests []*plugin.Request } func (p *mockWatcherPlugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) { p.call("$watch") p.requests = requests } func (p *mockWatcherPlugin) watched() []*plugin.Request { return p.requests } type mockRunnerWatcherPlugin struct { plugin.Base mockPlugin requests []*plugin.Request } func (p *mockRunnerWatcherPlugin) Start() { p.call("$start") } func (p *mockRunnerWatcherPlugin) Stop() { p.call("$stop") } func (p *mockRunnerWatcherPlugin) Watch(requests []*plugin.Request, ctx plugin.ContextProvider) { p.call("$watch") p.requests = requests } func (p *mockRunnerWatcherPlugin) watched() []*plugin.Request { return p.requests } type mockConfiguratorPlugin struct { plugin.Base mockPlugin options interface{} } func (p *mockConfiguratorPlugin) Configure(global *plugin.GlobalOptions, options interface{}) { p.call("$configure") } func (p *mockConfiguratorPlugin) Validate(options interface{}) (err error) { return } type resultCacheMock struct { results []*plugin.Result } func (c *resultCacheMock) Write(r *plugin.Result) { c.results = append(c.results, r) } func (c *resultCacheMock) Flush() { } func (pc *resultCacheMock) SlotsAvailable() int { return 1 } func (pc *resultCacheMock) PersistSlotsAvailable() int { return 1 } type mockManager struct { Manager sink chan performer now time.Time startTime time.Time } func (m *mockManager) finishTasks() { for { select { case p := <-m.sink: m.processFinishRequest(p) default: return } } } func (m *mockManager) iterate(t *testing.T, iters int) { for i := 0; i < iters; i++ { m.now = m.now.Add(time.Second) m.processQueue(m.now) m.finishTasks() } } func (m *mockManager) mockInit(t *testing.T) { m.init() m.aliases, _ = alias.NewManager(nil) clock := time.Now().Unix() m.startTime = time.Unix(clock-clock%10, 100) t.Logf("starting time %s", m.startTime.Format(time.Stamp)) m.now = m.startTime } func (m *mockManager) update(update *updateRequest) { m.processUpdateRequest(update, m.now) } func (m *mockManager) mockTasks() { index := make(map[exporterTaskAccessor]uint64) for clientid, client := range m.clients { for _, task := range client.exporters { index[task] = clientid } client.exporters = make(map[uint64]exporterTaskAccessor) } for _, p := range m.plugins { tasks := p.tasks p.tasks = make(performerHeap, 0, len(tasks)) for j, task := range tasks { switch t := task.(type) { case *collectorTask: collector := p.impl.(plugin.Collector) mockTask := &mockCollectorTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: getNextcheck(fmt.Sprintf("%d", collector.Period()), m.now).Add(priorityCollectorTaskNs), index: -1, active: task.isActive(), recurring: true, }, sink: m.sink, } p.enqueueTask(mockTask) case *exporterTask: mockTask := &mockExporterTask{ exporterTask: exporterTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: getNextcheck(t.item.delay, m.now).Add(priorityExporterTaskNs), index: -1, active: task.isActive(), recurring: true, }, item: t.item, client: t.client, meta: t.meta, }, sink: m.sink, } p.enqueueTask(mockTask) m.clients[index[t]].exporters[t.item.itemid] = mockTask case *directExporterTask: mockTask := &mockExporterTask{ exporterTask: exporterTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: getNextcheck(t.item.delay, m.now).Add(priorityExporterTaskNs), index: -1, active: task.isActive(), recurring: true, }, item: t.item, client: t.client, meta: t.meta, }, sink: m.sink, } p.enqueueTask(mockTask) case *starterTask: mockTask := &mockStarterTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: m.now, index: -1, active: task.isActive(), }, sink: m.sink, } p.enqueueTask(mockTask) case *stopperTask: mockTask := &mockStopperTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: m.now.Add(priorityStopperTaskNs), index: -1, active: task.isActive(), }, sink: m.sink, } p.enqueueTask(mockTask) case *watcherTask: mockTask := &mockWatcherTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: m.now.Add(priorityWatcherTaskNs), index: -1, active: task.isActive(), }, sink: m.sink, requests: t.requests, client: t.client, } p.enqueueTask(mockTask) case *configuratorTask: mockTask := &mockConfigerTask{ taskBase: taskBase{ plugin: task.getPlugin(), scheduled: m.now.Add(priorityWatcherTaskNs), index: -1, active: task.isActive(), }, options: t.options, sink: m.sink, } p.enqueueTask(mockTask) default: p.enqueueTask(task) } tasks[j].setIndex(-1) } m.pluginQueue.Update(p) } } // checks if the times timestamps match the offsets within the specified range func (m *mockManager) checkTimeline(t *testing.T, name string, times []time.Time, offsets []int, iters int) { start := m.now.Add(-time.Second * time.Duration(iters-1)) to := int(m.now.Sub(m.startTime) / time.Second) from := to - iters + 1 var left, right int // find the range start in timestamps if len(times) != 0 { for times[left].Before(start) { left++ if left == len(times) { break } } } // find the range start in offsets if len(offsets) != 0 { for offsets[right] < from { right++ if right == len(offsets) { break } } } for left < len(times) && right < len(offsets) { if times[left].After(m.now) { if offsets[right] <= to { t.Errorf("Plugin %s: no matching timestamp for offset %d", name, offsets[right]) } return } if offsets[right] > to { t.Errorf("Plugin %s: no matching offset for timestamp %s", name, times[left].Format(time.Stamp)) return } offsetTime := m.startTime.Add(time.Second * time.Duration(offsets[right])) if !offsetTime.Equal(times[left]) { t.Errorf("Plugin %s: offset %d time %s does not match timestamp %s", name, offsets[right], offsetTime.Format(time.Stamp), times[left].Format(time.Stamp)) return } left++ right++ } if left != len(times) && !times[left].After(m.now) { t.Errorf("Plugin %s: no matching offset for timestamp %s", name, times[left].Format(time.Stamp)) return } if right != len(offsets) && offsets[right] <= to { t.Errorf("Plugin %s: no matching timestamp for offset %d", name, offsets[right]) return } } // checks plugin call timeline within the specified range func (m *mockManager) checkPluginTimeline(t *testing.T, plugins []plugin.Accessor, calls []map[string][]int, iters int) { for i, p := range plugins { tracker := p.(callTracker).called() for key, offsets := range calls[i] { m.checkTimeline(t, p.Name()+":"+key, tracker[key], offsets, iters) } } } type mockExporterTask struct { exporterTask sink chan performer } func (t *mockExporterTask) perform(s Scheduler) { key, params, _ := itemutil.ParseKey(t.item.key) _, _ = t.plugin.impl.(plugin.Exporter).Export(key, params, t) t.sink <- t } func (t *mockExporterTask) reschedule(now time.Time) (err error) { t.scheduled = getNextcheck(t.item.delay, t.scheduled) return } func (t *mockExporterTask) task() (task *exporterTask) { return &t.exporterTask } // plugin.ContextProvider interface func (t *mockExporterTask) Output() (output plugin.ResultWriter) { return nil } func (t *mockExporterTask) Meta() (meta *plugin.Meta) { return &t.meta } func (t *mockExporterTask) GlobalRegexp() plugin.RegexpMatcher { return t.client.GlobalRegexp() } type mockCollectorTask struct { taskBase sink chan performer } func (t *mockCollectorTask) perform(s Scheduler) { _ = t.plugin.impl.(plugin.Collector).Collect() t.sink <- t } func (t *mockCollectorTask) reschedule(now time.Time) (err error) { t.scheduled = getNextcheck(fmt.Sprintf("%d", t.plugin.impl.(plugin.Collector).Period()), t.scheduled) return } func (t *mockCollectorTask) getWeight() int { return t.plugin.maxCapacity } type mockStarterTask struct { taskBase sink chan performer } func (t *mockStarterTask) perform(s Scheduler) { t.plugin.impl.(plugin.Runner).Start() t.sink <- t } func (t *mockStarterTask) reschedule(now time.Time) (err error) { return } func (t *mockStarterTask) getWeight() int { return t.plugin.maxCapacity } type mockStopperTask struct { taskBase sink chan performer } func (t *mockStopperTask) perform(s Scheduler) { t.plugin.impl.(plugin.Runner).Stop() t.sink <- t } func (t *mockStopperTask) reschedule(now time.Time) (err error) { return } func (t *mockStopperTask) getWeight() int { return t.plugin.maxCapacity } type mockWatcherTask struct { taskBase sink chan performer resultSink plugin.ResultWriter requests []*plugin.Request client ClientAccessor } func (t *mockWatcherTask) perform(s Scheduler) { log.Debugf("%s %v", t.plugin.impl.Name(), t.requests) t.plugin.impl.(plugin.Watcher).Watch(t.requests, t) t.sink <- t } func (t *mockWatcherTask) reschedule(now time.Time) (err error) { return } func (t *mockWatcherTask) getWeight() int { return t.plugin.maxCapacity } // plugin.ContextProvider interface func (t *mockWatcherTask) ClientID() (clientid uint64) { return t.client.ID() } func (t *mockWatcherTask) ItemID() (itemid uint64) { return 0 } func (t *mockWatcherTask) Output() (output plugin.ResultWriter) { return t.resultSink } func (t *mockWatcherTask) Meta() (meta *plugin.Meta) { return nil } func (t *mockWatcherTask) GlobalRegexp() plugin.RegexpMatcher { return t.client.GlobalRegexp() } type mockConfigerTask struct { taskBase sink chan performer options *agent.AgentOptions } func (t *mockConfigerTask) perform(s Scheduler) { t.plugin.impl.(plugin.Configurator).Configure(agent.GlobalOptions(t.options), t.options.Plugins[t.plugin.name()]) t.sink <- t } func (t *mockConfigerTask) reschedule(now time.Time) (err error) { return } func (t *mockConfigerTask) getWeight() int { return t.plugin.maxCapacity } func checkExporterTasks(t *testing.T, m *Manager, clientID uint64, items []*clientItem) { lastCheck := time.Time{} n := 0 for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() { if task := p.peekTask(); task != nil { if task.getScheduled().Before(lastCheck) { t.Errorf("Out of order tasks detected") } heap.Pop(&m.pluginQueue) p.popTask() n++ if p.peekTask() != nil { heap.Push(&m.pluginQueue, p) } } else { heap.Pop(&m.pluginQueue) } } if len(items) != n { t.Errorf("Expected %d tasks while got %d", len(items), n) } var requestClient *client var ok bool if requestClient, ok = m.clients[clientID]; !ok { t.Errorf("Cannot find owner of the default client") return } for _, item := range items { if tacc, ok := requestClient.exporters[item.itemid]; ok { ti := tacc.task().item if ti.delay != item.delay { t.Errorf("Expected item %d delay %s while got %s", item.itemid, item.delay, ti.delay) } if ti.key != item.key { t.Errorf("Expected item %d key %s while got %s", item.itemid, item.key, ti.key) } } else { t.Errorf("Item %d was not queued", item.itemid) } } if len(items) != len(requestClient.exporters) { t.Errorf("Expected %d queued items while got %d", len(items), len(requestClient.exporters)) } } func TestTaskCreate(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) plugin.ClearRegistry() plugins := make([]mockExporterPlugin, 3) for i := range plugins { p := &plugins[i] name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(p, name, name, "Debug.") } manager, _ := NewManager(&agent.Options) items := []*clientItem{ &clientItem{itemid: 1, delay: "151", key: "debug1"}, &clientItem{itemid: 2, delay: "103", key: "debug2"}, &clientItem{itemid: 3, delay: "79", key: "debug3"}, &clientItem{itemid: 4, delay: "17", key: "debug1"}, &clientItem{itemid: 5, delay: "7", key: "debug2"}, &clientItem{itemid: 6, delay: "1", key: "debug3"}, &clientItem{itemid: 7, delay: "63", key: "debug1"}, &clientItem{itemid: 8, delay: "47", key: "debug2"}, &clientItem{itemid: 9, delay: "31", key: "debug3"}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) if len(manager.pluginQueue) != 3 { t.Errorf("Expected %d plugins queued while got %d", 3, len(manager.pluginQueue)) } checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items) } func TestTaskUpdate(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) plugin.ClearRegistry() plugins := make([]mockExporterPlugin, 3) for i := range plugins { p := &plugins[i] name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(p, name, name, "Debug.") } manager, _ := NewManager(&agent.Options) items := []*clientItem{ &clientItem{itemid: 1, delay: "151", key: "debug1"}, &clientItem{itemid: 2, delay: "103", key: "debug2"}, &clientItem{itemid: 3, delay: "79", key: "debug3"}, &clientItem{itemid: 4, delay: "17", key: "debug1"}, &clientItem{itemid: 5, delay: "7", key: "debug2"}, &clientItem{itemid: 6, delay: "1", key: "debug3"}, &clientItem{itemid: 7, delay: "63", key: "debug1"}, &clientItem{itemid: 8, delay: "47", key: "debug2"}, &clientItem{itemid: 9, delay: "31", key: "debug3"}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) for _, item := range items { item.delay = "10" + item.delay item.key = item.key + "[1]" } update.requests = update.requests[:0] for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) if len(manager.pluginQueue) != 3 { t.Errorf("Expected %d plugins queued while got %d", 3, len(manager.pluginQueue)) } checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items) } func TestTaskUpdateInvalidInterval(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) plugin.ClearRegistry() plugins := make([]mockExporterPlugin, 3) for i := range plugins { p := &plugins[i] name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(p, name, name, "Debug.") } manager, _ := NewManager(&agent.Options) items := []*clientItem{ &clientItem{itemid: 1, delay: "151", key: "debug1"}, &clientItem{itemid: 2, delay: "103", key: "debug2"}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) items[0].delay = "xyz" update.requests = update.requests[:0] for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) if len(manager.plugins["debug1"].tasks) != 0 { t.Errorf("Expected %d tasks queued while got %d", 0, len(manager.plugins["debug1"].tasks)) } } func TestTaskDelete(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) plugin.ClearRegistry() plugins := make([]mockExporterPlugin, 3) for i := range plugins { p := &plugins[i] name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(p, name, name, "Debug.") } manager, _ := NewManager(&agent.Options) items := []*clientItem{ &clientItem{itemid: 1, delay: "151", key: "debug1"}, &clientItem{itemid: 2, delay: "103", key: "debug2"}, &clientItem{itemid: 3, delay: "79", key: "debug3"}, // remove &clientItem{itemid: 4, delay: "17", key: "debug1"}, &clientItem{itemid: 5, delay: "7", key: "debug2"}, &clientItem{itemid: 6, delay: "1", key: "debug3"}, // remove &clientItem{itemid: 7, delay: "63", key: "debug1"}, &clientItem{itemid: 8, delay: "47", key: "debug2"}, // remove &clientItem{itemid: 9, delay: "31", key: "debug3"}, // remove } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) items[2] = items[6] items = items[:cap(items)-4] update.requests = update.requests[:0] for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.processUpdateRequest(&update, time.Now()) if len(manager.plugins["debug3"].tasks) != 0 { t.Errorf("Expected %d tasks queued while got %d", 0, len(manager.plugins["debug3"].tasks)) } checkExporterTasks(t, manager, agent.MaxBuiltinClientID+1, items) } func TestSchedule(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "1", key: "debug1"}, &clientItem{itemid: 2, delay: "2", key: "debug2"}, &clientItem{itemid: 3, delay: "5", key: "debug3"}, } calls := []map[string][]int{ map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}}, map[string][]int{"debug2": []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}}, map[string][]int{"debug3": []int{5, 10, 15, 20}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 20) manager.checkPluginTimeline(t, plugins, calls, 20) } func TestScheduleCapacity(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 2) for i := range plugins { plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) p := manager.plugins["debug2"] p.maxCapacity = 2 items := []*clientItem{ &clientItem{itemid: 1, delay: "1", key: "debug1"}, &clientItem{itemid: 2, delay: "2", key: "debug2"}, &clientItem{itemid: 3, delay: "2", key: "debug2"}, &clientItem{itemid: 4, delay: "2", key: "debug2"}, } calls := []map[string][]int{ map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}}, map[string][]int{"debug2": []int{2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 10}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 10) manager.checkPluginTimeline(t, plugins, calls, 10) } func TestScheduleUpdate(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "1", key: "debug1"}, &clientItem{itemid: 2, delay: "1", key: "debug2"}, &clientItem{itemid: 3, delay: "1", key: "debug3"}, } calls := []map[string][]int{ map[string][]int{"debug1": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20}}, map[string][]int{"debug2": []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20}}, map[string][]int{"debug3": []int{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) } func TestCollectorSchedule(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 1) for i := range plugins { plugins[i] = &mockCollectorPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "1", key: "debug1"}, } calls := []map[string][]int{ map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 20) manager.checkPluginTimeline(t, plugins, calls, 20) } func TestCollectorScheduleUpdate(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockCollectorPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug1"}, &clientItem{itemid: 2, delay: "5", key: "debug2"}, &clientItem{itemid: 3, delay: "5", key: "debug3"}, } calls := []map[string][]int{ map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 12, 14}}, map[string][]int{"$collect": []int{2, 4, 6, 8, 10, 22, 24}}, map[string][]int{"$collect": []int{2, 4, 22, 24}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:1] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[1:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) } func TestRunner(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockRunnerPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug1"}, &clientItem{itemid: 2, delay: "5", key: "debug2"}, &clientItem{itemid: 3, delay: "5", key: "debug3"}, } calls := []map[string][]int{ map[string][]int{"$start": []int{1, 5}, "$stop": []int{4, 6}}, map[string][]int{"$start": []int{1, 5, 7}, "$stop": []int{3, 6, 8}}, map[string][]int{"$start": []int{1, 5, 8}, "$stop": []int{2, 6}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[:1] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[1:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) update.requests = update.requests[1:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) } func checkWatchRequests(t *testing.T, p plugin.Accessor, requests []*plugin.Request) { tracker := p.(watchTracker) if !reflect.DeepEqual(tracker.watched(), requests) { expected := "" for _, r := range requests { expected += fmt.Sprintf("%+v,", *r) } returned := "" for _, r := range tracker.watched() { returned += fmt.Sprintf("%+v,", *r) } t.Errorf("Expected watch requests %s while got %s", expected, returned) } } func TestWatcher(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug1"}, &clientItem{itemid: 2, delay: "5", key: "debug2[1]"}, &clientItem{itemid: 3, delay: "5", key: "debug2[2]"}, &clientItem{itemid: 4, delay: "5", key: "debug3[1]"}, &clientItem{itemid: 5, delay: "5", key: "debug3[2]"}, &clientItem{itemid: 6, delay: "5", key: "debug3[3]"}, } calls := []map[string][]int{ map[string][]int{"$watch": []int{1, 2, 3, 4, 5}}, map[string][]int{"$watch": []int{1, 2, 3, 4, 5}}, map[string][]int{"$watch": []int{1, 2, 3, 5}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:3]) checkWatchRequests(t, plugins[2], update.requests[3:6]) update.requests = update.requests[:5] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:3]) checkWatchRequests(t, plugins[2], update.requests[3:5]) update.requests = update.requests[:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:3]) update.requests = update.requests[:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:2]) update.requests = update.requests[:6] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:3]) checkWatchRequests(t, plugins[2], update.requests[3:6]) } func TestCollectorExporterSchedule(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 1) for i := range plugins { plugins[i] = &mockCollectorExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "2", key: "debug[1]"}, &clientItem{itemid: 2, delay: "2", key: "debug[2]"}, &clientItem{itemid: 3, delay: "2", key: "debug[3]"}, } calls := []map[string][]int{ map[string][]int{"debug": []int{3, 3, 3, 5, 5, 5, 7, 7, 7, 9, 9, 9}, "$collect": []int{2, 4, 6, 8, 10}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 10) manager.checkPluginTimeline(t, plugins, calls, 10) } func TestRunnerWatcher(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockRunnerWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug1"}, &clientItem{itemid: 2, delay: "5", key: "debug2[1]"}, &clientItem{itemid: 3, delay: "5", key: "debug2[2]"}, &clientItem{itemid: 4, delay: "5", key: "debug3[1]"}, &clientItem{itemid: 5, delay: "5", key: "debug3[2]"}, &clientItem{itemid: 6, delay: "5", key: "debug3[3]"}, } calls := []map[string][]int{ map[string][]int{"$watch": []int{2, 6, 11, 16}, "$start": []int{1}, "$stop": []int{17}}, map[string][]int{"$watch": []int{2, 6, 11, 22, 26}, "$start": []int{1, 21}, "$stop": []int{12, 27}}, map[string][]int{"$watch": []int{2, 6, 27}, "$start": []int{1, 26}, "$stop": []int{7}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:3]) checkWatchRequests(t, plugins[2], update.requests[3:6]) update.requests = update.requests[:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) checkWatchRequests(t, plugins[0], update.requests[0:1]) checkWatchRequests(t, plugins[1], update.requests[1:3]) update.requests = update.requests[:1] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) checkWatchRequests(t, plugins[0], update.requests[0:1]) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[1:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) checkWatchRequests(t, plugins[1], update.requests[:2]) update.requests = update.requests[2:5] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) checkWatchRequests(t, plugins[2], update.requests[0:3]) } func TestMultiCollectorExporterSchedule(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 1) for i := range plugins { plugins[i] = &mockCollectorExporterPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, period: 2} plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "2", key: "debug[1]"}, } calls := []map[string][]int{ map[string][]int{"debug": []int{3, 3, 5, 5, 7, 9}, "$collect": []int{2, 4, 6, 8, 10}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) update.clientID = agent.MaxBuiltinClientID + 2 manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.clientID = agent.MaxBuiltinClientID + 1 manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) } func TestMultiRunnerWatcher(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 1) for i := range plugins { plugins[i] = &mockRunnerWatcherPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} plugin.RegisterMetrics(plugins[i], "debug", "debug", "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug[1]"}, &clientItem{itemid: 2, delay: "5", key: "debug[2]"}, &clientItem{itemid: 3, delay: "5", key: "debug[3]"}, } calls := []map[string][]int{ map[string][]int{"$watch": []int{2, 3, 6, 7, 11, 17, 21}, "$start": []int{1, 16}, "$stop": []int{12}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) update.clientID = agent.MaxBuiltinClientID + 2 manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.clientID = agent.MaxBuiltinClientID + 1 manager.update(&update) update.clientID = agent.MaxBuiltinClientID + 2 update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.clientID = agent.MaxBuiltinClientID + 1 manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:1] update.clientID = agent.MaxBuiltinClientID + 2 manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.clientID = agent.MaxBuiltinClientID + 1 manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) } func TestPassiveRunner(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { plugins[i] = &mockPassiveRunnerPlugin{Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}} name := fmt.Sprintf("debug%d", i+1) plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug1"}, &clientItem{itemid: 2, delay: "5", key: "debug2"}, &clientItem{itemid: 3, delay: "5", key: "debug3"}, } calls := []map[string][]int{ map[string][]int{"$start": []int{1}, "$stop": []int{}}, map[string][]int{"$start": []int{1}, "$stop": []int{3600*51 + 1}}, map[string][]int{"$start": []int{1}, "$stop": []int{3600*26 + 1}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.PassiveChecksClientID, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } manager.update(&update) manager.mockTasks() manager.iterate(t, 3600) manager.checkPluginTimeline(t, plugins, calls, 3600) update.requests = update.requests[:0] manager.update(&update) manager.mockTasks() manager.iterate(t, 3600) manager.checkPluginTimeline(t, plugins, calls, 3600) update.requests = update.requests[:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 3600*24) manager.checkPluginTimeline(t, plugins, calls, 3600*24) update.requests = update.requests[:1] manager.update(&update) manager.mockTasks() manager.iterate(t, 3600*25) manager.checkPluginTimeline(t, plugins, calls, 3600*25) update.requests = update.requests[:1] manager.update(&update) manager.mockTasks() manager.iterate(t, 1) manager.checkPluginTimeline(t, plugins, calls, 1) } type configuratorOption struct { Params interface{} `conf:"optional"` } func TestConfigurator(t *testing.T) { _ = log.Open(log.Console, log.Debug, "", 0) var opt1, opt2, opt3 configuratorOption _ = conf.Unmarshal([]byte("Delay=5"), &opt1) _ = conf.Unmarshal([]byte("Delay=30"), &opt2) _ = conf.Unmarshal([]byte("Delay=60"), &opt3) agent.Options.Plugins = map[string]interface{}{ "Debug1": opt1.Params, "Debug2": opt2.Params, "Debug3": opt3.Params, } manager := mockManager{sink: make(chan performer, 10)} plugin.ClearRegistry() plugins := make([]plugin.Accessor, 3) for i := range plugins { name := fmt.Sprintf("debug%d", i+1) plugins[i] = &mockConfiguratorPlugin{ Base: plugin.Base{}, mockPlugin: mockPlugin{now: &manager.now}, options: agent.Options.Plugins[name]} plugin.RegisterMetrics(plugins[i], name, name, "Debug.") } manager.mockInit(t) items := []*clientItem{ &clientItem{itemid: 1, delay: "5", key: "debug1"}, &clientItem{itemid: 2, delay: "5", key: "debug2"}, &clientItem{itemid: 3, delay: "5", key: "debug3"}, } calls := []map[string][]int{ map[string][]int{"$configure": []int{1}}, map[string][]int{"$configure": []int{6}}, map[string][]int{"$configure": []int{11}}, } var cache resultCacheMock update := updateRequest{ clientID: agent.MaxBuiltinClientID + 1, sink: &cache, requests: make([]*plugin.Request, 0), } var lastLogsize uint64 var mtime int for _, item := range items { update.requests = append(update.requests, &plugin.Request{ Itemid: item.itemid, Key: item.key, Delay: item.delay, LastLogsize: &lastLogsize, Mtime: &mtime, }) } update.requests = update.requests[:1] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:2] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) update.requests = update.requests[:3] manager.update(&update) manager.mockTasks() manager.iterate(t, 5) manager.checkPluginTimeline(t, plugins, calls, 5) }