diff --git a/data-prepper-plugins/service-map-stateful/build.gradle b/data-prepper-plugins/service-map-stateful/build.gradle index fa83d4e6bc..ab2300f020 100644 --- a/data-prepper-plugins/service-map-stateful/build.gradle +++ b/data-prepper-plugins/service-map-stateful/build.gradle @@ -19,6 +19,7 @@ dependencies { exclude group: 'com.google.protobuf', module: 'protobuf-java' } implementation libs.protobuf.core + testImplementation project(':data-prepper-test-common') } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfig.java b/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfig.java index 8c337b2737..7f72fb5286 100644 --- a/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfig.java +++ b/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfig.java @@ -5,8 +5,20 @@ package org.opensearch.dataprepper.plugins.processor; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + public class ServiceMapProcessorConfig { - static final String WINDOW_DURATION = "window_duration"; + private static final String WINDOW_DURATION = "window_duration"; static final int DEFAULT_WINDOW_DURATION = 180; static final String DEFAULT_DB_PATH = "data/service-map/"; + + @JsonProperty(WINDOW_DURATION) + @JsonPropertyDescription("Represents the fixed time window, in seconds, " + + "during which service map relationships are evaluated. Default value is 180.") + private int windowDuration = DEFAULT_WINDOW_DURATION; + + public int getWindowDuration() { + return windowDuration; + } } diff --git a/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessor.java b/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessor.java index c02ccb17d6..75041a09b4 100644 --- a/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessor.java +++ b/data-prepper-plugins/service-map-stateful/src/main/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessor.java @@ -6,9 +6,11 @@ package org.opensearch.dataprepper.plugins.processor; import org.apache.commons.codec.DecoderException; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.annotations.SingleThread; -import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding; @@ -40,7 +42,8 @@ import java.util.concurrent.atomic.AtomicInteger; @SingleThread -@DataPrepperPlugin(name = "service_map", deprecatedName = "service_map_stateful", pluginType = Processor.class) +@DataPrepperPlugin(name = "service_map", deprecatedName = "service_map_stateful", pluginType = Processor.class, + pluginConfigurationType = ServiceMapProcessorConfig.class) public class ServiceMapStatefulProcessor extends AbstractProcessor, Record> implements RequiresPeerForwarding { static final String SPANS_DB_SIZE = "spansDbSize"; @@ -75,20 +78,24 @@ public class ServiceMapStatefulProcessor extends AbstractProcessor private final int thisProcessorId; - public ServiceMapStatefulProcessor(final PluginSetting pluginSetting) { - this(pluginSetting.getIntegerOrDefault(ServiceMapProcessorConfig.WINDOW_DURATION, ServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION) * TO_MILLIS, + @DataPrepperPluginConstructor + public ServiceMapStatefulProcessor( + final ServiceMapProcessorConfig serviceMapProcessorConfig, + final PluginMetrics pluginMetrics, + final PipelineDescription pipelineDescription) { + this((long) serviceMapProcessorConfig.getWindowDuration() * TO_MILLIS, new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH), Clock.systemUTC(), - pluginSetting.getNumberOfProcessWorkers(), - pluginSetting); + pipelineDescription.getNumberOfProcessWorkers(), + pluginMetrics); } - public ServiceMapStatefulProcessor(final long windowDurationMillis, + ServiceMapStatefulProcessor(final long windowDurationMillis, final File databasePath, final Clock clock, final int processWorkers, - final PluginSetting pluginSetting) { - super(pluginSetting); + final PluginMetrics pluginMetrics) { + super(pluginMetrics); ServiceMapStatefulProcessor.clock = clock; this.thisProcessorId = processorsCreated.getAndIncrement(); diff --git a/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfigTest.java b/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfigTest.java new file mode 100644 index 0000000000..35ef3b0c07 --- /dev/null +++ b/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapProcessorConfigTest.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugins.processor; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; + +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.opensearch.dataprepper.plugins.processor.ServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION; + +class ServiceMapProcessorConfigTest { + private ServiceMapProcessorConfig serviceMapProcessorConfig; + Random random; + + @BeforeEach + void setUp() { + serviceMapProcessorConfig = new ServiceMapProcessorConfig(); + random = new Random(); + } + + @Test + void testDefaultConfig() { + assertThat(serviceMapProcessorConfig.getWindowDuration(), equalTo(DEFAULT_WINDOW_DURATION)); + } + + @Test + void testGetter() throws NoSuchFieldException, IllegalAccessException { + final int windowDuration = 1 + random.nextInt(300); + ReflectivelySetField.setField( + ServiceMapProcessorConfig.class, + serviceMapProcessorConfig, + "windowDuration", + windowDuration); + assertThat(serviceMapProcessorConfig.getWindowDuration(), equalTo(windowDuration)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessorTest.java b/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessorTest.java index 28789615aa..b565642e19 100644 --- a/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessorTest.java +++ b/data-prepper-plugins/service-map-stateful/src/test/java/org/opensearch/dataprepper/plugins/processor/ServiceMapStatefulProcessorTest.java @@ -14,6 +14,8 @@ import org.mockito.Mockito; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.trace.Span; @@ -43,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.processor.ServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION; public class ServiceMapStatefulProcessorTest { @@ -54,12 +57,20 @@ public class ServiceMapStatefulProcessorTest { private static final String PAYMENT_SERVICE = "PAY"; private static final String CART_SERVICE = "CART"; private PluginSetting pluginSetting; + private PluginMetrics pluginMetrics; + private PipelineDescription pipelineDescription; + private ServiceMapProcessorConfig serviceMapProcessorConfig; @BeforeEach public void setup() throws NoSuchFieldException, IllegalAccessException { resetServiceMapStatefulProcessorStatic(); MetricsTestUtil.initMetrics(); pluginSetting = mock(PluginSetting.class); + pipelineDescription = mock(PipelineDescription.class); + serviceMapProcessorConfig = mock(ServiceMapProcessorConfig.class); + when(serviceMapProcessorConfig.getWindowDuration()).thenReturn(DEFAULT_WINDOW_DURATION); + pluginMetrics = PluginMetrics.fromNames( + "testServiceMapProcessor", "testPipelineName"); when(pluginSetting.getName()).thenReturn("testServiceMapProcessor"); when(pluginSetting.getPipelineName()).thenReturn("testPipelineName"); } @@ -116,13 +127,11 @@ private Set evaluateEdges(Set serv } @Test - public void testPluginSettingConstructor() { - - final PluginSetting pluginSetting = new PluginSetting("testPluginSetting", Collections.emptyMap()); - pluginSetting.setProcessWorkers(4); - pluginSetting.setPipelineName("TestPipeline"); + public void testDataPrepperConstructor() { + when(pipelineDescription.getNumberOfProcessWorkers()).thenReturn(4); //Nothing is accessible to validate, so just verify that no exception is thrown. - final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor(pluginSetting); + final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor( + serviceMapProcessorConfig, pluginMetrics, pipelineDescription); } @Test @@ -132,8 +141,8 @@ public void testTraceGroupsWithEventRecordData() throws Exception { Mockito.when(clock.instant()).thenReturn(Instant.now()); ExecutorService threadpool = Executors.newCachedThreadPool(); final File path = new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH); - final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting); - final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting); + final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics); + final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics); final byte[] rootSpanId1Bytes = ServiceMapTestUtils.getRandomBytes(8); final byte[] rootSpanId2Bytes = ServiceMapTestUtils.getRandomBytes(8); @@ -327,8 +336,8 @@ public void testTraceGroupsWithIsolatedServiceEventRecordData() throws Exception Mockito.when(clock.instant()).thenReturn(Instant.now()); ExecutorService threadpool = Executors.newCachedThreadPool(); final File path = new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH); - final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting); - final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting); + final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics); + final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics); final byte[] rootSpanIdBytes = ServiceMapTestUtils.getRandomBytes(8); final byte[] traceIdBytes = ServiceMapTestUtils.getRandomBytes(16); @@ -383,7 +392,7 @@ public void testTraceGroupsWithIsolatedServiceEventRecordData() throws Exception @Test public void testPrepareForShutdownWithEventRecordData() { final File path = new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH); - final ServiceMapStatefulProcessor serviceMapStateful = new ServiceMapStatefulProcessor(100, path, Clock.systemUTC(), 1, pluginSetting); + final ServiceMapStatefulProcessor serviceMapStateful = new ServiceMapStatefulProcessor(100, path, Clock.systemUTC(), 1, pluginMetrics); final byte[] rootSpanId1Bytes = ServiceMapTestUtils.getRandomBytes(8); final byte[] traceId1Bytes = ServiceMapTestUtils.getRandomBytes(16); @@ -411,11 +420,9 @@ public void testPrepareForShutdownWithEventRecordData() { @Test public void testGetIdentificationKeys() { - final PluginSetting pluginSetting = new PluginSetting("testPluginSetting", Collections.emptyMap()); - pluginSetting.setProcessWorkers(4); - pluginSetting.setPipelineName("TestPipeline"); - - final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor(pluginSetting); + when(pipelineDescription.getNumberOfProcessWorkers()).thenReturn(4); + final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor( + serviceMapProcessorConfig, pluginMetrics, pipelineDescription); final Collection expectedIdentificationKeys = serviceMapStatefulProcessor.getIdentificationKeys(); assertThat(expectedIdentificationKeys, equalTo(Collections.singleton("traceId")));