diff --git a/example/src/main/scala/example/observers.scala b/example/src/main/scala/example/observers.scala index a166d027d..aac983e73 100644 --- a/example/src/main/scala/example/observers.scala +++ b/example/src/main/scala/example/observers.scala @@ -32,7 +32,7 @@ object observers { val cloudwatch: CloudWatchObserver[IO] = CloudWatchObserver(CloudWatch[IO](_.region(Region.AP_SOUTHEAST_2))) .includeHistogram(_.withP50.withP95.withMax) - .includeDimensions(_.withServiceID.withServiceName.withDigest) + .includeDimensions(_.withServiceID.withServiceName) .unifyMeasurementUnit(_.withInfoUnit(_.BYTES)) val sqsObserver: SqsObserver[IO] = diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Batch.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Batch.scala index 78234a29d..bfa8a0ee3 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Batch.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Batch.scala @@ -118,7 +118,7 @@ object Batch { for { meas <- measure(jobs.size, BatchKind.Quasi, mode) case (fd, details) <- Resource.eval(exec(meas)) - } yield List(QuasiResult(fd.toJava, mode, details.sortBy(_.job.index))) + } yield List(QuasiResult(metrics.metricLabel, fd.toJava, mode, details.sortBy(_.job.index))) } override val fully: Resource[F, List[A]] = { @@ -156,6 +156,7 @@ object Batch { details <- Resource.eval(exec(meas)) } yield List( QuasiResult( + label = metrics.metricLabel, spent = details.map(_.took).foldLeft(Duration.ZERO)(_ plus _), mode = mode, details = details.sortBy(_.job.index) diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/BatchJob.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/BatchJob.scala index 2d4be09d3..ea093ac1b 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/BatchJob.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/BatchJob.scala @@ -1,6 +1,7 @@ package com.github.chenharryhua.nanjin.guard.action import cats.Show import cats.syntax.all.* +import com.github.chenharryhua.nanjin.guard.config.MetricLabel import com.github.chenharryhua.nanjin.guard.translator.fmt import io.circe.syntax.EncoderOps import io.circe.{Encoder, Json} @@ -34,11 +35,12 @@ object BatchMode { final case class BatchJob(name: Option[String], index: Int) final case class Detail(job: BatchJob, took: Duration, done: Boolean) -final case class QuasiResult(spent: Duration, mode: BatchMode, details: List[Detail]) +final case class QuasiResult(label: MetricLabel, spent: Duration, mode: BatchMode, details: List[Detail]) object QuasiResult { implicit val encoderQuasiResult: Encoder[QuasiResult] = { (a: QuasiResult) => val (done, fail) = a.details.partition(_.done) Json.obj( + "batch" -> Json.fromString(a.label.label), "mode" -> a.mode.asJson, "spent" -> Json.fromString(fmt.format(a.spent)), "done" -> Json.fromInt(done.length), diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Herald.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Herald.scala index e4ae100e5..e4ed5f561 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Herald.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Herald.scala @@ -71,7 +71,7 @@ object Herald { private def toText(sm: ServiceMessage): String = { val color = ColorScheme.decorate(sm).run(textHelper.consoleColor).value - val msg = jsonHelper.jsonServiceMessage(sm).noSpaces + val msg = jsonHelper.json_service_message(sm).noSpaces s"${sm.timestamp.format(fmt)} Console $color - $msg" } diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Retry.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Retry.scala index db69361ff..ec5a42b1c 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Retry.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/action/Retry.scala @@ -1,5 +1,6 @@ package com.github.chenharryhua.nanjin.guard.action +import cats.data.{EitherT, Kleisli} import cats.effect.Temporal import cats.implicits.{toFlatMapOps, toFunctorOps} import com.github.chenharryhua.nanjin.common.chrono.{Tick, TickStatus} @@ -10,6 +11,8 @@ sealed trait Retry[F[_]] { def apply[A](arrow: (Tick, Option[Throwable]) => F[Either[Throwable, A]]): F[A] def apply[A](tfa: Tick => F[A]): F[A] def apply[A](fa: F[A]): F[A] + + def apply[A](mt: Kleisli[EitherT[F, Throwable, *], (Tick, Option[Throwable]), A]): F[A] } object Retry { @@ -44,5 +47,8 @@ object Retry { override def apply[A](arrow: (Tick, Option[Throwable]) => F[Either[Throwable, A]]): F[A] = comprehensive(arrow) + + override def apply[A](mt: Kleisli[EitherT[F, Throwable, *], (Tick, Option[Throwable]), A]): F[A] = + comprehensive((t: Tick, o: Option[Throwable]) => mt.run((t, o)).value) } } diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/config/MetricID.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/config/MetricID.scala index f138e78b1..42a810e81 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/config/MetricID.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/config/MetricID.scala @@ -5,7 +5,6 @@ import com.github.chenharryhua.nanjin.guard.event.MeasurementUnit import enumeratum.{CirceEnum, Enum, EnumEntry} import io.circe.Encoder import io.circe.generic.JsonCodec -import org.apache.commons.codec.digest.DigestUtils import java.util.UUID import scala.concurrent.duration.FiniteDuration @@ -85,20 +84,7 @@ object MetricName { } @JsonCodec -final case class MetricLabel private (label: String, digest: String, measurement: String) -object MetricLabel { - def apply(serviceParams: ServiceParams, measurement: Measurement, label: String): MetricLabel = { - val full_name: List[String] = - serviceParams.taskName.value :: serviceParams.serviceName.value :: measurement.value :: label :: Nil - val digest = DigestUtils.sha256Hex(full_name.mkString("/")).take(8) - - MetricLabel( - label = label, - digest = digest, - measurement = measurement.value - ) - } -} +final case class MetricLabel(label: String, measurement: Measurement) @JsonCodec final case class MetricID(metricLabel: MetricLabel, metricName: MetricName, category: Category) { diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/MetricSnapshot.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/MetricSnapshot.scala index e1e946959..ac42c4db8 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/MetricSnapshot.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/MetricSnapshot.scala @@ -10,6 +10,7 @@ import org.typelevel.cats.time.instances.duration import squants.time.{Frequency, Hertz} import java.time.Duration +import java.util.UUID import scala.jdk.CollectionConverters.* sealed trait Snapshot extends Product with Serializable { def metricId: MetricID } @@ -94,12 +95,19 @@ final case class MetricSnapshot( histograms.map(_.metricId) def hasDuplication: Boolean = { - val stable = metricIDs.map(id => (id.metricLabel, id.metricName.name, id.category)) + val stable = metricIDs.map(id => (id.metricLabel, id.metricName.name)) stable.distinct.size != stable.size } + + def lookupCount: Map[UUID, Long] = { + meters.map(m => m.metricId.metricName.uuid -> m.meter.sum) ::: + timers.map(t => t.metricId.metricName.uuid -> t.timer.calls) ::: + histograms.map(h => h.metricId.metricName.uuid -> h.histogram.updates) + }.toMap } object MetricSnapshot extends duration { + val empty: MetricSnapshot = MetricSnapshot(Nil, Nil, Nil, Nil, Nil) def counters(metricRegistry: MetricRegistry): List[Snapshot.Counter] = metricRegistry.getCounters().asScala.toList.mapFilter { case (name, counter) => diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/NJEvent.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/NJEvent.scala index c0ce74330..29c250954 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/NJEvent.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/NJEvent.scala @@ -48,6 +48,7 @@ object NJEvent { final case class MetricReport( index: MetricIndex, serviceParams: ServiceParams, + previous: MetricSnapshot, snapshot: MetricSnapshot, timestamp: ZonedDateTime) // land time extends MetricEvent diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/eventFilters.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/eventFilters.scala index 02477a0ed..07ad7d7dc 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/eventFilters.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/event/eventFilters.scala @@ -21,7 +21,7 @@ object eventFilters { */ def sampling(interval: FiniteDuration)(evt: NJEvent): Boolean = evt match { - case MetricReport(mrt, sp, _, _) => + case MetricReport(mrt, sp, _, _, _) => mrt match { case MetricIndex.Adhoc(_) => true case MetricIndex.Periodic(tick) => @@ -42,7 +42,7 @@ object eventFilters { */ def sampling(divisor: Refined[Int, Positive])(evt: NJEvent): Boolean = evt match { - case MetricReport(mrt, _, _, _) => + case MetricReport(mrt, _, _, _, _) => mrt match { case MetricIndex.Adhoc(_) => true case MetricIndex.Periodic(tick) => (tick.index % divisor.value) === 0 @@ -54,7 +54,7 @@ object eventFilters { */ def sampling(cronExpr: CronExpr)(evt: NJEvent): Boolean = evt match { - case MetricReport(mrt, _, _, _) => + case MetricReport(mrt, _, _, _, _) => mrt match { case MetricIndex.Adhoc(_) => true case MetricIndex.Periodic(tick) => diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/Metrics.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/Metrics.scala index cd0165881..fb811b7bf 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/Metrics.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/Metrics.scala @@ -6,11 +6,8 @@ import cats.effect.kernel.{Async, Ref, Resource} import cats.syntax.all.* import com.codahale.metrics.MetricRegistry import com.github.chenharryhua.nanjin.guard.config.MetricLabel -import com.github.chenharryhua.nanjin.guard.event.NJUnits import com.github.chenharryhua.nanjin.guard.translator.{decimal_fmt, fmt} -import scala.concurrent.duration.DurationInt - trait KleisliLike[F[_], A] { def run(a: A): F[Unit] @@ -21,6 +18,7 @@ trait KleisliLike[F[_], A] { } sealed trait Metrics[F[_]] { + def metricLabel: MetricLabel def counter(name: String, f: Endo[NJCounter.Builder]): Resource[F, NJCounter[F]] final def counter(name: String): Resource[F, NJCounter[F]] = counter(name, identity) @@ -58,46 +56,30 @@ sealed trait Metrics[F[_]] { } object Metrics { - private[guard] class Impl[F[_]]( - metricLabel: MetricLabel, - metricRegistry: MetricRegistry, - isEnabled: Boolean)(implicit F: Async[F]) + private[guard] class Impl[F[_]](val metricLabel: MetricLabel, metricRegistry: MetricRegistry)(implicit + F: Async[F]) extends Metrics[F] { - override def counter(name: String, f: Endo[NJCounter.Builder]): Resource[F, NJCounter[F]] = { - val init = new NJCounter.Builder(isEnabled, false) - f(init).build[F](metricLabel, name, metricRegistry) - } - - override def meter(name: String, f: Endo[NJMeter.Builder]): Resource[F, NJMeter[F]] = { - val init = new NJMeter.Builder(isEnabled, NJUnits.COUNT) - f(init).build[F](metricLabel, name, metricRegistry) - } - - override def histogram(name: String, f: Endo[NJHistogram.Builder]): Resource[F, NJHistogram[F]] = { - val init = new NJHistogram.Builder(isEnabled, NJUnits.COUNT, None) - f(init).build[F](metricLabel, name, metricRegistry) - } - - override def timer(name: String, f: Endo[NJTimer.Builder]): Resource[F, NJTimer[F]] = { - val init = new NJTimer.Builder(isEnabled, None) - f(init).build[F](metricLabel, name, metricRegistry) - } - - override def healthCheck(name: String, f: Endo[NJHealthCheck.Builder]): NJHealthCheck[F] = { - val init = new NJHealthCheck.Builder(isEnabled, timeout = 5.seconds) - f(init).build[F](metricLabel, name, metricRegistry) - } - - override def ratio(name: String, f: Endo[NJRatio.Builder]): Resource[F, NJRatio[F]] = { - val init = new NJRatio.Builder(isEnabled, NJRatio.translator) - f(init).build[F](metricLabel, name, metricRegistry) - } - - override def gauge(name: String, f: Endo[NJGauge.Builder]): NJGauge[F] = { - val init = new NJGauge.Builder(isEnabled, timeout = 5.seconds) - f(init).build[F](metricLabel, name, metricRegistry) - } + override def counter(name: String, f: Endo[NJCounter.Builder]): Resource[F, NJCounter[F]] = + f(NJCounter.initial).build[F](metricLabel, name, metricRegistry) + + override def meter(name: String, f: Endo[NJMeter.Builder]): Resource[F, NJMeter[F]] = + f(NJMeter.initial).build[F](metricLabel, name, metricRegistry) + + override def histogram(name: String, f: Endo[NJHistogram.Builder]): Resource[F, NJHistogram[F]] = + f(NJHistogram.initial).build[F](metricLabel, name, metricRegistry) + + override def timer(name: String, f: Endo[NJTimer.Builder]): Resource[F, NJTimer[F]] = + f(NJTimer.initial).build[F](metricLabel, name, metricRegistry) + + override def healthCheck(name: String, f: Endo[NJHealthCheck.Builder]): NJHealthCheck[F] = + f(NJHealthCheck.initial).build[F](metricLabel, name, metricRegistry) + + override def ratio(name: String, f: Endo[NJRatio.Builder]): Resource[F, NJRatio[F]] = + f(NJRatio.initial).build[F](metricLabel, name, metricRegistry) + + override def gauge(name: String, f: Endo[NJGauge.Builder]): NJGauge[F] = + f(NJGauge.initial).build[F](metricLabel, name, metricRegistry) override def idleGauge(name: String, f: Endo[NJGauge.Builder]): Resource[F, NJIdleGauge[F]] = for { diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJCounter.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJCounter.scala index 49e47edaf..357f6172e 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJCounter.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJCounter.scala @@ -45,6 +45,8 @@ object NJCounter { } + val initial: Builder = new Builder(isEnabled = true, isRisk = false) + final class Builder private[guard] (isEnabled: Boolean, isRisk: Boolean) extends EnableConfig[Builder] { def asRisk: Builder = new Builder(isEnabled, true) diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJGauge.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJGauge.scala index 42cc06901..6ed7296e3 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJGauge.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJGauge.scala @@ -15,7 +15,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import java.time.ZoneId -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.Try sealed trait NJGauge[F[_]] { @@ -78,6 +78,8 @@ object NJGauge { } } + val initial: Builder = new Builder(isEnabled = true, timeout = 5.seconds) + final class Builder private[guard] (isEnabled: Boolean, timeout: FiniteDuration) extends EnableConfig[Builder] { diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHealthCheck.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHealthCheck.scala index 3e6a6cb88..0ff5c7d8b 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHealthCheck.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHealthCheck.scala @@ -11,7 +11,7 @@ import com.github.chenharryhua.nanjin.guard.config.* import com.github.chenharryhua.nanjin.guard.config.CategoryKind.GaugeKind import java.time.ZoneId -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.Try sealed trait NJHealthCheck[F[_]] { @@ -71,6 +71,8 @@ object NJHealthCheck { } } + val initial: Builder = new Builder(isEnabled = true, timeout = 5.seconds) + final class Builder private[guard] (isEnabled: Boolean, timeout: FiniteDuration) extends EnableConfig[Builder] { diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHistogram.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHistogram.scala index 64075ba06..e3cf6beda 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHistogram.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJHistogram.scala @@ -51,6 +51,8 @@ object NJHistogram { val unregister: F[Unit] = F.delay(metricRegistry.remove(histogram_name)).void } + val initial: Builder = new Builder(isEnabled = true, unit = NJUnits.COUNT, reservoir = None) + final class Builder private[guard] (isEnabled: Boolean, unit: MeasurementUnit, reservoir: Option[Reservoir]) extends EnableConfig[Builder] { diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJMeter.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJMeter.scala index 67f3c81ca..355eedbc8 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJMeter.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJMeter.scala @@ -42,6 +42,8 @@ object NJMeter { val unregister: F[Unit] = F.delay(metricRegistry.remove(meter_name)).void } + val initial: Builder = new Builder(isEnabled = true, unit = NJUnits.COUNT) + final class Builder private[guard] (isEnabled: Boolean, unit: MeasurementUnit) extends EnableConfig[Builder] { diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJRatio.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJRatio.scala index 5119f1d15..52d0cf6e1 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJRatio.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJRatio.scala @@ -76,6 +76,8 @@ object NJRatio { } } + val initial: Builder = new Builder(isEnabled = true, translator = translator) + final class Builder private[guard] ( isEnabled: Boolean, translator: Ior[Long, Long] => Json diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJTimer.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJTimer.scala index cc38805b2..28cf95d1f 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJTimer.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/metrics/NJTimer.scala @@ -70,6 +70,8 @@ object NJTimer { } + val initial: Builder = new Builder(isEnabled = true, reservoir = None) + final class Builder private[guard] ( isEnabled: Boolean, reservoir: Option[Reservoir] diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/PrettyJsonTranslator.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/PrettyJsonTranslator.scala index 8463fbd29..ced8dfb53 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/PrettyJsonTranslator.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/PrettyJsonTranslator.scala @@ -15,7 +15,7 @@ object PrettyJsonTranslator { "took" -> Json.fromString(fmt.format(dur)) private def uptime(evt: NJEvent): (String, Json) = - "upTime" -> Json.fromString(fmt.format(evt.upTime)) + "up_time" -> Json.fromString(fmt.format(evt.upTime)) private def pretty_metrics(ss: MetricSnapshot): (String, Json) = "metrics" -> new SnapshotPolyglot(ss).toPrettyJson @@ -26,19 +26,19 @@ object PrettyJsonTranslator { // events handlers private def service_started(evt: ServiceStart): Json = Json.obj( - EventName.ServiceStart.camel -> + EventName.ServiceStart.snake -> Json.obj( - jsonHelper.serviceParams(evt.serviceParams), + jsonHelper.service_params(evt.serviceParams), uptime(evt), jsonHelper.index(evt.tick), "snoozed" -> Json.fromString(fmt.format(evt.tick.snooze)))) private def service_panic(evt: ServicePanic): Json = Json.obj( - EventName.ServicePanic.camel -> + EventName.ServicePanic.snake -> Json.obj( - jsonHelper.serviceName(evt.serviceParams), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.service_name(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), uptime(evt), jsonHelper.index(evt.tick), active(evt.tick), @@ -49,23 +49,23 @@ object PrettyJsonTranslator { private def service_stopped(evt: ServiceStop): Json = Json.obj( - EventName.ServiceStop.camel -> + EventName.ServiceStop.snake -> Json.obj( - jsonHelper.serviceName(evt.serviceParams), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.service_name(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), uptime(evt), jsonHelper.policy(evt.serviceParams.servicePolicies.restart), - jsonHelper.exitCode(evt.cause), - jsonHelper.exitCause(evt.cause) + jsonHelper.exit_code(evt.cause), + jsonHelper.exit_cause(evt.cause) )) private def metric_report(evt: MetricReport): Json = Json.obj( - EventName.MetricReport.camel -> + EventName.MetricReport.snake -> Json.obj( - jsonHelper.metricIndex(evt.index), - jsonHelper.serviceName(evt.serviceParams), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.metric_index(evt.index), + jsonHelper.service_name(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), jsonHelper.policy(evt.serviceParams.servicePolicies.metricReport), uptime(evt), took(evt.took), @@ -74,11 +74,11 @@ object PrettyJsonTranslator { private def metric_reset(evt: MetricReset): Json = Json.obj( - EventName.MetricReset.camel -> + EventName.MetricReset.snake -> Json.obj( - jsonHelper.metricIndex(evt.index), - jsonHelper.serviceName(evt.serviceParams), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.metric_index(evt.index), + jsonHelper.service_name(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), jsonHelper.policy(evt.serviceParams.servicePolicies.metricReset), uptime(evt), took(evt.took), @@ -86,7 +86,7 @@ object PrettyJsonTranslator { )) private def service_message(evt: ServiceMessage): Json = - Json.obj(EventName.ServiceMessage.camel -> jsonHelper.jsonServiceMessage(evt)) + Json.obj(EventName.ServiceMessage.snake -> jsonHelper.json_service_message(evt)) def apply[F[_]: Applicative]: Translator[F, Json] = Translator diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/Agent.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/Agent.scala index 7ace0cdda..617fd9f33 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/Agent.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/Agent.scala @@ -49,8 +49,8 @@ final private class GeneralAgent[F[_]: Async] private[service] ( new GeneralAgent[F](serviceParams, metricRegistry, channel, Measurement(name)) override def batch(label: String): Batch[F] = { - val metricLabel = MetricLabel(serviceParams, measurement, label) - new Batch[F](new Metrics.Impl[F](metricLabel, metricRegistry, isEnabled = true)) + val metricLabel = MetricLabel(label, measurement) + new Batch[F](new Metrics.Impl[F](metricLabel, metricRegistry)) } override def ticks(policy: Policy): Stream[F, Tick] = @@ -62,8 +62,8 @@ final private class GeneralAgent[F[_]: Async] private[service] ( Resource.pure(new Retry.Impl[F](serviceParams.initialStatus.renewPolicy(policy))) override def facilitate[A](label: String)(f: Metrics[F] => A): A = { - val metricLabel = MetricLabel(serviceParams, measurement, label) - f(new Metrics.Impl[F](metricLabel, metricRegistry, isEnabled = true)) + val metricLabel = MetricLabel(label, measurement) + f(new Metrics.Impl[F](metricLabel, metricRegistry)) } override val herald: Herald[F] = diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/HttpRouter.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/HttpRouter.scala index 25ba166e5..0ffa7bc2c 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/HttpRouter.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/HttpRouter.scala @@ -117,6 +117,10 @@ private class HttpRouter[F[_]]( val json = new SnapshotPolyglot(MetricSnapshot(metricRegistry)).toPrettyJson Ok(json) + case GET -> Root / "metrics" / "raw" => + val json = MetricSnapshot(metricRegistry).asJson + Ok(json) + case GET -> Root / "metrics" / "reset" => for { ts <- serviceParams.zonedNow diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/MetricsReport.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/MetricsReport.scala index a7d779a5b..2c60b6aa0 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/MetricsReport.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/MetricsReport.scala @@ -5,7 +5,7 @@ import cats.effect.kernel.Clock import cats.implicits.{toFlatMapOps, toFunctorOps} import com.codahale.metrics.MetricRegistry import com.github.chenharryhua.nanjin.guard.config.ServiceParams -import com.github.chenharryhua.nanjin.guard.event.{MetricIndex, NJEvent} +import com.github.chenharryhua.nanjin.guard.event.{MetricIndex, MetricSnapshot, NJEvent} import fs2.concurrent.Channel abstract class MetricsReport[F[_]] private[service] ( @@ -27,7 +27,9 @@ abstract class MetricsReport[F[_]] private[service] ( .metricReport( channel = channel, serviceParams = serviceParams, - metricRegistry = metricRegistry, - index = MetricIndex.Adhoc(serviceParams.toZonedDateTime(ts))) + previous = MetricSnapshot.empty, + snapshot = MetricSnapshot(metricRegistry), + index = MetricIndex.Adhoc(serviceParams.toZonedDateTime(ts)) + ) .void) } diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/ServiceGuard.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/ServiceGuard.scala index aa132036b..cf14b7c4c 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/ServiceGuard.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/ServiceGuard.scala @@ -62,18 +62,32 @@ final class ServiceGuard[F[_]: Network] private[guard] (serviceName: ServiceName event <- Stream.eval(Channel.unbounded[F, NJEvent]).flatMap { channel => val metricRegistry: MetricRegistry = new MetricRegistry() - val metrics_report: Stream[F, Nothing] = - tickStream[F]( - TickStatus(serviceParams.zerothTick).renewPolicy(serviceParams.servicePolicies.metricReport)) - .evalMap(tick => - publisher - .metricReport( - channel = channel, - serviceParams = serviceParams, - metricRegistry = metricRegistry, - index = MetricIndex.Periodic(tick)) - .flatMap(mr => metricsHistory.modify(queue => (queue, queue.add(mr))))) + val metrics_report: Stream[F, Nothing] = { + val head_snapshot: Stream[F, (Tick, MetricSnapshot)] = + Stream((serviceParams.zerothTick, MetricSnapshot.empty)).covary[F] + + val tail_snapshots: Stream[F, (Tick, MetricSnapshot)] = + tickStream[F]( + TickStatus(serviceParams.zerothTick).renewPolicy(serviceParams.servicePolicies.metricReport)) + .map((_, MetricSnapshot(metricRegistry))) // capture current snapshot + + (head_snapshot ++ tail_snapshots) + .sliding(2) // previous snapshot and current snapshot, respectively + .evalMap(_.toList match { + case previous :: current :: Nil => + publisher + .metricReport( + channel = channel, + serviceParams = serviceParams, + index = MetricIndex.Periodic(current._1), + previous = previous._2, + snapshot = current._2) + .flatMap(mr => metricsHistory.modify(queue => (queue, queue.add(mr)))) + .void + case _ => F.unit + }) .drain + } val metrics_reset: Stream[F, Nothing] = tickStream[F]( diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/objectNameFactory.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/objectNameFactory.scala index c07482126..fc98d6648 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/objectNameFactory.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/objectNameFactory.scala @@ -14,8 +14,7 @@ private object objectNameFactory extends ObjectNameFactory { properties.put("label", mId.metricLabel.label) properties.put("name", mId.metricName.name) properties.put("type", mId.category.productPrefix) - properties.put("digest", mId.metricLabel.digest) - val dm = s"$domain.${mId.metricLabel.measurement}" + val dm = s"$domain.${mId.metricLabel.measurement.value}" new ObjectName(dm, properties) }.toOption.orNull } diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/publisher.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/publisher.scala index 1178652ae..fbd8ccda7 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/publisher.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/service/publisher.scala @@ -22,12 +22,17 @@ private object publisher { def metricReport[F[_]: Clock]( channel: Channel[F, NJEvent], serviceParams: ServiceParams, - metricRegistry: MetricRegistry, - index: MetricIndex)(implicit F: Monad[F]): F[MetricReport] = + index: MetricIndex, + previous: MetricSnapshot, + snapshot: MetricSnapshot)(implicit F: Monad[F]): F[MetricReport] = for { - ss <- F.pure(MetricSnapshot(metricRegistry)) now <- serviceParams.zonedNow[F] - mr = MetricReport(index, serviceParams, ss, now) + mr = MetricReport( + index = index, + serviceParams = serviceParams, + previous = previous, + snapshot = snapshot, + timestamp = now) _ <- channel.send(mr) } yield mr diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/SnapshotPolyglot.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/SnapshotPolyglot.scala index 647138c82..909389bdc 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/SnapshotPolyglot.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/SnapshotPolyglot.scala @@ -2,11 +2,12 @@ package com.github.chenharryhua.nanjin.guard.translator import cats.data.NonEmptyList import cats.implicits.{catsSyntaxEq, catsSyntaxOptionId, none, showInterpolator, toFunctorFilterOps} -import com.github.chenharryhua.nanjin.guard.config.{Category, MetricID} +import com.github.chenharryhua.nanjin.guard.config.MetricID import com.github.chenharryhua.nanjin.guard.event.MeasurementUnit.* import com.github.chenharryhua.nanjin.guard.event.{MeasurementUnit, MetricSnapshot} import io.circe.Json import io.circe.syntax.EncoderOps +import org.apache.commons.lang3.StringUtils import squants.time.TimeConversions final class SnapshotPolyglot(snapshot: MetricSnapshot) { @@ -19,86 +20,83 @@ final class SnapshotPolyglot(snapshot: MetricSnapshot) { case unit: NJDimensionlessUnit => s"${decimal_fmt.format(unit.mUnit(data).value.toLong)} ${unit.symbol}" } - private def meters: List[(MetricID, NonEmptyList[(String, Json)])] = + private def meters: List[(MetricID, NonEmptyList[(String, String)])] = snapshot.meters.map { m => m.metricId -> NonEmptyList.of( - "sum" -> Json.fromString(normalize(m.meter.unit, m.meter.sum)), - "mean_rate" -> Json.fromString(s"${normalize(m.meter.unit, m.meter.mean_rate.toHertz)}/s"), - "m1_rate" -> Json.fromString(s"${normalize(m.meter.unit, m.meter.m1_rate.toHertz)}/s"), - "m5_rate" -> Json.fromString(s"${normalize(m.meter.unit, m.meter.m5_rate.toHertz)}/s"), - "m15_rate" -> Json.fromString(s"${normalize(m.meter.unit, m.meter.m15_rate.toHertz)}/s") + "aggregate" -> normalize(m.meter.unit, m.meter.sum), + "mean_rate" -> s"${normalize(m.meter.unit, m.meter.mean_rate.toHertz)}/s", + "m1_rate" -> s"${normalize(m.meter.unit, m.meter.m1_rate.toHertz)}/s", + "m5_rate" -> s"${normalize(m.meter.unit, m.meter.m5_rate.toHertz)}/s", + "m15_rate" -> s"${normalize(m.meter.unit, m.meter.m15_rate.toHertz)}/s" ) } - private def timers: List[(MetricID, NonEmptyList[(String, Json)])] = + private def timers: List[(MetricID, NonEmptyList[(String, String)])] = snapshot.timers.map { t => val unit = s"calls/${NJTimeUnit.SECONDS.symbol}" t.metricId -> NonEmptyList.of( - "calls" -> Json.fromString(decimal_fmt.format(t.timer.calls)), - "mean_rate" -> Json.fromString(s"${decimal_fmt.format(t.timer.mean_rate.toHertz)} $unit"), - "m1_rate" -> Json.fromString(s"${decimal_fmt.format(t.timer.m1_rate.toHertz)} $unit"), - "m5_rate" -> Json.fromString(s"${decimal_fmt.format(t.timer.m5_rate.toHertz)} $unit"), - "m15_rate" -> Json.fromString(s"${decimal_fmt.format(t.timer.m15_rate.toHertz)} $unit"), - "min" -> Json.fromString(fmt.format(t.timer.min)), - "max" -> Json.fromString(fmt.format(t.timer.max)), - "mean" -> Json.fromString(fmt.format(t.timer.mean)), - "stddev" -> Json.fromString(fmt.format(t.timer.stddev)), - "p50" -> Json.fromString(fmt.format(t.timer.p50)), - "p75" -> Json.fromString(fmt.format(t.timer.p75)), - "p95" -> Json.fromString(fmt.format(t.timer.p95)), - "p98" -> Json.fromString(fmt.format(t.timer.p98)), - "p99" -> Json.fromString(fmt.format(t.timer.p99)), - "p999" -> Json.fromString(fmt.format(t.timer.p999)) + "invocations" -> decimal_fmt.format(t.timer.calls), + "mean_rate" -> s"${decimal_fmt.format(t.timer.mean_rate.toHertz)} $unit", + "m1_rate" -> s"${decimal_fmt.format(t.timer.m1_rate.toHertz)} $unit", + "m5_rate" -> s"${decimal_fmt.format(t.timer.m5_rate.toHertz)} $unit", + "m15_rate" -> s"${decimal_fmt.format(t.timer.m15_rate.toHertz)} $unit", + "min" -> fmt.format(t.timer.min), + "max" -> fmt.format(t.timer.max), + "mean" -> fmt.format(t.timer.mean), + "stddev" -> fmt.format(t.timer.stddev), + "p50" -> fmt.format(t.timer.p50), + "p75" -> fmt.format(t.timer.p75), + "p95" -> fmt.format(t.timer.p95), + "p98" -> fmt.format(t.timer.p98), + "p99" -> fmt.format(t.timer.p99), + "p999" -> fmt.format(t.timer.p999) ) } - private def histograms: List[(MetricID, NonEmptyList[(String, Json)])] = + private def histograms: List[(MetricID, NonEmptyList[(String, String)])] = snapshot.histograms.map { h => val unit = h.histogram.unit val histo = h.histogram h.metricId -> NonEmptyList.of( - "updates" -> Json.fromString(decimal_fmt.format(histo.updates)), - "min" -> Json.fromString(normalize(unit, histo.min)), - "max" -> Json.fromString(normalize(unit, histo.max)), - "mean" -> Json.fromString(normalize(unit, histo.mean)), - "stddev" -> Json.fromString(normalize(unit, histo.stddev)), - "p50" -> Json.fromString(normalize(unit, histo.p50)), - "p75" -> Json.fromString(normalize(unit, histo.p75)), - "p95" -> Json.fromString(normalize(unit, histo.p95)), - "p98" -> Json.fromString(normalize(unit, histo.p98)), - "p99" -> Json.fromString(normalize(unit, histo.p99)), - "p999" -> Json.fromString(normalize(unit, histo.p999)) + "updates" -> decimal_fmt.format(histo.updates), + "min" -> normalize(unit, histo.min), + "max" -> normalize(unit, histo.max), + "mean" -> normalize(unit, histo.mean), + "stddev" -> normalize(unit, histo.stddev), + "p50" -> normalize(unit, histo.p50), + "p75" -> normalize(unit, histo.p75), + "p95" -> normalize(unit, histo.p95), + "p98" -> normalize(unit, histo.p98), + "p99" -> normalize(unit, histo.p99), + "p999" -> normalize(unit, histo.p999) ) } - private def json_list(lst: List[(MetricID, NonEmptyList[(String, Json)])]): List[(MetricID, Json)] = + private def json_list(lst: List[(MetricID, NonEmptyList[(String, String)])]): List[(MetricID, Json)] = lst.map { case (id, items) => - id -> items.map { case (key, js) => Json.obj(key -> js) }.reduce[Json]((a, b) => b.deepMerge(a)) + id -> items.map { case (key, js) => Json.obj(key -> Json.fromString(js)) }.reduce[Json]((a, b) => + b.deepMerge(a)) } private def group_json(pairs: List[(MetricID, Json)]): Json = pairs .groupBy(_._1.metricLabel.measurement) // measurement group .toList - .sortBy(_._1) + .sortBy(_._1.value) // sort by measurement name. .map { case (measurement, lst) => val arr: List[Json] = lst .groupBy(_._1.metricLabel) // metric-name group .toList - .sortBy(_._1.label) - .map { case (name, items) => + .map { case (label, items) => val inner: Json = items .sortBy(_._1.metricName) - .map { case (mId, js) => - Json.obj(mId.metricName.name -> js) - } + .map { case (mId, js) => Json.obj(mId.metricName.name -> js) } .reduce((a, b) => b.deepMerge(a)) - name -> inner.asJson + Json.obj(label.label -> inner.asJson) } - .map { case (n, js) => Json.obj("digest" -> Json.fromString(n.digest), n.label -> js) } - Json.obj(measurement -> Json.arr(arr*)) + Json.obj(measurement.value -> Json.arr(arr*)) } .asJson @@ -117,7 +115,7 @@ final class SnapshotPolyglot(snapshot: MetricSnapshot) { val counters: List[(MetricID, Json)] = snapshot.counters.map(c => c.metricId -> Json.fromString(decimal_fmt.format(c.count))) val gauges: List[(MetricID, Json)] = - snapshot.gauges.mapFilter(g => if (g.value === Json.Null) Some(g.metricId -> g.value) else None) + snapshot.gauges.mapFilter(g => if (g.value === Json.Null) None else Some(g.metricId -> g.value)) val lst: List[(MetricID, Json)] = counters ::: gauges ::: json_list(meters ::: histograms ::: timers) @@ -145,88 +143,41 @@ final class SnapshotPolyglot(snapshot: MetricSnapshot) { content.map(str => g.metricId -> List(show"${g.metricId.metricName.name}: $str")) } + private val space: String = StringUtils.SPACE + + private def padded(kv: (String, String)): String = + s"${space * 2}${StringUtils.leftPad(kv._1, 11)}: ${kv._2}" + + private def named(id: MetricID, data: NonEmptyList[String]): List[String] = + s"${id.metricName.name}:" :: data.toList + private def meter_str: List[(MetricID, List[String])] = - snapshot.meters.map { m => - m.metricId -> List( - show"sum: ${normalize(m.meter.unit, m.meter.sum)}", - show"mean_rate: ${normalize(m.meter.unit, m.meter.mean_rate.toHertz)}/s", - show" m1_rate: ${normalize(m.meter.unit, m.meter.m1_rate.toHertz)}/s", - show" m5_rate: ${normalize(m.meter.unit, m.meter.m5_rate.toHertz)}/s", - show" m15_rate: ${normalize(m.meter.unit, m.meter.m15_rate.toHertz)}/s" - ) - } + meters.map { case (id, data) => id -> named(id, data.map(padded)) } private def timer_str: List[(MetricID, List[String])] = - snapshot.timers.map { t => - val unit = s"calls/${NJTimeUnit.SECONDS.symbol}" - t.metricId -> List( - show"calls: ${decimal_fmt.format(t.timer.calls)}", - show"mean_rate: ${decimal_fmt.format(t.timer.mean_rate.toHertz)} $unit", - show" m1_rate: ${decimal_fmt.format(t.timer.m1_rate.toHertz)} $unit", - show" m5_rate: ${decimal_fmt.format(t.timer.m5_rate.toHertz)} $unit", - show" m15_rate: ${decimal_fmt.format(t.timer.m15_rate.toHertz)} $unit", - show" min: ${fmt.format(t.timer.min)}", - show" max: ${fmt.format(t.timer.max)}", - show" mean: ${fmt.format(t.timer.mean)}", - show" stddev: ${fmt.format(t.timer.stddev)}", - show" p50: ${fmt.format(t.timer.p50)}", - show" p75: ${fmt.format(t.timer.p75)}", - show" p95: ${fmt.format(t.timer.p95)}", - show" p98: ${fmt.format(t.timer.p98)}", - show" p99: ${fmt.format(t.timer.p99)}", - show" p999: ${fmt.format(t.timer.p999)}" - ) - } + timers.map { case (id, data) => id -> named(id, data.map(padded)) } private def histogram_str: List[(MetricID, List[String])] = - snapshot.histograms.map { h => - val unit = h.histogram.unit - val histo = h.histogram - h.metricId -> List( - show"updates: ${decimal_fmt.format(histo.updates)}", - show" min: ${normalize(unit, histo.min)}", - show" max: ${normalize(unit, histo.max)}", - show" mean: ${normalize(unit, histo.mean)}", - show" stddev: ${normalize(unit, histo.stddev)}", - show" p50: ${normalize(unit, histo.p50)}", - show" p75: ${normalize(unit, histo.p75)}", - show" p95: ${normalize(unit, histo.p95)}", - show" p98: ${normalize(unit, histo.p98)}", - show" p99: ${normalize(unit, histo.p99)}", - show" p999: ${normalize(unit, histo.p999)}" - ) - } - - private val space: String = " " + histograms.map { case (id, data) => id -> named(id, data.map(padded)) } private def group_yaml(pairs: List[(MetricID, List[String])]): List[String] = pairs .groupBy(_._1.metricLabel.measurement) // measurement group .toList - .sortBy(_._1) + .sortBy(_._1.value) .flatMap { case (measurement, measurements) => val arr: List[String] = measurements .groupBy(_._1.metricLabel) // metric-name group .toList .map { case (name, items) => val oldest = items.map(_._1.metricName.order).min - (oldest, name) -> items.sortBy(_._1.metricName).flatMap { case (id, lst) => - @inline def others: List[String] = - List(id.metricName.name + ":").map(space * 4 + _) ::: lst.map(space * 6 + _) - id.category match { - case _: Category.Gauge => lst.map(space * 4 + _) - case _: Category.Counter => lst.map(space * 4 + _) - case _: Category.Timer => others - case _: Category.Meter => others - case _: Category.Histogram => others - } - } + (oldest, name) -> items.sortBy(_._1.metricName).flatMap(_._2.map(space * 4 + _)) } .sortBy(_._1._1) .flatMap { case ((_, n), items) => - s"${space * 2}[${n.digest}][${n.label}]:" :: items + s"${space * 2}- ${n.label}:" :: items } - show"- $measurement:" :: arr + show"[$measurement]:" :: arr } // for screen display diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/constants.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/constants.scala index 7724ea0a7..8bd7dd0aa 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/constants.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/constants.scala @@ -44,7 +44,6 @@ object textConstants { @inline final val CONSTANT_SNOOZED: String = "Snoozed" @inline final val CONSTANT_ACTIVE: String = "Active" - @inline final val CONSTANT_DIGEST: String = "Digest" @inline final val CONSTANT_LABEL: String = "Label" @inline final val CONSTANT_LAUNCH_TIME: String = "LaunchTime" diff --git a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/jsonHelper.scala b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/jsonHelper.scala index ced82bff1..f348ef264 100644 --- a/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/jsonHelper.scala +++ b/guard/src/main/scala/com/github/chenharryhua/nanjin/guard/translator/jsonHelper.scala @@ -2,7 +2,7 @@ package com.github.chenharryhua.nanjin.guard.translator import cats.implicits.toShow import com.github.chenharryhua.nanjin.common.chrono.{Policy, Tick} -import com.github.chenharryhua.nanjin.guard.config.{MetricLabel, ServiceParams} +import com.github.chenharryhua.nanjin.guard.config.ServiceParams import com.github.chenharryhua.nanjin.guard.event.NJEvent.ServiceMessage import com.github.chenharryhua.nanjin.guard.event.{MetricIndex, NJError, NJEvent, ServiceStopCause} import io.circe.Json @@ -10,11 +10,11 @@ import io.circe.syntax.EncoderOps object jsonHelper { - def timestamp(evt: NJEvent): (String, Json) = "timestamp" -> evt.timestamp.asJson - def serviceId(sp: ServiceParams): (String, Json) = "serviceId" -> sp.serviceId.asJson - def serviceParams(sp: ServiceParams): (String, Json) = "params" -> sp.asJson - def exitCode(sc: ServiceStopCause): (String, Json) = "exitCode" -> Json.fromInt(sc.exitCode) - def exitCause(sc: ServiceStopCause): (String, Json) = "exitCause" -> sc.asJson + def timestamp(evt: NJEvent): (String, Json) = "timestamp" -> evt.timestamp.asJson + def service_id(sp: ServiceParams): (String, Json) = "service_id" -> sp.serviceId.asJson + def service_params(sp: ServiceParams): (String, Json) = "params" -> sp.asJson + def exit_code(sc: ServiceStopCause): (String, Json) = "exit_code" -> Json.fromInt(sc.exitCode) + def exit_cause(sc: ServiceStopCause): (String, Json) = "exit_cause" -> sc.asJson def index(tick: Tick): (String, Json) = "index" -> Json.fromLong(tick.index) @@ -22,23 +22,21 @@ object jsonHelper { def stack(err: NJError): (String, Json) = "stack" -> err.stack.asJson - def metricName(mn: MetricLabel): (String, Json) = "name" -> Json.fromString(mn.label) - - def jsonServiceMessage(sm: ServiceMessage): Json = + def json_service_message(sm: ServiceMessage): Json = sm.error .map(err => Json.obj(stack(err))) .asJson .deepMerge( Json.obj( - serviceName(sm.serviceParams), - serviceId(sm.serviceParams), + service_name(sm.serviceParams), + service_id(sm.serviceParams), sm.level.entryName -> sm.message )) - def serviceName(sp: ServiceParams): (String, Json) = - "serviceName" -> Json.fromString(sp.serviceName.value) + def service_name(sp: ServiceParams): (String, Json) = + "service_name" -> Json.fromString(sp.serviceName.value) - def metricIndex(index: MetricIndex): (String, Json) = index match { + def metric_index(index: MetricIndex): (String, Json) = index match { case MetricIndex.Adhoc(_) => "index" -> Json.Null case MetricIndex.Periodic(tick) => "index" -> Json.fromLong(tick.index) } diff --git a/guard/src/test/scala/mtest/guard/ConsoleLogTest.scala b/guard/src/test/scala/mtest/guard/ConsoleLogTest.scala index 9fe19e3ec..fedc216e3 100644 --- a/guard/src/test/scala/mtest/guard/ConsoleLogTest.scala +++ b/guard/src/test/scala/mtest/guard/ConsoleLogTest.scala @@ -23,14 +23,14 @@ class ConsoleLogTest extends AnyFunSuite { val mtx = agent.facilitate("job") { mtx => for { retry <- agent.createRetry(_.fixedDelay(1.second)) - _ <- mtx.gauge("1").register(IO(1000000000)) - _ <- mtx.healthCheck("2").register(IO(true)) - _ <- mtx.timer("3").evalMap(_.update(10.second).replicateA(100)) + _ <- mtx.gauge("7").register(IO(1000000000)) + _ <- mtx.healthCheck("6").register(IO(true)) + _ <- mtx.timer("5").evalMap(_.update(10.second).replicateA(100)) _ <- mtx.meter("4", _.withUnit(_.COUNT)).evalMap(_.update(10000).replicateA(100)) - _ <- mtx.counter("5", _.asRisk).evalMap(_.inc(1000)) - _ <- mtx.histogram("6", _.withUnit(_.BYTES)).evalMap(_.update(10000L).replicateA(100)) + _ <- mtx.counter("3", _.asRisk).evalMap(_.inc(1000)) + _ <- mtx.histogram("2", _.withUnit(_.BYTES)).evalMap(_.update(10000L).replicateA(100)) _ <- mtx - .ratio("7") + .ratio("1") .evalMap(f => f.incDenominator(500) >> f.incNumerator(60) >> f.incBoth(299, 500)) } yield Kleisli((_: Int) => retry(IO.unit)) } @@ -62,6 +62,6 @@ class ConsoleLogTest extends AnyFunSuite { .lastOrError .unsafeRunSync() val tags = mr.snapshot.metricIDs.sortBy(_.metricName.order).map(_.metricName.name.toInt) - assert(tags == List(1, 2, 3, 4, 5, 6, 7)) + assert(tags == List(7, 6, 5, 4, 3, 2, 1)) } } diff --git a/guard/src/test/scala/mtest/guard/GaugeTest.scala b/guard/src/test/scala/mtest/guard/GaugeTest.scala index 9e390667d..5ef84a60a 100644 --- a/guard/src/test/scala/mtest/guard/GaugeTest.scala +++ b/guard/src/test/scala/mtest/guard/GaugeTest.scala @@ -7,6 +7,7 @@ import cats.implicits.toFunctorFilterOps import com.github.chenharryhua.nanjin.guard.TaskGuard import com.github.chenharryhua.nanjin.guard.config.MetricID import com.github.chenharryhua.nanjin.guard.event.{retrieveGauge, retrieveHealthChecks} +import com.github.chenharryhua.nanjin.guard.observers.console import io.circe.Json import org.scalatest.funsuite.AnyFunSuite @@ -18,10 +19,7 @@ class GaugeTest extends AnyFunSuite { test("1.gauge") { val mr = service.eventStream { agent => agent - .facilitate("gauge")( - _.gauge("gauge", _.withTimeout(1.second).enable(true)) - .register(IO(1)) - .map(_ => Kleisli((_: Unit) => IO.unit))) + .facilitate("gauge")(_.gauge("gauge").register(IO(1)).map(_ => Kleisli((_: Unit) => IO.unit))) .surround(agent.adhoc.report) }.map(checkJson).mapFilter(metricReport).compile.lastOrError.unsafeRunSync() val gauge = retrieveGauge[Int](mr.snapshot.gauges) @@ -67,4 +65,28 @@ class GaugeTest extends AnyFunSuite { assert(mr.snapshot.nonEmpty) assert(permanent.values.head.as[String].toOption.get == "1,999") } + + test("6.gauge timeout") { + val mr = service.eventStream { agent => + agent.facilitate("timeout.gauge")( + _.gauge("gauge", _.withTimeout(1.second).enable(true)) + .register(IO.never[Int]) + .surround(agent.adhoc.report)) + }.map(checkJson).evalTap(console.text[IO]).mapFilter(metricReport).compile.lastOrError.unsafeRunSync() + val gauge = retrieveGauge[Int](mr.snapshot.gauges) + assert(mr.snapshot.nonEmpty) + assert(gauge.isEmpty) + } + + test("7.gauge exception") { + val mr = service.eventStream { agent => + agent.facilitate("timeout.gauge")( + _.gauge("gauge", _.withTimeout(1.second).enable(true)) + .register(IO.raiseError[Int](new Exception("oops"))) + .surround(agent.adhoc.report)) + }.map(checkJson).evalTap(console.text[IO]).mapFilter(metricReport).compile.lastOrError.unsafeRunSync() + val gauge = retrieveGauge[Int](mr.snapshot.gauges) + assert(mr.snapshot.nonEmpty) + assert(gauge.isEmpty) + } } diff --git a/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/CloudWatchObserver.scala b/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/CloudWatchObserver.scala index 73fe5795d..863645363 100644 --- a/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/CloudWatchObserver.scala +++ b/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/CloudWatchObserver.scala @@ -18,6 +18,7 @@ import software.amazon.awssdk.services.cloudwatch.model.{ import java.time.Instant import java.util +import java.util.UUID import scala.jdk.CollectionConverters.* object CloudWatchObserver { @@ -56,9 +57,7 @@ final class CloudWatchObserver[F[_]: Sync] private ( unitBuilder(new UnitBuilder( UnitNormalization(timeUnit = CloudWatchTimeUnit.MILLISECONDS, infoUnit = None, rateUnit = None))).build - private def computeDatum( - report: MetricReport, - last: Map[MetricKey, Long]): (List[MetricDatum], Map[MetricKey, Long]) = { + private def computeDatum(report: MetricReport): List[MetricDatum] = { val timer_histo: List[MetricDatum] = for { hf <- histogramB.build @@ -69,8 +68,7 @@ final class CloudWatchObserver[F[_]: Sync] private ( serviceParams = report.serviceParams, metricLabel = timer.metricId.metricLabel, metricName = s"${timer.metricId.metricName.name}_$category", - standardUnit = CloudWatchTimeUnit.toStandardUnit(unitNormalization.timeUnit), - storageResolution = storageResolution + standardUnit = CloudWatchTimeUnit.toStandardUnit(unitNormalization.timeUnit) ).metricDatum(report.timestamp.toInstant, unitNormalization.normalize(dur).value) } @@ -84,75 +82,61 @@ final class CloudWatchObserver[F[_]: Sync] private ( serviceParams = report.serviceParams, metricLabel = histo.metricId.metricLabel, metricName = s"${histo.metricId.metricName.name}_$category", - standardUnit = CloudWatchTimeUnit.toStandardUnit(unit), - storageResolution = storageResolution + standardUnit = CloudWatchTimeUnit.toStandardUnit(unit) ).metricDatum(report.timestamp.toInstant, data) } - val timer_calls: Map[MetricKey, Long] = + val lookup: Map[UUID, Long] = report.previous.lookupCount + + val timer_count: List[MetricDatum] = report.snapshot.timers.map { timer => + val calls: Long = timer.timer.calls + val delta: Long = lookup.get(timer.metricId.metricName.uuid).fold(calls)(calls - _) MetricKey( serviceParams = report.serviceParams, metricLabel = timer.metricId.metricLabel, metricName = timer.metricId.metricName.name, - standardUnit = StandardUnit.COUNT, - storageResolution = storageResolution - ) -> timer.timer.calls - }.toMap + standardUnit = StandardUnit.COUNT + ).metricDatum(report.timestamp.toInstant, delta.toDouble) + } - val meter_sum: Map[MetricKey, Long] = + val meter_count: List[MetricDatum] = report.snapshot.meters.map { meter => - val Normalized(data, unit) = unitNormalization.normalize(meter.meter.unit, meter.meter.sum) + val sum: Long = meter.meter.sum + val value: Long = lookup.get(meter.metricId.metricName.uuid).fold(sum)(sum - _) + val Normalized(delta, unit) = unitNormalization.normalize(meter.meter.unit, value) MetricKey( serviceParams = report.serviceParams, metricLabel = meter.metricId.metricLabel, metricName = meter.metricId.metricName.name, - standardUnit = CloudWatchTimeUnit.toStandardUnit(unit), - storageResolution = storageResolution - ) -> data.toLong - }.toMap + standardUnit = CloudWatchTimeUnit.toStandardUnit(unit) + ).metricDatum(report.timestamp.toInstant, delta) + } - val histogram_updates: Map[MetricKey, Long] = + val histogram_count: List[MetricDatum] = if (histogramB.includeUpdate) report.snapshot.histograms.map { histo => + val updates: Long = histo.histogram.updates + val delta: Long = lookup.get(histo.metricId.metricName.uuid).fold(updates)(updates - _) MetricKey( serviceParams = report.serviceParams, metricLabel = histo.metricId.metricLabel, metricName = histo.metricId.metricName.name, - standardUnit = StandardUnit.COUNT, - storageResolution = storageResolution - ) -> histo.histogram.updates - }.toMap - else Map.empty - - val counterKeyMap: Map[MetricKey, Long] = timer_calls ++ meter_sum ++ histogram_updates - - val (counters, lastUpdates) = counterKeyMap.foldLeft((List.empty[MetricDatum], last)) { - case ((mds, newLast), (key, count)) => - newLast.get(key) match { - // counters of timer/meter increase monotonically. - // however, service may be crushed and restart - // such that counters are reset to zero. - case Some(old) => - val nc = if (count > old) count - old else 0 - (key.metricDatum(report.timestamp.toInstant, nc.toDouble) :: mds, newLast.updated(key, count)) - case None => - (key.metricDatum(report.timestamp.toInstant, count.toDouble) :: mds, newLast.updated(key, count)) + standardUnit = StandardUnit.COUNT + ).metricDatum(report.timestamp.toInstant, delta.toDouble) } - } - (counters ::: timer_histo ::: histograms, lastUpdates) + else Nil + + timer_count ::: meter_count ::: histogram_count ::: timer_histo ::: histograms } def observe(namespace: CloudWatchNamespace): Pipe[F, NJEvent, NJEvent] = (es: Stream[F, NJEvent]) => { - def go(cwc: CloudWatch[F], ss: Stream[F, NJEvent], last: Map[MetricKey, Long]): Pull[F, NJEvent, Unit] = + def go(cwc: CloudWatch[F], ss: Stream[F, NJEvent]): Pull[F, NJEvent, Unit] = ss.pull.uncons.flatMap { case Some((events, tail)) => - val (mds, next) = - events.collect { case mr: MetricReport => mr }.foldLeft((List.empty[MetricDatum], last)) { - case ((lmd, last), mr) => - val (mds, newLast) = computeDatum(mr, last) - (mds ::: lmd, newLast) - } + val mds: List[MetricDatum] = events.toList.collect { case mr: MetricReport => + computeDatum(mr) + }.flatten val publish: F[List[Either[Throwable, PutMetricDataResponse]]] = mds // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html @@ -160,29 +144,26 @@ final class CloudWatchObserver[F[_]: Sync] private ( .toList .traverse(ds => cwc.putMetricData(_.namespace(namespace.value).metricData(ds.asJava)).attempt) - Pull.eval(publish) >> Pull.output(events) >> go(cwc, tail, next) + Pull.eval(publish) >> Pull.output(events) >> go(cwc, tail) case None => Pull.done } - Stream.resource(client).flatMap(cw => go(cw, es, Map.empty).stream) + Stream.resource(client).flatMap(cw => go(cw, es).stream) } private case class MetricKey( serviceParams: ServiceParams, metricLabel: MetricLabel, metricName: String, - standardUnit: StandardUnit, - storageResolution: Int) { + standardUnit: StandardUnit) { + + private val permanent: Map[String, String] = Map( + textConstants.CONSTANT_LABEL -> metricLabel.label, + textConstants.CONSTANT_MEASUREMENT -> metricLabel.measurement.value) private val dimensions: util.List[Dimension] = - dimensionBuilder( - new DimensionBuilder( - serviceParams, - metricLabel, - Map( - textConstants.CONSTANT_LABEL -> metricLabel.label, - textConstants.CONSTANT_MEASUREMENT -> metricLabel.measurement))).build + dimensionBuilder(new DimensionBuilder(serviceParams, permanent)).build def metricDatum(ts: Instant, value: Double): MetricDatum = MetricDatum diff --git a/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/DimensionBuilder.scala b/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/DimensionBuilder.scala index a598ad283..4e48f3119 100644 --- a/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/DimensionBuilder.scala +++ b/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/cloudwatch/DimensionBuilder.scala @@ -1,7 +1,7 @@ package com.github.chenharryhua.nanjin.guard.observers.cloudwatch import cats.implicits.toShow -import com.github.chenharryhua.nanjin.guard.config.{MetricLabel, ServiceParams} +import com.github.chenharryhua.nanjin.guard.config.ServiceParams import com.github.chenharryhua.nanjin.guard.translator.textConstants import org.typelevel.cats.time.instances.localdate import software.amazon.awssdk.services.cloudwatch.model.Dimension @@ -9,14 +9,11 @@ import software.amazon.awssdk.services.cloudwatch.model.Dimension import java.util import scala.jdk.CollectionConverters.SeqHasAsJava -final class DimensionBuilder private[cloudwatch] ( - serviceParams: ServiceParams, - metricLabel: MetricLabel, - map: Map[String, String]) +final class DimensionBuilder private[cloudwatch] (serviceParams: ServiceParams, map: Map[String, String]) extends localdate { private def add(key: String, value: String): DimensionBuilder = - new DimensionBuilder(serviceParams, metricLabel, map.updated(key, value)) + new DimensionBuilder(serviceParams, map.updated(key, value)) def withTaskName: DimensionBuilder = add(textConstants.CONSTANT_TASK, serviceParams.taskName.value) @@ -33,9 +30,6 @@ final class DimensionBuilder private[cloudwatch] ( def withServiceID: DimensionBuilder = add(textConstants.CONSTANT_SERVICE_ID, serviceParams.serviceId.show) - def withDigest: DimensionBuilder = - add(textConstants.CONSTANT_DIGEST, metricLabel.digest) - private[cloudwatch] def build: util.List[Dimension] = map.map { case (k, v) => Dimension.builder().name(k).value(v).build() diff --git a/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/sns/SlackTranslator.scala b/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/sns/SlackTranslator.scala index 5044357c9..2124c973a 100644 --- a/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/sns/SlackTranslator.scala +++ b/observers/aws/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/sns/SlackTranslator.scala @@ -168,7 +168,8 @@ private object SlackTranslator extends all { HeaderSection(eventTitle(evt)), host_service_section(evt.serviceParams), metrics_index_section(evt), - MarkdownSection(show"*$CONSTANT_SERVICE_ID:* ${evt.serviceParams.serviceId}"), + MarkdownSection(show"""|*$CONSTANT_POLICY:* ${evt.serviceParams.servicePolicies.metricReport} + |*$CONSTANT_SERVICE_ID:* ${evt.serviceParams.serviceId}""".stripMargin), metrics_section(evt.snapshot) ) ), @@ -187,7 +188,8 @@ private object SlackTranslator extends all { HeaderSection(eventTitle(evt)), host_service_section(evt.serviceParams), metrics_index_section(evt), - MarkdownSection(s"*$CONSTANT_SERVICE_ID:* ${evt.serviceParams.serviceId.show}"), + MarkdownSection(show"""|*$CONSTANT_POLICY:* ${evt.serviceParams.servicePolicies.metricReset} + |*$CONSTANT_SERVICE_ID:* ${evt.serviceParams.serviceId}""".stripMargin), metrics_section(evt.snapshot) ) )) diff --git a/observers/database/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/postgres/JsonTranslator.scala b/observers/database/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/postgres/JsonTranslator.scala index 7ee418c6d..4c2ff2b6c 100644 --- a/observers/database/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/postgres/JsonTranslator.scala +++ b/observers/database/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/postgres/JsonTranslator.scala @@ -18,60 +18,60 @@ private object JsonTranslator { private def service_started(evt: ServiceStart): Json = Json.obj( - "event" -> EventName.ServiceStart.camelJson, + "event" -> EventName.ServiceStart.snakeJson, jsonHelper.index(evt.tick), - jsonHelper.serviceParams(evt.serviceParams), + jsonHelper.service_params(evt.serviceParams), jsonHelper.timestamp(evt)) private def service_panic(evt: ServicePanic): Json = Json.obj( - "event" -> EventName.ServicePanic.camelJson, + "event" -> EventName.ServicePanic.snakeJson, jsonHelper.index(evt.tick), - jsonHelper.serviceName(evt.serviceParams), + jsonHelper.service_name(evt.serviceParams), jsonHelper.policy(evt.serviceParams.servicePolicies.restart), jsonHelper.stack(evt.error), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), jsonHelper.timestamp(evt) ) private def service_stopped(evt: ServiceStop): Json = Json.obj( - "event" -> EventName.ServiceStop.camelJson, - jsonHelper.serviceName(evt.serviceParams), - jsonHelper.exitCode(evt.cause), - jsonHelper.exitCause(evt.cause), + "event" -> EventName.ServiceStop.snakeJson, + jsonHelper.service_name(evt.serviceParams), + jsonHelper.exit_code(evt.cause), + jsonHelper.exit_cause(evt.cause), jsonHelper.policy(evt.serviceParams.servicePolicies.restart), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), jsonHelper.timestamp(evt) ) private def metric_report(evt: MetricReport): Json = Json.obj( - "event" -> EventName.MetricReport.camelJson, - jsonHelper.metricIndex(evt.index), - jsonHelper.serviceName(evt.serviceParams), + "event" -> EventName.MetricReport.snakeJson, + jsonHelper.metric_index(evt.index), + jsonHelper.service_name(evt.serviceParams), took(evt.took), metrics(evt.snapshot), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), jsonHelper.timestamp(evt) ) private def metric_reset(evt: MetricReset): Json = Json.obj( - "event" -> EventName.MetricReset.camelJson, - jsonHelper.metricIndex(evt.index), - jsonHelper.serviceName(evt.serviceParams), + "event" -> EventName.MetricReset.snakeJson, + jsonHelper.metric_index(evt.index), + jsonHelper.service_name(evt.serviceParams), took(evt.took), metrics(evt.snapshot), - jsonHelper.serviceId(evt.serviceParams), + jsonHelper.service_id(evt.serviceParams), jsonHelper.timestamp(evt) ) private def service_message(evt: ServiceMessage): Json = Json.obj( - "event" -> EventName.ServiceMessage.camelJson, - "message" -> jsonHelper.jsonServiceMessage(evt), - jsonHelper.serviceId(evt.serviceParams), + "event" -> EventName.ServiceMessage.snakeJson, + "message" -> jsonHelper.json_service_message(evt), + jsonHelper.service_id(evt.serviceParams), jsonHelper.timestamp(evt) ) diff --git a/observers/influxdb/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/influxdb/InfluxdbObserver.scala b/observers/influxdb/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/influxdb/InfluxdbObserver.scala index 582403afc..b714052f0 100644 --- a/observers/influxdb/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/influxdb/InfluxdbObserver.scala +++ b/observers/influxdb/src/main/scala/com/github/chenharryhua/nanjin/guard/observers/influxdb/InfluxdbObserver.scala @@ -6,15 +6,7 @@ import cats.implicits.toShow import com.github.chenharryhua.nanjin.guard.config.ServiceParams import com.github.chenharryhua.nanjin.guard.event.{NJEvent, Snapshot} import com.github.chenharryhua.nanjin.guard.translator.metricConstants -import com.github.chenharryhua.nanjin.guard.translator.textConstants.{ - CONSTANT_DIGEST, - CONSTANT_HOST, - CONSTANT_LABEL, - CONSTANT_LAUNCH_TIME, - CONSTANT_SERVICE, - CONSTANT_SERVICE_ID, - CONSTANT_TASK -} +import com.github.chenharryhua.nanjin.guard.translator.textConstants.* import com.influxdb.client.domain.WritePrecision import com.influxdb.client.write.Point import com.influxdb.client.{InfluxDBClient, WriteOptions} @@ -79,18 +71,18 @@ final class InfluxdbObserver[F[_]]( ) private def dimension(ms: Snapshot): Map[String, String] = - Map(CONSTANT_DIGEST -> ms.metricId.metricLabel.digest, CONSTANT_LABEL -> ms.metricId.metricLabel.label) + Map(CONSTANT_LABEL -> ms.metricId.metricLabel.label) val observe: Pipe[F, NJEvent, NJEvent] = (events: Stream[F, NJEvent]) => for { writer <- Stream.resource(client.map(_.makeWriteApi(writeOptions(WriteOptions.builder()).build()))) event <- events.evalTap { - case NJEvent.MetricReport(_, sp, snapshot, timestamp) => + case NJEvent.MetricReport(_, sp, _, snapshot, timestamp) => val spDimensions: Map[String, String] = dimension(sp) val timers: List[Point] = snapshot.timers.map { timer => val tagToAdd = dimension(timer) ++ spDimensions ++ tags Point - .measurement(timer.metricId.metricLabel.measurement) + .measurement(timer.metricId.metricLabel.measurement.value) .time(timestamp.toInstant, writePrecision) .addTag("label", timer.metricId.metricLabel.label) .addTag("name", timer.metricId.metricName.name) @@ -116,7 +108,7 @@ final class InfluxdbObserver[F[_]]( val meters: List[Point] = snapshot.meters.map { meter => val tagToAdd = dimension(meter) ++ spDimensions ++ tags Point - .measurement(meter.metricId.metricLabel.measurement) + .measurement(meter.metricId.metricLabel.measurement.value) .time(timestamp.toInstant, writePrecision) .addTag("label", meter.metricId.metricLabel.label) .addTag("name", meter.metricId.metricName.name) @@ -132,7 +124,7 @@ final class InfluxdbObserver[F[_]]( val counters: List[Point] = snapshot.counters.map { counter => val tagToAdd = dimension(counter) ++ spDimensions ++ tags Point - .measurement(counter.metricId.metricLabel.measurement) + .measurement(counter.metricId.metricLabel.measurement.value) .time(timestamp.toInstant, writePrecision) .addTag("label", counter.metricId.metricLabel.label) .addTag("name", counter.metricId.metricName.name) @@ -144,7 +136,7 @@ final class InfluxdbObserver[F[_]]( val tagToAdd = dimension(histo) ++ spDimensions ++ tags val unitName = s"(${histo.histogram.unit.symbol})" Point - .measurement(histo.metricId.metricLabel.measurement) + .measurement(histo.metricId.metricLabel.measurement.value) .time(timestamp.toInstant, writePrecision) .addTag("label", histo.metricId.metricLabel.label) .addTag("name", histo.metricId.metricName.name)