From b4890753a82d9da41ec68a1efc1588942a95ba20 Mon Sep 17 00:00:00 2001 From: Zaki Siddiqui Date: Thu, 11 Jan 2024 00:13:02 -0800 Subject: [PATCH] DEV-2679 better kinesis source docs (#334) --- .wordlist.txt | 1 + docs/examples/api-reference/source.py | 31 +++++++++++++++++++++++++-- docs/pages/api-reference/sources.md | 27 ++++++++++++++--------- 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/.wordlist.txt b/.wordlist.txt index 396cfbbc0..7a79fb133 100644 --- a/.wordlist.txt +++ b/.wordlist.txt @@ -168,6 +168,7 @@ schemas SDK SearchRequest SHA +ShardIteratorType Signifier signup SLA diff --git a/docs/examples/api-reference/source.py b/docs/examples/api-reference/source.py index d96ee12d0..04d66c90d 100644 --- a/docs/examples/api-reference/source.py +++ b/docs/examples/api-reference/source.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime import pandas as pd @@ -167,8 +167,9 @@ class UserKafkaSourcedDataset: ) stream = kinesis.stream( stream_arn="", + # Start ingesting from Nov 5, 2023 init_position=InitPosition.AT_TIMESTAMP, - init_timestamp=datetime.now() - timedelta(days=14), + init_timestamp=datetime(2023, 11, 5), ) @@ -182,6 +183,32 @@ class UserKinesisSourcedDataset: ... +# /docsnip + +# docsnip kinesis_source_latest +from fennel.sources import InitPosition + +kinesis = sources.Kinesis( + name="kinesis_src", + role_arn="", +) +stream = kinesis.stream( + stream_arn="", + # Ingest all new records from now + init_position=InitPosition.LATEST, +) + + +@source(stream) +@meta(owner="abc@email.com") +@dataset +class UserKinesisSourcedDataset2: + uid: int = field(key=True) + email: str + timestamp: datetime + ... + + # /docsnip # docsnip s3_delta_lake_source diff --git a/docs/pages/api-reference/sources.md b/docs/pages/api-reference/sources.md index 779d0abc0..6580e797a 100644 --- a/docs/pages/api-reference/sources.md +++ b/docs/pages/api-reference/sources.md @@ -182,12 +182,22 @@ The following fields need to be defined on the topic: The following fields need to be defined for the source: -1. **`name`** - A name to identify the source. The name should be unique across all sources. -1. `role_arn` - The role that Fennel should use to access the Kinesis stream -2. `stream_arn` - AWS `ARN` of the stream -3. `init_position` - The Kinesis `ShardIterator` type used to begin ingestion. One of `LATEST`, `TRIM_HORIZON` or `AT_TIMESTAMP` -4. `init_timestamp` - If the `init_position` is `AT_TIMESTAMP` this is the datetime at which to begin ingestion -5. `format` - The format of the incoming data. Currently only JSON is supported and `"json"` is specified by default +1. `name: str` - A name to identify the source. The name should be unique across all sources. +1. `role_arn: str` - The role that Fennel should use to access the Kinesis stream +2. `stream_arn: str` - AWS `ARN` of the stream +3. `init_position: fennel.sources.InitPosition` - The Kinesis `ShardIterator` type used to begin ingestion. One of `LATEST`, `TRIM_HORIZON` or `AT_TIMESTAMP`. +Note that for `LATEST`, Fennel will begin consuming records that come into Kinesis a few minutes after `sync()` is called. For a completely deterministic position, use `AT_TIMESTAMP` +See [Kinesis ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax) for more info. +4. `init_timestamp: Optional[datetime]` - If the `init_position` is `AT_TIMESTAMP` this is the datetime at which to begin ingestion. Do not specify this for `LATEST` or `TRIM_HORIZON` +5. `format: Optional[str]` - The format of the incoming data. Currently only JSON is supported and `"json"` is specified by default + +**Example Using AT_TIMESTAMP** + +

+
+**Example Using LATEST** (TRIM_HORIZON is used the same way)
+
+

 
 :::info
 Fennel creates a special role with name prefixed by `FennelDataAccessRole-` in your AWS account for role-based access. The `role_arn` specified should have a trust policy allowing this role to assume the kinesis role.
@@ -245,7 +255,4 @@ Also attach the following permission policy. Add more streams to the Resource fi
 }
 ```
 
-
-
-
-

\ No newline at end of file
+
\ No newline at end of file