Skip to content

Commit

Permalink
Faster metadata - Ian's amendments
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 15, 2024
1 parent 3a513fc commit 245341d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ object Environment {
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled, incomplete.isDefined))
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http4s))
metadata <- metadataReporter[F](file, processor.artifact, http4s)
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
remoteAdapters <- prepareRemoteAdapters[F](file.remoteAdapters, metrics)
adapterRegistry = new AdapterRegistry(remoteAdapters, file.adaptersSchemas)
Expand Down Expand Up @@ -275,11 +275,16 @@ object Environment {
config: ConfigFile,
appName: String,
httpClient: Http4sClient[F]
): F[Metadata[F]] =
config.experimental
.flatMap(_.metadata)
.map(metadataConfig => Metadata.build[F](metadataConfig, Metadata.HttpMetadataReporter[F](metadataConfig, appName, httpClient)))
.getOrElse(Metadata.noop[F].pure[F])
): Resource[F, Metadata[F]] =
config.experimental.flatMap(_.metadata) match {
case Some(metadataConfig) =>
for {
reporter <- Metadata.HttpMetadataReporter.resource(metadataConfig, appName, httpClient)
metadata <- Resource.eval(Metadata.build(metadataConfig, reporter))
} yield metadata
case None =>
Resource.pure(Metadata.noop[F])
}

private implicit class EitherTOps[F[_], E: Show, A](eitherT: EitherT[F, E, A]) {
def resource(implicit F: Sync[F]): Resource[F, A] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object Metadata {
case (event, entitiesAndCount) =>
reporter.report(snapshot.periodStart, snapshot.periodEnd, event, entitiesAndCount)
}
_ <- reporter.flush
_ <- reporter.flush()
} yield ()

trait MetadataReporter[F[_]] {
Expand All @@ -117,17 +117,35 @@ object Metadata {
def flush(): F[Unit]
}

case class HttpMetadataReporter[F[_]: Async](
case class HttpMetadataReporter[F[_]: Sync](
config: MetadataConfig,
appName: String,
client: Client[F]
tracker: Tracker[F]
) extends MetadataReporter[F] {
// Probably shouldn't do this?
// Should this still be a resource?
// How would I release the resource automatically?
val tracker: Resource[F, Tracker[F]] = initTracker(config, appName, client)

def initTracker(
def report(
periodStart: Instant,
periodEnd: Instant,
event: MetadataEvent,
entitiesAndCount: EntitiesAndCount
): F[Unit] =
Logger[F].debug(s"Tracking observed event ${event.schema.toSchemaUri}") >>
tracker.trackSelfDescribingEvent(
mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, entitiesAndCount.count),
mkWebhookContexts(entitiesAndCount.entities).toSeq
)

def flush(): F[Unit] = tracker.flushEmitters()
}

object HttpMetadataReporter {
def resource[F[_]: Async](
config: MetadataConfig,
appName: String,
client: Client[F]
): Resource[F, HttpMetadataReporter[F]] =
initTracker(config, appName, client).map(t => HttpMetadataReporter(config, t))

private def initTracker[F[_]: Async](
config: MetadataConfig,
appName: String,
client: Client[F]
Expand All @@ -143,30 +161,11 @@ object Metadata {
client,
bufferConfig = Emitter.BufferConfig.PayloadSize(100000),
retryPolicy = Emitter.RetryPolicy.MaxAttempts(10),
callback = Some(emitterCallback _)
callback = Some(emitterCallback[F](_, _, _))
)
} yield new Tracker(NonEmptyList.of(emitter), "tracker-metadata", appName)

def report(
periodStart: Instant,
periodEnd: Instant,
event: MetadataEvent,
entitiesAndCount: EntitiesAndCount
): F[Unit] =
tracker.allocated.flatMap { case (t, _) =>
Logger[F].debug(s"Tracking observed event ${event.schema.toSchemaUri}") >>
t.trackSelfDescribingEvent(
mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, entitiesAndCount.count),
mkWebhookContexts(entitiesAndCount.entities).toSeq
)
}

def flush(): F[Unit] =
tracker.allocated.flatMap { case (t, _) =>
t.flushEmitters()
}

private def emitterCallback(
private def emitterCallback[F[_]: Sync](
params: Emitter.EndpointParams,
req: Emitter.Request,
res: Emitter.Result
Expand Down

0 comments on commit 245341d

Please sign in to comment.