Skip to content

Commit

Permalink
Adding allowlist setting for ingest-useragent and ingest-geoip proces…
Browse files Browse the repository at this point in the history
…sors (opensearch-project#15325)

* Adding allowlist setting for user-agent, geo-ip and updated tests for ingest-common.

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Remove duplicate test in ingest-common

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Adding changelog

Signed-off-by: Sarat Vemulapalli <[email protected]>

---------

Signed-off-by: Sarat Vemulapalli <[email protected]>
  • Loading branch information
saratvemulapalli authored Aug 22, 2024
1 parent b6b0403 commit d5a6c0b
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.IngestPlugin;
Expand All @@ -62,10 +63,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IngestGeoIpModulePlugin extends Plugin implements IngestPlugin, Closeable {
static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.geoip.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);
public static final Setting<Long> CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

static String[] DEFAULT_DATABASE_FILENAMES = new String[] { "GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb" };
Expand All @@ -74,7 +83,7 @@ public class IngestGeoIpModulePlugin extends Plugin implements IngestPlugin, Clo

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(CACHE_SIZE);
return Arrays.asList(CACHE_SIZE, PROCESSORS_ALLOWLIST_SETTING);
}

@Override
Expand All @@ -90,7 +99,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)));
return filterForAllowlistSetting(
parameters.env.settings(),
Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)))
);
}

/*
Expand Down Expand Up @@ -175,6 +187,30 @@ public void close() throws IOException {
}
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@
import com.maxmind.geoip2.model.AbstractResponse;

import org.opensearch.common.network.InetAddresses;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.geoip.IngestGeoIpModulePlugin.GeoIpCache;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -77,4 +89,87 @@ public void testInvalidInit() {
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new GeoIpCache(-1));
assertEquals("geoip max cache size must be 0 or greater", ex.getMessage());
}

public void testAllowList() throws IOException {
runAllowListTest(List.of());
runAllowListTest(List.of("geoip"));
}

public void testInvalidAllowList() throws IOException {
List<String> invalidAllowList = List.of("set");
Settings.Builder settingsBuilder = Settings.builder()
.putList(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), invalidAllowList);
createDb(settingsBuilder);
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settingsBuilder.build()))
);
assertEquals(
"Processor(s) "
+ invalidAllowList
+ " were defined in ["
+ IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist",
e.getMessage()
);
}
}

public void testAllowListNotSpecified() throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.remove(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
createDb(settingsBuilder);
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
final Set<String> expected = Set.of("geoip");
assertEquals(expected, plugin.getProcessors(createParameters(settingsBuilder.build())).keySet());
}
}

private void runAllowListTest(List<String> allowList) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
createDb(settingsBuilder);
final Settings settings = settingsBuilder.putList(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowList).build();
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
assertEquals(Set.copyOf(allowList), plugin.getProcessors(createParameters(settings)).keySet());
}
}

private void createDb(Settings.Builder settingsBuilder) throws IOException {
Path configDir = createTempDir();
Path userAgentConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(userAgentConfigDir);
settingsBuilder.put("ingest.geoip.database_path", configDir).put("path.home", configDir);
try {
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
configDir.resolve("GeoLite2-City.mmdb")
);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
configDir.resolve("GeoLite2-Country.mmdb")
);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
configDir.resolve("GeoLite2-ASN.mmdb")
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(settings),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.ingest.useragent;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.Plugin;
Expand All @@ -47,10 +48,19 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IngestUserAgentModulePlugin extends Plugin implements IngestPlugin {

static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.useragent.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);
private final Setting<Long> CACHE_SIZE_SETTING = Setting.longSetting(
"ingest.user_agent.cache_size",
1000,
Expand All @@ -77,7 +87,34 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(UserAgentProcessor.TYPE, new UserAgentProcessor.Factory(userAgentParsers));
return filterForAllowlistSetting(
parameters.env.settings(),
Collections.singletonMap(UserAgentProcessor.TYPE, new UserAgentProcessor.Factory(userAgentParsers))
);
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

static Map<String, UserAgentParser> createUserAgentParsers(Path userAgentConfigDirectory, UserAgentCache cache) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.useragent;

import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

public class IngestUserAgentModulePluginTests extends OpenSearchTestCase {
private Settings.Builder settingsBuilder;

@Before
public void setup() throws IOException {
Path configDir = createTempDir();
Path userAgentConfigDir = configDir.resolve("ingest-user-agent");
Files.createDirectories(userAgentConfigDir);
settingsBuilder = Settings.builder().put("ingest-user-agent", configDir).put("path.home", configDir);

// Copy file, leaving out the device parsers at the end
String regexWithoutDevicesFilename = "regexes_without_devices.yml";
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(UserAgentProcessor.class.getResourceAsStream("/regexes.yml"), StandardCharsets.UTF_8)
);
BufferedWriter writer = Files.newBufferedWriter(userAgentConfigDir.resolve(regexWithoutDevicesFilename));
) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("device_parsers:")) {
break;
}

writer.write(line);
writer.newLine();
}
}
}

public void testAllowList() throws IOException {
runAllowListTest(List.of());
runAllowListTest(List.of("user_agent"));
}

public void testInvalidAllowList() throws IOException {
List<String> invalidAllowList = List.of("set");
final Settings settings = settingsBuilder.putList(
IngestUserAgentModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(),
invalidAllowList
).build();
try (IngestUserAgentModulePlugin plugin = new IngestUserAgentModulePlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settings))
);
assertEquals(
"Processor(s) "
+ invalidAllowList
+ " were defined in ["
+ IngestUserAgentModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist",
e.getMessage()
);
}
}

public void testAllowListNotSpecified() throws IOException {
settingsBuilder.remove(IngestUserAgentModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
try (IngestUserAgentModulePlugin plugin = new IngestUserAgentModulePlugin()) {
final Set<String> expected = Set.of("user_agent");
assertEquals(expected, plugin.getProcessors(createParameters(settingsBuilder.build())).keySet());
}
}

private void runAllowListTest(List<String> allowList) throws IOException {
final Settings settings = settingsBuilder.putList(IngestUserAgentModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowList)
.build();
try (IngestUserAgentModulePlugin plugin = new IngestUserAgentModulePlugin()) {
assertEquals(Set.copyOf(allowList), plugin.getProcessors(createParameters(settings)).keySet());
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(settings),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}

0 comments on commit d5a6c0b

Please sign in to comment.