Skip to content

Commit

Permalink
Merge branch 'main' into pointrange_optimization
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi authored Aug 28, 2024
2 parents 900a685 + f5da8c8 commit c238f62
Show file tree
Hide file tree
Showing 70 changed files with 1,226 additions and 129 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))

### Dependencies
Expand Down Expand Up @@ -55,6 +58,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `dnsjava:dnsjava` from 3.6.0 to 3.6.1 ([#15418](https://github.com/opensearch-project/OpenSearch/pull/15418))
- Bump `com.netflix.nebula.ospackage-base` from 11.9.1 to 11.10.0 ([#15419](https://github.com/opensearch-project/OpenSearch/pull/15419))
- Bump `org.roaringbitmap:RoaringBitmap` from 1.1.0 to 1.2.1 ([#15423](https://github.com/opensearch-project/OpenSearch/pull/15423))
- Bump `icu4j` from 70.1 to 75.1 ([#15469](https://github.com/opensearch-project/OpenSearch/pull/15469))

### Changed
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))
Expand Down
4 changes: 2 additions & 2 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jts = 1.15.0
jackson = 2.17.2
jackson_databind = 2.17.2
snakeyaml = 2.1
icu4j = 70.1
icu4j = 75.1
supercsv = 2.4.0
log4j = 2.21.0
slf4j = 1.7.36
Expand All @@ -37,7 +37,7 @@ reactor_netty = 1.1.22
reactor = 3.5.20

# client dependencies
httpclient5 = 5.2.3
httpclient5 = 5.3.1
httpcore5 = 5.2.5
httpclient = 4.5.14
httpcore = 4.4.16
Expand Down
1 change: 0 additions & 1 deletion client/rest/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/rest/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
50 changes: 49 additions & 1 deletion client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.zip.GZIPOutputStream;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

Expand Down Expand Up @@ -416,7 +417,12 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
try {
final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message);
if (responseOrResponseException.responseException == null) {
return Mono.just(message);
return Mono.just(
new Message<>(
message.getHead(),
Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b)))
)
);
} else {
if (nodeTuple.nodes.hasNext()) {
return Mono.from(streamRequest(nodeTuple, request));
Expand All @@ -431,6 +437,48 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
});
}

/**
* Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence.
* @param b {@link ByteBuffer} to split
* @return individual chunks
*/
private static Collection<ByteBuffer> frame(ByteBuffer b) {
final Collection<ByteBuffer> buffers = new ArrayList<>();

int position = b.position();
while (b.hasRemaining()) {
// Skip the chunk separator when it comes right at the beginning
if (b.get() == '\r' && b.hasRemaining() && b.position() > 1) {
if (b.get() == '\n') {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

// Do not copy the '\r\n' sequence
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2));
position = b.position();
}
}
}

if (buffers.isEmpty()) {
return Collections.singleton(b);
}

// Copy last chunk
if (position != b.position()) {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length));
}

return buffers;
}

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
Expand Down
1 change: 0 additions & 1 deletion client/sniffer/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/sniffer/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
1 change: 0 additions & 1 deletion plugins/analysis-icu/licenses/icu4j-70.1.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/analysis-icu/licenses/icu4j-75.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e8f8dcc2967f5ec2cfae185172293adfa5599b78
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,43 @@
import org.apache.logging.log4j.Logger;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.mgt.SecurityManager;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.identity.PluginSubject;
import org.opensearch.identity.Subject;
import org.opensearch.identity.tokens.TokenManager;
import org.opensearch.plugins.IdentityPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
* Identity implementation with Shiro
*
* @opensearch.experimental
*/
@ExperimentalApi
public final class ShiroIdentityPlugin extends Plugin implements IdentityPlugin {
private Logger log = LogManager.getLogger(this.getClass());

private final Settings settings;
private final ShiroTokenManager authTokenHandler;

private ThreadPool threadPool;

/**
* Create a new instance of the Shiro Identity Plugin
*
Expand All @@ -42,13 +62,31 @@ public ShiroIdentityPlugin(final Settings settings) {
SecurityUtils.setSecurityManager(securityManager);
}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver expressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.threadPool = threadPool;
return Collections.emptyList();
}

/**
* Return a Shiro Subject based on the provided authTokenHandler and current subject
*
* @return The current subject
*/
@Override
public Subject getSubject() {
public Subject getCurrentSubject() {
return new ShiroSubject(authTokenHandler, SecurityUtils.getSubject());
}

Expand All @@ -61,4 +99,9 @@ public Subject getSubject() {
public TokenManager getTokenManager() {
return this.authTokenHandler;
}

@Override
public PluginSubject getPluginSubject(Plugin plugin) {
return new ShiroPluginSubject(threadPool);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.identity.shiro;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.identity.NamedPrincipal;
import org.opensearch.identity.PluginSubject;
import org.opensearch.threadpool.ThreadPool;

import java.security.Principal;
import java.util.concurrent.Callable;

/**
* Implementation of subject that is always authenticated
* <p>
* This class and related classes in this package will not return nulls or fail permissions checks
*
* This class is used by the ShiroIdentityPlugin to initialize IdentityAwarePlugins
*
* @opensearch.experimental
*/
@ExperimentalApi
public class ShiroPluginSubject implements PluginSubject {
private final ThreadPool threadPool;

ShiroPluginSubject(ThreadPool threadPool) {
super();
this.threadPool = threadPool;
}

@Override
public Principal getPrincipal() {
return NamedPrincipal.UNAUTHENTICATED;
}

@Override
public <T> T runAs(Callable<T> callable) throws Exception {
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
return callable.call();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.identity.shiro;

import org.opensearch.identity.Subject;
import org.opensearch.identity.UserSubject;
import org.opensearch.identity.tokens.AuthToken;

import java.security.Principal;
Expand All @@ -19,7 +20,7 @@
*
* @opensearch.experimental
*/
public class ShiroSubject implements Subject {
public class ShiroSubject implements UserSubject {
private final ShiroTokenManager authTokenHandler;
private final org.apache.shiro.subject.Subject shiroSubject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.identity.IdentityService;
import org.opensearch.plugins.IdentityPlugin;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;

import java.util.List;

Expand All @@ -24,19 +25,23 @@
public class ShiroIdentityPluginTests extends OpenSearchTestCase {

public void testSingleIdentityPluginSucceeds() {
TestThreadPool threadPool = new TestThreadPool(getTestName());
IdentityPlugin identityPlugin1 = new ShiroIdentityPlugin(Settings.EMPTY);
List<IdentityPlugin> pluginList1 = List.of(identityPlugin1);
IdentityService identityService1 = new IdentityService(Settings.EMPTY, pluginList1);
IdentityService identityService1 = new IdentityService(Settings.EMPTY, threadPool, pluginList1);
assertThat(identityService1.getTokenManager(), is(instanceOf(ShiroTokenManager.class)));
terminate(threadPool);
}

public void testMultipleIdentityPluginsFail() {
TestThreadPool threadPool = new TestThreadPool(getTestName());
IdentityPlugin identityPlugin1 = new ShiroIdentityPlugin(Settings.EMPTY);
IdentityPlugin identityPlugin2 = new ShiroIdentityPlugin(Settings.EMPTY);
IdentityPlugin identityPlugin3 = new ShiroIdentityPlugin(Settings.EMPTY);
List<IdentityPlugin> pluginList = List.of(identityPlugin1, identityPlugin2, identityPlugin3);
Exception ex = assertThrows(OpenSearchException.class, () -> new IdentityService(Settings.EMPTY, pluginList));
Exception ex = assertThrows(OpenSearchException.class, () -> new IdentityService(Settings.EMPTY, threadPool, pluginList));
assert (ex.getMessage().contains("Multiple identity plugins are not supported,"));
terminate(threadPool);
}

}
Loading

0 comments on commit c238f62

Please sign in to comment.