/* ** Zabbix ** Copyright (C) 2001-2023 Zabbix SIA ** ** This program is free software; you can redistribute it and/or modify ** it under the terms of the GNU General Public License as published by ** the Free Software Foundation; either version 2 of the License, or ** (at your option) any later version. ** ** This program is distributed in the hope that it will be useful, ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** GNU General Public License for more details. ** ** You should have received a copy of the GNU General Public License ** along with this program; if not, write to the Free Software ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. **/ // watch package provides utility functionality for easier Watcher plugin implementation. // Watcher plugin listens for events specified by the item requests and comnverts those // events to item values. This package handles event source initialization/deinitialization, // event filtering and conversion to item values/errors. package watch import ( "sync" "time" "git.zabbix.com/ap/plugin-support/plugin" ) // EventSource generates events by calling Manager.Notify() method and passing arbitrary // data. The data is converted to string values by event filter that was created by the // event source during Manager update. type EventSource interface { // initialize event source and start generating events Initialize() error // stop generating events and release internal resources allocated by event source Release() // create new event filter based on item key NewFilter(key string) (filter EventFilter, err error) } // EventProvider interface provides methods to get event source by item key. An new or existing // event source can be returned. EventProvider is required to create Manager instance. type EventProvider interface { EventSourceByKey(key string) (EventSource, error) } // EventFilter has two responsibilities. The first is to convert data to a string value. // The second is optional - based on internal filter parameters (item key for example) // it can make the data to be ignored by returning nil value. type EventFilter interface { // Process processes data from event source and returns: // value - data is valid and matches filter // error - invalid data // nothing - data is valid, but does not match the filter Process(data interface{}) (value *string, err error) } // eventWriter links event filter to output sink where the filtered results must be written type eventWriter struct { output plugin.ResultWriter filter EventFilter } // Item to monitor type Item struct { Key string Updated time.Time } type Subscriber struct { Itemid uint64 Clientid uint64 } // Client represents monitoring instance - Zabbix server or proxy type Client struct { // the client ID ID uint64 // the items monitored by client: itemid->Item Items map[uint64]*Item } type Manager struct { eventProvider EventProvider clients map[uint64]*Client subscriptions map[EventSource]map[Subscriber]*eventWriter mutex sync.Mutex } // Update updates monitored items for the specified client based on new requests. func (m *Manager) Update(clientid uint64, output plugin.ResultWriter, requests []*plugin.Request) { var client *Client var ok bool // find or create client if client, ok = m.clients[clientid]; !ok { client = &Client{ID: clientid, Items: make(map[uint64]*Item)} m.clients[clientid] = client } // temporary event source error cache failedEventSources := make(map[EventSource]error) now := time.Now() for _, r := range requests { var sub map[Subscriber]*eventWriter subscriber := Subscriber{Clientid: client.ID, Itemid: r.Itemid} if item, ok := client.Items[r.Itemid]; ok { // remove existing subscription if item key was changed if item.Key != r.Key { if es, err := m.eventProvider.EventSourceByKey(item.Key); err == nil { if sub, ok := m.subscriptions[es]; ok { delete(sub, subscriber) } item.Key = r.Key } else { output.Write(&plugin.Result{Itemid: r.Itemid, Ts: now, Error: err}) } } item.Updated = now } else { // register new item to be monitored client.Items[r.Itemid] = &Item{Key: r.Key, Updated: now} } // subscribe new or changed item if es, err := m.eventProvider.EventSourceByKey(r.Key); err == nil { // initialize new event source if sub, ok = m.subscriptions[es]; !ok { // reuse event source initialization error if the initialization did already // fail for this batch if err, ok = failedEventSources[es]; !ok { if err = es.Initialize(); err == nil { sub = make(map[Subscriber]*eventWriter) m.subscriptions[es] = sub } else { // cache initialization error failedEventSources[es] = err } } if err != nil { output.Write(&plugin.Result{Itemid: r.Itemid, Ts: now, Error: err}) } } if sub != nil { if _, ok = sub[subscriber]; !ok { // create subscription if filter, err := es.NewFilter(r.Key); err != nil { output.Write(&plugin.Result{Itemid: r.Itemid, Ts: now, Error: err}) } else { sub[subscriber] = &eventWriter{output: output, filter: filter} } } } } else { output.Write(&plugin.Result{Itemid: r.Itemid, Ts: now, Error: err}) } } // remove unused subscriptions for itemid, item := range client.Items { if !item.Updated.Equal(now) { if es, err := m.eventProvider.EventSourceByKey(item.Key); err == nil { if sub, ok := m.subscriptions[es]; ok { delete(sub, Subscriber{Clientid: client.ID, Itemid: itemid}) } } delete(client.Items, itemid) } } // release unused event sources for es, sub := range m.subscriptions { if len(sub) == 0 { es.Release() delete(m.subscriptions, es) } } } // Notify method notifies manager about a new event from an event source. // Manager checks subscriptions, runs filters and writes the results to the corresponding // output sinks. func (m *Manager) Notify(es EventSource, data interface{}) { now := time.Now() if sub, ok := m.subscriptions[es]; ok { for source, writer := range sub { if value, err := writer.filter.Process(data); value != nil || err != nil { writer.output.Write(&plugin.Result{Itemid: source.Itemid, Ts: now, Value: value, Error: err}) } } } } // Flush method flushes all outputs that are subscribed to the specified event source. func (m *Manager) Flush(es EventSource) { if sub, ok := m.subscriptions[es]; ok { outputs := make([]plugin.ResultWriter, 0, len(sub)) for _, writer := range sub { found := false for _, output := range outputs { if writer.output == output { found = true break } } if !found { outputs = append(outputs, writer.output) writer.output.Flush() } } } } func (m *Manager) Lock() { m.mutex.Lock() } func (m *Manager) Unlock() { m.mutex.Unlock() } func NewManager(e EventProvider) (manager *Manager) { manager = &Manager{ clients: make(map[uint64]*Client), subscriptions: make(map[EventSource]map[Subscriber]*eventWriter), eventProvider: e, } return }