Skip to content

Commit

Permalink
DEV-2679 better kinesis source docs (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
zsid60 authored Jan 11, 2024
1 parent b24849a commit b489075
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ schemas
SDK
SearchRequest
SHA
ShardIteratorType
Signifier
signup
SLA
Expand Down
31 changes: 29 additions & 2 deletions docs/examples/api-reference/source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime

import pandas as pd

Expand Down Expand Up @@ -167,8 +167,9 @@ class UserKafkaSourcedDataset:
)
stream = kinesis.stream(
stream_arn="<SOME_STREAM_ARN>",
# Start ingesting from Nov 5, 2023
init_position=InitPosition.AT_TIMESTAMP,
init_timestamp=datetime.now() - timedelta(days=14),
init_timestamp=datetime(2023, 11, 5),
)


Expand All @@ -182,6 +183,32 @@ class UserKinesisSourcedDataset:
...


# /docsnip

# docsnip kinesis_source_latest
from fennel.sources import InitPosition

kinesis = sources.Kinesis(
name="kinesis_src",
role_arn="<SOME_ROLE_ARN>",
)
stream = kinesis.stream(
stream_arn="<SOME_STREAM_ARN>",
# Ingest all new records from now
init_position=InitPosition.LATEST,
)


@source(stream)
@meta(owner="[email protected]")
@dataset
class UserKinesisSourcedDataset2:
uid: int = field(key=True)
email: str
timestamp: datetime
...


# /docsnip

# docsnip s3_delta_lake_source
Expand Down
27 changes: 17 additions & 10 deletions docs/pages/api-reference/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,22 @@ The following fields need to be defined on the topic:

The following fields need to be defined for the source:

1. **`name`** - A name to identify the source. The name should be unique across all sources.
1. `role_arn` - The role that Fennel should use to access the Kinesis stream
2. `stream_arn` - AWS `ARN` of the stream
3. `init_position` - The Kinesis `ShardIterator` type used to begin ingestion. One of `LATEST`, `TRIM_HORIZON` or `AT_TIMESTAMP`
4. `init_timestamp` - If the `init_position` is `AT_TIMESTAMP` this is the datetime at which to begin ingestion
5. `format` - The format of the incoming data. Currently only JSON is supported and `"json"` is specified by default
1. `name: str` - A name to identify the source. The name should be unique across all sources.
1. `role_arn: str` - The role that Fennel should use to access the Kinesis stream
2. `stream_arn: str` - AWS `ARN` of the stream
3. `init_position: fennel.sources.InitPosition` - The Kinesis `ShardIterator` type used to begin ingestion. One of `LATEST`, `TRIM_HORIZON` or `AT_TIMESTAMP`.
Note that for `LATEST`, Fennel will begin consuming records that come into Kinesis a few minutes after `sync()` is called. For a completely deterministic position, use `AT_TIMESTAMP`
See [Kinesis ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax) for more info.
4. `init_timestamp: Optional[datetime]` - If the `init_position` is `AT_TIMESTAMP` this is the datetime at which to begin ingestion. Do not specify this for `LATEST` or `TRIM_HORIZON`
5. `format: Optional[str]` - The format of the incoming data. Currently only JSON is supported and `"json"` is specified by default

**Example Using AT_TIMESTAMP**

<pre snippet="api-reference/source#kinesis_source"></pre>

**Example Using LATEST** (TRIM_HORIZON is used the same way)

<pre snippet="api-reference/source#kinesis_source_latest"></pre>

:::info
Fennel creates a special role with name prefixed by `FennelDataAccessRole-` in your AWS account for role-based access. The `role_arn` specified should have a trust policy allowing this role to assume the kinesis role.
Expand Down Expand Up @@ -245,7 +255,4 @@ Also attach the following permission policy. Add more streams to the Resource fi
}
```

</details>


<pre snippet="api-reference/source#kinesis_source"></pre>
</details>

0 comments on commit b489075

Please sign in to comment.