From 8d17c8d87e67db8924ed66ab0128f0bdc6aff274 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 28 Aug 2024 11:23:56 -0400 Subject: [PATCH 1/5] [Streaming Indexing] Introduce bulk HTTP API streaming flavor (#15381) * [Streaming Indexing] Introduce bulk HTTP API streaming flavor Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko * Add more test cases Signed-off-by: Andriy Redko * Add more test cases Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + buildSrc/version.properties | 2 +- .../rest/licenses/httpclient5-5.2.3.jar.sha1 | 1 - .../rest/licenses/httpclient5-5.3.1.jar.sha1 | 1 + .../org/opensearch/client/RestClient.java | 50 +++++- .../licenses/httpclient5-5.2.3.jar.sha1 | 1 - .../licenses/httpclient5-5.3.1.jar.sha1 | 1 + .../rest/ReactorNetty4StreamingIT.java | 163 +++++++++++++++++- .../ReactorNetty4StreamingHttpChannel.java | 1 - ...ReactorNetty4StreamingRequestConsumer.java | 2 +- ...tty4HttpServerTransportStreamingTests.java | 2 +- .../xcontent/support/XContentHttpChunk.java | 5 +- .../document/RestBulkStreamingAction.java | 71 +++++--- 13 files changed, 271 insertions(+), 30 deletions(-) delete mode 100644 client/rest/licenses/httpclient5-5.2.3.jar.sha1 create mode 100644 client/rest/licenses/httpclient5-5.3.1.jar.sha1 delete mode 100644 client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 create mode 100644 client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cd030290d4de..6b95e9ec57733 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) +- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381)) - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index ccec8e2891a65..98f474a7f0b90 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -37,7 +37,7 @@ reactor_netty = 1.1.22 reactor = 3.5.20 # client dependencies -httpclient5 = 5.2.3 +httpclient5 = 5.3.1 httpcore5 = 5.2.5 httpclient = 4.5.14 httpcore = 4.4.16 diff --git a/client/rest/licenses/httpclient5-5.2.3.jar.sha1 b/client/rest/licenses/httpclient5-5.2.3.jar.sha1 deleted file mode 100644 index 43e233e72001a..0000000000000 --- a/client/rest/licenses/httpclient5-5.2.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5d753a99d299756998a08c488f2efdf9cf26198e \ No newline at end of file diff --git a/client/rest/licenses/httpclient5-5.3.1.jar.sha1 b/client/rest/licenses/httpclient5-5.3.1.jar.sha1 new file mode 100644 index 0000000000000..c8f32c1ec23a1 --- /dev/null +++ b/client/rest/licenses/httpclient5-5.3.1.jar.sha1 @@ -0,0 +1 @@ +56b53c8f4bcdaada801d311cf2ff8a24d6d96883 \ No newline at end of file diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index 5c87e3fda5701..ab112ca5219e7 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -114,6 +114,7 @@ import java.util.zip.GZIPOutputStream; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; @@ -416,7 +417,12 @@ private Publisher>> streamRequest( try { final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message); if (responseOrResponseException.responseException == null) { - return Mono.just(message); + return Mono.just( + new Message<>( + message.getHead(), + Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b))) + ) + ); } else { if (nodeTuple.nodes.hasNext()) { return Mono.from(streamRequest(nodeTuple, request)); @@ -431,6 +437,48 @@ private Publisher>> streamRequest( }); } + /** + * Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence. + * @param b {@link ByteBuffer} to split + * @return individual chunks + */ + private static Collection frame(ByteBuffer b) { + final Collection buffers = new ArrayList<>(); + + int position = b.position(); + while (b.hasRemaining()) { + // Skip the chunk separator when it comes right at the beginning + if (b.get() == '\r' && b.hasRemaining() && b.position() > 1) { + if (b.get() == '\n') { + final byte[] chunk = new byte[b.position() - position]; + + b.position(position); + b.get(chunk); + + // Do not copy the '\r\n' sequence + buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2)); + position = b.position(); + } + } + } + + if (buffers.isEmpty()) { + return Collections.singleton(b); + } + + // Copy last chunk + if (position != b.position()) { + final byte[] chunk = new byte[b.position() - position]; + + b.position(position); + b.get(chunk); + + buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length)); + } + + return buffers; + } + private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); diff --git a/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 b/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 deleted file mode 100644 index 43e233e72001a..0000000000000 --- a/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5d753a99d299756998a08c488f2efdf9cf26198e \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 b/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 new file mode 100644 index 0000000000000..c8f32c1ec23a1 --- /dev/null +++ b/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 @@ -0,0 +1 @@ +56b53c8f4bcdaada801d311cf2ff8a24d6d96883 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java index c564e289e3f88..6f3895fffa437 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java @@ -44,7 +44,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testStreamingRequest() throws IOException { + public void testStreamingRequestNoBatching() throws IOException { final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true); final Stream stream = IntStream.range(1, 6) @@ -85,6 +85,167 @@ public void testStreamingRequest() throws IOException { assertThat(count, equalTo(5)); } + public void testStreamingRequestOneBatchBySize() throws IOException, InterruptedException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_size", "5"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatchesBySize() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_size", "3"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + ) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatchesByInterval() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofMillis(500); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_interval", "5s"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + // We don't check for a other documents here since those may appear in any of the chunks (it is very + // difficult to get the timing right). But at the end, the total number of the documents is being checked. + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofSeconds(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_interval", "3s"); + streamingRequest.addParameter("batch_size", "5"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + // We don't check for a other documents here since those may appear in any of the chunks (it is very + // difficult to get the timing right). But at the end, the total number of the documents is being checked. + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"5\"")) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + public void testStreamingBadRequest() throws IOException { final Stream stream = Stream.of( "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n" diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java index 1aa03aa9967e2..12ed847c0c0de 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -103,7 +103,6 @@ public void receiveChunk(HttpChunk message) { } } catch (final Exception ex) { producer.error(ex); - } finally { message.close(); } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java index 8ed6710c8a1e3..282a82dc39fda 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -44,7 +44,7 @@ public void subscribe(Subscriber s) { } HttpChunk createChunk(HttpContent chunk, boolean last) { - return new ReactorNetty4HttpChunk(chunk.copy().content(), last); + return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last); } StreamingHttpChannel httpChannel() { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java index a7bf71e58e9b6..df0e4027cc474 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -191,7 +191,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } catch (final IOException ex) { throw new UncheckedIOException(ex); } - }).collect(Collectors.joining("")))); + }).collect(Collectors.joining("\r\n", "", "\r\n")))); } finally { response.release(); } diff --git a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java index 15b63a0ac2030..a7f1d30cd05dd 100644 --- a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java +++ b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java @@ -12,6 +12,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.http.HttpChunk; @@ -19,6 +20,7 @@ * Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk} */ public final class XContentHttpChunk implements HttpChunk { + private static final byte[] CHUNK_SEPARATOR = new byte[] { '\r', '\n' }; private final BytesReference content; /** @@ -42,7 +44,8 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) { if (builder == null /* no content */) { content = BytesArray.EMPTY; } else { - content = BytesReference.bytes(builder); + // Always finalize the output chunk with '\r\n' sequence + content = CompositeBytesReference.of(BytesReference.bytes(builder), new BytesArray(CHUNK_SEPARATOR)); } } diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java index a38244fe9ff20..2e0d1b8ead814 100644 --- a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -95,6 +96,17 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); final String refresh = request.param("refresh"); + final TimeValue batchInterval = request.paramAsTime("batch_interval", null); + final int batchSize = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */ + final boolean hasBatchSize = request.hasParam("batch_size"); /* is batch_size explicitly specified or default is used */ + + if (batchInterval != null && batchInterval.duration() <= 0) { + throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batchInterval.millis() + "ms]."); + } + + if (batchSize <= 0) { + throw new IllegalArgumentException("The batch_size value should be non-negative [" + batchSize + "]."); + } final StreamingRestChannelConsumer consumer = (channel) -> { final MediaType mediaType = request.getMediaType(); @@ -114,39 +126,38 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // Set the content type and the status code before sending the response stream over channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters()))); - // This is initial implementation at the moment which transforms each single request stream chunk into - // individual bulk request and streams each response back. Another source of inefficiency comes from converting - // bulk response from raw (json/yaml/...) to model and back to raw (json/yaml/...). - // TODOs: - // - add batching (by interval and/or count) // - eliminate serialization inefficiencies - Flux.from(channel).zipWith(Flux.fromStream(Stream.generate(() -> { + createBufferedFlux(batchInterval, batchSize, hasBatchSize, channel).zipWith(Flux.fromStream(Stream.generate(() -> { BulkRequest bulkRequest = Requests.bulkRequest(); bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards()); bulkRequest.timeout(prepareBulkRequest.timeout()); bulkRequest.setRefreshPolicy(prepareBulkRequest.getRefreshPolicy()); return bulkRequest; }))).map(t -> { - final HttpChunk chunk = t.getT1(); + boolean isLast = false; + final List chunks = t.getT1(); final BulkRequest bulkRequest = t.getT2(); - try (chunk) { - bulkRequest.add( - chunk.content(), - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - allowExplicitIndex, - request.getMediaType() - ); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); + for (final HttpChunk chunk : chunks) { + isLast |= chunk.isLast(); + try (chunk) { + bulkRequest.add( + chunk.content(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + allowExplicitIndex, + request.getMediaType() + ); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } } - return Tuple.tuple(chunk.isLast(), bulkRequest); + return Tuple.tuple(isLast, bulkRequest); }).flatMap(tuple -> { final CompletableFuture f = new CompletableFuture<>(); @@ -222,4 +233,22 @@ public boolean supportsStreaming() { public boolean allowsUnsafeBuffers() { return true; } + + private Flux> createBufferedFlux( + final TimeValue batchInterval, + final int batchSize, + final boolean hasBatchSize, + StreamingRestChannel channel + ) { + if (batchInterval != null) { + // If non-default batch size is specified, buffer by interval and batch + if (hasBatchSize) { + return Flux.from(channel).bufferTimeout(batchSize, Duration.ofMillis(batchInterval.millis())); + } else { + return Flux.from(channel).buffer(Duration.ofMillis(batchInterval.millis())); + } + } else { + return Flux.from(channel).buffer(batchSize); + } + } } From c0bcacb52155c9114352765c5105a7caa19dc763 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Wed, 28 Aug 2024 09:27:14 -0700 Subject: [PATCH 2/5] Add query group stats constructs (#15343) * add query group stats constructs Signed-off-by: Kaushal Kumar * add changelog entry Signed-off-by: Kaushal Kumar * add packageinfo for stats Signed-off-by: Kaushal Kumar * add total cancellations Signed-off-by: Kaushal Kumar * add more granular level rejections Signed-off-by: Kaushal Kumar * add toXContent test cases Signed-off-by: Kaushal Kumar * move ResourceType enum to wlm Signed-off-by: Kaushal Kumar * update the comment for query group stats Signed-off-by: Kaushal Kumar --------- Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 1 + .../service/QueryGroupPersistenceService.java | 2 +- .../plugin/wlm/QueryGroupTestUtils.java | 2 +- .../QueryGroupPersistenceServiceTests.java | 2 +- .../cluster/metadata/QueryGroup.java | 2 +- .../SearchBackpressureService.java | 2 +- .../trackers/NodeDuressTrackers.java | 2 +- .../wlm/QueryGroupLevelResourceUsageView.java | 1 - .../{search => wlm}/ResourceType.java | 14 +- .../opensearch/wlm/stats/QueryGroupState.java | 103 ++++++++ .../opensearch/wlm/stats/QueryGroupStats.java | 228 ++++++++++++++++++ .../opensearch/wlm/stats/package-info.java | 12 + ...QueryGroupResourceUsageTrackerService.java | 2 +- .../metadata/QueryGroupMetadataTests.java | 2 +- .../cluster/metadata/QueryGroupTests.java | 2 +- .../SearchBackpressureServiceTests.java | 6 +- .../trackers/NodeDuressTrackersTests.java | 2 +- ...QueryGroupLevelResourceUsageViewTests.java | 1 - .../{search => wlm}/ResourceTypeTests.java | 3 +- .../wlm/stats/QueryGroupStateTests.java | 73 ++++++ .../wlm/stats/QueryGroupStatsTests.java | 75 ++++++ ...GroupResourceUsageTrackerServiceTests.java | 2 +- 22 files changed, 517 insertions(+), 22 deletions(-) rename server/src/main/java/org/opensearch/{search => wlm}/ResourceType.java (87%) create mode 100644 server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java create mode 100644 server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java create mode 100644 server/src/main/java/org/opensearch/wlm/stats/package-info.java rename server/src/test/java/org/opensearch/{search => wlm}/ResourceTypeTests.java (96%) create mode 100644 server/src/test/java/org/opensearch/wlm/stats/QueryGroupStateTests.java create mode 100644 server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b95e9ec57733..f3d7525fec7f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381)) - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) +- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java index ba5161a2c855e..7561a2f6f99c3 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java @@ -29,7 +29,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest; -import org.opensearch.search.ResourceType; +import org.opensearch.wlm.ResourceType; import java.util.Collection; import java.util.EnumMap; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java index 5ba1ad5334712..e165645775d5c 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java @@ -31,7 +31,7 @@ import java.util.Set; import static org.opensearch.cluster.metadata.QueryGroup.builder; -import static org.opensearch.search.ResourceType.fromName; +import static org.opensearch.wlm.ResourceType.fromName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java index a516ffdde839e..5cb3d8fc6d11f 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java @@ -24,9 +24,9 @@ import org.opensearch.plugin.wlm.QueryGroupTestUtils; import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest; -import org.opensearch.search.ResourceType; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.ResourceType; import java.util.ArrayList; import java.util.Collection; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java index 9b5c6bc2369a6..a971aa58940ba 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -17,7 +17,7 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.search.ResourceType; +import org.opensearch.wlm.ResourceType; import org.joda.time.Instant; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index c26c5d63a3573..a85bc69b766cb 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; -import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -43,6 +42,7 @@ import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.ResourceType; import java.io.IOException; import java.util.ArrayList; diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java index ae60a82fc2816..c27c50ac12c0f 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java @@ -9,7 +9,7 @@ package org.opensearch.search.backpressure.trackers; import org.opensearch.common.util.Streak; -import org.opensearch.search.ResourceType; +import org.opensearch.wlm.ResourceType; import java.util.Map; import java.util.function.BooleanSupplier; diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java b/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java index 2fd743dc3f83f..7577c8573ec10 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java @@ -8,7 +8,6 @@ package org.opensearch.wlm; -import org.opensearch.search.ResourceType; import org.opensearch.tasks.Task; import java.util.List; diff --git a/server/src/main/java/org/opensearch/search/ResourceType.java b/server/src/main/java/org/opensearch/wlm/ResourceType.java similarity index 87% rename from server/src/main/java/org/opensearch/search/ResourceType.java rename to server/src/main/java/org/opensearch/wlm/ResourceType.java index 0cba2222a6e20..adf384995c91d 100644 --- a/server/src/main/java/org/opensearch/search/ResourceType.java +++ b/server/src/main/java/org/opensearch/wlm/ResourceType.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.search; +package org.opensearch.wlm; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamOutput; @@ -21,15 +21,17 @@ */ @PublicApi(since = "2.x") public enum ResourceType { - CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU)), - MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY)); + CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU), true), + MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY), true); private final String name; private final Function getResourceUsage; + private final boolean statsEnabled; - ResourceType(String name, Function getResourceUsage) { + ResourceType(String name, Function getResourceUsage, boolean statsEnabled) { this.name = name; this.getResourceUsage = getResourceUsage; + this.statsEnabled = statsEnabled; } /** @@ -63,4 +65,8 @@ public String getName() { public long getResourceUsage(Task task) { return getResourceUsage.apply(task); } + + public boolean hasStatsEnabled() { + return statsEnabled; + } } diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java new file mode 100644 index 0000000000000..93cfcea697c43 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.stats; + +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.wlm.ResourceType; + +import java.util.EnumMap; +import java.util.Map; + +/** + * This class will keep the point in time view of the query group stats + */ +public class QueryGroupState { + /** + * completions at the query group level, this is a cumulative counter since the Opensearch start time + */ + final CounterMetric completions = new CounterMetric(); + + /** + * rejections at the query group level, this is a cumulative counter since the OpenSearch start time + */ + final CounterMetric totalRejections = new CounterMetric(); + + /** + * this will track the cumulative failures in a query group + */ + final CounterMetric failures = new CounterMetric(); + + /** + * This will track total number of cancellations in the query group due to all resource type breaches + */ + final CounterMetric totalCancellations = new CounterMetric(); + + /** + * This is used to store the resource type state both for CPU and MEMORY + */ + private final Map resourceState; + + public QueryGroupState() { + resourceState = new EnumMap<>(ResourceType.class); + for (ResourceType resourceType : ResourceType.values()) { + if (resourceType.hasStatsEnabled()) { + resourceState.put(resourceType, new ResourceTypeState(resourceType)); + } + } + } + + /** + * + * @return completions in the query group + */ + public long getCompletions() { + return completions.count(); + } + + /** + * + * @return rejections in the query group + */ + public long getTotalRejections() { + return totalRejections.count(); + } + + /** + * + * @return failures in the query group + */ + public long getFailures() { + return failures.count(); + } + + public long getTotalCancellations() { + return totalCancellations.count(); + } + + /** + * getter for query group resource state + * @return the query group resource state + */ + public Map getResourceState() { + return resourceState; + } + + /** + * This class holds the resource level stats for the query group + */ + public static class ResourceTypeState { + final ResourceType resourceType; + final CounterMetric cancellations = new CounterMetric(); + final CounterMetric rejections = new CounterMetric(); + + public ResourceTypeState(ResourceType resourceType) { + this.resourceType = resourceType; + } + } +} diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java new file mode 100644 index 0000000000000..d39bf104332da --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.wlm.ResourceType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * { + * "queryGroupID": { + * "completions": 1233234234, + * "rejections": 12, + * "failures": 97, + * "total_cancellations": 474, + * "CPU": { "current_usage": 49.6, "cancellation": 432, "rejections": 8 }, + * "MEMORY": { "current_usage": 39.6, "cancellation": 42, "rejections": 4 } + * }, + * ... + * ... + * } + */ +public class QueryGroupStats implements ToXContentObject, Writeable { + private final Map stats; + + public QueryGroupStats(Map stats) { + this.stats = stats; + } + + public QueryGroupStats(StreamInput in) throws IOException { + stats = in.readMap(StreamInput::readString, QueryGroupStatsHolder::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(stats, StreamOutput::writeString, QueryGroupStatsHolder::writeTo); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("query_groups"); + for (Map.Entry queryGroupStats : stats.entrySet()) { + builder.startObject(queryGroupStats.getKey()); + queryGroupStats.getValue().toXContent(builder, params); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QueryGroupStats that = (QueryGroupStats) o; + return Objects.equals(stats, that.stats); + } + + @Override + public int hashCode() { + return Objects.hash(stats); + } + + /** + * This is a stats holder object which will hold the data for a query group at a point in time + * the instance will only be created on demand through stats api + */ + public static class QueryGroupStatsHolder implements ToXContentObject, Writeable { + public static final String COMPLETIONS = "completions"; + public static final String REJECTIONS = "rejections"; + public static final String TOTAL_CANCELLATIONS = "total_cancellations"; + public static final String FAILURES = "failures"; + private final long completions; + private final long rejections; + private final long failures; + private final long totalCancellations; + private final Map resourceStats; + + public QueryGroupStatsHolder( + long completions, + long rejections, + long failures, + long totalCancellations, + Map resourceStats + ) { + this.completions = completions; + this.rejections = rejections; + this.failures = failures; + this.totalCancellations = totalCancellations; + this.resourceStats = resourceStats; + } + + public QueryGroupStatsHolder(StreamInput in) throws IOException { + this.completions = in.readVLong(); + this.rejections = in.readVLong(); + this.failures = in.readVLong(); + this.totalCancellations = in.readVLong(); + this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new); + } + + /** + * Writes the {@param statsHolder} to {@param out} + * @param out StreamOutput + * @param statsHolder QueryGroupStatsHolder + * @throws IOException exception + */ + public static void writeTo(StreamOutput out, QueryGroupStatsHolder statsHolder) throws IOException { + out.writeVLong(statsHolder.completions); + out.writeVLong(statsHolder.rejections); + out.writeVLong(statsHolder.failures); + out.writeVLong(statsHolder.totalCancellations); + out.writeMap(statsHolder.resourceStats, (o, val) -> o.writeString(val.getName()), ResourceStats::writeTo); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + QueryGroupStatsHolder.writeTo(out, this); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(COMPLETIONS, completions); + builder.field(REJECTIONS, rejections); + builder.field(FAILURES, failures); + builder.field(TOTAL_CANCELLATIONS, totalCancellations); + for (Map.Entry resourceStat : resourceStats.entrySet()) { + ResourceType resourceType = resourceStat.getKey(); + ResourceStats resourceStats1 = resourceStat.getValue(); + builder.startObject(resourceType.getName()); + resourceStats1.toXContent(builder, params); + builder.endObject(); + } + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QueryGroupStatsHolder that = (QueryGroupStatsHolder) o; + return completions == that.completions + && rejections == that.rejections + && Objects.equals(resourceStats, that.resourceStats) + && failures == that.failures + && totalCancellations == that.totalCancellations; + } + + @Override + public int hashCode() { + return Objects.hash(completions, rejections, totalCancellations, failures, resourceStats); + } + } + + /** + * point in time resource level stats holder + */ + public static class ResourceStats implements ToXContentObject, Writeable { + public static final String CURRENT_USAGE = "current_usage"; + public static final String CANCELLATIONS = "cancellations"; + public static final double PRECISION = 1e-9; + private final double currentUsage; + private final long cancellations; + private final long rejections; + + public ResourceStats(double currentUsage, long cancellations, long rejections) { + this.currentUsage = currentUsage; + this.cancellations = cancellations; + this.rejections = rejections; + } + + public ResourceStats(StreamInput in) throws IOException { + this.currentUsage = in.readDouble(); + this.cancellations = in.readVLong(); + this.rejections = in.readVLong(); + } + + /** + * Writes the {@param stats} to {@param out} + * @param out StreamOutput + * @param stats QueryGroupStatsHolder + * @throws IOException exception + */ + public static void writeTo(StreamOutput out, ResourceStats stats) throws IOException { + out.writeDouble(stats.currentUsage); + out.writeVLong(stats.cancellations); + out.writeVLong(stats.rejections); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + ResourceStats.writeTo(out, this); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(CURRENT_USAGE, currentUsage); + builder.field(CANCELLATIONS, cancellations); + builder.field(QueryGroupStatsHolder.REJECTIONS, rejections); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ResourceStats that = (ResourceStats) o; + return (currentUsage - that.currentUsage) < PRECISION && cancellations == that.cancellations && rejections == that.rejections; + } + + @Override + public int hashCode() { + return Objects.hash(currentUsage, cancellations, rejections); + } + } +} diff --git a/server/src/main/java/org/opensearch/wlm/stats/package-info.java b/server/src/main/java/org/opensearch/wlm/stats/package-info.java new file mode 100644 index 0000000000000..2facf8d16df22 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/stats/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query group stats related artifacts + */ +package org.opensearch.wlm.stats; diff --git a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java index bfbf5d8a452d1..15852b5bbe6a8 100644 --- a/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java +++ b/server/src/main/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerService.java @@ -8,11 +8,11 @@ package org.opensearch.wlm.tracker; -import org.opensearch.search.ResourceType; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.wlm.QueryGroupLevelResourceUsageView; import org.opensearch.wlm.QueryGroupTask; +import org.opensearch.wlm.ResourceType; import java.util.EnumMap; import java.util.EnumSet; diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java index 06734b8e0bac2..f5e667de73d93 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java @@ -14,8 +14,8 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.search.ResourceType; import org.opensearch.test.AbstractDiffableSerializationTestCase; +import org.opensearch.wlm.ResourceType; import java.io.IOException; import java.util.Collections; diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java index 884b364fb26b8..f4d3e5ceb1784 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java @@ -14,8 +14,8 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.search.ResourceType; import org.opensearch.test.AbstractSerializingTestCase; +import org.opensearch.wlm.ResourceType; import org.joda.time.Instant; import java.io.IOException; diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index 15d0fcd10d701..a444eb42eac2e 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -16,7 +16,6 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -40,6 +39,7 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.ResourceType; import org.junit.After; import org.junit.Before; @@ -56,9 +56,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; -import static org.opensearch.search.ResourceType.CPU; -import static org.opensearch.search.ResourceType.MEMORY; import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; +import static org.opensearch.wlm.ResourceType.CPU; +import static org.opensearch.wlm.ResourceType.MEMORY; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java index 801576bdf89d4..7c52840c099d4 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java @@ -8,9 +8,9 @@ package org.opensearch.search.backpressure.trackers; -import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.wlm.ResourceType; import java.util.EnumMap; diff --git a/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java b/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java index 7f6419505fec2..532bf3de95bd6 100644 --- a/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java +++ b/server/src/test/java/org/opensearch/wlm/QueryGroupLevelResourceUsageViewTests.java @@ -10,7 +10,6 @@ import org.opensearch.action.search.SearchAction; import org.opensearch.core.tasks.TaskId; -import org.opensearch.search.ResourceType; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; diff --git a/server/src/test/java/org/opensearch/search/ResourceTypeTests.java b/server/src/test/java/org/opensearch/wlm/ResourceTypeTests.java similarity index 96% rename from server/src/test/java/org/opensearch/search/ResourceTypeTests.java rename to server/src/test/java/org/opensearch/wlm/ResourceTypeTests.java index 78827b8b1bdad..737cbb37b554c 100644 --- a/server/src/test/java/org/opensearch/search/ResourceTypeTests.java +++ b/server/src/test/java/org/opensearch/wlm/ResourceTypeTests.java @@ -6,14 +6,13 @@ * compatible open source license. */ -package org.opensearch.search; +package org.opensearch.wlm; import org.opensearch.action.search.SearchShardTask; import org.opensearch.core.tasks.resourcetracker.ResourceStats; import org.opensearch.tasks.CancellableTask; import org.opensearch.test.OpenSearchTestCase; -import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStateTests.java b/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStateTests.java new file mode 100644 index 0000000000000..576eec7be1888 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStateTests.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.stats; + +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.wlm.ResourceType; + +import java.util.ArrayList; +import java.util.List; + +public class QueryGroupStateTests extends OpenSearchTestCase { + QueryGroupState queryGroupState; + + public void testRandomQueryGroupsStateUpdates() { + queryGroupState = new QueryGroupState(); + List updaterThreads = new ArrayList<>(); + + for (int i = 0; i < 25; i++) { + if (i % 5 == 0) { + updaterThreads.add(new Thread(() -> queryGroupState.completions.inc())); + } else if (i % 5 == 1) { + updaterThreads.add(new Thread(() -> { + queryGroupState.totalRejections.inc(); + if (randomBoolean()) { + queryGroupState.getResourceState().get(ResourceType.CPU).rejections.inc(); + } else { + queryGroupState.getResourceState().get(ResourceType.MEMORY).rejections.inc(); + } + })); + } else if (i % 5 == 2) { + updaterThreads.add(new Thread(() -> queryGroupState.failures.inc())); + } else if (i % 5 == 3) { + updaterThreads.add(new Thread(() -> queryGroupState.getResourceState().get(ResourceType.CPU).cancellations.inc())); + } else { + updaterThreads.add(new Thread(() -> queryGroupState.getResourceState().get(ResourceType.MEMORY).cancellations.inc())); + } + + if (i % 5 == 3 || i % 5 == 4) { + updaterThreads.add(new Thread(() -> queryGroupState.totalCancellations.inc())); + } + } + + // trigger the updates + updaterThreads.forEach(Thread::start); + // wait for updates to be finished + updaterThreads.forEach(thread -> { + try { + thread.join(); + } catch (InterruptedException ignored) { + + } + }); + + assertEquals(5, queryGroupState.getCompletions()); + assertEquals(5, queryGroupState.getTotalRejections()); + + final long sumOfRejectionsDueToResourceTypes = queryGroupState.getResourceState().get(ResourceType.CPU).rejections.count() + + queryGroupState.getResourceState().get(ResourceType.MEMORY).rejections.count(); + assertEquals(sumOfRejectionsDueToResourceTypes, queryGroupState.getTotalRejections()); + + assertEquals(5, queryGroupState.getFailures()); + assertEquals(10, queryGroupState.getTotalCancellations()); + assertEquals(5, queryGroupState.getResourceState().get(ResourceType.CPU).cancellations.count()); + assertEquals(5, queryGroupState.getResourceState().get(ResourceType.MEMORY).cancellations.count()); + } + +} diff --git a/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java b/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java new file mode 100644 index 0000000000000..661c3a7beae40 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.stats; + +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.AbstractWireSerializingTestCase; +import org.opensearch.wlm.ResourceType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class QueryGroupStatsTests extends AbstractWireSerializingTestCase { + + public void testToXContent() throws IOException { + final Map stats = new HashMap<>(); + final String queryGroupId = "afakjklaj304041-afaka"; + stats.put( + queryGroupId, + new QueryGroupStats.QueryGroupStatsHolder( + 123456789, + 2, + 0, + 13, + Map.of(ResourceType.CPU, new QueryGroupStats.ResourceStats(0.3, 13, 2)) + ) + ); + XContentBuilder builder = JsonXContent.contentBuilder(); + QueryGroupStats queryGroupStats = new QueryGroupStats(stats); + builder.startObject(); + queryGroupStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + assertEquals( + "{\"query_groups\":{\"afakjklaj304041-afaka\":{\"completions\":123456789,\"rejections\":2,\"failures\":0,\"total_cancellations\":13,\"cpu\":{\"current_usage\":0.3,\"cancellations\":13,\"rejections\":2}}}}", + builder.toString() + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return QueryGroupStats::new; + } + + @Override + protected QueryGroupStats createTestInstance() { + Map stats = new HashMap<>(); + stats.put( + randomAlphaOfLength(10), + new QueryGroupStats.QueryGroupStatsHolder( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + Map.of( + ResourceType.CPU, + new QueryGroupStats.ResourceStats( + randomDoubleBetween(0.0, 0.90, false), + randomNonNegativeLong(), + randomNonNegativeLong() + ) + ) + ) + ); + return new QueryGroupStats(stats); + } +} diff --git a/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java index 967119583c25f..ca2891cb532f2 100644 --- a/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java +++ b/server/src/test/java/org/opensearch/wlm/tracker/QueryGroupResourceUsageTrackerServiceTests.java @@ -12,7 +12,6 @@ import org.opensearch.action.search.SearchTask; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.tasks.resourcetracker.ResourceStats; -import org.opensearch.search.ResourceType; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskResourceTrackingService; @@ -21,6 +20,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.wlm.QueryGroupLevelResourceUsageView; import org.opensearch.wlm.QueryGroupTask; +import org.opensearch.wlm.ResourceType; import org.junit.After; import org.junit.Before; From b30df02b9f86fc4730620a39aeaeffc5bc6c54d9 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Wed, 28 Aug 2024 09:34:20 -0700 Subject: [PATCH 3/5] Update skip versions for flat_object tests (#15448) While fixing a flat_object bug, I noticed that we were skipping all related yamlRestTests on pre-3.0 versions. Since the flat object field type was added in 2.7, that should be the correct skip level for most tests. The bug I fixed is still present in 2.16 and earlier, so the test that covers that fix needs to be skipped pre-2.17. Signed-off-by: Michael Froh --- .../rest-api-spec/test/index/100_partial_flat_object.yml | 8 ++++---- .../test/index/105_partial_flat_object_nested.yml | 8 ++++---- .../resources/rest-api-spec/test/index/90_flat_object.yml | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/100_partial_flat_object.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/100_partial_flat_object.yml index e1bc86f1c9f3d..6fc2654bcfc8f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/100_partial_flat_object.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/100_partial_flat_object.yml @@ -114,8 +114,8 @@ teardown: # and no dynamic fields were created. "Mappings": - skip: - version: " - 2.99.99" - reason: "flat_object is introduced in 3.0.0 in main branch" + version: " - 2.16.99" + reason: "flat_object field with null (doc 4) throws exception before 2.17" - do: indices.get_mapping: @@ -131,8 +131,8 @@ teardown: --- "Supported queries": - skip: - version: " - 2.99.99" - reason: "flat_object is introduced in 3.0.0 in main branch" + version: " - 2.16.99" + reason: "flat_object field with null (doc 4) throws exception before 2.17" # Verify Document Count diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/105_partial_flat_object_nested.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/105_partial_flat_object_nested.yml index ce172c2773e1f..549ddbdde7bab 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/105_partial_flat_object_nested.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/105_partial_flat_object_nested.yml @@ -89,8 +89,8 @@ teardown: # and no dynamic fields were created. "Mappings": - skip: - version: " - 2.99.99" - reason: "flat_object is introduced in 3.0.0 in main branch" + version: " - 2.6.99" + reason: "flat_object is introduced in 2.7" - do: indices.get_mapping: @@ -106,8 +106,8 @@ teardown: --- "Supported queries": - skip: - version: " - 2.99.99" - reason: "flat_object is introduced in 3.0.0 in main branch" + version: " - 2.6.99" + reason: "flat_object is introduced in 2.7" # Verify Document Count diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml index 0a5f7444efd17..83d3d273ebd93 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml @@ -97,8 +97,8 @@ teardown: --- "Supported queries": - skip: - version: " - 2.99.99" - reason: "flat_object is introduced in 3.0.0 in main branch" + version: " - 2.6.99" + reason: "flat_object is introduced in 2.7" # Verify Document Count - do: @@ -607,8 +607,8 @@ teardown: --- "Unsupported": - skip: - version: " - 2.99.99" - reason: "flat_object is introduced in 3.0.0 in main branch" + version: " - 2.6.99" + reason: "flat_object is introduced in 2.7" # Mapping parameters (such as index/search analyzers) are currently not supported # The plan is to support them in the next version From ee17ecacc0daeed27e4b9640a0ad0e5c3f766960 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 28 Aug 2024 14:53:44 -0400 Subject: [PATCH 4/5] Add runAs to Subject interface and introduce IdentityAwarePlugin extension point (#14630) * Create ExecutionContext and show example with ActionPluginProxy Signed-off-by: Craig Perkins * Only allow core to set the ExecutionContext Signed-off-by: Craig Perkins * WIP on plugin aware thread context Signed-off-by: Craig Perkins * Plugin Aware API Handling Signed-off-by: Craig Perkins * Add test to verify that ExecutionContext is being populated during RestHandling Signed-off-by: Craig Perkins * Clear context in a finally block Signed-off-by: Craig Perkins * Create switchContext method in ThreadContext and make pluginExecutionStack a stack Signed-off-by: Craig Perkins * WIP on plugin aware stash context Signed-off-by: Craig Perkins * Create class called PluginAwareNodeClient that provides a method called switchContext Signed-off-by: Craig Perkins * Remove ExecutionContext class Signed-off-by: Craig Perkins * Update javadoc Signed-off-by: Craig Perkins * Change createComponents to take in PluginAwareNodeClient Signed-off-by: Craig Perkins * Update all instances of createComponents Signed-off-by: Craig Perkins * Initialize clients Signed-off-by: Craig Perkins * Remove casting Signed-off-by: Craig Perkins * WIP on notion of ContextSwitcher Signed-off-by: Craig Perkins * Make stashContext package-private Signed-off-by: Craig Perkins * Make markAsSystemContext package-private Signed-off-by: Craig Perkins * Add javadoc on param Signed-off-by: Craig Perkins * Remove SystemContextSwitcher Signed-off-by: Craig Perkins * Merge with main Signed-off-by: Craig Perkins * Cleanup Signed-off-by: Craig Perkins * Remove SystemIndexFilter Signed-off-by: Craig Perkins * Add notion of Forbidden Headers to the ThreadContext Signed-off-by: Craig Perkins * Fix tests Signed-off-by: Craig Perkins * Fix test Signed-off-by: Craig Perkins * Add method to initialize plugins Signed-off-by: Craig Perkins * Create concept of pluginNodeClient that can be used for executing transport actions as the plugin Signed-off-by: Craig Perkins * Add test Signed-off-by: Craig Perkins * Add another test for setPluginNodeClient Signed-off-by: Craig Perkins * Remove newline Signed-off-by: Craig Perkins * Add another test Signed-off-by: Craig Perkins * Subject.runAs and introduce PluginSubject Signed-off-by: Craig Perkins * Do nothing when runAs is called for ShiroSubject and NoopSubject Signed-off-by: Craig Perkins * Remove extraneous changes Signed-off-by: Craig Perkins * Test all methods in PluginSubject Signed-off-by: Craig Perkins * Pass a Callable to runAs Signed-off-by: Craig Perkins * Update import Signed-off-by: Craig Perkins * Simplify PR, make NoopPluginSubject and introduce IdentityAwarePlugin Signed-off-by: Craig Perkins * Add final Signed-off-by: Craig Perkins * Remove server dependency Signed-off-by: Craig Perkins * Remove AbstractSubject Signed-off-by: Craig Perkins * Remove unnecessary changes Signed-off-by: Craig Perkins * Add javadoc to NoopPluginSubject Signed-off-by: Craig Perkins * Rename to assignSubject Signed-off-by: Craig Perkins * Add experimental label Signed-off-by: Craig Perkins * Add getPluginSubject(plugin) to IdentityPlugin Signed-off-by: Craig Perkins * Make runAs generic Signed-off-by: Craig Perkins * package-private constructor Signed-off-by: Craig Perkins * Move IdentityAwarePlugin initialization Signed-off-by: Craig Perkins * Create separate PluginSubject interface Signed-off-by: Craig Perkins * Remove authenticate method Signed-off-by: Craig Perkins * Remove import Signed-off-by: Craig Perkins * Separate UserSubject and PluginSubject Signed-off-by: Craig Perkins * Terminate TestThreadPool Signed-off-by: Craig Perkins * mock ThreadPool in RestSendToExtensionActionTests Signed-off-by: Craig Perkins * Fix Thread leak Signed-off-by: Craig Perkins * Add to CHANGELOG Signed-off-by: Craig Perkins * Rename to getCurrentSubject Signed-off-by: Craig Perkins * Add type check Signed-off-by: Craig Perkins * Rename to pluginSubject Signed-off-by: Craig Perkins * Add runAs to ActionRequest and surround doExecute in AbstractClient Signed-off-by: Craig Perkins * Return this Signed-off-by: Craig Perkins * Switch back to void Signed-off-by: Craig Perkins * Revert change to ActionRequest Signed-off-by: Craig Perkins --------- Signed-off-by: Craig Perkins --- CHANGELOG.md | 1 + .../identity/shiro/ShiroIdentityPlugin.java | 45 +++++++++++++- .../identity/shiro/ShiroPluginSubject.java | 49 ++++++++++++++++ .../identity/shiro/ShiroSubject.java | 3 +- .../shiro/ShiroIdentityPluginTests.java | 9 ++- .../extensions/NoopExtensionsManager.java | 5 +- .../rest/RestSendToExtensionAction.java | 2 +- .../opensearch/identity/IdentityService.java | 20 +++++-- .../opensearch/identity/PluginSubject.java | 19 ++++++ .../java/org/opensearch/identity/Subject.java | 14 ++--- .../org/opensearch/identity/UserSubject.java | 29 ++++++++++ .../identity/noop/NoopIdentityPlugin.java | 16 ++++- .../identity/noop/NoopPluginSubject.java | 49 ++++++++++++++++ .../opensearch/identity/noop/NoopSubject.java | 3 +- .../opensearch/identity/tokens/AuthToken.java | 3 + .../identity/tokens/OnBehalfOfClaims.java | 5 ++ .../identity/tokens/TokenManager.java | 4 ++ .../main/java/org/opensearch/node/Node.java | 30 +++++----- .../plugins/IdentityAwarePlugin.java | 34 +++++++++++ .../opensearch/plugins/IdentityPlugin.java | 17 +++++- .../org/opensearch/rest/RestController.java | 9 ++- .../opensearch/action/ActionModuleTests.java | 5 +- .../bootstrap/IdentityPluginTests.java | 19 +++--- .../extensions/ExtensionsManagerTests.java | 4 +- .../rest/ExtensionRestRequestTests.java | 8 ++- .../RestInitializeExtensionActionTests.java | 6 +- .../rest/RestSendToExtensionActionTests.java | 6 +- .../identity/noop/NoopPluginSubjectTests.java | 58 +++++++++++++++++++ .../opensearch/rest/RestControllerTests.java | 3 +- .../rest/RestHttpResponseHeadersTests.java | 4 +- .../indices/RestValidateQueryActionTests.java | 2 +- .../test/rest/RestActionTestCase.java | 5 +- 32 files changed, 423 insertions(+), 63 deletions(-) create mode 100644 plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroPluginSubject.java create mode 100644 server/src/main/java/org/opensearch/identity/PluginSubject.java create mode 100644 server/src/main/java/org/opensearch/identity/UserSubject.java create mode 100644 server/src/main/java/org/opensearch/identity/noop/NoopPluginSubject.java create mode 100644 server/src/main/java/org/opensearch/plugins/IdentityAwarePlugin.java create mode 100644 server/src/test/java/org/opensearch/identity/noop/NoopPluginSubjectTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f3d7525fec7f2..fb89dd2723b62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) +- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroIdentityPlugin.java b/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroIdentityPlugin.java index 77cab13880c27..af802596ebaa7 100644 --- a/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroIdentityPlugin.java +++ b/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroIdentityPlugin.java @@ -12,23 +12,43 @@ import org.apache.logging.log4j.Logger; import org.apache.shiro.SecurityUtils; import org.apache.shiro.mgt.SecurityManager; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.identity.PluginSubject; import org.opensearch.identity.Subject; import org.opensearch.identity.tokens.TokenManager; import org.opensearch.plugins.IdentityPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.function.Supplier; /** * Identity implementation with Shiro * * @opensearch.experimental */ +@ExperimentalApi public final class ShiroIdentityPlugin extends Plugin implements IdentityPlugin { private Logger log = LogManager.getLogger(this.getClass()); private final Settings settings; private final ShiroTokenManager authTokenHandler; + private ThreadPool threadPool; + /** * Create a new instance of the Shiro Identity Plugin * @@ -42,13 +62,31 @@ public ShiroIdentityPlugin(final Settings settings) { SecurityUtils.setSecurityManager(securityManager); } + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver expressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.threadPool = threadPool; + return Collections.emptyList(); + } + /** * Return a Shiro Subject based on the provided authTokenHandler and current subject * * @return The current subject */ @Override - public Subject getSubject() { + public Subject getCurrentSubject() { return new ShiroSubject(authTokenHandler, SecurityUtils.getSubject()); } @@ -61,4 +99,9 @@ public Subject getSubject() { public TokenManager getTokenManager() { return this.authTokenHandler; } + + @Override + public PluginSubject getPluginSubject(Plugin plugin) { + return new ShiroPluginSubject(threadPool); + } } diff --git a/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroPluginSubject.java b/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroPluginSubject.java new file mode 100644 index 0000000000000..31dde34f447d4 --- /dev/null +++ b/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroPluginSubject.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity.shiro; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.identity.NamedPrincipal; +import org.opensearch.identity.PluginSubject; +import org.opensearch.threadpool.ThreadPool; + +import java.security.Principal; +import java.util.concurrent.Callable; + +/** + * Implementation of subject that is always authenticated + *

+ * This class and related classes in this package will not return nulls or fail permissions checks + * + * This class is used by the ShiroIdentityPlugin to initialize IdentityAwarePlugins + * + * @opensearch.experimental + */ +@ExperimentalApi +public class ShiroPluginSubject implements PluginSubject { + private final ThreadPool threadPool; + + ShiroPluginSubject(ThreadPool threadPool) { + super(); + this.threadPool = threadPool; + } + + @Override + public Principal getPrincipal() { + return NamedPrincipal.UNAUTHENTICATED; + } + + @Override + public T runAs(Callable callable) throws Exception { + try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { + return callable.call(); + } + } +} diff --git a/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroSubject.java b/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroSubject.java index e55204593621c..72a168f23c5cd 100644 --- a/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroSubject.java +++ b/plugins/identity-shiro/src/main/java/org/opensearch/identity/shiro/ShiroSubject.java @@ -9,6 +9,7 @@ package org.opensearch.identity.shiro; import org.opensearch.identity.Subject; +import org.opensearch.identity.UserSubject; import org.opensearch.identity.tokens.AuthToken; import java.security.Principal; @@ -19,7 +20,7 @@ * * @opensearch.experimental */ -public class ShiroSubject implements Subject { +public class ShiroSubject implements UserSubject { private final ShiroTokenManager authTokenHandler; private final org.apache.shiro.subject.Subject shiroSubject; diff --git a/plugins/identity-shiro/src/test/java/org/opensearch/identity/shiro/ShiroIdentityPluginTests.java b/plugins/identity-shiro/src/test/java/org/opensearch/identity/shiro/ShiroIdentityPluginTests.java index 626cd44d13ec8..bc14410d81de0 100644 --- a/plugins/identity-shiro/src/test/java/org/opensearch/identity/shiro/ShiroIdentityPluginTests.java +++ b/plugins/identity-shiro/src/test/java/org/opensearch/identity/shiro/ShiroIdentityPluginTests.java @@ -13,6 +13,7 @@ import org.opensearch.identity.IdentityService; import org.opensearch.plugins.IdentityPlugin; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; import java.util.List; @@ -24,19 +25,23 @@ public class ShiroIdentityPluginTests extends OpenSearchTestCase { public void testSingleIdentityPluginSucceeds() { + TestThreadPool threadPool = new TestThreadPool(getTestName()); IdentityPlugin identityPlugin1 = new ShiroIdentityPlugin(Settings.EMPTY); List pluginList1 = List.of(identityPlugin1); - IdentityService identityService1 = new IdentityService(Settings.EMPTY, pluginList1); + IdentityService identityService1 = new IdentityService(Settings.EMPTY, threadPool, pluginList1); assertThat(identityService1.getTokenManager(), is(instanceOf(ShiroTokenManager.class))); + terminate(threadPool); } public void testMultipleIdentityPluginsFail() { + TestThreadPool threadPool = new TestThreadPool(getTestName()); IdentityPlugin identityPlugin1 = new ShiroIdentityPlugin(Settings.EMPTY); IdentityPlugin identityPlugin2 = new ShiroIdentityPlugin(Settings.EMPTY); IdentityPlugin identityPlugin3 = new ShiroIdentityPlugin(Settings.EMPTY); List pluginList = List.of(identityPlugin1, identityPlugin2, identityPlugin3); - Exception ex = assertThrows(OpenSearchException.class, () -> new IdentityService(Settings.EMPTY, pluginList)); + Exception ex = assertThrows(OpenSearchException.class, () -> new IdentityService(Settings.EMPTY, threadPool, pluginList)); assert (ex.getMessage().contains("Multiple identity plugins are not supported,")); + terminate(threadPool); } } diff --git a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java index 81b1b91b11481..5bc655af4df7b 100644 --- a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java @@ -20,7 +20,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.List; import java.util.Optional; import java.util.Set; @@ -31,8 +30,8 @@ */ public class NoopExtensionsManager extends ExtensionsManager { - public NoopExtensionsManager() throws IOException { - super(Set.of(), new IdentityService(Settings.EMPTY, List.of())); + public NoopExtensionsManager(IdentityService identityService) throws IOException { + super(Set.of(), identityService); } @Override diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index f4503ce55e6bc..dc508e30b1895 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -249,7 +249,7 @@ public String executor() { Map> filteredHeaders = filterHeaders(headers, allowList, denyList); TokenManager tokenManager = identityService.getTokenManager(); - Subject subject = this.identityService.getSubject(); + Subject subject = this.identityService.getCurrentSubject(); OnBehalfOfClaims claims = new OnBehalfOfClaims(discoveryExtensionNode.getId(), subject.getPrincipal().getName()); transportService.sendRequest( diff --git a/server/src/main/java/org/opensearch/identity/IdentityService.java b/server/src/main/java/org/opensearch/identity/IdentityService.java index 3129c201b9a39..03f937180f4ba 100644 --- a/server/src/main/java/org/opensearch/identity/IdentityService.java +++ b/server/src/main/java/org/opensearch/identity/IdentityService.java @@ -11,7 +11,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.identity.noop.NoopIdentityPlugin; import org.opensearch.identity.tokens.TokenManager; +import org.opensearch.plugins.IdentityAwarePlugin; import org.opensearch.plugins.IdentityPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; import java.util.List; import java.util.stream.Collectors; @@ -27,12 +30,12 @@ public class IdentityService { private final Settings settings; private final IdentityPlugin identityPlugin; - public IdentityService(final Settings settings, final List identityPlugins) { + public IdentityService(final Settings settings, final ThreadPool threadPool, final List identityPlugins) { this.settings = settings; if (identityPlugins.size() == 0) { log.debug("Identity plugins size is 0"); - identityPlugin = new NoopIdentityPlugin(); + identityPlugin = new NoopIdentityPlugin(threadPool); } else if (identityPlugins.size() == 1) { log.debug("Identity plugins size is 1"); identityPlugin = identityPlugins.get(0); @@ -47,8 +50,8 @@ public IdentityService(final Settings settings, final List ident /** * Gets the current Subject */ - public Subject getSubject() { - return identityPlugin.getSubject(); + public Subject getCurrentSubject() { + return identityPlugin.getCurrentSubject(); } /** @@ -57,4 +60,13 @@ public Subject getSubject() { public TokenManager getTokenManager() { return identityPlugin.getTokenManager(); } + + public void initializeIdentityAwarePlugins(final List identityAwarePlugins) { + if (identityAwarePlugins != null) { + for (IdentityAwarePlugin plugin : identityAwarePlugins) { + PluginSubject pluginSubject = identityPlugin.getPluginSubject((Plugin) plugin); + plugin.assignSubject(pluginSubject); + } + } + } } diff --git a/server/src/main/java/org/opensearch/identity/PluginSubject.java b/server/src/main/java/org/opensearch/identity/PluginSubject.java new file mode 100644 index 0000000000000..3ea42182d3fc3 --- /dev/null +++ b/server/src/main/java/org/opensearch/identity/PluginSubject.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Similar to {@link Subject}, but represents a plugin executing actions + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface PluginSubject extends Subject {} diff --git a/server/src/main/java/org/opensearch/identity/Subject.java b/server/src/main/java/org/opensearch/identity/Subject.java index cbfdadb5cf6a7..0fb0e53848d80 100644 --- a/server/src/main/java/org/opensearch/identity/Subject.java +++ b/server/src/main/java/org/opensearch/identity/Subject.java @@ -5,15 +5,17 @@ package org.opensearch.identity; -import org.opensearch.identity.tokens.AuthToken; +import org.opensearch.common.annotation.ExperimentalApi; import java.security.Principal; +import java.util.concurrent.Callable; /** * An individual, process, or device that causes information to flow among objects or change to the system state. * * @opensearch.experimental */ +@ExperimentalApi public interface Subject { /** @@ -22,11 +24,9 @@ public interface Subject { Principal getPrincipal(); /** - * Authenticate via an auth token - * throws UnsupportedAuthenticationMethod - * throws InvalidAuthenticationToken - * throws SubjectNotFound - * throws SubjectDisabled + * runAs allows the caller to run a callable function as this subject */ - void authenticate(final AuthToken token); + default T runAs(Callable callable) throws Exception { + return callable.call(); + }; } diff --git a/server/src/main/java/org/opensearch/identity/UserSubject.java b/server/src/main/java/org/opensearch/identity/UserSubject.java new file mode 100644 index 0000000000000..50f8ac6b37be3 --- /dev/null +++ b/server/src/main/java/org/opensearch/identity/UserSubject.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.identity.tokens.AuthToken; + +/** + * An instance of a subject representing a User. UserSubjects must pass credentials for authentication. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface UserSubject extends Subject { + /** + * Authenticate via an auth token + * throws UnsupportedAuthenticationMethod + * throws InvalidAuthenticationToken + * throws SubjectNotFound + * throws SubjectDisabled + */ + void authenticate(final AuthToken token); +} diff --git a/server/src/main/java/org/opensearch/identity/noop/NoopIdentityPlugin.java b/server/src/main/java/org/opensearch/identity/noop/NoopIdentityPlugin.java index 090b1f1d025e0..6279388c76f96 100644 --- a/server/src/main/java/org/opensearch/identity/noop/NoopIdentityPlugin.java +++ b/server/src/main/java/org/opensearch/identity/noop/NoopIdentityPlugin.java @@ -8,9 +8,12 @@ package org.opensearch.identity.noop; +import org.opensearch.identity.PluginSubject; import org.opensearch.identity.Subject; import org.opensearch.identity.tokens.TokenManager; import org.opensearch.plugins.IdentityPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; /** * Implementation of identity plugin that does not enforce authentication or authorization @@ -21,12 +24,18 @@ */ public class NoopIdentityPlugin implements IdentityPlugin { + private final ThreadPool threadPool; + + public NoopIdentityPlugin(ThreadPool threadPool) { + this.threadPool = threadPool; + } + /** * Get the current subject * @return Must never return null */ @Override - public Subject getSubject() { + public Subject getCurrentSubject() { return new NoopSubject(); } @@ -38,4 +47,9 @@ public Subject getSubject() { public TokenManager getTokenManager() { return new NoopTokenManager(); } + + @Override + public PluginSubject getPluginSubject(Plugin plugin) { + return new NoopPluginSubject(threadPool); + } } diff --git a/server/src/main/java/org/opensearch/identity/noop/NoopPluginSubject.java b/server/src/main/java/org/opensearch/identity/noop/NoopPluginSubject.java new file mode 100644 index 0000000000000..20e075276f317 --- /dev/null +++ b/server/src/main/java/org/opensearch/identity/noop/NoopPluginSubject.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity.noop; + +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.identity.NamedPrincipal; +import org.opensearch.identity.PluginSubject; +import org.opensearch.threadpool.ThreadPool; + +import java.security.Principal; +import java.util.concurrent.Callable; + +/** + * Implementation of subject that is always authenticated + *

+ * This class and related classes in this package will not return nulls or fail permissions checks + * + * This class is used by the NoopIdentityPlugin to initialize IdentityAwarePlugins + * + * @opensearch.internal + */ +@InternalApi +public class NoopPluginSubject implements PluginSubject { + private final ThreadPool threadPool; + + NoopPluginSubject(ThreadPool threadPool) { + super(); + this.threadPool = threadPool; + } + + @Override + public Principal getPrincipal() { + return NamedPrincipal.UNAUTHENTICATED; + } + + @Override + public T runAs(Callable callable) throws Exception { + try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { + return callable.call(); + } + } +} diff --git a/server/src/main/java/org/opensearch/identity/noop/NoopSubject.java b/server/src/main/java/org/opensearch/identity/noop/NoopSubject.java index 964a218db3cf5..fda88a8b7e8af 100644 --- a/server/src/main/java/org/opensearch/identity/noop/NoopSubject.java +++ b/server/src/main/java/org/opensearch/identity/noop/NoopSubject.java @@ -10,6 +10,7 @@ import org.opensearch.identity.NamedPrincipal; import org.opensearch.identity.Subject; +import org.opensearch.identity.UserSubject; import org.opensearch.identity.tokens.AuthToken; import java.security.Principal; @@ -22,7 +23,7 @@ * * @opensearch.internal */ -public class NoopSubject implements Subject { +public class NoopSubject implements UserSubject { @Override public Principal getPrincipal() { diff --git a/server/src/main/java/org/opensearch/identity/tokens/AuthToken.java b/server/src/main/java/org/opensearch/identity/tokens/AuthToken.java index 88bb855a6e70d..57e4ac4a82ae3 100644 --- a/server/src/main/java/org/opensearch/identity/tokens/AuthToken.java +++ b/server/src/main/java/org/opensearch/identity/tokens/AuthToken.java @@ -8,11 +8,14 @@ package org.opensearch.identity.tokens; +import org.opensearch.common.annotation.ExperimentalApi; + /** * Interface for all token formats to support to authenticate user such as UserName/Password tokens, Access tokens, and more. * * @opensearch.experimental */ +@ExperimentalApi public interface AuthToken { String asAuthHeaderValue(); diff --git a/server/src/main/java/org/opensearch/identity/tokens/OnBehalfOfClaims.java b/server/src/main/java/org/opensearch/identity/tokens/OnBehalfOfClaims.java index 00e50a59e9486..2b37ed954e7d4 100644 --- a/server/src/main/java/org/opensearch/identity/tokens/OnBehalfOfClaims.java +++ b/server/src/main/java/org/opensearch/identity/tokens/OnBehalfOfClaims.java @@ -8,9 +8,14 @@ package org.opensearch.identity.tokens; +import org.opensearch.common.annotation.ExperimentalApi; + /** * This class represents the claims of an OnBehalfOf token. + * + * @opensearch.experimental */ +@ExperimentalApi public class OnBehalfOfClaims { private final String audience; diff --git a/server/src/main/java/org/opensearch/identity/tokens/TokenManager.java b/server/src/main/java/org/opensearch/identity/tokens/TokenManager.java index 972a9a1080955..b9340e618245a 100644 --- a/server/src/main/java/org/opensearch/identity/tokens/TokenManager.java +++ b/server/src/main/java/org/opensearch/identity/tokens/TokenManager.java @@ -8,11 +8,15 @@ package org.opensearch.identity.tokens; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.identity.Subject; /** * This interface defines the expected methods of a token manager + * + * @opensearch.experimental */ +@ExperimentalApi public interface TokenManager { /** diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7e867d3966ff5..388e00bedab0c 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -201,6 +201,7 @@ import org.opensearch.plugins.DiscoveryPlugin; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.ExtensionAwarePlugin; +import org.opensearch.plugins.IdentityAwarePlugin; import org.opensearch.plugins.IdentityPlugin; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.IngestPlugin; @@ -526,19 +527,6 @@ protected Node( identityPlugins.addAll(pluginsService.filterPlugins(IdentityPlugin.class)); } - final IdentityService identityService = new IdentityService(settings, identityPlugins); - - if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - final List extensionAwarePlugins = pluginsService.filterPlugins(ExtensionAwarePlugin.class); - Set> additionalSettings = new HashSet<>(); - for (ExtensionAwarePlugin extAwarePlugin : extensionAwarePlugins) { - additionalSettings.addAll(extAwarePlugin.getExtensionSettings()); - } - this.extensionsManager = new ExtensionsManager(additionalSettings, identityService); - } else { - this.extensionsManager = new NoopExtensionsManager(); - } - final Set additionalRoles = pluginsService.filterPlugins(Plugin.class) .stream() .map(Plugin::getRoles) @@ -576,6 +564,19 @@ protected Node( runnableTaskListener = new AtomicReference<>(); final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, executorBuilders.toArray(new ExecutorBuilder[0])); + final IdentityService identityService = new IdentityService(settings, threadPool, identityPlugins); + + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + final List extensionAwarePlugins = pluginsService.filterPlugins(ExtensionAwarePlugin.class); + Set> additionalSettings = new HashSet<>(); + for (ExtensionAwarePlugin extAwarePlugin : extensionAwarePlugins) { + additionalSettings.addAll(extAwarePlugin.getExtensionSettings()); + } + this.extensionsManager = new ExtensionsManager(additionalSettings, identityService); + } else { + this.extensionsManager = new NoopExtensionsManager(identityService); + } + final SetOnce repositoriesServiceReference = new SetOnce<>(); final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(repositoriesServiceReference::get, threadPool); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreNodeService); @@ -1012,6 +1013,9 @@ protected Node( // Add the telemetryAwarePlugin components to the existing pluginComponents collection. pluginComponents.addAll(telemetryAwarePluginComponents); + List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); + identityService.initializeIdentityAwarePlugins(identityAwarePlugins); + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( diff --git a/server/src/main/java/org/opensearch/plugins/IdentityAwarePlugin.java b/server/src/main/java/org/opensearch/plugins/IdentityAwarePlugin.java new file mode 100644 index 0000000000000..b19dcfe5544ec --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/IdentityAwarePlugin.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.identity.PluginSubject; +import org.opensearch.identity.Subject; + +/** + * Plugin that performs transport actions with a plugin system context. IdentityAwarePlugins are initialized + * with a {@link Subject} that they can utilize to perform transport actions outside the default subject. + * + * When the Security plugin is installed, the default subject is the authenticated user. In particular, + * SystemIndexPlugins utilize the {@link Subject} to perform transport actions that interact with system indices. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface IdentityAwarePlugin { + + /** + * Passes necessary classes for this plugin to operate as an IdentityAwarePlugin + * + * @param pluginSubject A subject for running transport actions in the plugin context for system index + * interaction + */ + default void assignSubject(PluginSubject pluginSubject) {} +} diff --git a/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java b/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java index 410535504f0dd..b40af14231fb9 100644 --- a/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java @@ -8,6 +8,8 @@ package org.opensearch.plugins; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.identity.PluginSubject; import org.opensearch.identity.Subject; import org.opensearch.identity.tokens.TokenManager; @@ -16,17 +18,28 @@ * * @opensearch.experimental */ +@ExperimentalApi public interface IdentityPlugin { /** * Get the current subject. + * * @return Should never return null * */ - public Subject getSubject(); + Subject getCurrentSubject(); /** * Get the Identity Plugin's token manager implementation * @return Should never return null. */ - public TokenManager getTokenManager(); + TokenManager getTokenManager(); + + /** + * Gets a subject corresponding to the passed plugin that can be utilized to perform transport actions + * in the plugin system context + * + * @param plugin The corresponding plugin + * @return Subject corresponding to the plugin + */ + PluginSubject getPluginSubject(Plugin plugin); } diff --git a/server/src/main/java/org/opensearch/rest/RestController.java b/server/src/main/java/org/opensearch/rest/RestController.java index 7d0c1e2260de1..9889f5d67e966 100644 --- a/server/src/main/java/org/opensearch/rest/RestController.java +++ b/server/src/main/java/org/opensearch/rest/RestController.java @@ -58,6 +58,7 @@ import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; import org.opensearch.identity.Subject; +import org.opensearch.identity.UserSubject; import org.opensearch.identity.tokens.AuthToken; import org.opensearch.identity.tokens.RestTokenExtractor; import org.opensearch.usage.UsageService; @@ -593,9 +594,11 @@ private boolean handleAuthenticateUser(final RestRequest request, final RestChan // Authentication did not fail so return true. Authorization is handled at the action level. return true; } - final Subject currentSubject = identityService.getSubject(); - currentSubject.authenticate(token); - logger.debug("Logged in as user " + currentSubject); + final Subject currentSubject = identityService.getCurrentSubject(); + if (currentSubject instanceof UserSubject) { + ((UserSubject) currentSubject).authenticate(token); + logger.debug("Logged in as user " + currentSubject); + } } catch (final Exception e) { try { final BytesRestResponse bytesRestResponse = BytesRestResponse.createSimpleErrorResponse( diff --git a/server/src/test/java/org/opensearch/action/ActionModuleTests.java b/server/src/test/java/org/opensearch/action/ActionModuleTests.java index 8479f011adf48..6b8951dd43d11 100644 --- a/server/src/test/java/org/opensearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/opensearch/action/ActionModuleTests.java @@ -74,6 +74,7 @@ import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; public class ActionModuleTests extends OpenSearchTestCase { public void testSetupActionsContainsKnownBuiltin() { @@ -142,8 +143,8 @@ public void testSetupRestHandlerContainsKnownBuiltin() throws IOException { null, usageService, null, - new IdentityService(Settings.EMPTY, new ArrayList<>()), - new ExtensionsManager(Set.of(), new IdentityService(Settings.EMPTY, List.of())) + new IdentityService(Settings.EMPTY, mock(ThreadPool.class), new ArrayList<>()), + new ExtensionsManager(Set.of(), new IdentityService(Settings.EMPTY, mock(ThreadPool.class), List.of())) ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail diff --git a/server/src/test/java/org/opensearch/bootstrap/IdentityPluginTests.java b/server/src/test/java/org/opensearch/bootstrap/IdentityPluginTests.java index 2129810a99879..d7b9f5917c366 100644 --- a/server/src/test/java/org/opensearch/bootstrap/IdentityPluginTests.java +++ b/server/src/test/java/org/opensearch/bootstrap/IdentityPluginTests.java @@ -15,6 +15,7 @@ import org.opensearch.identity.noop.NoopTokenManager; import org.opensearch.plugins.IdentityPlugin; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; import java.util.List; @@ -24,19 +25,23 @@ public class IdentityPluginTests extends OpenSearchTestCase { public void testSingleIdentityPluginSucceeds() { - IdentityPlugin identityPlugin1 = new NoopIdentityPlugin(); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + IdentityPlugin identityPlugin1 = new NoopIdentityPlugin(threadPool); List pluginList1 = List.of(identityPlugin1); - IdentityService identityService1 = new IdentityService(Settings.EMPTY, pluginList1); - assertTrue(identityService1.getSubject().getPrincipal().getName().equalsIgnoreCase("Unauthenticated")); + IdentityService identityService1 = new IdentityService(Settings.EMPTY, threadPool, pluginList1); + assertTrue(identityService1.getCurrentSubject().getPrincipal().getName().equalsIgnoreCase("Unauthenticated")); assertThat(identityService1.getTokenManager(), is(instanceOf(NoopTokenManager.class))); + terminate(threadPool); } public void testMultipleIdentityPluginsFail() { - IdentityPlugin identityPlugin1 = new NoopIdentityPlugin(); - IdentityPlugin identityPlugin2 = new NoopIdentityPlugin(); - IdentityPlugin identityPlugin3 = new NoopIdentityPlugin(); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + IdentityPlugin identityPlugin1 = new NoopIdentityPlugin(threadPool); + IdentityPlugin identityPlugin2 = new NoopIdentityPlugin(threadPool); + IdentityPlugin identityPlugin3 = new NoopIdentityPlugin(threadPool); List pluginList = List.of(identityPlugin1, identityPlugin2, identityPlugin3); - Exception ex = assertThrows(OpenSearchException.class, () -> new IdentityService(Settings.EMPTY, pluginList)); + Exception ex = assertThrows(OpenSearchException.class, () -> new IdentityService(Settings.EMPTY, threadPool, pluginList)); assert (ex.getMessage().contains("Multiple identity plugins are not supported,")); + terminate(threadPool); } } diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 3c25dbdff3342..5ae1bdce48cd5 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -154,7 +154,7 @@ public List> getExtensionSettings() { new NodeClient(Settings.EMPTY, threadPool), new NoneCircuitBreakerService(), new UsageService(), - new IdentityService(Settings.EMPTY, List.of()) + new IdentityService(Settings.EMPTY, threadPool, List.of()) ); when(actionModule.getDynamicActionRegistry()).thenReturn(mock(DynamicActionRegistry.class)); when(actionModule.getRestController()).thenReturn(restController); @@ -171,7 +171,7 @@ public List> getExtensionSettings() { Collections.emptyList() ); client = new NoOpNodeClient(this.getTestName()); - identityService = new IdentityService(Settings.EMPTY, List.of()); + identityService = new IdentityService(Settings.EMPTY, threadPool, List.of()); } @Override diff --git a/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java index 8b73f2e81972f..7d9ebe1d1e26a 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestRequestTests.java @@ -29,6 +29,7 @@ import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest.Method; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import java.nio.charset.StandardCharsets; import java.security.Principal; @@ -38,6 +39,7 @@ import java.util.Map; import static java.util.Map.entry; +import static org.mockito.Mockito.mock; public class ExtensionRestRequestTests extends OpenSearchTestCase { @@ -72,12 +74,12 @@ public void setUp() throws Exception { userPrincipal = () -> "user1"; expectedHttpVersion = HttpRequest.HttpVersion.HTTP_1_1; extensionTokenProcessor = "placeholder_extension_token_processor"; - identityService = new IdentityService(Settings.EMPTY, List.of()); + identityService = new IdentityService(Settings.EMPTY, mock(ThreadPool.class), List.of()); TokenManager tokenManager = identityService.getTokenManager(); - Subject subject = this.identityService.getSubject(); + Subject subject = this.identityService.getCurrentSubject(); OnBehalfOfClaims claims = new OnBehalfOfClaims("testID", subject.getPrincipal().getName()); expectedRequestIssuerIdentity = identityService.getTokenManager() - .issueOnBehalfOfToken(identityService.getSubject(), claims) + .issueOnBehalfOfToken(identityService.getCurrentSubject(), claims) .asAuthHeaderValue(); } diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java index 0dae0ae1b4e0b..ac818c3bb4a7b 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -121,7 +121,7 @@ public void testRestInitializeExtensionActionResponse() throws Exception { } public void testRestInitializeExtensionActionFailure() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(Set.of(), new IdentityService(Settings.EMPTY, List.of())); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of(), new IdentityService(Settings.EMPTY, threadPool, List.of())); RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"\",\"hostAddress\":\"127.0.0.1\"," @@ -156,7 +156,7 @@ public void testRestInitializeExtensionActionResponseWithAdditionalSettings() th ); ExtensionsManager extensionsManager = new ExtensionsManager( Set.of(boolSetting, stringSetting, intSetting, listSetting), - new IdentityService(Settings.EMPTY, List.of()) + new IdentityService(Settings.EMPTY, threadPool, List.of()) ); ExtensionsManager spy = spy(extensionsManager); @@ -206,7 +206,7 @@ public void testRestInitializeExtensionActionResponseWithAdditionalSettingsUsing ); ExtensionsManager extensionsManager = new ExtensionsManager( Set.of(boolSetting, stringSetting, intSetting, listSetting), - new IdentityService(Settings.EMPTY, List.of()) + new IdentityService(Settings.EMPTY, threadPool, List.of()) ); ExtensionsManager spy = spy(extensionsManager); diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java index 9da976de7d7f6..e9c910ea361fb 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java @@ -122,10 +122,10 @@ public void setup() throws Exception { null, usageService, null, - new IdentityService(Settings.EMPTY, new ArrayList<>()), - new ExtensionsManager(Set.of(), new IdentityService(Settings.EMPTY, List.of())) + new IdentityService(Settings.EMPTY, mock(ThreadPool.class), new ArrayList<>()), + new ExtensionsManager(Set.of(), new IdentityService(Settings.EMPTY, mock(ThreadPool.class), List.of())) ); - identityService = new IdentityService(Settings.EMPTY, new ArrayList<>()); + identityService = new IdentityService(Settings.EMPTY, mock(ThreadPool.class), new ArrayList<>()); dynamicActionRegistry = actionModule.getDynamicActionRegistry(); } diff --git a/server/src/test/java/org/opensearch/identity/noop/NoopPluginSubjectTests.java b/server/src/test/java/org/opensearch/identity/noop/NoopPluginSubjectTests.java new file mode 100644 index 0000000000000..79c26a7eb790d --- /dev/null +++ b/server/src/test/java/org/opensearch/identity/noop/NoopPluginSubjectTests.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity.noop; + +import org.opensearch.common.settings.Settings; +import org.opensearch.identity.IdentityService; +import org.opensearch.identity.NamedPrincipal; +import org.opensearch.identity.PluginSubject; +import org.opensearch.plugins.IdentityAwarePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class NoopPluginSubjectTests extends OpenSearchTestCase { + public static class TestPlugin extends Plugin implements IdentityAwarePlugin { + private PluginSubject subject; + + @Override + public void assignSubject(PluginSubject subject) { + this.subject = subject; + } + + public PluginSubject getSubject() { + return subject; + } + } + + public void testInitializeIdentityAwarePlugin() throws Exception { + ThreadPool threadPool = new TestThreadPool(getTestName()); + IdentityService identityService = new IdentityService(Settings.EMPTY, threadPool, List.of()); + + TestPlugin testPlugin = new TestPlugin(); + identityService.initializeIdentityAwarePlugins(List.of(testPlugin)); + + PluginSubject testPluginSubject = new NoopPluginSubject(threadPool); + assertThat(testPlugin.getSubject().getPrincipal().getName(), equalTo(NamedPrincipal.UNAUTHENTICATED.getName())); + assertThat(testPluginSubject.getPrincipal().getName(), equalTo(NamedPrincipal.UNAUTHENTICATED.getName())); + threadPool.getThreadContext().putHeader("test_header", "foo"); + assertThat(threadPool.getThreadContext().getHeader("test_header"), equalTo("foo")); + testPluginSubject.runAs(() -> { + assertNull(threadPool.getThreadContext().getHeader("test_header")); + return null; + }); + assertThat(threadPool.getThreadContext().getHeader("test_header"), equalTo("foo")); + terminate(threadPool); + } +} diff --git a/server/src/test/java/org/opensearch/rest/RestControllerTests.java b/server/src/test/java/org/opensearch/rest/RestControllerTests.java index b7239e7b59742..ef9257d746573 100644 --- a/server/src/test/java/org/opensearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/opensearch/rest/RestControllerTests.java @@ -61,6 +61,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.client.NoOpNodeClient; import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; import org.junit.After; import org.junit.Before; @@ -114,7 +115,7 @@ public void setup() { // we can do this here only because we know that we don't adjust breaker settings dynamically in the test inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); - identityService = new IdentityService(Settings.EMPTY, List.of()); + identityService = new IdentityService(Settings.EMPTY, mock(ThreadPool.class), List.of()); HttpServerTransport httpServerTransport = new TestHttpServerTransport(); client = new NoOpNodeClient(this.getTestName()); diff --git a/server/src/test/java/org/opensearch/rest/RestHttpResponseHeadersTests.java b/server/src/test/java/org/opensearch/rest/RestHttpResponseHeadersTests.java index 5d677247b8b6d..983121a4f481d 100644 --- a/server/src/test/java/org/opensearch/rest/RestHttpResponseHeadersTests.java +++ b/server/src/test/java/org/opensearch/rest/RestHttpResponseHeadersTests.java @@ -44,6 +44,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.rest.FakeRestChannel; import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; import java.util.ArrayList; @@ -55,6 +56,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; public class RestHttpResponseHeadersTests extends OpenSearchTestCase { @@ -106,7 +108,7 @@ public void testUnsupportedMethodResponseHttpHeader() throws Exception { final Settings settings = Settings.EMPTY; UsageService usageService = new UsageService(); - final IdentityService identityService = new IdentityService(settings, List.of()); + final IdentityService identityService = new IdentityService(settings, mock(ThreadPool.class), List.of()); RestController restController = new RestController( Collections.emptySet(), null, diff --git a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java index 3fb6764846da6..c3cf33f4e9034 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java @@ -75,7 +75,7 @@ public class RestValidateQueryActionTests extends AbstractSearchTestCase { private static NodeClient client = new NodeClient(Settings.EMPTY, threadPool); private static UsageService usageService = new UsageService(); - private static IdentityService identityService = new IdentityService(Settings.EMPTY, List.of()); + private static IdentityService identityService = new IdentityService(Settings.EMPTY, threadPool, List.of()); private static RestController controller = new RestController( emptySet(), null, diff --git a/test/framework/src/main/java/org/opensearch/test/rest/RestActionTestCase.java b/test/framework/src/main/java/org/opensearch/test/rest/RestActionTestCase.java index a77865579f3b3..c7a0fe35b0237 100644 --- a/test/framework/src/main/java/org/opensearch/test/rest/RestActionTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/rest/RestActionTestCase.java @@ -47,6 +47,7 @@ import org.opensearch.tasks.TaskListener; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; import org.junit.After; import org.junit.Before; @@ -56,6 +57,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import static org.mockito.Mockito.mock; + /** * A common base class for Rest*ActionTests. Provides access to a {@link RestController} * that can be used to register individual REST actions, and test request handling. @@ -67,7 +70,7 @@ public abstract class RestActionTestCase extends OpenSearchTestCase { @Before public void setUpController() { verifyingClient = new VerifyingClient(this.getTestName()); - final IdentityService identityService = new IdentityService(Settings.EMPTY, List.of()); + final IdentityService identityService = new IdentityService(Settings.EMPTY, mock(ThreadPool.class), List.of()); controller = new RestController( Collections.emptySet(), null, From f5da8c8d8da644a92bcf5690182742d04b69e039 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 28 Aug 2024 15:05:48 -0400 Subject: [PATCH 5/5] Bump icu4j from 70.1 to 75.1 (#15469) Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + buildSrc/version.properties | 2 +- plugins/analysis-icu/licenses/icu4j-70.1.jar.sha1 | 1 - plugins/analysis-icu/licenses/icu4j-75.1.jar.sha1 | 1 + 4 files changed, 3 insertions(+), 2 deletions(-) delete mode 100644 plugins/analysis-icu/licenses/icu4j-70.1.jar.sha1 create mode 100644 plugins/analysis-icu/licenses/icu4j-75.1.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index fb89dd2723b62..b7e4548100df3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `dnsjava:dnsjava` from 3.6.0 to 3.6.1 ([#15418](https://github.com/opensearch-project/OpenSearch/pull/15418)) - Bump `com.netflix.nebula.ospackage-base` from 11.9.1 to 11.10.0 ([#15419](https://github.com/opensearch-project/OpenSearch/pull/15419)) - Bump `org.roaringbitmap:RoaringBitmap` from 1.1.0 to 1.2.1 ([#15423](https://github.com/opensearch-project/OpenSearch/pull/15423)) +- Bump `icu4j` from 70.1 to 75.1 ([#15469](https://github.com/opensearch-project/OpenSearch/pull/15469)) ### Changed - Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 98f474a7f0b90..0f066de481ea2 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -10,7 +10,7 @@ jts = 1.15.0 jackson = 2.17.2 jackson_databind = 2.17.2 snakeyaml = 2.1 -icu4j = 70.1 +icu4j = 75.1 supercsv = 2.4.0 log4j = 2.21.0 slf4j = 1.7.36 diff --git a/plugins/analysis-icu/licenses/icu4j-70.1.jar.sha1 b/plugins/analysis-icu/licenses/icu4j-70.1.jar.sha1 deleted file mode 100644 index 6fad735235417..0000000000000 --- a/plugins/analysis-icu/licenses/icu4j-70.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -dfa3a1fbc55bf5db8c6e79fc0935ac7ab1202950 \ No newline at end of file diff --git a/plugins/analysis-icu/licenses/icu4j-75.1.jar.sha1 b/plugins/analysis-icu/licenses/icu4j-75.1.jar.sha1 new file mode 100644 index 0000000000000..4ee1fbe23b117 --- /dev/null +++ b/plugins/analysis-icu/licenses/icu4j-75.1.jar.sha1 @@ -0,0 +1 @@ +e8f8dcc2967f5ec2cfae185172293adfa5599b78 \ No newline at end of file