Skip to content

Commit

Permalink
Updates the S3 source README.md to link to the user documentation and…
Browse files Browse the repository at this point in the history
… retain the developer guide. (opensearch-project#4094)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Feb 13, 2024
1 parent 72e5a92 commit c792812
Showing 1 changed file with 2 additions and 233 deletions.
235 changes: 2 additions & 233 deletions data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
@@ -1,240 +1,9 @@
# S3 Source

This source allows Data Prepper to use S3 as a source. It uses SQS for notifications
of which S3 objects are new and loads those new objects to parse out events.
It supports scan pipeline to scan the data from s3 buckets and loads those new objects to parse out events.
This source ingests data into Data Prepper from [Amazon S3](https://aws.amazon.com/s3/).

## Basic Usage
See the [`s3` source documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) for details on usage.

This source requires an SQS queue which receives
[S3 Event Notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html).
The S3 Source will load S3 objects that have Event notifications for Create events.
A user-specified codec parses the S3 Object and creates Events from them.

Currently, there are three codecs:

* `newline` - Parses files where each single line is a log event.
* `json` - Parses the file for a JSON array. Each object in the JSON array is a log event.
* `csv` - Parses a character separated file. Each line of data is a log event.



The `compression` property defines how to handle compressed S3 objects. It has the following options.

* `none` - The file is not compressed.
* `gzip` - Apply GZip de-compression on the S3 object.
* `automatic` - Attempts to automatically determine the compression. If the S3 object key name ends in`.gz`, then perform `gzip` compression. Otherwise, it is treated as `none`.

### Example: Un-Compressed Logs

The following configuration shows a minimum configuration for reading newline-delimited logs which
are not compressed.

```
source:
s3:
notification_type: "sqs"
codec:
newline:
compression: none
sqs:
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
aws:
region: "us-east-1"
sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"
```

The following configuration shows a minimum configuration for reading content using S3 select service and Scanning from S3 bucket which
are not compressed.

```
source-pipeline:
source:
s3:
notification_type: sqs
compression: none
codec:
newline:
s3_select:
expression: "select * from s3object s LIMIT 10000"
expression_type: SQL
input_serialization: csv
compression_type: none
csv:
file_header_info: use
quote_escape:
comments:
json:
type: document
sqs:
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
aws:
region: "us-east-1"
sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"
scan:
start_time: now
end_time: 2023-12-31T11:59:59
buckets:
- bucket:
name: my-bucket-1
filter:
include_prefix:
- bucket2/
exclude_suffix:
- .jpeg
- .png
```

## Configuration Options

All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").

* `s3_select` : S3 Select Configuration. See [S3 Select Configuration](#s3_select_configuration) for details

* `notification_type` (Optional) : Must be `sqs`.

* `notification_source` (Optional): Provide how the notifications are generated. Must be either `s3` or `eventbridge`. Defaults to `s3`. `s3` represents notifications that are sent S3 to SQS directly or fanout pattern from S3 to SNS to SQS. `eventbridge` represents notifications which are sent from S3 to EventBridge to SQS. Only `Object Created` events are supported.

* `compression` (Optional) : The compression algorithm to apply. May be one of: `none`, `gzip`, or `automatic`. Defaults to `none`.

* `codec` (Required) : The codec to apply. Must be either `newline`, `csv` or `json`.

* `sqs` (Optional) : The SQS configuration. See [SQS Configuration](#sqs_configuration) for details.

* `scan` (Optional): S3 Scan Configuration. See [S3 Scan Configuration](#s3_scan_configuration) for details

* `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details.

* `acknowledgments` (Optional) : Enables End-to-end acknowledgments. If set to `true`, sqs message is deleted only after all events from the sqs message are successfully acknowledged by all sinks. Default value `false`.

* `on_error` (Optional) : Determines how to handle errors in SQS. Can be either `retain_messages` or `delete_messages`. If `retain_messages`, then Data Prepper will leave the message in the SQS queue and try again. This is recommended for dead-letter queues. If `delete_messages`, then Data Prepper will delete failed messages. Defaults to `retain_messages`.

* `buffer_timeout` (Optional) : Duration - The timeout for writing events to the Data Prepper buffer. Any Events which the S3 Source cannot write to the Buffer in this time will be discarded. Defaults to 10 seconds.

* `records_to_accumulate` (Optional) : The number of messages to write to accumulate before writing to the Buffer. Defaults to 100.

* `metadata_root_key` (Optional) : String - Sets the base key for adding S3 metadata to each Event. The metadata includes the `key` and `bucket` for each S3 object. Defaults to `s3/`.

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

* `delete_s3_objects_on_read` (Optional) : Boolean - If set to true, then the S3 Scan will attempt to delete S3 objects after all the events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled for deleting S3 objects. Defaults to `false`.

### <a name="s3_select_configuration">S3 Select Configuration</a>

* `expression` (Required if s3_select enabled) : Provide s3 select query to process the data using S3 select for the particular bucket.

* `expression_type` (Optional if s3_select enabled) : Provide s3 select query type to process the data using S3 select for the particular bucket.

* `compression_type` (Optional if s3_select enabled) : The compression algorithm to apply. May be one of: `none`, `gzip`. Defaults to `none`.

* `input_serialization` (Required if s3_select enabled) : Provide the s3 select file format (csv/json/Apache Parquet) Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. You must also specify the data serialization format for the response.

* `csv` (Optional) : Provide the csv configuration to process the csv data.

* `file_header_info` (Required if csv block is enabled) : Provide CSV Header example : `use` , `none` , `ignore`. Default is `use`.

* `quote_escape` (Optional) : Provide quote_escape attribute example : `,` , `.`.

* `comments` (Optional) : Provide comments attribute example : `#`. Default is `#`.

* `json` (Optional) : Provide the json configuration to process the json data.

* `type` (Optional) : Provide the type attribute to process the json type data example: `Lines` , `Document` Default is `Document`.

### <a name="sqs_configuration">SQS Configuration</a>

* `queue_url` (Required) : The SQS queue URL of the queue to read from.
* `maximum_messages` (Optional) : Duration - The maximum number of messages to read from the queue in any request to the SQS queue. Defaults to 10.
* `visibility_timeout` (Optional) : Duration - The visibility timeout to apply to messages read from the SQS queue. This should be set to the amount of time that Data Prepper may take to read all the S3 objects in a batch. Defaults to 30 seconds.
* `wait_time` (Optional) : Duration - The time to wait for long-polling on the SQS API. Defaults to 20 seconds.
* `poll_delay` (Optional) : Duration - A delay to place between reading and processing a batch of SQS messages and making a subsequent request. Defaults to 0 seconds.

### <a name="s3_scan_configuration">S3 Scan Configuration</a>
* `start_time` (Optional) : Provide the start time to scan objects from all the buckets. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, or it can be configured to `now` keyword which represents current LocalDateTime. This parameter defines a time range together with either end_time or range. Examples: `2023-01-23T10:00:00`, `now`.
* `end_time` (Optional) : Provide the end time to scan objects from all the buckets. This should follow [ISO LocalDateTime](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_LOCAL_DATE_TIME) format, or it can be configured to `now` keyword which represents current LocalDateTime. This parameter defines a time range together with either start_time or range. Examples: `2023-01-23T10:00:00`, `now`.
* `range` (Optional) : Provide the duration to scan objects from all the buckets. This parameter defines a time range together with either `start_time` or `end_time`.
* `scheduling` (Optional): See [Scheduling Configuration](#scheduling_configuration) for details
* `bucket`: Provide S3 bucket information
* `name` (Required if bucket block is used): Provide S3 bucket name.
* `filter` (Optional) : Provide include and exclude list items to filter objects in bucket.
* `include_prefix` (Optional) : Provide the list of include key path prefix. For example `dlq/`
* `exclude_suffix` (Optional) : Provide the list of suffix to exclude items. For example `.csv`.
* `start_time` (Optional) : Provide the start time to scan objects from the current bucket. This parameter defines a time range together with either end_time or range. Example: `2023-01-23T10:00:00`.
* `end_time` (Optional) : Provide the end time to scan objects from the current bucket. This parameter defines a time range together with either start_time or range. Example: `2023-01-23T10:00:00`.
* `range` (Optional) : Provide the duration to scan objects from the current bucket. This parameter defines a time range together with either start_time or end_time.

> Note: If a time range is not specified, all objects will be included by default. To set a time range, specify any two and only two configurations from start_time, end_time and range. The time range configured on a specific bucket will override the time range specified on the top level
### <a name="scheduling_configuration">Scheduling Configuration</a>

Schedule interval and amount of times a S3 bucket should be scanned when using S3 Scan. For example,
a `interval` of `PT1H` and a `count` of `3` would result in each bucket being scanned 3 times with 1 hour interval in between each scan, starting after source is ready
and then every hour after the first scan.
* `interval` (Optional) : A String that indicates the minimum interval between each scan. If objects from fist scan are not proceed within configured interval, scan will be done whenever there are no pending objects to process from previous scan.
Supports ISO_8601 notation Strings ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").
Defaults to 8 hours, and is only applicable when `count` is greater than 1.
* `count` (Optional) : An Integer that specifies how many times bucket will be scanned. Defaults to 1.


### <a name="aws_configuration">AWS Configuration</a>

The AWS configuration is the same for both SQS and S3.

* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_external_id` (Optional) : The external ID to attach to AssumeRole requests.

The following policy shows the necessary permissions for S3 source. `kms:Decrypt` is required if SQS queue is encrypted with AWS [KMS](https://aws.amazon.com/kms/).
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3policy",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket",
"s3:DeleteObject",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"kms:Decrypt"
],
"Resource": "*"
}
]
}
```
* `aws_sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.

## Metrics

### Counters

* `s3ObjectsFailed` - The number of S3 objects that the S3 Source failed to read.
* `s3ObjectsNotFound` - The number of S3 objects that the S3 Source failed to read due to a Not Found error from S3. These are also counted toward `s3ObjectsFailed`.
* `s3ObjectsAccessDenied` - The number of S3 objects that the S3 Source failed to read due to an Access Denied or Forbidden error. These are also counted toward `s3ObjectsFailed`.
* `s3ObjectNoRecordsFound` - The number of S3 objects that resulted in 0 records added to the buffer.
* `s3ObjectsSucceeded` - The number of S3 objects that the S3 Source successfully read.
* `sqsMessagesReceived` - The number of SQS messages received from the queue by the S3 Source.
* `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the S3 Source.
* `sqsMessagesFailed` - The number of SQS messages that the S3 Source failed to parse.
* `sqsMessagesDeleteFailed` - The number of SQS messages that the S3 Source failed to delete from the SQS queue.
* `s3ObjectsDeleted` - The number of S3 objects deleted by the S3 source.
* `s3ObjectsDeleteFailed` - The number of S3 objects that the S3 source failed to delete.
* `acknowledgementSetCallbackCounter` - The number of times End-to-end acknowledgments created an acknowledgment set.


### Timers

* `s3ObjectReadTimeElapsed` - Measures the time the S3 Source takes to perform a request to GET an S3 object, parse it, and write Events to the buffer.
* `sqsMessageDelay` - Measures the time from when S3 records an event time for the creation of an object to when it was fully parsed.

### Distribution Summaries

* `s3ObjectSizeBytes` - Measures the size of S3 objects as reported by the S3 `Content-Length`. For compressed objects, this is the compressed size.
* `s3ObjectProcessedBytes` - Measures the bytes processed by the S3 source for a given object. For compressed objects, this is the un-compressed size.
* `s3ObjectsEvents` - Measures the number of events (sometimes called records) produced by an S3 object.

## Developer Guide

Expand Down

0 comments on commit c792812

Please sign in to comment.