Skip to content

Commit

Permalink
NIFI-13487 ConsumeKinesisStream initial stream position handling erro…
Browse files Browse the repository at this point in the history
…r after AWS SDK 2.x migration

Signed-off-by: Joe Gresock <[email protected]>
This closes apache#9035.
  • Loading branch information
kzsihovszki authored and gresockj committed Jul 4, 2024
1 parent e4014b5 commit c73d1cf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

Expand Down Expand Up @@ -611,7 +612,6 @@ public void stopConsuming(final ProcessContext context) {
synchronized Scheduler prepareScheduler(final ProcessContext context, final ProcessSessionFactory sessionFactory, final String schedulerId) {
final KinesisAsyncClient kinesisClient = getClient(context);
final ConfigsBuilder configsBuilder = prepareConfigsBuilder(context, schedulerId, sessionFactory);

final MetricsConfig metricsConfig = configsBuilder.metricsConfig();
if (!isReportCloudWatchMetrics(context)) {
metricsConfig.metricsFactory(new NullMetricsFactory());
Expand All @@ -623,18 +623,13 @@ synchronized Scheduler prepareScheduler(final ProcessContext context, final Proc
.streamName(streamName);
final CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig().workerStateChangeListener(workerState::set);

final InitialPositionInStream initialPositionInStream = getInitialPositionInStream(context);
final InitialPositionInStreamExtended initialPositionInStreamValue = (InitialPositionInStream.AT_TIMESTAMP == initialPositionInStream)
? InitialPositionInStreamExtended.newInitialPositionAtTimestamp(getStartStreamTimestamp(context))
: InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
leaseManagementConfig.initialPositionInStream(initialPositionInStreamValue);

final List<PropertyDescriptor> dynamicProperties = context.getProperties()
.keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());


final RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));

Expand Down Expand Up @@ -715,8 +710,13 @@ private String getEndpointPrefix(final ProcessContext context) {
*/
@VisibleForTesting
ConfigsBuilder prepareConfigsBuilder(final ProcessContext context, final String workerId, final ProcessSessionFactory sessionFactory) {
final InitialPositionInStream initialPositionInStream = getInitialPositionInStream(context);
final InitialPositionInStreamExtended initialPositionInStreamValue = (InitialPositionInStream.AT_TIMESTAMP == initialPositionInStream)
? InitialPositionInStreamExtended.newInitialPositionAtTimestamp(getStartStreamTimestamp(context))
: InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);

return new ConfigsBuilder(
getStreamName(context),
new SingleStreamTracker(getStreamName(context), initialPositionInStreamValue),
getApplicationName(context),
getClient(context),
getDynamoClient(context),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private void runWorker(final boolean withCredentials, final boolean waitForFailu
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.APPLICATION_NAME, "test-application");
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.REGION, Regions.EU_WEST_2.getName());
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.TIMEOUT, "5 secs");
mockConsumeKinesisStreamRunner.setProperty(ConsumeKinesisStream.INITIAL_STREAM_POSITION, "TRIM_HORIZON");

final AWSCredentialsProviderService awsCredentialsProviderService = new AWSCredentialsProviderControllerService();
mockConsumeKinesisStreamRunner.addControllerService("aws-credentials", awsCredentialsProviderService);
Expand Down Expand Up @@ -326,7 +327,8 @@ private void assertSchedulerConfigs(final Scheduler scheduler, final String host
assertTrue(scheduler.leaseManagementConfig().workerIdentifier().startsWith(hostname));
assertEquals(scheduler.coordinatorConfig().applicationName(), "test-application");
assertEquals(scheduler.leaseManagementConfig().streamName(), "test-stream");
assertEquals(scheduler.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.LATEST);
assertEquals(scheduler.retrievalConfig().streamTracker().streamConfigList().get(0).initialPositionInStreamExtended().getInitialPositionInStream(),
InitialPositionInStream.TRIM_HORIZON );
assertEquals(scheduler.coordinatorConfig().parentShardPollIntervalMillis(), 1);
}

Expand Down

0 comments on commit c73d1cf

Please sign in to comment.