Skip to content

Commit

Permalink
Upgrade common-streams to 0.8.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Sep 16, 2024
1 parent 32a1a0e commit 1ec1e0b
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 15 deletions.
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
12 changes: 7 additions & 5 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
# -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll.
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

"output": {
Expand Down Expand Up @@ -139,6 +134,13 @@
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@
# -- indicates an error that needs addressing.
# -- Change to `false` so events go the failed events stream instead of crashing the loader.
"exitOnMissingIgluSchema": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {
Expand Down
6 changes: 6 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
"legacyColumns": []
"exitOnMissingIgluSchema": true

"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {
"statsd": ${snowplow.defaults.statsd}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ case class Config[+Source, +Sink](
license: AcceptedLicense,
skipSchemas: List[SchemaCriterion],
legacyColumns: List[SchemaCriterion],
exitOnMissingIgluSchema: Boolean
exitOnMissingIgluSchema: Boolean,
maxConnectionsPerServer: Int
)

object Config {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ package com.snowplowanalytics.snowplow.bigquery

import cats.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry
import retry.RetryPolicy

Expand All @@ -21,7 +19,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, BigQueryUtils, TableManager, Writer}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}

case class Environment[F[_]](
appInfo: AppInfo,
Expand Down Expand Up @@ -55,7 +53,7 @@ object Environment {
sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](HttpClient.Config(config.main.maxConnectionsPerServer))
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object Processing {
}

/** Transform the Event into values compatible with the BigQuery sdk */
private def transform[F[_]: Sync: RegistryLookup](
private def transform[F[_]: Async: RegistryLookup](
env: Environment[F],
badProcessor: BadRowProcessor
): Pipe[F, Batched, BatchAfterTransform] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import eu.timepit.refined.types.all.PosInt
import org.http4s.implicits.http4sLiteralsSyntax
import org.specs2.Specification

Expand Down Expand Up @@ -67,7 +66,6 @@ object KinesisConfigSpec {
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down Expand Up @@ -135,7 +133,6 @@ object KinesisConfigSpec {
workerIdentifier = "test-hostname",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Dependencies {
val bigquery = "2.34.2"

// Snowplow
val streams = "0.8.0-M2"
val streams = "0.8.0-M4"
val igluClient = "3.1.0"

// tests
Expand Down

0 comments on commit 1ec1e0b

Please sign in to comment.