diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/IndexToolsAdapterOS2IT.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/IndexToolsAdapterOS2IT.java new file mode 100644 index 0000000000..c84034d21c --- /dev/null +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/IndexToolsAdapterOS2IT.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.storage.opensearch2; + +import org.graylog.storage.opensearch2.testing.OpenSearchInstance; +import org.graylog.testing.elasticsearch.SearchInstance; +import org.graylog.testing.elasticsearch.SearchServerInstance; +import org.graylog2.indexer.IndexToolsAdapterIT; + +public class IndexToolsAdapterOS2IT extends IndexToolsAdapterIT { + + @SearchInstance + public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create(); + + @Override + protected SearchServerInstance searchServer() { + return openSearchInstance; + } +} diff --git a/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/IndexToolsAdapterOS.java b/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/IndexToolsAdapterOS.java new file mode 100644 index 0000000000..758606a815 --- /dev/null +++ b/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/IndexToolsAdapterOS.java @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.storage.opensearch3; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import jakarta.inject.Inject; +import org.graylog2.indexer.IndexToolsAdapter; +import org.graylog2.plugin.Message; +import org.graylog2.plugin.streams.Stream; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.opensearch.client.opensearch._types.ExpandWildcard; +import org.opensearch.client.opensearch._types.FieldValue; +import org.opensearch.client.opensearch._types.aggregations.Aggregation; +import org.opensearch.client.opensearch._types.aggregations.DateHistogramBucket; +import org.opensearch.client.opensearch._types.aggregations.FilterAggregate; +import org.opensearch.client.opensearch._types.aggregations.StringTermsBucket; +import org.opensearch.client.opensearch._types.query_dsl.BoolQuery; +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class IndexToolsAdapterOS implements IndexToolsAdapter { + private static final String AGG_DATE_HISTOGRAM = "source_date_histogram"; + private static final String AGG_MESSAGE_FIELD = "message_field"; + private static final String AGG_FILTER = "message_filter"; + private final OfficialOpensearchClient client; + + @Inject + public IndexToolsAdapterOS(OfficialOpensearchClient client) { + this.client = client; + } + + @Override + public Map> fieldHistogram(String fieldName, Set indices, + Optional> includedStreams, long interval) { + final Query streamFilter = buildStreamIdFilter(includedStreams); + + final Aggregation dateHistogramAgg = Aggregation.of(a -> a + .dateHistogram(dh -> dh + .field("timestamp") + .fixedInterval(fi -> fi.time(interval + "ms")) + // We use "min_doc_count" here to avoid empty buckets in the histogram result. + // This is needed to avoid out-of-memory errors when creating a histogram for a really large + // date range. See: https://github.com/Graylog2/graylog-plugin-archive/issues/59 + .minDocCount(1) + ) + .aggregations(AGG_MESSAGE_FIELD, Aggregation.of(ta -> ta + .terms(t -> t.field(fieldName)) + )) + ); + + final Aggregation filterAgg = Aggregation.of(a -> a + .filter(streamFilter) + .aggregations(AGG_DATE_HISTOGRAM, dateHistogramAgg) + ); + + final SearchRequest searchRequest = SearchRequest.of(sr -> sr + .index(indices.stream().toList()) + .query(Query.of(q -> q.matchAll(m -> m))) + .aggregations(AGG_FILTER, filterAgg) + .size(0) + ); + + final SearchResponse searchResult = client.sync( + c -> c.search(searchRequest, Void.class), + "Unable to retrieve field histogram." + ); + + final FilterAggregate filterAggregate = searchResult.aggregations().get(AGG_FILTER).filter(); + final List histogramBuckets = filterAggregate.aggregations() + .get(AGG_DATE_HISTOGRAM).dateHistogram().buckets().array(); + + final Map> result = Maps.newHashMapWithExpectedSize(histogramBuckets.size()); + + for (final DateHistogramBucket bucket : histogramBuckets) { + final DateTime date = new DateTime(bucket.key(), DateTimeZone.UTC); + + final List termBuckets = bucket.aggregations() + .get(AGG_MESSAGE_FIELD).sterms().buckets().array(); + + final HashMap termCounts = Maps.newHashMapWithExpectedSize(termBuckets.size()); + for (final StringTermsBucket termBucket : termBuckets) { + termCounts.put(termBucket.key(), termBucket.docCount()); + } + + result.put(date, termCounts); + } + + return ImmutableMap.copyOf(result); + } + + @Override + public long count(Set indices, Optional> includedStreams) { + final Query query = buildStreamIdFilter(includedStreams); + + final SearchRequest searchRequest = SearchRequest.of(sr -> sr + .index(indices.stream().toList()) + .query(query) + .ignoreUnavailable(true) + .allowNoIndices(true) + .expandWildcards(ExpandWildcard.Open) + .trackTotalHits(t -> t.enabled(true)) + .size(0) + ); + + final SearchResponse response = client.sync( + c -> c.search(searchRequest, Void.class), + "Unable to count documents of index." + ); + + return response.hits().total().value(); + } + + private Query buildStreamIdFilter(Optional> includedStreams) { + BoolQuery.Builder queryBuilder = BoolQuery.builder().must(MatchAllQuery.builder().build().toQuery()); + + // If the included streams are not present, we do not filter on streams + if (includedStreams.isPresent()) { + final Set streams = includedStreams.get(); + final BoolQuery.Builder filterBuilder = new BoolQuery.Builder(); + + // If the included streams set contains the default stream, we also want all documents which do not + // have any stream assigned. Those documents have basically been in the "default stream" which didn't + // exist in Graylog <2.2.0. + if (streams.contains(Stream.DEFAULT_STREAM_ID)) { + final Query noStreamsField = Query.of(q -> q + .bool(b -> b.mustNot(mn -> mn.exists(e -> e.field(Message.FIELD_STREAMS)))) + ); + filterBuilder.should(noStreamsField); + } + + // Only select messages which are assigned to the given streams + final Query termsQuery = Query.of(q -> q + .terms(t -> t + .field(Message.FIELD_STREAMS) + .terms(tv -> tv.value(streams.stream().map(FieldValue::of).toList())) + ) + ); + filterBuilder.should(termsQuery); + + queryBuilder.filter(filterBuilder.build().toQuery()); + } + + return queryBuilder.build().toQuery(); + } +} diff --git a/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/IndexToolsAdapterOS2.java b/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/IndexToolsAdapterOS2.java deleted file mode 100644 index 74bb82e759..0000000000 --- a/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/IndexToolsAdapterOS2.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.storage.opensearch3; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import jakarta.inject.Inject; -import org.graylog.shaded.opensearch2.org.opensearch.action.search.SearchRequest; -import org.graylog.shaded.opensearch2.org.opensearch.action.search.SearchResponse; -import org.graylog.shaded.opensearch2.org.opensearch.action.support.IndicesOptions; -import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountRequest; -import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountResponse; -import org.graylog.shaded.opensearch2.org.opensearch.index.query.BoolQueryBuilder; -import org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders; -import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.AggregationBuilders; -import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.filter.Filter; -import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder; -import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.histogram.ParsedDateHistogram; -import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.bucket.terms.Terms; -import org.graylog.shaded.opensearch2.org.opensearch.search.builder.SearchSourceBuilder; -import org.graylog2.indexer.IndexToolsAdapter; -import org.graylog2.plugin.Message; -import org.graylog2.plugin.streams.Stream; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; - -import java.time.ZonedDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.boolQuery; -import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.existsQuery; -import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.matchAllQuery; -import static org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders.termsQuery; - -public class IndexToolsAdapterOS2 implements IndexToolsAdapter { - private static final String AGG_DATE_HISTOGRAM = "source_date_histogram"; - private static final String AGG_MESSAGE_FIELD = "message_field"; - private static final String AGG_FILTER = "message_filter"; - private final OpenSearchClient client; - - @Inject - public IndexToolsAdapterOS2(OpenSearchClient client) { - this.client = client; - } - - @Override - public Map> fieldHistogram(String fieldName, Set indices, Optional> includedStreams, long interval) { - final BoolQueryBuilder queryBuilder = buildStreamIdFilter(includedStreams); - - final FilterAggregationBuilder the_filter = AggregationBuilders.filter(AGG_FILTER, queryBuilder) - .subAggregation(AggregationBuilders.dateHistogram(AGG_DATE_HISTOGRAM) - .field("timestamp") - .subAggregation(AggregationBuilders.terms(AGG_MESSAGE_FIELD).field(fieldName)) - .fixedInterval(new DateHistogramInterval(interval + "ms")) - // We use "min_doc_count" here to avoid empty buckets in the histogram result. - // This is needed to avoid out-of-memory errors when creating a histogram for a really large - // date range. See: https://github.com/Graylog2/graylog-plugin-archive/issues/59 - .minDocCount(1L)); - - final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()) - .aggregation(the_filter); - - final SearchRequest searchRequest = new SearchRequest() - .source(searchSourceBuilder) - .indices(indices.toArray(new String[0])); - final SearchResponse searchResult = client.search(searchRequest, "Unable to retrieve field histogram."); - - final Filter filterAggregation = searchResult.getAggregations().get(AGG_FILTER); - final ParsedDateHistogram dateHistogram = filterAggregation.getAggregations().get(AGG_DATE_HISTOGRAM); - - - final List histogramBuckets = (List) dateHistogram.getBuckets(); - final Map> result = Maps.newHashMapWithExpectedSize(histogramBuckets.size()); - - for (ParsedDateHistogram.ParsedBucket bucket : histogramBuckets) { - final ZonedDateTime zonedDateTime = (ZonedDateTime) bucket.getKey(); - final DateTime date = new DateTime(zonedDateTime.toInstant().toEpochMilli(), DateTimeZone.UTC); - - final Terms sourceFieldAgg = bucket.getAggregations().get(AGG_MESSAGE_FIELD); - final List termBuckets = sourceFieldAgg.getBuckets(); - - final HashMap termCounts = Maps.newHashMapWithExpectedSize(termBuckets.size()); - - for (Terms.Bucket termBucket : termBuckets) { - termCounts.put(termBucket.getKeyAsString(), termBucket.getDocCount()); - } - - result.put(date, termCounts); - } - - return ImmutableMap.copyOf(result); - } - - @Override - public long count(Set indices, Optional> includedStreams) { - final CountRequest request = new CountRequest(indices.toArray(new String[0]), buildStreamIdFilter(includedStreams)) - .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - - final CountResponse result = client.execute((c, requestOptions) -> c.count(request, requestOptions), "Unable to count documents of index."); - - return result.getCount(); - } - - private BoolQueryBuilder buildStreamIdFilter(Optional> includedStreams) { - final BoolQueryBuilder queryBuilder = boolQuery().must(matchAllQuery()); - - // If the included streams are not present, we do not filter on streams - if (includedStreams.isPresent()) { - final Set streams = includedStreams.get(); - final BoolQueryBuilder filterBuilder = boolQuery(); - - // If the included streams set contains the default stream, we also want all documents which do not - // have any stream assigned. Those documents have basically been in the "default stream" which didn't - // exist in Graylog <2.2.0. - if (streams.contains(Stream.DEFAULT_STREAM_ID)) { - filterBuilder.should(boolQuery().mustNot(existsQuery(Message.FIELD_STREAMS))); - } - - // Only select messages which are assigned to the given streams - filterBuilder.should(termsQuery(Message.FIELD_STREAMS, streams)); - - queryBuilder.filter(filterBuilder); - } - - return queryBuilder; - } -} diff --git a/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/OpenSearch3Module.java b/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/OpenSearch3Module.java index ddec8e3fa9..09dd52ee4a 100644 --- a/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/OpenSearch3Module.java +++ b/graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/OpenSearch3Module.java @@ -81,10 +81,12 @@ public class OpenSearch3Module extends VersionAwareModule { bindForSupportedVersion(IndexTemplateAdapter.class).to(LegacyIndexTemplateAdapter.class); } bindForSupportedVersion(IndexFieldTypePollerAdapter.class).to(IndexFieldTypePollerAdapterOS.class); - bindForSupportedVersion(IndexToolsAdapter.class).to(IndexToolsAdapterOS2.class); + + bindForSupportedVersion(IndexToolsAdapter.class).to(IndexToolsAdapterOS.class); bindForSupportedVersion(MessagesAdapter.class).to(MessagesAdapterOS.class); bindForSupportedVersion(MultiChunkResultRetriever.class).to(PaginationOS.class); bindForSupportedVersion(MoreSearchAdapter.class).to(MoreSearchAdapterOS.class); + bindForSupportedVersion(NodeAdapter.class).to(NodeAdapterOS.class); bindForSupportedVersion(SearchesAdapter.class).to(SearchesAdapterOS.class); bindForSupportedVersion(V20170607164210_MigrateReopenedIndicesToAliases.ClusterState.class) diff --git a/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/IndexToolsAdapterOSIT.java b/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/IndexToolsAdapterOSIT.java new file mode 100644 index 0000000000..3970f207cb --- /dev/null +++ b/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/IndexToolsAdapterOSIT.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.storage.opensearch3; + +import org.graylog.storage.opensearch3.testing.OpenSearchInstance; +import org.graylog.testing.elasticsearch.SearchInstance; +import org.graylog.testing.elasticsearch.SearchServerInstance; +import org.graylog2.indexer.IndexToolsAdapterIT; + +public class IndexToolsAdapterOSIT extends IndexToolsAdapterIT { + + @SearchInstance + public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create(); + + @Override + protected SearchServerInstance searchServer() { + return openSearchInstance; + } +} diff --git a/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/testing/AdaptersOS.java b/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/testing/AdaptersOS.java index d56ecf085d..cdf90fc09a 100644 --- a/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/testing/AdaptersOS.java +++ b/graylog-storage-opensearch3/src/test/java/org/graylog/storage/opensearch3/testing/AdaptersOS.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.graylog.plugins.views.search.searchfilters.db.IgnoreSearchFilters; import org.graylog.storage.opensearch3.CountsAdapterOS; import org.graylog.storage.opensearch3.IndexFieldTypePollerAdapterOS; -import org.graylog.storage.opensearch3.IndexToolsAdapterOS2; +import org.graylog.storage.opensearch3.IndexToolsAdapterOS; import org.graylog.storage.opensearch3.IndicesAdapterOS; import org.graylog.storage.opensearch3.MessagesAdapterOS; import org.graylog.storage.opensearch3.NodeAdapterOS; @@ -101,7 +101,7 @@ public class AdaptersOS implements Adapters { @Override public IndexToolsAdapter indexToolsAdapter() { - return new IndexToolsAdapterOS2(client); + return new IndexToolsAdapterOS(officialOpensearchClient); } @Override diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/IndexToolsAdapterIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/IndexToolsAdapterIT.java new file mode 100644 index 0000000000..5c2a173b75 --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog2/indexer/IndexToolsAdapterIT.java @@ -0,0 +1,148 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.indexer; + +import org.graylog.testing.elasticsearch.BulkIndexRequest; +import org.graylog.testing.elasticsearch.ElasticsearchBaseTest; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class IndexToolsAdapterIT extends ElasticsearchBaseTest { + + private static final String INDEX_NAME = "graylog_0"; + private static final String STREAM_1 = "000000000000000000000001"; + private static final String STREAM_2 = "000000000000000000000002"; + + private IndexToolsAdapter adapter; + + @BeforeEach + public void setUp() { + client().createIndex(INDEX_NAME); + client().waitForGreenStatus(INDEX_NAME); + this.adapter = searchServer().adapters().indexToolsAdapter(); + } + + @Test + public void countReturnsZeroForEmptyIndex() { + final long count = adapter.count(Set.of(INDEX_NAME), Optional.empty()); + assertThat(count).isEqualTo(0L); + } + + @Test + public void countWithoutStreamFilter() { + indexTestDocuments(); + + final long count = adapter.count(Set.of(INDEX_NAME), Optional.empty()); + assertThat(count).isEqualTo(4L); + } + + @Test + public void countWithStreamFilter() { + indexTestDocuments(); + + final long countStream1 = adapter.count(Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_1))); + assertThat(countStream1).isEqualTo(3L); + + final long countStream2 = adapter.count(Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_2))); + assertThat(countStream2).isEqualTo(2L); + + final long countBothStreams = adapter.count(Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_1, STREAM_2))); + assertThat(countBothStreams).isEqualTo(4L); + } + + @Test + public void fieldHistogramReturnsGroupedResults() { + indexTestDocuments(); + + final Map> result = adapter.fieldHistogram( + "source", Set.of(INDEX_NAME), Optional.empty(), 3600000L + ); + + assertThat(result).isNotEmpty(); + + long totalSourceCounts = result.values().stream() + .flatMap(m -> m.values().stream()) + .mapToLong(Long::longValue) + .sum(); + assertThat(totalSourceCounts).isEqualTo(4L); + } + + @Test + public void fieldHistogramWithStreamFilter() { + indexTestDocuments(); + + final Map> result = adapter.fieldHistogram( + "source", Set.of(INDEX_NAME), Optional.of(Set.of(STREAM_1)), 3600000L + ); + + assertThat(result).isNotEmpty(); + + long totalSourceCounts = result.values().stream() + .flatMap(m -> m.values().stream()) + .mapToLong(Long::longValue) + .sum(); + assertThat(totalSourceCounts).isEqualTo(3L); + } + + private void indexTestDocuments() { + final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC(); + final String timestamp = formatter.print(DateTime.now(DateTimeZone.UTC)); + + final BulkIndexRequest bulkRequest = new BulkIndexRequest(); + + bulkRequest.addRequest(INDEX_NAME, Map.of( + "message", "test message 1", + "source", "source-a", + "timestamp", timestamp, + "streams", java.util.List.of(STREAM_1) + )); + + bulkRequest.addRequest(INDEX_NAME, Map.of( + "message", "test message 2", + "source", "source-a", + "timestamp", timestamp, + "streams", java.util.List.of(STREAM_1, STREAM_2) + )); + + bulkRequest.addRequest(INDEX_NAME, Map.of( + "message", "test message 3", + "source", "source-b", + "timestamp", timestamp, + "streams", java.util.List.of(STREAM_2) + )); + + bulkRequest.addRequest(INDEX_NAME, Map.of( + "message", "test message 4", + "source", "source-a", + "timestamp", timestamp, + "streams", java.util.List.of(STREAM_1) + )); + + client().bulkIndex(bulkRequest); + client().refreshNode(); + } +}