Skip to content

Commit

Permalink
Merge branch 'main' into plugin-aware-thread-context
Browse files Browse the repository at this point in the history
  • Loading branch information
cwperks committed Aug 28, 2024
2 parents 653beb4 + 8d17c8d commit a872e7c
Show file tree
Hide file tree
Showing 158 changed files with 5,172 additions and 542 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: actions/checkout@v4
- name: Get changed files
id: changed-files-specific
uses: tj-actions/changed-files@v44
uses: tj-actions/changed-files@v45
with:
files_ignore: |
release-notes/*.md
Expand Down
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
- [Workload Management] Add Delete QueryGroup API Logic ([#14735](https://github.com/opensearch-project/OpenSearch/pull/14735))
- [Streaming Indexing] Enhance RestClient with a new streaming API support ([#14437](https://github.com/opensearch-project/OpenSearch/pull/14437))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))
Expand All @@ -22,15 +23,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.16.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861), [#15205](https://github.com/opensearch-project/OpenSearch/pull/15205))
- OpenJDK Update (July 2024 Patch releases) ([#14998](https://github.com/opensearch-project/OpenSearch/pull/14998))
- Bump `com.microsoft.azure:msal4j` from 1.16.1 to 1.16.2 ([#14995](https://github.com/opensearch-project/OpenSearch/pull/14995))
- Bump `com.microsoft.azure:msal4j` from 1.16.1 to 1.17.0 ([#14995](https://github.com/opensearch-project/OpenSearch/pull/14995), [#15420](https://github.com/opensearch-project/OpenSearch/pull/15420))
- Bump `actions/github-script` from 6 to 7 ([#14997](https://github.com/opensearch-project/OpenSearch/pull/14997))
- Bump `org.tukaani:xz` from 1.9 to 1.10 ([#15110](https://github.com/opensearch-project/OpenSearch/pull/15110))
- Bump `actions/setup-java` from 1 to 4 ([#15104](https://github.com/opensearch-project/OpenSearch/pull/15104))
Expand All @@ -45,6 +50,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.azure:azure-core-http-netty` from 1.15.1 to 1.15.3 ([#15300](https://github.com/opensearch-project/OpenSearch/pull/15300))
- Bump `com.gradle.develocity` from 3.17.6 to 3.18 ([#15297](https://github.com/opensearch-project/OpenSearch/pull/15297))
- Bump `commons-cli:commons-cli` from 1.8.0 to 1.9.0 ([#15298](https://github.com/opensearch-project/OpenSearch/pull/15298))
- Bump `opentelemetry` from 1.40.0 to 1.41.0 ([#15361](https://github.com/opensearch-project/OpenSearch/pull/15361))
- Bump `opentelemetry-semconv` from 1.26.0-alpha to 1.27.0-alpha ([#15361](https://github.com/opensearch-project/OpenSearch/pull/15361))
- Bump `tj-actions/changed-files` from 44 to 45 ([#15422](https://github.com/opensearch-project/OpenSearch/pull/15422))
- Bump `dnsjava:dnsjava` from 3.6.0 to 3.6.1 ([#15418](https://github.com/opensearch-project/OpenSearch/pull/15418))
- Bump `com.netflix.nebula.ospackage-base` from 11.9.1 to 11.10.0 ([#15419](https://github.com/opensearch-project/OpenSearch/pull/15419))
- Bump `org.roaringbitmap:RoaringBitmap` from 1.1.0 to 1.2.1 ([#15423](https://github.com/opensearch-project/OpenSearch/pull/15423))

### Changed
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))
Expand All @@ -64,6 +75,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
- Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194))
- Fix incorrect parameter names in MinHash token filter configuration handling ([#15233](https://github.com/opensearch-project/OpenSearch/pull/15233))
- Fix indexing error when flat_object field is explicitly null ([#15375](https://github.com/opensearch-project/OpenSearch/pull/15375))
- Fix split response processor not included in allowlist ([#15393](https://github.com/opensearch-project/OpenSearch/pull/15393))
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))

### Security

Expand Down
6 changes: 3 additions & 3 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ reactor_netty = 1.1.22
reactor = 3.5.20

# client dependencies
httpclient5 = 5.2.3
httpclient5 = 5.3.1
httpcore5 = 5.2.5
httpclient = 4.5.14
httpcore = 4.4.16
Expand Down Expand Up @@ -74,5 +74,5 @@ jzlib = 1.1.3
resteasy = 6.2.4.Final

# opentelemetry dependencies
opentelemetry = 1.40.0
opentelemetrysemconv = 1.26.0-alpha
opentelemetry = 1.41.0
opentelemetrysemconv = 1.27.0-alpha
1 change: 0 additions & 1 deletion client/rest/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/rest/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
50 changes: 49 additions & 1 deletion client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.zip.GZIPOutputStream;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

Expand Down Expand Up @@ -416,7 +417,12 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
try {
final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message);
if (responseOrResponseException.responseException == null) {
return Mono.just(message);
return Mono.just(
new Message<>(
message.getHead(),
Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b)))
)
);
} else {
if (nodeTuple.nodes.hasNext()) {
return Mono.from(streamRequest(nodeTuple, request));
Expand All @@ -431,6 +437,48 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
});
}

/**
* Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence.
* @param b {@link ByteBuffer} to split
* @return individual chunks
*/
private static Collection<ByteBuffer> frame(ByteBuffer b) {
final Collection<ByteBuffer> buffers = new ArrayList<>();

int position = b.position();
while (b.hasRemaining()) {
// Skip the chunk separator when it comes right at the beginning
if (b.get() == '\r' && b.hasRemaining() && b.position() > 1) {
if (b.get() == '\n') {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

// Do not copy the '\r\n' sequence
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2));
position = b.position();
}
}
}

if (buffers.isEmpty()) {
return Collections.singleton(b);
}

// Copy last chunk
if (position != b.position()) {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length));
}

return buffers;
}

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
Expand Down
1 change: 0 additions & 1 deletion client/sniffer/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/sniffer/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
2 changes: 1 addition & 1 deletion distribution/packages/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import java.util.regex.Pattern
*/

plugins {
id "com.netflix.nebula.ospackage-base" version "11.9.1"
id "com.netflix.nebula.ospackage-base" version "11.10.0"
}

void addProcessFilesTask(String type, boolean jdk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.IngestPlugin;
Expand All @@ -62,10 +63,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IngestGeoIpModulePlugin extends Plugin implements IngestPlugin, Closeable {
static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.geoip.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);
public static final Setting<Long> CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

static String[] DEFAULT_DATABASE_FILENAMES = new String[] { "GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb" };
Expand All @@ -74,7 +83,7 @@ public class IngestGeoIpModulePlugin extends Plugin implements IngestPlugin, Clo

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(CACHE_SIZE);
return Arrays.asList(CACHE_SIZE, PROCESSORS_ALLOWLIST_SETTING);
}

@Override
Expand All @@ -90,7 +99,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)));
return filterForAllowlistSetting(
parameters.env.settings(),
Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)))
);
}

/*
Expand Down Expand Up @@ -175,6 +187,30 @@ public void close() throws IOException {
}
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@
import com.maxmind.geoip2.model.AbstractResponse;

import org.opensearch.common.network.InetAddresses;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.geoip.IngestGeoIpModulePlugin.GeoIpCache;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -77,4 +89,87 @@ public void testInvalidInit() {
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new GeoIpCache(-1));
assertEquals("geoip max cache size must be 0 or greater", ex.getMessage());
}

public void testAllowList() throws IOException {
runAllowListTest(List.of());
runAllowListTest(List.of("geoip"));
}

public void testInvalidAllowList() throws IOException {
List<String> invalidAllowList = List.of("set");
Settings.Builder settingsBuilder = Settings.builder()
.putList(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), invalidAllowList);
createDb(settingsBuilder);
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settingsBuilder.build()))
);
assertEquals(
"Processor(s) "
+ invalidAllowList
+ " were defined in ["
+ IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist",
e.getMessage()
);
}
}

public void testAllowListNotSpecified() throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.remove(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
createDb(settingsBuilder);
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
final Set<String> expected = Set.of("geoip");
assertEquals(expected, plugin.getProcessors(createParameters(settingsBuilder.build())).keySet());
}
}

private void runAllowListTest(List<String> allowList) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
createDb(settingsBuilder);
final Settings settings = settingsBuilder.putList(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowList).build();
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
assertEquals(Set.copyOf(allowList), plugin.getProcessors(createParameters(settings)).keySet());
}
}

private void createDb(Settings.Builder settingsBuilder) throws IOException {
Path configDir = createTempDir();
Path userAgentConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(userAgentConfigDir);
settingsBuilder.put("ingest.geoip.database_path", configDir).put("path.home", configDir);
try {
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
configDir.resolve("GeoLite2-City.mmdb")
);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
configDir.resolve("GeoLite2-Country.mmdb")
);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
configDir.resolve("GeoLite2-ASN.mmdb")
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(settings),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}
Loading

0 comments on commit a872e7c

Please sign in to comment.