From 4fc4e27af3635c0b640b6919c120018b39098c6a Mon Sep 17 00:00:00 2001 From: Hong Teoh Date: Mon, 4 Nov 2024 17:33:12 +0000 Subject: [PATCH] [FLINK-31989][docs] Update english docs for KinesisStreamsSource --- .../docs/connectors/datastream/kinesis.md | 776 ++++++------------ 1 file changed, 240 insertions(+), 536 deletions(-) diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index 72bd5d738..502f2d439 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -27,644 +27,348 @@ under the License. # Amazon Kinesis Data Streams Connector -The Kinesis connector provides access to [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/). - -To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams: - - - - - - - - - - - - - - - - - - -
KDS ConnectivityMaven Dependency
Source{{< connector_artifact flink-connector-kinesis kinesis >}}
Sink{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}
+The Kinesis connector allows users to read/write from [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/). -{{< py_connector_download_link "kinesis" >}} +## Dependency -## Using the Amazon Kinesis Streams Service -Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) -to setup Kinesis streams. +To use this connector, add the below dependency to your project: -## Configuring Access to Kinesis with IAM -Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html). +{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}} -Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. -By default, the `AUTO` Credentials Provider is used. -If the access key ID and secret key are set in the configuration, the `BASIC` provider is used. +For use in PyFlink jobs, use the following dependency: -A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting. - -Supported Credential Providers are: -* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider. -* `BASIC` - Using access key ID and secret key supplied as configuration. -* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. -* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey. -* `CUSTOM` - Use a custom user class as credential provider. -* `PROFILE` - Use AWS credentials profile file to create the AWS credentials. -* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. -* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. +{{< py_connector_download_link "kinesis" >}} -## Kinesis Consumer -The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis -streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is -responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will -change as shards are closed and created by Kinesis. +## Kinesis Streams Source +The `KinesisStreamsSource` is an exactly-once, parallel streaming data source based on the [FLIP-27 source interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface). +The source subscribes to a single Amazon Kinesis Data stream, and reads events whilst maintaining order within a specific Kinesis `partitionId`. +The `KinesisStreamsSource` will discover the shards of the stream and start reading from each shard in parallel, depending on the parallelism of the operator. +For more details on selecting the right parallelism, see section on [parallelism](#parallelism-and-number-of-shards). +It also transparently handles discovery of new shards of the Kinesis Data stream if resharding of streams occurs while the job is running. + +{{< hint info >}} +Note: Before consuming data, ensure that the Kinesis Data Stream is created with `ACTIVE` status on the Amazon Kinesis Data Streams console. +{{< /hint >}} -Before consuming data from Kinesis streams, make sure that all streams are created with the status "ACTIVE" in the Amazon Kinesis Data Stream console. +The `KinesisStreamsSource` provides a fluent builder to construct an instance of the `KinesisStreamsSource`. +The code snippet below illustrates how to do so. -{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}} +{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}} {{< tab "Java" >}} ```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +// Configure the KinesisStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new KinesisStreamsSource to read from specified Kinesis Stream. +KinesisStreamsSource kdsSource = + KinesisStreamsSource.builder() + .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") + .setSourceConfig(sourceConfig) + .setDeserializationSchema(new SimpleStringSchema()) + .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); +// Specify watermarking strategy and the name of the Kinesis Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. +DataStream kinesisRecordsWithEventTimeWatermarks = env.fromSource(kdsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "Kinesis source") + .returns(TypeInformation.of(String.class)) + .uid("custom-uid"); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val consumerConfig = new Properties() -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") - -val env = StreamExecutionEnvironment.getExecutionEnvironment +val sourceConfig = new Configuration() +sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST -val kinesis = env.addSource(new FlinkKinesisConsumer[String]( - "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -consumer_config = { - 'aws.region': 'us-east-1', - 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id', - 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key', - 'flink.stream.initpos': 'LATEST' -} +val env = StreamExecutionEnvironment.getExecutionEnvironment() -env = StreamExecutionEnvironment.get_execution_environment() +val kdsSource = KinesisStreamsSource.builder[String]() + .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") + .setSourceConfig(sourceConfig) + .setDeserializationSchema(new SimpleStringSchema()) + .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. + .build() -kinesis = env.add_source(FlinkKinesisConsumer("stream-1", SimpleStringSchema(), consumer_config)) +val kinesisEvents = env.fromSource(kdsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "Kinesis source") + .uid("custom-uid") ``` {{< /tab >}} -{{< /tabs >}} -The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` -instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and -`ConsumerConfigConstants` (Kinesis consumer parameters). The example -demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which -the AWS access key ID and secret access key are directly supplied in the configuration. Also, data is being consumed -from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` -to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible). - -Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. - -Note that the configured parallelism of the Flink Kinesis Consumer source -can be completely independent of the total number of shards in the Kinesis streams. -When the number of shards is larger than the parallelism of the consumer, -then each consumer subtask can subscribe to multiple shards; otherwise -if the number of shards is smaller than the parallelism of the consumer, -then some consumer subtasks will simply be idle and wait until it gets assigned -new shards (i.e., when the streams are resharded to increase the -number of shards for higher provisioned Kinesis service throughput). - -Also note that the default assignment of shards to subtasks is based on the hashes of the shard and stream names, -which will more-or-less balance the shards across the subtasks. -However, assuming the default Kinesis shard management is used on the stream (UpdateShardCount with `UNIFORM_SCALING`), -setting `UniformShardAssigner` as the shard assigner on the consumer will much more evenly distribute shards to subtasks. -Assuming the incoming Kinesis records are assigned random Kinesis `PartitionKey` or `ExplicitHashKey` values, -the result is consistent subtask loading. -If neither the default assigner nor the `UniformShardAssigner` suffice, a custom implementation of `KinesisShardAssigner` can be set. - -### The `DeserializationSchema` - -Flink Kinesis Consumer also needs a schema to know how to turn the binary data in a Kinesis Data Stream into Java objects. -The `KinesisDeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)` -method gets called for each Kinesis record. +The above is a simple example of using the `KinesisStreamsSource`. +- The Kinesis stream being read from is specified using the Kinesis Stream ARN. +- Configuration for the `Source` is supplied using an instance of Flink's `Configuration` class. + The configuration keys can be taken from `AWSConfigOptions` (AWS-specific configuration) and `KinesisSourceConfigOptions` (Kinesis Source configuration). +- The example specifies the starting position as `TRIM_HORIZON` (see [Configuring Starting Position](#configuring-starting-position) for more information). +- The deserialization format is as `SimpleStringSchema` (see [Deserialization Schema](#deserialization-schema) for more information). +- The distribution of shards across subtasks is controlled using the `UniformShardAssigner` (see [Shard Assignment Strategy](#shard-assignment-strategy) for more information). +- The example also specifies an increasing `WatermarkStrategy`, which means each record will be tagged with event time specified using `approximateArrivalTimestamp`. + Monotonically increasing watermarks will be generated, and subtasks will be considered idle if no record is emitted after 1 second. -For convenience, Flink provides the following schemas out of the box: - -1. `TypeInformationSerializationSchema` which creates a schema based on a Flink's `TypeInformation`. - This is useful if the data is both written and read by Flink. - This schema is a performant Flink-specific alternative to other generic serialization approaches. - -2. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup the writer's schema (schema which was used to write the record) - in [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Using this, deserialization schema record will be - read with the schema retrieved from AWS Glue Schema Registry and transformed to either `com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema` - that represents generic record with a manually provided schema or a JAVA POJO generated by [mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema). - -
To use this deserialization schema one has to add the following additional dependency: - -{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}} -{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}} -{{< connector_artifact flink-json-glue-schema-registry kinesis >}} -{{< /tab >}} -{{< /tabs >}} - -3. `AvroDeserializationSchema` which reads data serialized with Avro format using a statically provided schema. It can - infer the schema from Avro generated classes (`AvroDeserializationSchema.forSpecific(...)`) or it can work with `GenericRecords` - with a manually provided schema (with `AvroDeserializationSchema.forGeneric(...)`). This deserialization schema expects that - the serialized records DO NOT contain the embedded schema. +### Configuring Access to Kinesis with IAM +Access to Kinesis streams are controlled via IAM identities. Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html). - - You can use [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) - to retrieve the writer’s schema. Similarly, the deserialization record will be read with the schema from AWS Glue Schema Registry and transformed - (either through `GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or `GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`). - For more information on integrating the AWS Glue Schema Registry with Apache Flink see - [Use Case: Amazon Kinesis Data Analytics for Apache Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink). +Depending on your deployment, you can select a suitable AWS Credentials Provider. +By default, the `AUTO` Credentials Provider is used. +If the access key ID and secret key are set in the configuration, the `BASIC` provider is used. -
To use this deserialization schema one has to add the following additional dependency: - -{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}} -{{< tab "AvroDeserializationSchema" >}} -{{< artifact flink-avro >}} -{{< /tab >}} -{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}} -{{< connector_artifact flink-avro-glue-schema-registry kinesis >}} -{{< /tab >}} -{{< /tabs >}} +A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting. -### Configuring Starting Position +Supported Credential Providers are: +* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider. +* `BASIC` - Using access key ID and secret key supplied as configuration. +* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. +* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey. +* `CUSTOM` - Use a custom user class as credential provider. +* `PROFILE` - Use AWS credentials profile file to create the AWS credentials. +* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. +* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. -The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to -one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)): +### Configuring Starting Position +To specify where the `KinesisStreamsSource` starts reading from the Kinesis stream, users can set the `KinesisSourceConfigOptions.STREAM_INITIAL_POSITION` in configuration. +The values used follow [the namings used by the AWS Kinesis Data Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax): -- `LATEST`: read all shards of all streams starting from the latest record. -- `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). -- `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : +- `LATEST`: read all shards of the stream starting from the latest record. +- `TRIM_HORIZON`: read all shards of the stream starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). +- `AT_TIMESTAMP`: read all shards of the stream starting from a specified timestamp. The timestamp must also be specified in the configuration + properties by providing a value for `KinesisSourceConfigOptions.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). - - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. - If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` - (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT`. + If `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` + (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). + ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics -With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and +With Flink's checkpointing enabled, the `KinesisStreamsSource` will consume records from shards in Kinesis streams and periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that was stored in the checkpoint. -The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. +Note that when restoring from a snapshot, the configured starting positions will be ignored. The `KinesisStreamsSource` +will proceed to read from where it left off in the snapshot. If the restored snapshot is stale (e.g. the shards saved are +now expired and past the retention period of the Kinesis stream), it will read the earliest possible event (effectively a `TRIM_HORIZON`) -To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment: +If users want to restore a Flink job from an existing snapshot but want to respect the configured starting position of +the stream, users can change the `uid` of the `KinesisStreamsSource` operator to effectively restore this operator without state. -{{< tabs "b1399ed7-5855-446d-9684-7a49de9b4c97" >}} -{{< tab "Java" >}} -```java -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); // checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.enableCheckpointing(5000) // checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -env = StreamExecutionEnvironment.get_execution_environment() -env.enable_checkpointing(5000) # checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< /tabs >}} -Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. -Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. -Flink on YARN supports automatic restart of lost YARN containers. +### Shard Assignment Strategy +For most use cases, users would prefer a uniform distribution of records across parallel subtasks. This prevents data skew if data is evenly distributed in the Kinesis Data Stream. +This is achieved by the `UniformShardAssigner`, which is the default shard assignment strategy. Users can implement their own custom strategy by implementing the interface for `KinesisShardAssigner`. -### Using Enhanced Fan-Out +The uniform distribution of shards across parallel subtasks is a tricky situation, especially if the stream has been resharded. +Amazon Kinesis Data streams distributes `partitionId`s evenly across the entire `HashKeyRange` of a given stream, and these ranges are evenly distributed across all open shards if `UNIFORM_SCALING` is used. +However, there will be a mixture of Open and Closed shards on the Kinesis Data Stream, and the status of each shard can change during a rescaling operation. -[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum -number of concurrent consumers per Kinesis stream. -Without EFO, all concurrent consumers share a single read quota per shard. -Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers. -Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/). - -In order to enable EFO two additional configuration parameters are required: +To ensure a uniform distribution of `partitionId`s across each parallel subtask, the `UniformShardAssigner` uses the `HashKeyRange` of each shard to decide which parallel subtask will read from the discovered shard. -- `RECORD_PUBLISHER_TYPE`: Determines whether to use `EFO` or `POLLING`. The default `RecordPublisher` is `POLLING`. -- `EFO_CONSUMER_NAME`: A name to identify the consumer. -For a given Kinesis data stream, each consumer must have a unique name. -However, consumer names do not have to be unique across data streams. -Reusing a consumer name will result in existing subscriptions being terminated. +### Record ordering +Kinesis maintains the write order of records per `partitionId` within a Kinesis stream. The `KinesisStreamsSource` reads +records in the same order within a given `partitionId`, even through resharding operations. It does this by first checking if +a given shard's parents (up to 2 shards) have been completely read, before proceeding to read from the given shard. -The code snippet below shows a simple example configurating an EFO consumer. +### Deserialization Schema +The `KinesisStreamsSource` retrieves binary data from the Kinesis Data Stream, and needs a schema to convert it into Java objects. +Both Flink's `DeserializationSchema` and the custom `KinesisDeserializationSchema` are accepted by the `KinesisStreamsSource`. +The `KinesisDeserializationSchema` provides additional Kinesis-specific metadata per record to allow users to make serialization decisions based off the metadata. -{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}} -{{< tab "Java" >}} -```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +For convenience, Flink provides the following schemas out of the box: +1. `SimpleStringSchema` and `JsonSerializationSchema`. -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +2. `TypeInformationSerializationSchema` which creates a schema based on a Flink's `TypeInformation`. + This is useful if the data is both written and read by Flink. + This schema is a performant Flink-specific alternative to other generic serialization approaches. -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +3. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup the writer's schema (schema which was used to write the record) + in [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Using this, deserialization schema record will be + read with the schema retrieved from AWS Glue Schema Registry and transformed to either `com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema` + that represents generic record with a manually provided schema or a JAVA POJO generated by [mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema). -DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val consumerConfig = new Properties() -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") +
To use this deserialization schema one has to add the following additional dependency: -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}} +{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}} +{{< connector_artifact flink-json-glue-schema-registry kinesis >}} +{{< /tab >}} +{{< /tabs >}} -val env = StreamExecutionEnvironment.getExecutionEnvironment() +4. `AvroDeserializationSchema` which reads data serialized with Avro format using a statically provided schema. It can + infer the schema from Avro generated classes (`AvroDeserializationSchema.forSpecific(...)`) or it can work with `GenericRecords` + with a manually provided schema (with `AvroDeserializationSchema.forGeneric(...)`). This deserialization schema expects that + the serialized records DO NOT contain the embedded schema. -val kinesis = env.addSource(new FlinkKinesisConsumer[String]( - "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -consumer_config = { - 'aws.region': 'us-east-1', - 'flink.stream.initpos': 'LATEST', - 'flink.stream.recordpublisher': 'EFO', - 'flink.stream.efo.consumername': 'my-flink-efo-consumer' -} + - You can use [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) + to retrieve the writer’s schema. Similarly, the deserialization record will be read with the schema from AWS Glue Schema Registry and transformed + (either through `GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or `GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`). + For more information on integrating the AWS Glue Schema Registry with Apache Flink see + [Use Case: Amazon Kinesis Data Analytics for Apache Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink). -env = StreamExecutionEnvironment.get_execution_environment() +
To use this deserialization schema one has to add the following additional dependency: -kinesis = env.add_source(FlinkKinesisConsumer( - "kinesis_stream_name", SimpleStringSchema(), consumer_config)) -``` +{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}} +{{< tab "AvroDeserializationSchema" >}} +{{< artifact flink-avro >}} +{{< /tab >}} +{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}} +{{< connector_artifact flink-avro-glue-schema-registry kinesis >}} {{< /tab >}} {{< /tabs >}} -#### EFO Stream Consumer Registration/Deregistration - -In order to use EFO, a stream consumer must be registered against each stream you wish to consume. -By default, the `FlinkKinesisConsumer` will register the stream consumer automatically when the Flink job starts. -The stream consumer will be registered using the name provided by the `EFO_CONSUMER_NAME` configuration. -`FlinkKinesisConsumer` provides three registration strategies: - -- Registration - - `LAZY` (default): Stream consumers are registered when the Flink job starts running. - If the stream consumer already exists, it will be reused. - This is the preferred strategy for the majority of applications. - However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN. - For jobs with very large parallelism this can result in an increased start-up time. - The `DescribeStreamConsumer` operation has a limit of 20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html), - this means application startup time will increase by roughly `parallelism/20 seconds`. - - `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` constructor. - If the stream consumer already exists, it will be reused. - This will result in registration occurring when the job is constructed, - either on the Flink Job Manager or client environment submitting the job. - Using this strategy results in a single thread registering and retrieving the stream consumer ARN, - reducing startup time over `LAZY` (with large parallelism). - However, consider that the client environment will require access to the AWS services. - - `NONE`: Stream consumer registration is not performed by `FlinkKinesisConsumer`. - Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/) - to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html). - Stream consumer ARNs should be provided to the job via the consumer configuration. -- Deregistration - - `LAZY` (default): Stream consumers are deregistered when the job is shutdown gracefully. - In the event that a job terminates without executing the shutdown hooks, stream consumers will remain active. - In this situation the stream consumers will be gracefully reused when the application restarts. - - `EAGER|NONE`: Stream consumer deregistration is not performed by `FlinkKinesisConsumer`. - -Below is an example configuration to use the `EAGER` registration strategy: - -{{< tabs "a85d716b-6c1c-46d8-9ee4-12d8380a0c06" >}} -{{< tab "Java" >}} -```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); - -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +### Parallelism and Number of Shards +The configured parallelism of the `KinesisStreamsSource` is independent of the total number of shards in the Kinesis streams. +- If the parallelism of the `KinesisStreamsSource` is less than the total number of shards, then a single parallel subtask would handle multiple shards. +- If the parallelism of the `KinesisStreamsSource` is more than the total number of shards, then there will be some parallel subtasks that do not read from any shards. If this is the case, users will need to set up `withIdleness` on the `WatermarkStrategy`. Failing to do so will mean watermark generation will get blocked due to idle subtasks. -consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, - ConsumerConfigConstants.EFORegistrationType.EAGER.name()); +### Watermark Handling in the source -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - -DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val consumerConfig = new Properties() -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") +The `KinesisStreamsSource` supplies the `approximateArrivalTimestamp` provided by Kinesis as the event time associated with each record read. +For more information on event time handling in Flink, see [Event time]({{< ref "docs/concepts/time" >}}). +Note that this timestamp is typically referred to as a Kinesis server-side timestamp, and there are no guarantees +about the accuracy or order correctness (i.e., the timestamps may not always be ascending). -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +### Event Time Alignment for Shard Readers -consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, - ConsumerConfigConstants.EFORegistrationType.EAGER.name()); +Since the shards are being consumed in parallel, some shards might be reading records far ahead from other shards. +Flink supports synchronization between these reading speed using split-specific watermark alignment. The `KinesisStreamsSource` +supports split-specific watermark alignment, and will pause reading from specific shards if the watermark from that shard +is too far ahead of others. It will resume reading from that specific split once the other shards catch up. -val env = StreamExecutionEnvironment.getExecutionEnvironment() +### Threading Model -val kinesis = env.addSource(new FlinkKinesisConsumer[String]( - "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -consumer_config = { - 'aws.region': 'us-east-1', - 'flink.stream.initpos': 'LATEST', - 'flink.stream.recordpublisher': 'EFO', - 'flink.stream.efo.consumername': 'my-flink-efo-consumer', - 'flink.stream.efo.registration': 'EAGER' -} +The `KinesisStreamsSource` uses multiple threads for shard discovery and data consumption. -env = StreamExecutionEnvironment.get_execution_environment() +#### Shard Discovery -kinesis = env.add_source(FlinkKinesisConsumer( - "kinesis_stream_name", SimpleStringSchema(), consumer_config)) -``` -{{< /tab >}} -{{< /tabs >}} +For shard discovery, the `SplitEnumerator` runs on the JobManager to periodically discover new shards using the `ListShard` API. +Once new shards are discovered, it will confirm if the parent shards have been completed. If all parents have been completed, +the shards will be assigned to the `SplitReader` on the TaskManagers to be read. -Below is an example configuration to use the `NONE` registration strategy: +#### Polling (default) Split Reader -{{< tabs "00b46c87-7740-4263-8040-2aa7e2960513" >}} -{{< tab "Java" >}} -```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +For `POLLING` data consumption, a single thread will be created per-parallel subtask to consume allocated shards. This means +that the number of open threads scale with the parallelism of the Flink operator. -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +#### Enhanced Fan-Out Split Reader -consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, - ConsumerConfigConstants.EFORegistrationType.NONE.name()); -consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), - "arn:aws:kinesis::>:stream//consumer/:"); +For `EFO` data consumption the threading model is the same as `POLLING` - one thread per-parallel subtask. However, +there are additional thread pools to handle asynchronous communication with Kinesis. AWS SDK v2.x `KinesisAsyncClient` +uses additional threads for Netty to handle IO and asynchronous response. Each parallel subtask will have their own +instance of the `KinesisAsyncClient`. In other words, if the consumer is run with a parallelism of 10, there will be a +total of 10 `KinesisAsyncClient` instances. A separate client will be created and subsequently destroyed when +registering and deregistering stream consumers. -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +### Internally Used Kinesis APIs -DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val consumerConfig = new Properties() -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") +The `KinesisStreamsSource` uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs +for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), +the `KinesisStreamsSource` will compete with other non-Flink consuming applications that the user may be running. +Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information +on how to deal with any errors or warnings that the `KinesisStreamsSource` may have due to these service limits. -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +#### Shard Discovery -consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, - ConsumerConfigConstants.EFORegistrationType.NONE.name()); -consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), - "arn:aws:kinesis::>:stream//consumer/:"); +- *[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)*: this is periodically called + by the `SplitEnumerator`, one per Flink job, to discover any new shards as a result of stream resharding. By default, + the `SplitEnumerator` performs the shard discovery at an interval of 10 seconds. If this interferes with other non-Flink + consuming applications, users can slow down the calls to this API by setting a value for + `KinesisSourceConfigOptions.SHARD_DISCOVERY_INTERVAL` in the supplied `Configuration`. + This sets the discovery interval to a different value. Note that this setting directly impacts + the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval. -val env = StreamExecutionEnvironment.getExecutionEnvironment() +#### Polling (default) Split Reader -val kinesis = env.addSource(new FlinkKinesisConsumer[String]( - "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -consumer_config = { - 'aws.region': 'us-east-1', - 'flink.stream.initpos': 'LATEST', - 'flink.stream.recordpublisher': 'EFO', - 'flink.stream.efo.consumername': 'my-flink-efo-consumer', - 'flink.stream.efo.consumerarn.stream-name': - 'arn:aws:kinesis::>:stream//consumer/:' -} +- *[GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*: this is called once per shard, Note that since the rate limit for this API is per shard (not per stream), + the split reader itself should not exceed the limit. -env = StreamExecutionEnvironment.get_execution_environment() +- *[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: this is constantly called to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there + are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call + of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded. -kinesis = env.add_source(FlinkKinesisConsumer( - "kinesis_stream_name", SimpleStringSchema(), consumer_config)) -``` -{{< /tab >}} -{{< /tabs >}} +#### Enhanced Fan-Out Split Reader -### Event Time for Consumed Records +- *[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)*: this is called per shard to obtain shard subscriptions. A shard subscription is typically active for 5 minutes, + but subscriptions will be re-acquired if any recoverable errors are thrown. Once a subscription is acquired, the consumer + will receive a stream of [SubscribeToShardEvents](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)s. -If streaming topologies choose to use the [event time notion]({{< ref "docs/concepts/time" >}}) for record -timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they -were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side -timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be -ascending). +- *[DescribeStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html)*: + this is called only during application startup per parallel subtask of the operator. This is to retrieve the `consumerArn` + for the `ACTIVE` consumer attached to the stream. The retry strategy can be configured using `KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY*` options -Users can choose to override this default with a custom timestamp, as described [here]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}), -or use one from the [predefined ones]({{< ref "docs/dev/datastream/event-time/built_in" >}}). After doing so, -it can be passed to the consumer in the following way: +- *[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*: + this is called once per stream during stream consumer registration, unless the `SELF_MANAGED` consumer lifecycle is configured. -{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6600" >}} -{{< tab "Java" >}} -```java -FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>( - "kinesis_stream_name", - new SimpleStringSchema(), - kinesisConsumerConfig); -consumer.setPeriodicWatermarkAssigner(new CustomAssignerWithPeriodicWatermarks()); -DataStream stream = env - .addSource(consumer) - .print(); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val consumer = new FlinkKinesisConsumer[String]( - "kinesis_stream_name", - new SimpleStringSchema(), - kinesisConsumerConfig); -consumer.setPeriodicWatermarkAssigner(new CustomAssignerWithPeriodicWatermarks()); -val stream = env - .addSource(consumer) - .print(); -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -consumer = FlinkKinesisConsumer( - "kinesis_stream_name", - SimpleStringSchema(), - consumer_config) -stream = env.add_source(consumer).print() -``` -{{< /tab >}} -{{< /tabs >}} +- *[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*: + this is called once per stream during stream consumer deregistration, unless the `SELF_MANAGED` registration strategy is configured. -Internally, an instance of the assigner is executed per shard / consumer thread (see threading model below). -When an assigner is specified, for each record read from Kinesis, the extractTimestamp(T element, long previousElementTimestamp) -is called to assign a timestamp to the record and getCurrentWatermark() to determine the new watermark for the shard. -The watermark of the consumer subtask is then determined as the minimum watermark of all its shards and emitted periodically. -The per shard watermark is essential to deal with varying consumption speed between shards, that otherwise could lead -to issues with downstream logic that relies on the watermark, such as incorrect late data dropping. +### Using Enhanced Fan-Out -By default, the watermark is going to stall if shards do not deliver new records. -The property `ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS` can be used to avoid this potential issue through a -timeout that will allow the watermark to progress despite of idle shards. +[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum number of concurrent consumers per Kinesis stream. +Without EFO, all concurrent consumers share a single read quota per shard. +Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers. +Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/). -### Event Time Alignment for Shard Consumers +In order to enable EFO two additional configuration parameters are required: -The Flink Kinesis Consumer optionally supports synchronization between parallel consumer subtasks (and their threads) -to avoid the event time skew related problems described in [Event time synchronization across sources](https://issues.apache.org/jira/browse/FLINK-10886). +- `READER_TYPE`: Determines whether to use `EFO` or `POLLING`. The default `ReaderType` is `POLLING`. +- `EFO_CONSUMER_NAME`: A name to identify the consumer. + For a given Kinesis data stream, each consumer must have a unique name. + However, consumer names do not have to be unique across data streams. + Reusing a consumer name will result in existing subscriptions being terminated. -To enable synchronization, set the watermark tracker on the consumer: +The code snippet below shows a simple example configuring an EFO consumer. -{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6601" >}} +{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}} {{< tab "Java" >}} ```java -JobManagerWatermarkTracker watermarkTracker = - new JobManagerWatermarkTracker("myKinesisSource"); -consumer.setWatermarkTracker(watermarkTracker); +Configuration sourceConfig = new Configuration(); +sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); +sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); ``` {{< /tab >}} -{{< tab "Python" >}} -```python -watermark_tracker = WatermarkTracker.job_manager_watermark_tracker("myKinesisSource") -consumer.set_watermark_tracker(watermark_tracker) +{{< tab "Scala" >}} +```scala +val sourceConfig = new Configuration() +sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO) +sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer") ``` {{< /tab >}} {{< /tabs >}} -The `JobManagerWatermarkTracker` will use a global aggregate to synchronize the per subtask watermarks. Each subtask -uses a per shard queue to control the rate at which records are emitted downstream based on how far ahead of the global -watermark the next record in the queue is. - -The "emit ahead" limit is configured via `ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS`. Smaller values reduce -the skew but also the throughput. Larger values will allow the subtask to proceed further before waiting for the global -watermark to advance. - -Another variable in the throughput equation is how frequently the watermark is propagated by the tracker. -The interval can be configured via `ConsumerConfigConstants.WATERMARK_SYNC_MILLIS`. -Smaller values reduce emitter waits and come at the cost of increased communication with the job manager. - -Since records accumulate in the queues when skew occurs, increased memory consumption needs to be expected. -How much depends on the average record size. With larger sizes, it may be necessary to adjust the emitter queue capacity via -`ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY`. - -### Threading Model - -The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption. +#### EFO Stream Consumer Lifecycle Management -#### Shard Discovery - -For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard -information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if -the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless -of the total amount of shards in the subscribed streams. - -#### Polling (default) Record Publisher +In order to use EFO, a stream consumer must be registered against the stream you wish to consume. +By default, the `KinesisStreamsSource` will manage the lifecycle of the stream consumer automatically when the Flink job starts/stops. +The stream consumer will be registered using the name provided by the `EFO_CONSUMER_NAME` configuration, and de-registered when job stops gracefully. +`KinesisStreamsSource` provides two lifecycle options: -For `POLLING` data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the -shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be -one thread per open shard. +- `JOB_MANAGED` (default): Stream consumers are registered when the Flink job starts running. + If the stream consumer already exists, it will be reused. + When the job stops gracefully, the consumer will be de-registered. + This is the preferred strategy for the majority of applications. +- `SELF_MANAGED`: Stream consumer registration/de-registration will not be performed by the `KinesisStreamsSource`. + Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/) + to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html). + Stream consumer ARNs should be provided to the job via the consumer configuration. -#### Enhanced Fan-Out Record Publisher - -For `EFO` data consumption the threading model is the same as `POLLING`, with additional thread pools to handle -asynchronous communication with Kinesis. AWS SDK v2.x `KinesisAsyncClient` uses additional threads for -Netty to handle IO and asynchronous response. Each parallel consumer subtask will have their own instance of the `KinesisAsyncClient`. -In other words, if the consumer is run with a parallelism of 10, there will be a total of 10 `KinesisAsyncClient` instances. -A separate client will be created and subsequently destroyed when registering and deregistering stream consumers. - -### Internally Used Kinesis APIs +To specify the lifecycle management, simply specify the `KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE` in the `Configuration` provided. -The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs -for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) -on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running. -Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information -on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits. +## Kinesis Consumer -#### Shard Discovery +{{< hint warning >}} +The old Kinesis source `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Source]({{}}) instead. +{{< /hint >}} -- *[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)*: this is constantly called -by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default, -the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result -from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of -calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied -configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts -the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval. - -#### Polling (default) Record Publisher - -- *[GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*: this is called -only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the -API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream), -the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other -non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by -setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties. - -- *[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: this is constantly called -by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there -are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call -of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded, -up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput -of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and -`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former -adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 10,000), while -the latter modifies the sleep interval between each fetch (default is 200). The retry behaviour of the -consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`. - -#### Enhanced Fan-Out Record Publisher - -- *[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)*: this is called -by per shard consuming threads to obtain shard subscriptions. A shard subscription is typically active for 5 minutes, -but subscriptions will be reaquired if any recoverable errors are thrown. Once a subscription is acquired, the consumer -will receive a stream of [SubscribeToShardEvents](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)s. -Retry and backoff parameters can be configured using the `ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_*` keys. - -- *[DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)*: this is called -once per stream, during stream consumer registration. By default, the `LAZY` registration strategy will scale the -number of calls by the job parallelism. `EAGER` will invoke this once per stream and `NONE` will not invoke this API. -Retry and backoff parameters can be configured using the -`ConsumerConfigConstants.STREAM_DESCRIBE_*` keys. +The `FlinkKinesisConsumer` is the deprecated connector based off the old source interface. We do not recommend users +to use this. -- *[DescribeStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html)*: -this is called during stream consumer registration and deregistration. For each stream this service will be invoked -periodically until the stream consumer is reported `ACTIVE`/`not found` for registration/deregistration. By default, -the `LAZY` registration strategy will scale the number of calls by the job parallelism. `EAGER` will call the service -once per stream for registration only. `NONE` will not invoke this service. Retry and backoff parameters can be configured using the -`ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` keys. - -- *[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*: -this is called once per stream during stream consumer registration, unless the `NONE` registration strategy is configured. -Retry and backoff parameters can be configured using the `ConsumerConfigConstants.REGISTER_STREAM_*` keys. - -- *[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*: -this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. -Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. +Note that there is no state compatibility between the `FlinkKinesisConsumer` and `KinesisStreamsSource`. +To migrate, consider starting the `KinesisStreamsSource` from `AT_TIMESTAMP` slightly before the time when the `FlinkKinesisConsumer` was stopped. +Note that this will result in some re-processed some records. ## Kinesis Streams Sink