From c1133911d533af2c507450df9dca8092ac3bf6f3 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 24 Sep 2024 11:02:27 -0400 Subject: [PATCH] GH-245: Expose customizers for KCL configs Fixes: #245 Issue link: https://github.com/spring-projects/spring-integration-aws/issues/245 * Add something like `setLeaseManagementConfigCustomizer(Consumer leaseManagementConfigCustomizer)` to the `KclMessageDrivenChannelAdapter` and call them before creating a `Scheduler` * Also add a simple `emptyRecordList` property for the `ProcessorConfig` --- README.md | 2 +- .../KclMessageDrivenChannelAdapter.java | 103 ++++++++++++++++-- .../KclMessageDrivenChannelAdapterTests.java | 42 +++++-- 3 files changed, 128 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index be13e4ee..62775647 100644 --- a/README.md +++ b/README.md @@ -435,7 +435,7 @@ For example, users may want to fully read any parent shards before starting to r return openShards.stream() .filter(shard -> !openShardIds.contains(shard.getParentShardId()) && !openShardIds.contains(shard.getAdjacentParentShardId())) - .collect(Collectors.toList()); + .toList(); } ``` diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index f138a5f7..b521860e 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -21,7 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; +import java.util.function.Consumer; import javax.annotation.Nullable; @@ -40,10 +40,12 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.exceptions.ThrottlingException; +import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; @@ -55,6 +57,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -151,6 +154,20 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport private MetricsLevel metricsLevel = MetricsLevel.DETAILED; + private Consumer coordinatorConfigCustomizer = (config) -> { + }; + + private Consumer lifecycleConfigCustomizer = (config) -> { + }; + + private Consumer metricsConfigCustomizer = (config) -> { + }; + + private Consumer leaseManagementConfigCustomizer = (config) -> { + }; + + private boolean emptyRecordList; + public KclMessageDrivenChannelAdapter(String... streams) { this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams); } @@ -283,9 +300,70 @@ public void setMetricsLevel(MetricsLevel metricsLevel) { this.metricsLevel = metricsLevel; } + /** + * Set a {@link Consumer} to configure a {@link CoordinatorConfig}. + * @param coordinatorConfigCustomizer the {@link Consumer} to configure a {@link CoordinatorConfig}. + * @since 3.0.8 + * @see CoordinatorConfig + */ + public void setCoordinatorConfigCustomizer(Consumer coordinatorConfigCustomizer) { + Assert.notNull(coordinatorConfigCustomizer, "'coordinatorConfigCustomizer' must not be null"); + this.coordinatorConfigCustomizer = coordinatorConfigCustomizer; + } + + /** + * Set a {@link Consumer} to configure a {@link LifecycleConfig}. + * @param lifecycleConfigCustomizer the {@link Consumer} to configure a {@link LifecycleConfig}. + * @since 3.0.8 + * @see LifecycleConfig + */ + public void setLifecycleConfigCustomizer(Consumer lifecycleConfigCustomizer) { + Assert.notNull(lifecycleConfigCustomizer, "'lifecycleConfigCustomizer' must not be null"); + this.lifecycleConfigCustomizer = lifecycleConfigCustomizer; + } + + /** + * Set a {@link Consumer} to configure a {@link MetricsConfig}. + * May override whatever could be set individually, like {@link #setMetricsLevel(MetricsLevel)}. + * @param metricsConfigCustomizer the {@link Consumer} to configure a {@link MetricsConfig}. + * @since 3.0.8 + * @see MetricsConfig + */ + public void setMetricsConfigCustomizer(Consumer metricsConfigCustomizer) { + Assert.notNull(metricsConfigCustomizer, "'metricsConfigCustomizer' must not be null"); + this.metricsConfigCustomizer = metricsConfigCustomizer; + } + + /** + * Set a {@link Consumer} to configure a {@link LeaseManagementConfig}. + * @param leaseManagementConfigCustomizer the {@link Consumer} to configure a {@link LeaseManagementConfig}. + * @since 3.0.8 + * @see LeaseManagementConfig + */ + public void setLeaseManagementConfigCustomizer(Consumer leaseManagementConfigCustomizer) { + Assert.notNull(leaseManagementConfigCustomizer, "'leaseManagementConfigCustomizer' must not be null"); + this.leaseManagementConfigCustomizer = leaseManagementConfigCustomizer; + } + + /** + * Whether to return an empty record list from consumer to the processor. + * Works only in {@link ListenerMode#batch} mode. + * The message will be sent into the output channel with an empty {@link List} as a payload. + * @param emptyRecordList true to return an empty record list. + * @since 3.0.8 + * @see ProcessorConfig#callProcessRecordsEvenForEmptyRecordList(boolean) + */ + public void setEmptyRecordList(boolean emptyRecordList) { + this.emptyRecordList = emptyRecordList; + } + @Override protected void onInit() { super.onInit(); + if (this.listenerMode.equals(ListenerMode.record) && this.emptyRecordList) { + this.emptyRecordList = false; + logger.warn("The 'emptyRecordList' is processed only in the [ListenerMode.batch]."); + } this.config = new ConfigsBuilder(buildStreamTracker(), this.consumerGroup, @@ -316,8 +394,9 @@ protected void doStart() { + "because it does not make sense in case of [ListenerMode.batch]."); } - LifecycleConfig lifecycleConfig = this.config.lifecycleConfig(); + LifecycleConfig lifecycleConfig = this.config.lifecycleConfig(); lifecycleConfig.taskBackoffTimeMillis(this.consumerBackoff); + this.lifecycleConfigCustomizer.accept(lifecycleConfig); RetrievalSpecificConfig retrievalSpecificConfig; String singleStreamName = this.streams.length == 1 ? this.streams[0] : null; @@ -342,15 +421,25 @@ protected void doStart() { if (MetricsLevel.NONE.equals(this.metricsLevel)) { metricsConfig.metricsFactory(new NullMetricsFactory()); } + this.metricsConfigCustomizer.accept(metricsConfig); + + CoordinatorConfig coordinatorConfig = this.config.coordinatorConfig(); + this.coordinatorConfigCustomizer.accept(coordinatorConfig); + + LeaseManagementConfig leaseManagementConfig = this.config.leaseManagementConfig(); + this.leaseManagementConfigCustomizer.accept(leaseManagementConfig); + + ProcessorConfig processorConfig = this.config.processorConfig() + .callProcessRecordsEvenForEmptyRecordList(this.emptyRecordList); this.scheduler = new Scheduler( this.config.checkpointConfig(), - this.config.coordinatorConfig(), - this.config.leaseManagementConfig(), + coordinatorConfig, + leaseManagementConfig, lifecycleConfig, metricsConfig, - this.config.processorConfig(), + processorConfig, retrievalConfig); this.executor.execute(this.scheduler); @@ -542,7 +631,7 @@ private void processMultipleRecords(List records, records.stream() .map(this::prepareMessageForRecord) .map(AbstractIntegrationMessageBuilder::build) - .collect(Collectors.toList()); + .toList(); messageBuilder = getMessageBuilderFactory().withPayload(payload); } @@ -557,7 +646,7 @@ else if (KclMessageDrivenChannelAdapter.this.converter != null) { return KclMessageDrivenChannelAdapter.this.converter.convert(r.data().array()); }) - .collect(Collectors.toList()); + .toList(); messageBuilder = getMessageBuilderFactory().withPayload(payload) .setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, partitionKeys) diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index db88b5d5..b4696a17 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -86,7 +86,6 @@ static void setup() { .join(); } - @AfterAll static void tearDown() { AMAZON_KINESIS @@ -126,24 +125,40 @@ void kclChannelAdapterReceivesRecords() { @Test public void metricsLevelOfMetricsConfigShouldBeSetToMetricsLevelOfAdapter() { - MetricsLevel metricsLevel = TestUtils.getPropertyValue( - this.kclMessageDrivenChannelAdapter, - "scheduler.metricsConfig.metricsLevel", - MetricsLevel.class - ); + MetricsLevel metricsLevel = + TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, + "scheduler.metricsConfig.metricsLevel", + MetricsLevel.class); assertThat(metricsLevel).isEqualTo(MetricsLevel.NONE); } @Test public void metricsFactoryOfSchedulerShouldBeSetNullMetricsFactoryIfMetricsLevelIsNone() { - MetricsFactory metricsFactory = TestUtils.getPropertyValue( - this.kclMessageDrivenChannelAdapter, - "scheduler.metricsFactory", - MetricsFactory.class - ); + MetricsFactory metricsFactory = + TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, + "scheduler.metricsFactory", + MetricsFactory.class); assertThat(metricsFactory).isInstanceOf(NullMetricsFactory.class); } + @Test + public void maxLeasesForWorkerOverriddenByCustomizer() { + Integer maxLeasesForWorker = + TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, + "scheduler.leaseCoordinator.leaseTaker.maxLeasesForWorker", + Integer.class); + assertThat(maxLeasesForWorker).isEqualTo(10); + } + + @Test + public void shardConsumerDispatchPollIntervalMillisOverriddenByCustomizer() { + Long shardConsumerDispatchPollIntervalMillis = + TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, + "scheduler.shardConsumerDispatchPollIntervalMillis", + Long.class); + assertThat(shardConsumerDispatchPollIntervalMillis).isEqualTo(500L); + } + @Configuration @EnableIntegration public static class TestConfiguration { @@ -159,7 +174,12 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { adapter.setConsumerGroup("single_stream_group"); adapter.setFanOut(false); adapter.setMetricsLevel(MetricsLevel.NONE); + adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig -> + leaseManagementConfig.maxLeasesForWorker(10)); + adapter.setCoordinatorConfigCustomizer(coordinatorConfig -> + coordinatorConfig.shardConsumerDispatchPollIntervalMillis(500L)); adapter.setBindSourceRecord(true); + adapter.setEmptyRecordList(true); return adapter; }