/* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ package server import ( "bytes" "encoding/json" "fmt" "io" "net" "net/http" "strconv" "strings" "time" "git.zabbix.com/ZT/kafka-connector/kafka" "git.zabbix.com/ap/plugin-support/errs" "git.zabbix.com/ap/plugin-support/log" "git.zabbix.com/ap/plugin-support/zbxnet" ) const ( contentType = "Content-Type" applicationXndJSON = "application/x-ndjson" applicationJSON = "application/json" ) var _ http.ResponseWriter = &BufferedResponseWriter{} // BufferedResponseWriter response writer for http handler. type BufferedResponseWriter struct { w http.ResponseWriter buffer bytes.Buffer code int header http.Header } type handler struct { authToken string producer kafka.Producer allowedPeers *zbxnet.AllowedPeers } type event struct { EventID int `json:"eventid"` Data string `json:"data"` } type item struct { ItemID int `json:"itemid"` Data string `json:"data"` } // ServerInit initializes a http server with provided parameters. func ServerInit(port string, router http.Handler, timeout int) *http.Server { return &http.Server{ Addr: fmt.Sprintf(":%s", port), Handler: router, ReadHeaderTimeout: time.Duration(timeout) * time.Second, } } // Run starts the server. func Run(server *http.Server, cert, key string, tls bool, errors chan<- error) { if tls { runTLS(server, cert, key, errors) return } run(server, errors) } // NewRouter creates a mux http handler with all the routing handled. func NewRouter(producer *kafka.DefaultProducer, auth string, allowedIPs *zbxnet.AllowedPeers) http.Handler { router := http.NewServeMux() h := handler{ authToken: auth, producer: producer, allowedPeers: allowedIPs, } router.HandleFunc( "/api/v1/events", allowedMethodsMW( []string{http.MethodPost}, h.accessMW( errorHandlingMW(h.events), ), ), ) router.HandleFunc( "/api/v1/items", allowedMethodsMW( []string{http.MethodPost}, h.accessMW( errorHandlingMW(h.items), ), ), ) return notFoundMW(router) } // Header returns set headers. func (b *BufferedResponseWriter) Header() http.Header { return b.header } // Write writes data into buffer. func (b *BufferedResponseWriter) Write(data []byte) (int, error) { n, err := b.buffer.Write(data) if err != nil { return n, errs.Wrap(err, "failed to write data to response writer") } return n, nil } // WriteHeader sets code for response. func (b *BufferedResponseWriter) WriteHeader(code int) { b.code = code } // WriteResponse writes response as a json encoded text. func (b *BufferedResponseWriter) WriteResponse() { b.w.Header().Set("Content-Type", applicationJSON) b.w.Header().Set("X-Content-Type-Options", "nosniff") b.w.WriteHeader(b.code) for k, v := range b.header { for _, vv := range v { b.w.Header().Add(k, vv) } } _, err := b.w.Write(b.buffer.Bytes()) if err != nil { log.Errf("failed to write response %s", err) } } //nolint:revive // checks 3 things no reason to split up because of complexity func (h *handler) accessMW(handler http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { err := h.checkIP(r) if err != nil { write( w, http.StatusForbidden, jsonResponse( map[string]string{ "response": "fail", "error": fmt.Sprintf("ip validation failed, %s", err.Error()), }, ), ) return } if h.authToken != "" { code, err := h.validateBearerToken(r) if err != nil { write( w, code, jsonResponse( map[string]string{ "response": "fail", "error": fmt.Sprintf("bearer token validation failed, %s", err.Error()), }, ), ) return } } ct := r.Header.Get(contentType) if ct != "" && ct != applicationXndJSON { write( w, http.StatusUnsupportedMediaType, jsonResponse( map[string]string{ "response": "fail", "error": fmt.Sprintf("%s header must contain %s", contentType, applicationXndJSON), }, ), ) return } handler(w, r) } } func (h *handler) checkIP(req *http.Request) error { host, _, err := net.SplitHostPort(req.RemoteAddr) if err != nil { return errs.Wrap(err, "failed to split request ip and port") } if !h.allowedPeers.CheckPeer(net.ParseIP(host)) { return errs.New("ip not allowed") } return nil } func (h *handler) validateBearerToken(r *http.Request) (int, error) { splitToken := strings.Split(r.Header.Get("Authorization"), "Bearer ") if len(splitToken) < 2 { return http.StatusBadRequest, errs.New("failed to retrieve bearer auth token") } if h.authToken != splitToken[1] { return http.StatusUnauthorized, errs.New("incorrect bearer auth token") } return 0, nil } func (h handler) events(w http.ResponseWriter, r *http.Request) error { events, err := decodeEvents(r.Body) if err != nil { return errs.Wrap(err, "failed to read request") } if len(events) == 0 { return errs.New("empty request") } for _, v := range events { h.producer.ProduceEvent(strconv.Itoa(v.EventID), v.Data) } write( w, http.StatusCreated, jsonResponse( map[string]string{ "response": "success", }, ), ) return nil } func (h handler) items(w http.ResponseWriter, r *http.Request) error { items, err := decodeItems(r.Body) if err != nil { return errs.Wrap(err, "failed to read request") } if len(items) == 0 { return errs.New("empty request") } for _, v := range items { h.producer.ProduceItem(strconv.Itoa(v.ItemID), v.Data) } write( w, http.StatusCreated, jsonResponse( map[string]string{ "response": "success", }, ), ) return nil } func notFoundMW(handler http.Handler) http.Handler { return http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { bw := &BufferedResponseWriter{ w: w, header: http.Header{}, code: http.StatusOK, } handler.ServeHTTP(bw, r) if bw.code == http.StatusNotFound { write( w, http.StatusNotFound, jsonResponse( map[string]string{ "response": "fail", "error": http.StatusText(http.StatusNotFound), }, ), ) return } bw.WriteResponse() }, ) } func allowedMethodsMW(allowedMethods []string, handler http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { for _, method := range allowedMethods { if r.Method == method { handler(w, r) return } } write( w, http.StatusMethodNotAllowed, jsonResponse( map[string]string{ "response": "fail", "error": http.StatusText(http.StatusMethodNotAllowed), }, ), ) } } func errorHandlingMW( handler func(w http.ResponseWriter, r *http.Request) error, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { err := handler(w, r) if err != nil { log.Errf("failed handle request, %s", err.Error()) write( w, http.StatusInternalServerError, jsonResponse( map[string]string{ "response": "fail", "error": err.Error(), }, ), ) } } } func write(w http.ResponseWriter, status int, message string) { w.WriteHeader(status) _, err := w.Write([]byte(message)) if err != nil { log.Errf("failed to write response, %s", err) } } func decodeEvents(r io.Reader) ([]event, error) { var ( d any events []event ) decoder := json.NewDecoder(r) for decoder.More() { err := decoder.Decode(&d) if err != nil { return nil, errs.Wrap(err, "failed to decode incoming item data") } b, err := json.Marshal(d) if err != nil { return nil, errs.Wrap(err, "failed to marshal incoming item data") } var e event err = json.Unmarshal(b, &e) if err != nil { return nil, errs.Wrap(err, "failed to unmarshal incoming item data") } e.Data = string(b) log.Tracef("Received event with ID %d", e.EventID) events = append(events, e) } return events, nil } func decodeItems(r io.Reader) ([]item, error) { var ( d any items []item ) decoder := json.NewDecoder(r) for decoder.More() { err := decoder.Decode(&d) if err != nil { return nil, errs.Wrap(err, "failed to decode incoming item data") } b, err := json.Marshal(d) if err != nil { return nil, errs.Wrap(err, "failed to marshal incoming item data") } var i item err = json.Unmarshal(b, &i) if err != nil { return nil, errs.Wrap(err, "failed to unmarshal incoming item data") } i.Data = string(b) log.Tracef("Received item with ID %d", i.ItemID) items = append(items, i) } return items, nil } func run(server *http.Server, e chan<- error) { err := server.ListenAndServe() if err != nil { e <- errs.Wrap(err, "failed to start the server") } } func runTLS(server *http.Server, cert, key string, e chan<- error) { err := validateTLS(cert, key) if err != nil { e <- errs.Wrap(err, "failed to start the server") return } err = server.ListenAndServeTLS(cert, key) if err != nil { e <- errs.Wrap(err, "failed to start the server") } } func validateTLS(certPath, keyPath string) error { if certPath == "" || keyPath == "" { return errs.New("both tls certificate and key file paths must be set") } return nil } func jsonResponse(msg map[string]string) string { out, err := json.Marshal(msg) if err != nil { log.Errf("failed to create json response, %s", err.Error()) return "{}" } return string(out) }