From 1ec1e0b20a7e2e2478ba93842cf4422fc35e3235 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Mon, 16 Sep 2024 13:47:43 +0300 Subject: [PATCH] Upgrade common-streams to 0.8.0-M4 --- config/config.azure.reference.hocon | 7 +++++++ config/config.kinesis.reference.hocon | 12 +++++++----- config/config.pubsub.reference.hocon | 7 +++++++ modules/core/src/main/resources/reference.conf | 6 ++++++ .../Config.scala | 3 ++- .../Environment.scala | 6 ++---- .../processing/Processing.scala | 2 +- .../snowplow/bigquery/KinesisConfigSpec.scala | 3 --- project/Dependencies.scala | 2 +- 9 files changed, 33 insertions(+), 15 deletions(-) diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 9545c131..2175dd2e 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -117,6 +117,13 @@ # -- Change to `false` so events go the failed events stream instead of crashing the loader. "exitOnMissingIgluSchema": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 4456af30..2e793783 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -31,11 +31,6 @@ # -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll. "maxRecords": 1000 } - - # -- The number of batches of events which are pre-fetched from kinesis. - # -- Increasing this above 1 is not known to improve performance. - "bufferSize": 1 - } "output": { @@ -139,6 +134,13 @@ # -- Change to `false` so events go the failed events stream instead of crashing the loader. "exitOnMissingIgluSchema": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index d4710624..15bfdcb0 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -119,6 +119,13 @@ # -- indicates an error that needs addressing. # -- Change to `false` so events go the failed events stream instead of crashing the loader. "exitOnMissingIgluSchema": true + + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } "monitoring": { "metrics": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 13468414..714366e0 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -44,6 +44,12 @@ "legacyColumns": [] "exitOnMissingIgluSchema": true + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { "statsd": ${snowplow.defaults.statsd} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala index ab7a30f5..36d9ecba 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala @@ -32,7 +32,8 @@ case class Config[+Source, +Sink]( license: AcceptedLicense, skipSchemas: List[SchemaCriterion], legacyColumns: List[SchemaCriterion], - exitOnMissingIgluSchema: Boolean + exitOnMissingIgluSchema: Boolean, + maxConnectionsPerServer: Int ) object Config { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala index b3ba9f4a..13c6680c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala @@ -10,9 +10,7 @@ package com.snowplowanalytics.snowplow.bigquery import cats.implicits._ import cats.effect.{Async, Resource, Sync} -import cats.effect.unsafe.implicits.global import org.http4s.client.Client -import org.http4s.blaze.client.BlazeClientBuilder import io.sentry.Sentry import retry.RetryPolicy @@ -21,7 +19,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, BigQueryUtils, TableManager, Writer} -import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook} case class Environment[F[_]]( appInfo: AppInfo, @@ -55,7 +53,7 @@ object Environment { sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy) appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter))) resolver <- mkResolver[F](config.iglu) - httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource + httpClient <- HttpClient.resource[F](HttpClient.Config(config.main.maxConnectionsPerServer)) _ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth) _ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth) badSink <- diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index 355308d4..07452e24 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -141,7 +141,7 @@ object Processing { } /** Transform the Event into values compatible with the BigQuery sdk */ - private def transform[F[_]: Sync: RegistryLookup]( + private def transform[F[_]: Async: RegistryLookup]( env: Environment[F], badProcessor: BadRowProcessor ): Pipe[F, Batched, BatchAfterTransform] = diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala index 20e52909..a65900c7 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala @@ -20,7 +20,6 @@ import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Retrying, Telemetry, Webhook} import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig} import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig -import eu.timepit.refined.types.all.PosInt import org.http4s.implicits.http4sLiteralsSyntax import org.specs2.Specification @@ -67,7 +66,6 @@ object KinesisConfigSpec { workerIdentifier = "test-hostname", initialPosition = KinesisSourceConfig.InitialPosition.Latest, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, cloudwatchCustomEndpoint = None, @@ -135,7 +133,6 @@ object KinesisConfigSpec { workerIdentifier = "test-hostname", initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, cloudwatchCustomEndpoint = None, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7657bcbe..c0c3cb97 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,7 +29,7 @@ object Dependencies { val bigquery = "2.34.2" // Snowplow - val streams = "0.8.0-M2" + val streams = "0.8.0-M4" val igluClient = "3.1.0" // tests