Skip to content

Commit

Permalink
Refactor logic for converting starting position
Browse files Browse the repository at this point in the history
  • Loading branch information
hlteoh37 committed Nov 1, 2024
1 parent 250cd20 commit 2eb43c4
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition;

/** Implementation of async stream proxy for the Kinesis client. */
@Internal
public class KinesisAsyncStreamProxy implements AsyncStreamProxy {
Expand All @@ -51,7 +53,7 @@ public CompletableFuture<Void> subscribeToShard(
SubscribeToShardRequest.builder()
.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(startingPosition.getSdkStartingPosition())
.startingPosition(toSdkStartingPosition(startingPosition))
.build();
return kinesisAsyncClient.subscribeToShard(request, responseHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.connector.kinesis.source.split;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

Expand Down Expand Up @@ -53,30 +52,6 @@ public Object getStartingMarker() {
return startingMarker;
}

public software.amazon.awssdk.services.kinesis.model.StartingPosition getSdkStartingPosition() {
software.amazon.awssdk.services.kinesis.model.StartingPosition.Builder builder =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder()
.type(shardIteratorType);

switch (shardIteratorType) {
case LATEST:
case TRIM_HORIZON:
return builder.build();
case AT_TIMESTAMP:
Preconditions.checkArgument(
startingMarker instanceof Instant,
"Invalid StartingPosition. When ShardIteratorType is AT_TIMESTAMP, startingMarker must be an Instant.");
return builder.timestamp((Instant) startingMarker).build();
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
Preconditions.checkArgument(
startingMarker instanceof String,
"Invalid StartingPosition. When ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, startingMarker must be a String.");
return builder.sequenceNumber((String) startingMarker).build();
}
throw new IllegalArgumentException("Unsupported shardIteratorType " + shardIteratorType);
}

public static StartingPosition fromTimestamp(final Instant timestamp) {
return new StartingPosition(AT_TIMESTAMP, timestamp);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kinesis.source.split;

import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
import org.apache.flink.util.Preconditions;

import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

import java.time.Instant;

/**
* Util class with methods relating to {@link KinesisStreamsSource}'s internal representation of
* {@link StartingPosition}.
*/
public class StartingPositionUtil {

private void StartingPosition() {
// prevent initialization of util class.
}

/**
* @param startingPosition {@link KinesisStreamsSource}'s internal representation of {@link
* StartingPosition}
* @return AWS SDK's representation of {@link StartingPosition}.
*/
public static software.amazon.awssdk.services.kinesis.model.StartingPosition
toSdkStartingPosition(StartingPosition startingPosition) {
ShardIteratorType shardIteratorType = startingPosition.getShardIteratorType();
Object startingMarker = startingPosition.getStartingMarker();

software.amazon.awssdk.services.kinesis.model.StartingPosition.Builder builder =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder()
.type(shardIteratorType);

switch (shardIteratorType) {
case LATEST:
case TRIM_HORIZON:
return builder.build();
case AT_TIMESTAMP:
Preconditions.checkArgument(
startingMarker instanceof Instant,
"Invalid StartingPosition. When ShardIteratorType is AT_TIMESTAMP, startingMarker must be an Instant.");
return builder.timestamp((Instant) startingMarker).build();
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
Preconditions.checkArgument(
startingMarker instanceof String,
"Invalid StartingPosition. When ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, startingMarker must be a String.");
return builder.sequenceNumber((String) startingMarker).build();
}
throw new IllegalArgumentException("Unsupported shardIteratorType " + shardIteratorType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition;
import static org.apache.flink.connector.kinesis.source.util.KinesisAsyncClientProvider.TestingAsyncKinesisClient.SUBSCRIBE_TO_SHARD_RESPONSE_FUTURE;
import static org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN;
import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void testSubscribeToShard(
SubscribeToShardRequest.builder()
.consumerARN(CONSUMER_ARN)
.shardId(shardId)
.startingPosition(startingPosition.getSdkStartingPosition())
.startingPosition(toSdkStartingPosition(startingPosition))
.build();
assertThat(result).isEqualTo(SUBSCRIBE_TO_SHARD_RESPONSE_FUTURE);
assertThat(testKinesisClient.getSubscribeToShardRequest()).isEqualTo(expectedRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -246,7 +247,7 @@ public CompletableFuture<Void> subscribeToShard(
SubscribeToShardRequest.builder()
.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(startingPosition.getSdkStartingPosition())
.startingPosition(toSdkStartingPosition(startingPosition))
.build());

return CompletableFuture.supplyAsync(
Expand Down

0 comments on commit 2eb43c4

Please sign in to comment.