From 9fb9c1295ccc8f55df3cbd8f3ea6786f93de3f73 Mon Sep 17 00:00:00 2001 From: Hong Teoh Date: Wed, 6 Nov 2024 12:29:46 +0000 Subject: [PATCH] [FLINK-31989][docs] Update english docs for DynamoDbStreamsSource --- .../docs/connectors/datastream/dynamodb.md | 150 +++++++++++++++++- 1 file changed, 144 insertions(+), 6 deletions(-) diff --git a/docs/content/docs/connectors/datastream/dynamodb.md b/docs/content/docs/connectors/datastream/dynamodb.md index 834ac6643..4a7380346 100644 --- a/docs/content/docs/connectors/datastream/dynamodb.md +++ b/docs/content/docs/connectors/datastream/dynamodb.md @@ -23,16 +23,154 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +# Amazon DynamoDB Connector +The DynamoDB connector allows users to read/write from [Amazon DynamoDB](https://aws.amazon.com/dynamodb/). -# Amazon DynamoDB Sink +As a source, the connector allows users to read change data capture stream from DynamoDB tables using [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). -The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) -to setup a table. +As a sink, the connector allows users to write directly to Amazon DynamoDB tables using the [BatchWriteItem API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html). + +## Dependency + +Apache Flink ships the connector for users to utilize. To use the connector, add the following Maven dependency to your project: {{< connector_artifact flink-connector-dynamodb dynamodb >}} + +## Amazon DynamoDB Streams Source + +The DynamoDB Streams source reads from [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). +Follow the instructions from the [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to set up and configure the change data capture stream. + +The actual events streamed to the DynamoDB Stream depend on the `StreamViewType` specified by the DynamoDB Stream itself. +See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information. + +### Usage + +The `DynamoDbStreamsSource` provides a fluent builder to construct an instance of the `DynamoDbStreamsSource`. +The code snippet below illustrates how to do so. + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}} +{{< tab "Java" >}} +```java +// Configure the DynamodbStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +DynamoDbStreamsSource dynamoDbStreamsSource = + DynamoDbStreamsSource.builder() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build(); + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify UID of operator in line with Flink best practice. +DataStream cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source") + .returns(TypeInformation.of(String.class)) + .uid("custom-uid"); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +// Configure the DynamodbStreamsSource +val sourceConfig = new Configuration() +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build() + +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify UID of operator in line with Flink best practice. +val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source") + .uid("custom-uid") +``` +{{< /tab >}} +{{< /tabs >}} + +The above is a simple example of using the `DynamoDbStreamsSource`. +- The DynamoDB Stream being read from is specified using the stream ARN. +- Configuration for the `Source` is supplied using an instance of Flink's `Configuration` class. + The configuration keys can be taken from `AWSConfigOptions` (AWS-specific configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams Source configuration). +- The example specifies the starting position as `TRIM_HORIZON` (see [Configuring Starting Position](#configuring-starting-position) for more information). +- The deserialization format is as `SimpleStringSchema` (see [Deserialization Schema](#deserialization-schema) for more information). +- The distribution of shards across subtasks is controlled using the `UniformShardAssigner` (see [Shard Assignment Strategy](#shard-assignment-strategy) for more information). +- The example also specifies an increasing `WatermarkStrategy`, which means each record will be tagged with event time specified using `approximateCreationDateTime`. + Monotonically increasing watermarks will be generated, and subtasks will be considered idle if no record is emitted after 1 second. + +### Configuring Starting Position + +To specify the starting position of the `DynamodbStreamsSource`, users can set the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in configuration. +- `LATEST`: read all shards of the stream starting from the latest record. +- `TRIM_HORIZON`: read all shards of the stream starting from the earliest record possible (data is trimmed by DynamoDB after 24 hours). + +### Deserialization Schema + +The `DynamoDbStreamsSource` provides the `DynamoDbStreamsDeserializationSchema` interface to allow users to implement their own +deserialization schema to convert DynamoDB change data capture events into custom event types. + +The `DynamoDbStreamsDeserializationSchema#deserialize` method takes in an instance of `Record` from the DynamoDB model. +The `Record` can contain different content, depending on the configuration of the DynamoDB Stream. See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information. + +### Event Ordering + +Events are written into DynamoDB Streams, maintaining ordering within the same primary key. +This is done by ensuring that events within the same primary key are written to the same shard lineage. +When there are shard splits (one parent shard splitting into two child shards), the ordering will be maintained as long as the parent shard is read completely before starting to read from the child shards. + +The `DynamoDbStreamsSource` ensures that shards are assigned in a manner that respects parent-child shard ordering. +This means that the shard will only be passed to the shard assigner if the parent shard has been completely read. +This helps to ensure that the events from the change data capture stream are read in-order within the same DynamoDB primary key. + +### Shard Assignment Strategy + +The `UniformShardAssigner` allocates the shards of the DynamoDB Stream evenly across the parallel subtasks of the source operator. +DynamoDB Stream shards are ephemeral and are created and deleted automatically, as required. +The `UniformShardAssigner` allocates new shards to the subtask with the lowest number of currently allocated shards. + +Users can also implement their own shard assignment strategy by implementing the `DynamoDbStreamsShardAssigner` interface. + +### Configuration + +#### Retry Strategy + +The `DynamoDbStreamsSource` interacts with Amazon DynamoDB using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +The retry strategy used by the AWS SDK client can be tuned using the following configuration options: +- `DYNAMODB_STREAMS_RETRY_COUNT`: Maximum number of API retries on retryable errors, before it will restart the Flink job. +- `DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY`: The base delay used for calculation of the exponential backoff. +- `DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY`: The maximum delay for exponential backoff. + +#### Shard Discovery + +The `DynamoDbStreamsSource` periodically discovers newly created shards on the DynamoDB Stream. This can come from shard splitting, or shard rotations. +By default this is set to discover shards every 60 seconds. However, users can customize this to a smaller value by configuring the `SHARD_DISCOVERY_INTERVAL`. + +There is an issue for shard discovery where the shard graph returned from DynamoDB might have inconsistencies. +In this case, the `DynamoDbStreamsSource` automatically detects the inconsistency and retries the shard discovery process. +The maximum number of retries can be configured using `DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT`. + + +## Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + {{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} {{< tab "Java" >}} ```java @@ -90,7 +228,7 @@ flinkStream.sinkTo(dynamoDbSink) {{< /tab >}} {{< /tabs >}} -## Configurations +### Configurations Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.builder()`. @@ -130,7 +268,7 @@ Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.