Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabh6788 committed Mar 29, 2024
2 parents 8f07f5b + eba3b57 commit 46f32ae
Show file tree
Hide file tree
Showing 61 changed files with 1,870 additions and 355 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand All @@ -113,6 +117,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
- Improve built-in secure transports support ([#12907](https://github.com/opensearch-project/OpenSearch/pull/12907))

### Deprecated

Expand Down
12 changes: 7 additions & 5 deletions distribution/src/bin/opensearch
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ fi

# get keystore password before setting java options to avoid
# conflicting GC configurations for the keystore tools
unset KEYSTORE_PASSWORD
KEYSTORE_PASSWORD=
if [[ $CHECK_KEYSTORE = true ]] \
&& bin/opensearch-keystore has-passwd --silent
then
if ! read -s -r -p "OpenSearch keystore password: " KEYSTORE_PASSWORD ; then
echo "Failed to read keystore password on console" 1>&2
exit 1
if [[ ! -z "${KEYSTORE_PASSWORD}" ]]; then
echo "Using value of KEYSTORE_PASSWORD from the environment"
else
if ! read -s -r -p "OpenSearch keystore password: " KEYSTORE_PASSWORD ; then
echo "Failed to read keystore password on console" 1>&2
exit 1
fi
fi
fi

Expand Down
13 changes: 8 additions & 5 deletions distribution/src/bin/opensearch.bat
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ if not exist "%SERVICE_LOG_DIR%" (
mkdir "%SERVICE_LOG_DIR%"
)

SET KEYSTORE_PASSWORD=
IF "%checkpassword%"=="Y" (
CALL "%~dp0opensearch-keystore.bat" has-passwd --silent
IF !ERRORLEVEL! EQU 0 (
SET /P KEYSTORE_PASSWORD=OpenSearch keystore password:
IF !ERRORLEVEL! NEQ 0 (
ECHO Failed to read keystore password on standard input
EXIT /B !ERRORLEVEL!
if defined KEYSTORE_PASSWORD (
ECHO Using value of KEYSTORE_PASSWORD from the environment
) else (
SET /P KEYSTORE_PASSWORD=OpenSearch keystore password:
IF !ERRORLEVEL! NEQ 0 (
ECHO Failed to read keystore password on standard input
EXIT /B !ERRORLEVEL!
)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.ingest.common;

import org.opensearch.common.network.InetAddresses;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
Expand Down Expand Up @@ -118,6 +119,19 @@ public Object convert(Object value) {
return value.toString();
}
},
IP {
@Override
public Object convert(Object value) {
// If the value is a valid ipv4/ipv6 address, we return the original value directly because IpFieldType
// can accept string value, this is simpler than we return an InetAddress object which needs to do more
// work such as serialization
if (value instanceof String && InetAddresses.isInetAddress(value.toString())) {
return value;
} else {
throw new IllegalArgumentException("[" + value + "] is not a valid ipv4/ipv6 address");
}
}
},
AUTO {
@Override
public Object convert(Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,29 @@ public void testTargetField() throws Exception {
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(String.valueOf(randomInt)));
assertThat(ingestDocument.getFieldValue(targetField, Integer.class), equalTo(randomInt));
}

public void testConvertIP() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String validIPString;
if (randomBoolean()) {
validIPString = "1.2.3.4";
} else {
validIPString = "::1";
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, validIPString);

Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, fieldName, fieldName, Type.IP, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(validIPString));

String invalidIPString = randomAlphaOfLength(10);
fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, invalidIPString);
Processor processorWithInvalidIP = new ConvertProcessor(randomAlphaOfLength(10), null, fieldName, fieldName, Type.IP, false);
try {
processorWithInvalidIP.execute(ingestDocument);
fail("processor execute should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("[" + invalidIPString + "] is not a valid ipv4/ipv6 address"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "1"
ignore: 404

---
"Test convert processor with ip type":
- skip:
version: " - 2.13.99"
reason: "introduced in 2.14.0"
- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"convert" : {
"field" : "raw_ip",
"type": "ip"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /\[1.1.1.\] is not a valid ipv4\/ipv6 address/
index:
index: test
id: 1
pipeline: "1"
body: {
raw_ip: "1.1.1."
}

- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"convert" : {
"field" : "raw_ip",
"target_field" : "ip_field",
"type" : "ip",
"ignore_failure" : true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "1"
body: {
raw_ip: "1.1.1."
}
- do:
get:
index: test
id: 1
- match: { _source: { raw_ip: "1.1.1."} }

- do:
index:
index: test
id: 1
pipeline: "1"
body: {
raw_ip: "1.1.1.1"
}
- do:
get:
index: test
id: 1
- match: { _source: { raw_ip: "1.1.1.1", ip_field: "1.1.1.1"} }
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,27 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.netty4.Netty4HttpChannel;
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.plugins.SecureTransportSettingsProvider;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.plugins.TransportExceptionHandler;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.transport.TransportAdapterProvider;
import org.opensearch.transport.netty4.ssl.SslUtils;

import javax.net.ssl.SSLEngine;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
Expand All @@ -59,9 +67,14 @@
* @see <a href="https://github.com/opensearch-project/security/blob/d526c9f6c2a438c14db8b413148204510b9fe2e2/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java">SecuritySSLNettyHttpServerTransport</a>
*/
public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport {
public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";
public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor";

private static final Logger logger = LogManager.getLogger(SecureNetty4HttpServerTransport.class);
private final SecureTransportSettingsProvider secureTransportSettingsProvider;
private final SecureTransportSettingsProvider.ServerExceptionHandler exceptionHandler;
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
private final TransportExceptionHandler exceptionHandler;
private final ChannelInboundHandlerAdapter headerVerifier;
private final TransportAdapterProvider<HttpServerTransport> decompressorProvider;

public SecureNetty4HttpServerTransport(
final Settings settings,
Expand All @@ -72,7 +85,7 @@ public SecureNetty4HttpServerTransport(
final Dispatcher dispatcher,
final ClusterSettings clusterSettings,
final SharedGroupFactory sharedGroupFactory,
final SecureTransportSettingsProvider secureTransportSettingsProvider,
final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
final Tracer tracer
) {
super(
Expand All @@ -86,9 +99,45 @@ public SecureNetty4HttpServerTransport(
sharedGroupFactory,
tracer
);
this.secureTransportSettingsProvider = secureTransportSettingsProvider;
this.exceptionHandler = secureTransportSettingsProvider.buildHttpServerExceptionHandler(settings, this)
.orElse(SecureTransportSettingsProvider.ServerExceptionHandler.NOOP);

this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
this.exceptionHandler = secureHttpTransportSettingsProvider.buildHttpServerExceptionHandler(settings, this)
.orElse(TransportExceptionHandler.NOOP);

final List<ChannelInboundHandlerAdapter> headerVerifiers = secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(
settings
)
.stream()
.filter(p -> REQUEST_HEADER_VERIFIER.equalsIgnoreCase(p.name()))
.map(p -> p.create(settings, this, ChannelInboundHandlerAdapter.class))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

if (headerVerifiers.size() > 1) {
throw new IllegalArgumentException("Cannot have more than one header verifier configured, supplied " + headerVerifiers.size());
}

final Optional<TransportAdapterProvider<HttpServerTransport>> decompressorProviderOpt = secureHttpTransportSettingsProvider
.getHttpTransportAdapterProviders(settings)
.stream()
.filter(p -> REQUEST_DECOMPRESSOR.equalsIgnoreCase(p.name()))
.findFirst();
// There could be multiple request decompressor providers configured, using the first one
decompressorProviderOpt.ifPresent(p -> logger.debug("Using request decompressor provider: {}", p));

this.headerVerifier = headerVerifiers.isEmpty() ? null : headerVerifiers.get(0);
this.decompressorProvider = decompressorProviderOpt.orElseGet(() -> new TransportAdapterProvider<HttpServerTransport>() {
@Override
public String name() {
return REQUEST_DECOMPRESSOR;
}

@Override
public <C> Optional<C> create(Settings settings, HttpServerTransport transport, Class<C> adapterClass) {
return Optional.empty();
}
});
}

@Override
Expand Down Expand Up @@ -152,7 +201,7 @@ protected SslHttpChannelHandler(final Netty4HttpServerTransport transport, final
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);

final SSLEngine sslEngine = secureTransportSettingsProvider.buildSecureHttpServerEngine(
final SSLEngine sslEngine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(
settings,
SecureNetty4HttpServerTransport.this
).orElseGet(SslUtils::createDefaultServerSSLEngine);
Expand All @@ -166,4 +215,17 @@ protected void configurePipeline(Channel ch) {
ch.pipeline().addLast(new Http2OrHttpHandler());
}
}

protected ChannelInboundHandlerAdapter createHeaderVerifier() {
if (headerVerifier != null) {
return headerVerifier;
} else {
return super.createHeaderVerifier();
}
}

@Override
protected ChannelInboundHandlerAdapter createDecompressor() {
return decompressorProvider.create(settings, this, ChannelInboundHandlerAdapter.class).orElseGet(super::createDecompressor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.http.netty4.ssl.SecureNetty4HttpServerTransport;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.plugins.SecureTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -160,7 +161,7 @@ public Map<String, Supplier<HttpServerTransport>> getSecureHttpTransports(
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings,
SecureTransportSettingsProvider secureTransportSettingsProvider,
SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
Tracer tracer
) {
return Collections.singletonMap(
Expand All @@ -174,7 +175,7 @@ public Map<String, Supplier<HttpServerTransport>> getSecureHttpTransports(
dispatcher,
clusterSettings,
getSharedGroupFactory(settings),
secureTransportSettingsProvider,
secureHttpTransportSettingsProvider,
tracer
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.SecureTransportSettingsProvider;
import org.opensearch.plugins.TransportExceptionHandler;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class SecureNetty4Transport extends Netty4Transport {

private static final Logger logger = LogManager.getLogger(SecureNetty4Transport.class);
private final SecureTransportSettingsProvider secureTransportSettingsProvider;
private final SecureTransportSettingsProvider.ServerExceptionHandler exceptionHandler;
private final TransportExceptionHandler exceptionHandler;

public SecureNetty4Transport(
final Settings settings,
Expand Down Expand Up @@ -100,7 +101,7 @@ public SecureNetty4Transport(

this.secureTransportSettingsProvider = secureTransportSettingsProvider;
this.exceptionHandler = secureTransportSettingsProvider.buildServerTransportExceptionHandler(settings, this)
.orElse(SecureTransportSettingsProvider.ServerExceptionHandler.NOOP);
.orElse(TransportExceptionHandler.NOOP);
}

@Override
Expand Down
Loading

0 comments on commit 46f32ae

Please sign in to comment.