Migrate IndexToolsAdapter to OS Java client (#25030)

* migrate IndexToolsAdapter

* fix test

* simplify timestamp

---------

Co-authored-by: Tomas Dvorak <tomas.dvorak@graylog.com>
This commit is contained in:
Matthias Oesterheld
2026-03-03 15:45:38 +01:00
committed by GitHub
parent 5bc5da03d2
commit 256a90cd3d
7 changed files with 388 additions and 150 deletions

View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch2;
import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
import org.graylog.testing.elasticsearch.SearchInstance;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.IndexToolsAdapterIT;
public class IndexToolsAdapterOS2IT extends IndexToolsAdapterIT {
@SearchInstance
public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create();
@Override
protected SearchServerInstance searchServer() {
return openSearchInstance;
}
}

View File

@@ -0,0 +1,169 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch3;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import jakarta.inject.Inject;
import org.graylog2.indexer.IndexToolsAdapter;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.opensearch.client.opensearch._types.ExpandWildcard;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.aggregations.Aggregation;
import org.opensearch.client.opensearch._types.aggregations.DateHistogramBucket;
import org.opensearch.client.opensearch._types.aggregations.FilterAggregate;
import org.opensearch.client.opensearch._types.aggregations.StringTermsBucket;
import org.opensearch.client.opensearch._types.query_dsl.BoolQuery;
import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class IndexToolsAdapterOS implements IndexToolsAdapter {
private static final String AGG_DATE_HISTOGRAM = "source_date_histogram";
private static final String AGG_MESSAGE_FIELD = "message_field";
private static final String AGG_FILTER = "message_filter";
private final OfficialOpensearchClient client;
@Inject
public IndexToolsAdapterOS(OfficialOpensearchClient client) {
this.client = client;
}
@Override
public Map<DateTime, Map<String, Long>> fieldHistogram(String fieldName, Set<String> indices,
Optional<Set<String>> includedStreams, long interval) {
final Query streamFilter = buildStreamIdFilter(includedStreams);
final Aggregation dateHistogramAgg = Aggregation.of(a -> a
.dateHistogram(dh -> dh
.field("timestamp")
.fixedInterval(fi -> fi.time(interval + "ms"))
// We use "min_doc_count" here to avoid empty buckets in the histogram result.
// This is needed to avoid out-of-memory errors when creating a histogram for a really large
// date range. See: https://github.com/Graylog2/graylog-plugin-archive/issues/59
.minDocCount(1)
)
.aggregations(AGG_MESSAGE_FIELD, Aggregation.of(ta -> ta
.terms(t -> t.field(fieldName))
))
);
final Aggregation filterAgg = Aggregation.of(a -> a
.filter(streamFilter)
.aggregations(AGG_DATE_HISTOGRAM, dateHistogramAgg)
);
final SearchRequest searchRequest = SearchRequest.of(sr -> sr
.index(indices.stream().toList())
.query(Query.of(q -> q.matchAll(m -> m)))
.aggregations(AGG_FILTER, filterAgg)
.size(0)
);
final SearchResponse<Void> searchResult = client.sync(
c -> c.search(searchRequest, Void.class),
"Unable to retrieve field histogram."
);
final FilterAggregate filterAggregate = searchResult.aggregations().get(AGG_FILTER).filter();
final List<DateHistogramBucket> histogramBuckets = filterAggregate.aggregations()
.get(AGG_DATE_HISTOGRAM).dateHistogram().buckets().array();
final Map<DateTime, Map<String, Long>> result = Maps.newHashMapWithExpectedSize(histogramBuckets.size());
for (final DateHistogramBucket bucket : histogramBuckets) {
final DateTime date = new DateTime(bucket.key(), DateTimeZone.UTC);
final List<StringTermsBucket> termBuckets = bucket.aggregations()
.get(AGG_MESSAGE_FIELD).sterms().buckets().array();
final HashMap<String, Long> termCounts = Maps.newHashMapWithExpectedSize(termBuckets.size());
for (final StringTermsBucket termBucket : termBuckets) {
termCounts.put(termBucket.key(), termBucket.docCount());
}
result.put(date, termCounts);
}
return ImmutableMap.copyOf(result);
}
@Override
public long count(Set<String> indices, Optional<Set<String>> includedStreams) {
final Query query = buildStreamIdFilter(includedStreams);
final SearchRequest searchRequest = SearchRequest.of(sr -> sr
.index(indices.stream().toList())
.query(query)
.ignoreUnavailable(true)
.allowNoIndices(true)
.expandWildcards(ExpandWildcard.Open)
.trackTotalHits(t -> t.enabled(true))
.size(0)
);
final SearchResponse<Void> response = client.sync(
c -> c.search(searchRequest, Void.class),
"Unable to count documents of index."
);
return response.hits().total().value();
}
private Query buildStreamIdFilter(Optional<Set<String>> includedStreams) {
BoolQuery.Builder queryBuilder = BoolQuery.builder().must(MatchAllQuery.builder().build().toQuery());
// If the included streams are not present, we do not filter on streams
if (includedStreams.isPresent()) {
final Set<String> streams = includedStreams.get();
final BoolQuery.Builder filterBuilder = new BoolQuery.Builder();
// If the included streams set contains the default stream, we also want all documents which do not
// have any stream assigned. Those documents have basically been in the "default stream" which didn't
// exist in Graylog <2.2.0.
if (streams.contains(Stream.DEFAULT_STREAM_ID)) {
final Query noStreamsField = Query.of(q -> q
.bool(b -> b.mustNot(mn -> mn.exists(e -> e.field(Message.FIELD_STREAMS))))
);
filterBuilder.should(noStreamsField);
}
// Only select messages which are assigned to the given streams
final Query termsQuery = Query.of(q -> q
.terms(t -> t
.field(Message.FIELD_STREAMS)
.terms(tv -> tv.value(streams.stream().map(FieldValue::of).toList()))
)
);
filterBuilder.should(termsQuery);
queryBuilder.filter(filterBuilder.build().toQuery());
}
return queryBuilder.build().toQuery();
}
}

View File

@@ -1,147 +0,0 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch3;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import jakarta.inject.Inject;
import org.graylog.shaded.opensearch2.org.opensearch.action.search.SearchRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.search.SearchResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.IndicesOptions;
import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountRequest;
import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountResponse;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.BoolQueryBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.AggregationBuilders;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.filter.Filter;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.terms.Terms;
import org.graylog.shaded.opensearch2.org.opensearch.search.builder.SearchSourceBuilder;
import org.graylog2.indexer.IndexToolsAdapter;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.boolQuery;
import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.existsQuery;
import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.termsQuery;
public class IndexToolsAdapterOS2 implements IndexToolsAdapter {
private static final String AGG_DATE_HISTOGRAM = "source_date_histogram";
private static final String AGG_MESSAGE_FIELD = "message_field";
private static final String AGG_FILTER = "message_filter";
private final OpenSearchClient client;
@Inject
public IndexToolsAdapterOS2(OpenSearchClient client) {
this.client = client;
}
@Override
public Map<DateTime, Map<String, Long>> fieldHistogram(String fieldName, Set<String> indices, Optional<Set<String>> includedStreams, long interval) {
final BoolQueryBuilder queryBuilder = buildStreamIdFilter(includedStreams);
final FilterAggregationBuilder the_filter = AggregationBuilders.filter(AGG_FILTER, queryBuilder)
.subAggregation(AggregationBuilders.dateHistogram(AGG_DATE_HISTOGRAM)
.field("timestamp")
.subAggregation(AggregationBuilders.terms(AGG_MESSAGE_FIELD).field(fieldName))
.fixedInterval(new DateHistogramInterval(interval + "ms"))
// We use "min_doc_count" here to avoid empty buckets in the histogram result.
// This is needed to avoid out-of-memory errors when creating a histogram for a really large
// date range. See: https://github.com/Graylog2/graylog-plugin-archive/issues/59
.minDocCount(1L));
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.aggregation(the_filter);
final SearchRequest searchRequest = new SearchRequest()
.source(searchSourceBuilder)
.indices(indices.toArray(new String[0]));
final SearchResponse searchResult = client.search(searchRequest, "Unable to retrieve field histogram.");
final Filter filterAggregation = searchResult.getAggregations().get(AGG_FILTER);
final ParsedDateHistogram dateHistogram = filterAggregation.getAggregations().get(AGG_DATE_HISTOGRAM);
final List<ParsedDateHistogram.ParsedBucket> histogramBuckets = (List<ParsedDateHistogram.ParsedBucket>) dateHistogram.getBuckets();
final Map<DateTime, Map<String, Long>> result = Maps.newHashMapWithExpectedSize(histogramBuckets.size());
for (ParsedDateHistogram.ParsedBucket bucket : histogramBuckets) {
final ZonedDateTime zonedDateTime = (ZonedDateTime) bucket.getKey();
final DateTime date = new DateTime(zonedDateTime.toInstant().toEpochMilli(), DateTimeZone.UTC);
final Terms sourceFieldAgg = bucket.getAggregations().get(AGG_MESSAGE_FIELD);
final List<? extends Terms.Bucket> termBuckets = sourceFieldAgg.getBuckets();
final HashMap<String, Long> termCounts = Maps.newHashMapWithExpectedSize(termBuckets.size());
for (Terms.Bucket termBucket : termBuckets) {
termCounts.put(termBucket.getKeyAsString(), termBucket.getDocCount());
}
result.put(date, termCounts);
}
return ImmutableMap.copyOf(result);
}
@Override
public long count(Set<String> indices, Optional<Set<String>> includedStreams) {
final CountRequest request = new CountRequest(indices.toArray(new String[0]), buildStreamIdFilter(includedStreams))
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
final CountResponse result = client.execute((c, requestOptions) -> c.count(request, requestOptions), "Unable to count documents of index.");
return result.getCount();
}
private BoolQueryBuilder buildStreamIdFilter(Optional<Set<String>> includedStreams) {
final BoolQueryBuilder queryBuilder = boolQuery().must(matchAllQuery());
// If the included streams are not present, we do not filter on streams
if (includedStreams.isPresent()) {
final Set<String> streams = includedStreams.get();
final BoolQueryBuilder filterBuilder = boolQuery();
// If the included streams set contains the default stream, we also want all documents which do not
// have any stream assigned. Those documents have basically been in the "default stream" which didn't
// exist in Graylog <2.2.0.
if (streams.contains(Stream.DEFAULT_STREAM_ID)) {
filterBuilder.should(boolQuery().mustNot(existsQuery(Message.FIELD_STREAMS)));
}
// Only select messages which are assigned to the given streams
filterBuilder.should(termsQuery(Message.FIELD_STREAMS, streams));
queryBuilder.filter(filterBuilder);
}
return queryBuilder;
}
}

View File

@@ -81,10 +81,12 @@ public class OpenSearch3Module extends VersionAwareModule {
bindForSupportedVersion(IndexTemplateAdapter.class).to(LegacyIndexTemplateAdapter.class);
}
bindForSupportedVersion(IndexFieldTypePollerAdapter.class).to(IndexFieldTypePollerAdapterOS.class);
bindForSupportedVersion(IndexToolsAdapter.class).to(IndexToolsAdapterOS2.class);
bindForSupportedVersion(IndexToolsAdapter.class).to(IndexToolsAdapterOS.class);
bindForSupportedVersion(MessagesAdapter.class).to(MessagesAdapterOS.class);
bindForSupportedVersion(MultiChunkResultRetriever.class).to(PaginationOS.class);
bindForSupportedVersion(MoreSearchAdapter.class).to(MoreSearchAdapterOS.class);
bindForSupportedVersion(NodeAdapter.class).to(NodeAdapterOS.class);
bindForSupportedVersion(SearchesAdapter.class).to(SearchesAdapterOS.class);
bindForSupportedVersion(V20170607164210_MigrateReopenedIndicesToAliases.ClusterState.class)

View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch3;
import org.graylog.storage.opensearch3.testing.OpenSearchInstance;
import org.graylog.testing.elasticsearch.SearchInstance;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.IndexToolsAdapterIT;
public class IndexToolsAdapterOSIT extends IndexToolsAdapterIT {
@SearchInstance
public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create();
@Override
protected SearchServerInstance searchServer() {
return openSearchInstance;
}
}

View File

@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.graylog.plugins.views.search.searchfilters.db.IgnoreSearchFilters;
import org.graylog.storage.opensearch3.CountsAdapterOS;
import org.graylog.storage.opensearch3.IndexFieldTypePollerAdapterOS;
import org.graylog.storage.opensearch3.IndexToolsAdapterOS2;
import org.graylog.storage.opensearch3.IndexToolsAdapterOS;
import org.graylog.storage.opensearch3.IndicesAdapterOS;
import org.graylog.storage.opensearch3.MessagesAdapterOS;
import org.graylog.storage.opensearch3.NodeAdapterOS;
@@ -101,7 +101,7 @@ public class AdaptersOS implements Adapters {
@Override
public IndexToolsAdapter indexToolsAdapter() {
return new IndexToolsAdapterOS2(client);
return new IndexToolsAdapterOS(officialOpensearchClient);
}
@Override

View File

@@ -0,0 +1,148 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.indexer;
import org.graylog.testing.elasticsearch.BulkIndexRequest;
import org.graylog.testing.elasticsearch.ElasticsearchBaseTest;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class IndexToolsAdapterIT extends ElasticsearchBaseTest {
private static final String INDEX_NAME = "graylog_0";
private static final String STREAM_1 = "000000000000000000000001";
private static final String STREAM_2 = "000000000000000000000002";
private IndexToolsAdapter adapter;
@BeforeEach
public void setUp() {
client().createIndex(INDEX_NAME);
client().waitForGreenStatus(INDEX_NAME);
this.adapter = searchServer().adapters().indexToolsAdapter();
}
@Test
public void countReturnsZeroForEmptyIndex() {
final long count = adapter.count(Set.of(INDEX_NAME), Optional.empty());
assertThat(count).isEqualTo(0L);
}
@Test
public void countWithoutStreamFilter() {
indexTestDocuments();
final long count = adapter.count(Set.of(INDEX_NAME), Optional.empty());
assertThat(count).isEqualTo(4L);
}
@Test
public void countWithStreamFilter() {
indexTestDocuments();
final long countStream1 = adapter.count(Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_1)));
assertThat(countStream1).isEqualTo(3L);
final long countStream2 = adapter.count(Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_2)));
assertThat(countStream2).isEqualTo(2L);
final long countBothStreams = adapter.count(Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_1, STREAM_2)));
assertThat(countBothStreams).isEqualTo(4L);
}
@Test
public void fieldHistogramReturnsGroupedResults() {
indexTestDocuments();
final Map<DateTime, Map<String, Long>> result = adapter.fieldHistogram(
"source", Set.of(INDEX_NAME), Optional.empty(), 3600000L
);
assertThat(result).isNotEmpty();
long totalSourceCounts = result.values().stream()
.flatMap(m -> m.values().stream())
.mapToLong(Long::longValue)
.sum();
assertThat(totalSourceCounts).isEqualTo(4L);
}
@Test
public void fieldHistogramWithStreamFilter() {
indexTestDocuments();
final Map<DateTime, Map<String, Long>> result = adapter.fieldHistogram(
"source", Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_1)), 3600000L
);
assertThat(result).isNotEmpty();
long totalSourceCounts = result.values().stream()
.flatMap(m -> m.values().stream())
.mapToLong(Long::longValue)
.sum();
assertThat(totalSourceCounts).isEqualTo(3L);
}
private void indexTestDocuments() {
final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC();
final String timestamp = formatter.print(DateTime.now(DateTimeZone.UTC));
final BulkIndexRequest bulkRequest = new BulkIndexRequest();
bulkRequest.addRequest(INDEX_NAME, Map.of(
"message", "test message 1",
"source", "source-a",
"timestamp", timestamp,
"streams", java.util.List.of(STREAM_1)
));
bulkRequest.addRequest(INDEX_NAME, Map.of(
"message", "test message 2",
"source", "source-a",
"timestamp", timestamp,
"streams", java.util.List.of(STREAM_1, STREAM_2)
));
bulkRequest.addRequest(INDEX_NAME, Map.of(
"message", "test message 3",
"source", "source-b",
"timestamp", timestamp,
"streams", java.util.List.of(STREAM_2)
));
bulkRequest.addRequest(INDEX_NAME, Map.of(
"message", "test message 4",
"source", "source-a",
"timestamp", timestamp,
"streams", java.util.List.of(STREAM_1)
));
client().bulkIndex(bulkRequest);
client().refreshNode();
}
}