diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index 0858b1f64..0d9962cc8 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -213,7 +213,7 @@ object Environment { igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled, incomplete.isDefined)) - metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http4s)) + metadata <- metadataReporter[F](file, processor.artifact, http4s) assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache) remoteAdapters <- prepareRemoteAdapters[F](file.remoteAdapters, metrics) adapterRegistry = new AdapterRegistry(remoteAdapters, file.adaptersSchemas) @@ -275,11 +275,16 @@ object Environment { config: ConfigFile, appName: String, httpClient: Http4sClient[F] - ): F[Metadata[F]] = - config.experimental - .flatMap(_.metadata) - .map(metadataConfig => Metadata.build[F](metadataConfig, Metadata.HttpMetadataReporter[F](metadataConfig, appName, httpClient))) - .getOrElse(Metadata.noop[F].pure[F]) + ): Resource[F, Metadata[F]] = + config.experimental.flatMap(_.metadata) match { + case Some(metadataConfig) => + for { + reporter <- Metadata.HttpMetadataReporter.resource(metadataConfig, appName, httpClient) + metadata <- Resource.eval(Metadata.build(metadataConfig, reporter)) + } yield metadata + case None => + Resource.pure(Metadata.noop[F]) + } private implicit class EitherTOps[F[_], E: Show, A](eitherT: EitherT[F, E, A]) { def resource(implicit F: Sync[F]): Resource[F, A] = { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala index 432e35315..af134d49a 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala @@ -103,7 +103,7 @@ object Metadata { case (event, entitiesAndCount) => reporter.report(snapshot.periodStart, snapshot.periodEnd, event, entitiesAndCount) } - _ <- reporter.flush + _ <- reporter.flush() } yield () trait MetadataReporter[F[_]] { @@ -117,17 +117,35 @@ object Metadata { def flush(): F[Unit] } - case class HttpMetadataReporter[F[_]: Async]( + case class HttpMetadataReporter[F[_]: Sync]( config: MetadataConfig, - appName: String, - client: Client[F] + tracker: Tracker[F] ) extends MetadataReporter[F] { - // Probably shouldn't do this? - // Should this still be a resource? - // How would I release the resource automatically? - val tracker: Resource[F, Tracker[F]] = initTracker(config, appName, client) - def initTracker( + def report( + periodStart: Instant, + periodEnd: Instant, + event: MetadataEvent, + entitiesAndCount: EntitiesAndCount + ): F[Unit] = + Logger[F].debug(s"Tracking observed event ${event.schema.toSchemaUri}") >> + tracker.trackSelfDescribingEvent( + mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, entitiesAndCount.count), + mkWebhookContexts(entitiesAndCount.entities).toSeq + ) + + def flush(): F[Unit] = tracker.flushEmitters() + } + + object HttpMetadataReporter { + def resource[F[_]: Async]( + config: MetadataConfig, + appName: String, + client: Client[F] + ): Resource[F, HttpMetadataReporter[F]] = + initTracker(config, appName, client).map(t => HttpMetadataReporter(config, t)) + + private def initTracker[F[_]: Async]( config: MetadataConfig, appName: String, client: Client[F] @@ -143,30 +161,11 @@ object Metadata { client, bufferConfig = Emitter.BufferConfig.PayloadSize(100000), retryPolicy = Emitter.RetryPolicy.MaxAttempts(10), - callback = Some(emitterCallback _) + callback = Some(emitterCallback[F](_, _, _)) ) } yield new Tracker(NonEmptyList.of(emitter), "tracker-metadata", appName) - def report( - periodStart: Instant, - periodEnd: Instant, - event: MetadataEvent, - entitiesAndCount: EntitiesAndCount - ): F[Unit] = - tracker.allocated.flatMap { case (t, _) => - Logger[F].debug(s"Tracking observed event ${event.schema.toSchemaUri}") >> - t.trackSelfDescribingEvent( - mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, entitiesAndCount.count), - mkWebhookContexts(entitiesAndCount.entities).toSeq - ) - } - - def flush(): F[Unit] = - tracker.allocated.flatMap { case (t, _) => - t.flushEmitters() - } - - private def emitterCallback( + private def emitterCallback[F[_]: Sync]( params: Emitter.EndpointParams, req: Emitter.Request, res: Emitter.Result