Skip to content

Commit

Permalink
Add configuration parsing specifications.
Browse files Browse the repository at this point in the history
We make sure all default, minimal and reference configuration are parsed correctly for Azure/GCP/AWS.
  • Loading branch information
pondzix committed Mar 6, 2024
1 parent 016b844 commit 7ae3f3c
Show file tree
Hide file tree
Showing 16 changed files with 652 additions and 25 deletions.
2 changes: 1 addition & 1 deletion config/config.azure.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

"good": {
"project": "my-project"
"dataset": "snowplow"
"dataset": "my-dataset"
}

"bad": {
Expand Down
16 changes: 13 additions & 3 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"project": "my-project"

# -- the BigQuery dataset to which events will be loaded
"dataset": "my-project"
"dataset": "my-dataset"

# -- the table within the BigQuery dataset to which events will be loaded
"table": "events"
Expand Down Expand Up @@ -84,7 +84,7 @@
# -- Configures backoff when waiting for the BigQuery Writer to detect that we have altered the
# -- table by adding new columns
"alterTableWait": {
"delay": 1 second"
"delay": "1 second"
}
}

Expand All @@ -105,7 +105,7 @@
"period": "1 minute"

# -- Prefix used for the metric name when sending to statsd
"prefix": "snowplow.bigquery-loader"
"prefix": "snowplow.bigquery.loader"
}
}

Expand All @@ -118,6 +118,16 @@
"myTag": "xyz"
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
}

# -- Optional, configure telemetry
Expand Down
2 changes: 1 addition & 1 deletion config/config.kinesis.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

"good": {
"project": "my-project"
"dataset": "snowplow"
"dataset": "my-dataset"
}

"bad": {
Expand Down
16 changes: 13 additions & 3 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"project": "my-project"

# -- the BigQuery dataset to which events will be loaded
"dataset": "my-project"
"dataset": "my-dataset"

# -- the table within the BigQuery dataset to which events will be loaded
"table": "events"
Expand Down Expand Up @@ -106,7 +106,7 @@
# -- Configures backoff when waiting for the BigQuery Writer to detect that we have altered the
# -- table by adding new columns
"alterTableWait": {
"delay": 1 second"
"delay": "1 second"
}
}

Expand All @@ -127,7 +127,7 @@
"period": "1 minute"

# -- Prefix used for the metric name when sending to statsd
"prefix": "snowplow.bigquery-loader"
"prefix": "snowplow.bigquery.loader"
}
}

Expand All @@ -140,6 +140,16 @@
"myTag": "xyz"
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
}

# -- Optional, configure telemetry
Expand Down
4 changes: 2 additions & 2 deletions config/config.pubsub.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

"good": {
"project": "my-project"
"dataset": "snowplow"
"dataset": "my-dataset"
}

"bad": {
"topic": "projects/myproject/topics/snowplow-bad"
"topic": "projects/my-project/topics/snowplow-bad"
}

}
Expand Down
20 changes: 15 additions & 5 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

"input": {
# -- pubsub subscription for the source of enriched events
"subscription": "projects/myproject/subscriptions/snowplow-enriched"
"subscription": "projects/my-project/subscriptions/snowplow-enriched"

# -- How many threads are used by the pubsub client library for fetching events
"parallelPullCount": 3
Expand All @@ -32,7 +32,7 @@
"project": "my-project"

# -- the BigQuery dataset to which events will be loaded
"dataset": "my-project"
"dataset": "my-dataset"

# -- the table within the BigQuery dataset to which events will be loaded
"table": "events"
Expand All @@ -44,7 +44,7 @@

"bad": {
# -- output pubsub topic for emitting failed events that could not be processed
"topic": "projects/myproject/topics/snowplow-bad"
"topic": "projects/my-project/topics/snowplow-bad"

# -- Failed sends events to pubsub in batches not exceeding this size.
"batchSize": 100
Expand Down Expand Up @@ -86,7 +86,7 @@
# -- Configures backoff when waiting for the BigQuery Writer to detect that we have altered the
# -- table by adding new columns
"alterTableWait": {
"delay": 1 second"
"delay": "1 second"
}
}

Expand All @@ -107,7 +107,7 @@
"period": "1 minute"

# -- Prefix used for the metric name when sending to statsd
"prefix": "snowplow.bigquery-loader"
"prefix": "snowplow.bigquery.loader"
}
}

Expand All @@ -120,6 +120,16 @@
"myTag": "xyz"
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}
}

# -- Optional, configure telemetry
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"gcpUserAgent": {
"productName": "Snowplow OSS"
"productName": ${?GCP_USER_AGENT_PRODUCT_NAME}
"productVersion": "lake-loader"
"productVersion": "bigquery-loader"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.Caster

import scala.jdk.CollectionConverters._

/** Converts schema-ddl values into objects which are compatible with the snowflake ingest sdk */
/** Converts schema-ddl values into objects which are compatible with the BigQuery sdk */
private[processing] object BigQueryCaster extends Caster[AnyRef] {

override def nullValue: Null = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ object Processing {
} yield ParseResult(events, badRows, numBytes, token)
}

/** Transform the Event into values compatible with the snowflake ingest sdk */
/** Transform the Event into values compatible with the BigQuery sdk */
private def transform[F[_]: Sync: RegistryLookup](
env: Environment[F],
badProcessor: BadRowProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ object MockEnvironment {
}

val appInfo = new AppInfo {
def name = "snowflake-loader-test"
def name = "bigquery-loader-test"
def version = "0.0.0"
def dockerAlias = "snowplow/snowflake-loader-test:0.0.0"
def dockerAlias = "snowplow/bigquery-loader-test:0.0.0"
def cloud = "OnPrem"
}

Expand Down
Loading

0 comments on commit 7ae3f3c

Please sign in to comment.