<?php /* ** Copyright (C) 2001-2025 Zabbix SIA ** ** This program is free software: you can redistribute it and/or modify it under the terms of ** the GNU Affero General Public License as published by the Free Software Foundation, version 3. ** ** This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; ** without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ** See the GNU Affero General Public License for more details. ** ** You should have received a copy of the GNU Affero General Public License along with this program. ** If not, see <https://www.gnu.org/licenses/>. **/ /** * A helper class for working with Elasticsearch. */ class CElasticsearchHelper { const MAX_RESULT_WINDOW = 10000; const KEEP_CONTEXT_PERIOD = '10s'; private static $scroll_id; private static $scrolls; /** * Perform request to Elasticsearch. * * @param string $method HTTP method to be used to perform request * @param string $endpoint requested url * @param mixed $request data to be sent * * @return string result */ private static function request($method, $endpoint, $request = null) { $time_start = microtime(true); $options = [ 'http' => [ 'header' => "Content-Type: application/json; charset=UTF-8", 'method' => $method, 'ignore_errors' => true // To get error messages from Elasticsearch. ] ]; if ($request) { $request = json_encode($request); $options['http']['content'] = $request; } try { $result = file_get_contents($endpoint, false, stream_context_create($options)); } catch (Exception $e) { error($e->getMessage()); } CProfiler::getInstance()->profileElasticsearch(microtime(true) - $time_start, $method, $endpoint, $request); return $result; } /** * Get Elasticsearch endpoint for scroll API requests. * Endpoint should be in following format: <Elasticsearch url>/<indices>/<values>/<action><query string>. * * @param string $endpoint endpoint of the initial request * * @return array parsed result */ private static function getScrollApiEndpoint($endpoint) { $url = $endpoint; for ($i = 0; $i < 2; $i++) { if (($pos = strrpos($url, '/')) !== false) { $url = substr($url, 0, $pos); } else { // Endpoint is in different format, no way to get scroll API url. error(_s('Elasticsearch error: %1$s.', _('cannot perform Scroll API request, data could be truncated')) ); return null; } } return $url.'/_search/scroll'; } /** * Perform request(s) to Elasticsearch and parse the results. * * @param string $method HTTP method to be used to perform request * @param string $endpoint requested url * @param mixed $request data to be sent * * @return array parsed result */ public static function query($method, $endpoint, $request = null) { $parse_as = ELASTICSEARCH_RESPONSE_PLAIN; // For non-search requests no additional parsing is done. if (substr($endpoint, -strlen('/_search')) === '/_search') { $parse_as = ELASTICSEARCH_RESPONSE_DOCUMENTS; if (is_array($request) && array_key_exists('aggs', $request)) { $parse_as = (array_key_exists('size', $request) && $request['size'] == 0) ? ELASTICSEARCH_RESPONSE_AGGREGATION : ELASTICSEARCH_RESPONSE_PLAIN; } } if (is_array($request) && (!array_key_exists('size', $request) || $request['size'] > self::MAX_RESULT_WINDOW)) { // Scroll API should be used to retrieve all data. $results = []; $limit = array_key_exists('size', $request) ? $request['size'] : null; $request['size'] = self::MAX_RESULT_WINDOW; self::$scroll_id = null; self::$scrolls = []; $scroll_endpoint = self::getScrollApiEndpoint($endpoint); if ($scroll_endpoint !== null) { $endpoint .= '?scroll='.self::KEEP_CONTEXT_PERIOD; } $slice = self::parseResult(self::request($method, $endpoint, $request), $parse_as); $results = array_merge($results, $slice); if (self::$scroll_id === null) { $slice = null; // Reset slice if there is no scroll_id. } $endpoint = $scroll_endpoint; while ($slice) { if (count($slice) < self::MAX_RESULT_WINDOW) { // No need to continue as there are no more data. break; } $scroll = [ 'scroll' => self::KEEP_CONTEXT_PERIOD, 'scroll_id' => self::$scroll_id ]; $slice = self::parseResult(self::request($method, $endpoint, $scroll), $parse_as); $results = array_merge($results, $slice); if ($limit !== null && count($results) >= $limit) { $results = array_slice($results, 0, $limit); // No need to perform additional queries as limit is reached. break; } } // Scrolls should be deleted when they are not required anymore. if (count(self::$scrolls) > 0) { self::request('DELETE', $endpoint, ['scroll_id' => array_keys(self::$scrolls)]); } return $results; } return self::parseResult(self::request($method, $endpoint, $request), $parse_as); } /** * Parse result based on request data. * * @param string $data result as a string * @param int $parse_as result type * * @return array parsed result */ private static function parseResult($data, $parse_as) { $result = json_decode($data, TRUE); if (!is_array($result)) { error(_s('Elasticsearch error: %1$s.', _('failed to parse JSON'))); return []; } if (array_key_exists('error', $result)) { $error = (is_array($result['error']) && array_key_exists('reason', $result['error'])) ? $result['error']['reason'] : _('Unknown error'); error(_s('Elasticsearch error: %1$s.', $error)); return []; } if (array_key_exists('_scroll_id', $result)) { self::$scroll_id = $result['_scroll_id']; self::$scrolls[self::$scroll_id] = true; } switch ($parse_as) { // Return aggregations only. case ELASTICSEARCH_RESPONSE_AGGREGATION: if (array_key_exists('aggregations', $result) && is_array($result['aggregations'])) { return $result['aggregations']; } break; // Return documents only. case ELASTICSEARCH_RESPONSE_DOCUMENTS: if (array_key_exists('hits', $result) && array_key_exists('hits', $result['hits'])) { $values = []; foreach ($result['hits']['hits'] as $row) { if (!array_key_exists('_source', $row)) { continue; } $values[] = $row['_source']; } return $values; } break; // Return result "as is". case ELASTICSEARCH_RESPONSE_PLAIN: return $result; } return []; } /** * Add filters to Elasticsearch query. * * @param array $schema DB schema * @param array $query Elasticsearch query. * @param array $options Filtering options. * * @return array Elasticsearch query with added filtering */ public static function addFilter($schema, $query, $options) { foreach ($options['filter'] as $field => $value) { // Skip missing fields, textual fields (different mapping is needed for exact matching) and empty values. if ($value === null || !array_key_exists($field, $schema['fields']) || $schema['fields'][$field]['type'] & (DB::FIELD_TYPE_CHAR | DB::FIELD_TYPE_TEXT | DB::FIELD_TYPE_NCLOB)) { continue; } if ($options['searchByAny']) { $type = 'should'; $query['minimum_should_match'] = 1; } else { $type = 'must'; } $query['query']['bool'][$type][] = [ 'terms' => [ $field => $value ] ]; } return $query; } /** * Add search criteria to Elasticsearch query. * * @param array $schema DB schema * @param array $query Elasticsearch query * @param array $options search options * * @return array Elasticsearch query with added search criteria */ public static function addSearch($schema, $query, $options) { $start = $options['startSearch'] ? '' : '*'; $exclude = $options['excludeSearch'] ? 'must_not' : 'must'; if ($options['searchByAny']) { if (!$options['excludeSearch']) { $exclude = 'should'; } $query['minimum_should_match'] = 1; } foreach ($options['search'] as $field => $value) { // Skip missing fields, non textual fields and empty values. if ($value === null || !array_key_exists($field, $schema['fields']) || ($schema['fields'][$field]['type'] & (DB::FIELD_TYPE_TEXT | DB::FIELD_TYPE_NCLOB | DB::FIELD_TYPE_CHAR)) == 0) { continue; } foreach ($value as $phrase) { $phrase = str_replace('?', '\\?', $phrase); if (!$options['searchWildcardsEnabled']) { $phrase = str_replace('*', '\\*', $phrase); $criteria = [ 'wildcard' => [ $field => $start.$phrase.'*' ] ]; } else { $criteria = [ 'wildcard' => [ $field => $phrase ] ]; } if ($options['excludeSearch'] && $options['searchByAny']) { $query['query']['bool']['must_not']['bool']['should'][] = $criteria; } else { $query['query']['bool'][$exclude][] = $criteria; } } } return $query; } /** * Add sorting criteria to Elasticsearch query. * * @param array $query Elasticsearch query. * @param array $options Sorting options. * * @return array Elasticsearch query with added sorting options */ public static function addSort($query, $options) { foreach ($options['sortfield'] as $i => $sortfield) { // Add sort field to order. if (is_array($options['sortorder'])) { $sortorder = array_key_exists($i, $options['sortorder']) ? $options['sortorder'][$i] : ZBX_SORT_UP; } else { $sortorder = ($options['sortorder'] !== '') ? $options['sortorder'] : ZBX_SORT_UP; } if ($sortorder === ZBX_SORT_DOWN) { $query['sort'][$sortfield] = $sortorder; } else { $query['sort'][] = $sortfield; } } return $query; } }