Skip to content

Commit

Permalink
metric report
Browse files Browse the repository at this point in the history
  • Loading branch information
Harry Chen committed Nov 15, 2024
1 parent 1a13b90 commit e5e8eb5
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.typelevel.cats.time.instances.duration
import squants.time.{Frequency, Hertz}

import java.time.Duration
import java.util.UUID
import scala.jdk.CollectionConverters.*

sealed trait Snapshot extends Product with Serializable { def metricId: MetricID }
Expand Down Expand Up @@ -97,9 +98,16 @@ final case class MetricSnapshot(
val stable = metricIDs.map(id => (id.metricLabel, id.metricName.name))
stable.distinct.size != stable.size
}

def lookupCount: Map[UUID, Long] = {
meters.map(m => m.metricId.metricName.uuid -> m.meter.sum) :::
timers.map(t => t.metricId.metricName.uuid -> t.timer.calls) :::
histograms.map(h => h.metricId.metricName.uuid -> h.histogram.updates)
}.toMap
}

object MetricSnapshot extends duration {
val empty: MetricSnapshot = MetricSnapshot(Nil, Nil, Nil, Nil, Nil)

def counters(metricRegistry: MetricRegistry): List[Snapshot.Counter] =
metricRegistry.getCounters().asScala.toList.mapFilter { case (name, counter) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object NJEvent {
final case class MetricReport(
index: MetricIndex,
serviceParams: ServiceParams,
previous: MetricSnapshot,
snapshot: MetricSnapshot,
timestamp: ZonedDateTime) // land time
extends MetricEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object eventFilters {
*/
def sampling(interval: FiniteDuration)(evt: NJEvent): Boolean =
evt match {
case MetricReport(mrt, sp, _, _) =>
case MetricReport(mrt, sp, _, _, _) =>
mrt match {
case MetricIndex.Adhoc(_) => true
case MetricIndex.Periodic(tick) =>
Expand All @@ -42,7 +42,7 @@ object eventFilters {
*/
def sampling(divisor: Refined[Int, Positive])(evt: NJEvent): Boolean =
evt match {
case MetricReport(mrt, _, _, _) =>
case MetricReport(mrt, _, _, _, _) =>
mrt match {
case MetricIndex.Adhoc(_) => true
case MetricIndex.Periodic(tick) => (tick.index % divisor.value) === 0
Expand All @@ -54,7 +54,7 @@ object eventFilters {
*/
def sampling(cronExpr: CronExpr)(evt: NJEvent): Boolean =
evt match {
case MetricReport(mrt, _, _, _) =>
case MetricReport(mrt, _, _, _, _) =>
mrt match {
case MetricIndex.Adhoc(_) => true
case MetricIndex.Periodic(tick) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.effect.kernel.Clock
import cats.implicits.{toFlatMapOps, toFunctorOps}
import com.codahale.metrics.MetricRegistry
import com.github.chenharryhua.nanjin.guard.config.ServiceParams
import com.github.chenharryhua.nanjin.guard.event.{MetricIndex, NJEvent}
import com.github.chenharryhua.nanjin.guard.event.{MetricIndex, MetricSnapshot, NJEvent}
import fs2.concurrent.Channel

abstract class MetricsReport[F[_]] private[service] (
Expand All @@ -27,7 +27,9 @@ abstract class MetricsReport[F[_]] private[service] (
.metricReport(
channel = channel,
serviceParams = serviceParams,
metricRegistry = metricRegistry,
index = MetricIndex.Adhoc(serviceParams.toZonedDateTime(ts)))
previous = MetricSnapshot.empty,
snapshot = MetricSnapshot(metricRegistry),
index = MetricIndex.Adhoc(serviceParams.toZonedDateTime(ts))
)
.void)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,32 @@ final class ServiceGuard[F[_]: Network] private[guard] (serviceName: ServiceName
event <- Stream.eval(Channel.unbounded[F, NJEvent]).flatMap { channel =>
val metricRegistry: MetricRegistry = new MetricRegistry()

val metrics_report: Stream[F, Nothing] =
tickStream[F](
TickStatus(serviceParams.zerothTick).renewPolicy(serviceParams.servicePolicies.metricReport))
.evalMap(tick =>
publisher
.metricReport(
channel = channel,
serviceParams = serviceParams,
metricRegistry = metricRegistry,
index = MetricIndex.Periodic(tick))
.flatMap(mr => metricsHistory.modify(queue => (queue, queue.add(mr)))))
val metrics_report: Stream[F, Nothing] = {
val head_snapshot: Stream[F, (Tick, MetricSnapshot)] =
Stream((serviceParams.zerothTick, MetricSnapshot.empty)).covary[F]

val tail_snapshots: Stream[F, (Tick, MetricSnapshot)] =
tickStream[F](
TickStatus(serviceParams.zerothTick).renewPolicy(serviceParams.servicePolicies.metricReport))
.map((_, MetricSnapshot(metricRegistry))) // capture current snapshot

(head_snapshot ++ tail_snapshots)
.sliding(2) // previous snapshot and current snapshot, respectively
.evalMap(_.toList match {
case previous :: current :: Nil =>
publisher
.metricReport(
channel = channel,
serviceParams = serviceParams,
index = MetricIndex.Periodic(current._1),
previous = previous._2,
snapshot = current._2)
.flatMap(mr => metricsHistory.modify(queue => (queue, queue.add(mr))))
.void
case _ => F.unit
})
.drain
}

val metrics_reset: Stream[F, Nothing] =
tickStream[F](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ private object publisher {
def metricReport[F[_]: Clock](
channel: Channel[F, NJEvent],
serviceParams: ServiceParams,
metricRegistry: MetricRegistry,
index: MetricIndex)(implicit F: Monad[F]): F[MetricReport] =
index: MetricIndex,
previous: MetricSnapshot,
snapshot: MetricSnapshot)(implicit F: Monad[F]): F[MetricReport] =
for {
ss <- F.pure(MetricSnapshot(metricRegistry))
now <- serviceParams.zonedNow[F]
mr = MetricReport(index, serviceParams, ss, now)
mr = MetricReport(
index = index,
serviceParams = serviceParams,
previous = previous,
snapshot = snapshot,
timestamp = now)
_ <- channel.send(mr)
} yield mr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import software.amazon.awssdk.services.cloudwatch.model.{

import java.time.Instant
import java.util
import java.util.UUID
import scala.jdk.CollectionConverters.*

object CloudWatchObserver {
Expand Down Expand Up @@ -56,9 +57,7 @@ final class CloudWatchObserver[F[_]: Sync] private (
unitBuilder(new UnitBuilder(
UnitNormalization(timeUnit = CloudWatchTimeUnit.MILLISECONDS, infoUnit = None, rateUnit = None))).build

private def computeDatum(
report: MetricReport,
last: Map[MetricKey, Long]): (List[MetricDatum], Map[MetricKey, Long]) = {
private def computeDatum(report: MetricReport): List[MetricDatum] = {

val timer_histo: List[MetricDatum] = for {
hf <- histogramB.build
Expand All @@ -69,8 +68,7 @@ final class CloudWatchObserver[F[_]: Sync] private (
serviceParams = report.serviceParams,
metricLabel = timer.metricId.metricLabel,
metricName = s"${timer.metricId.metricName.name}_$category",
standardUnit = CloudWatchTimeUnit.toStandardUnit(unitNormalization.timeUnit),
storageResolution = storageResolution
standardUnit = CloudWatchTimeUnit.toStandardUnit(unitNormalization.timeUnit)
).metricDatum(report.timestamp.toInstant, unitNormalization.normalize(dur).value)
}

Expand All @@ -84,96 +82,81 @@ final class CloudWatchObserver[F[_]: Sync] private (
serviceParams = report.serviceParams,
metricLabel = histo.metricId.metricLabel,
metricName = s"${histo.metricId.metricName.name}_$category",
standardUnit = CloudWatchTimeUnit.toStandardUnit(unit),
storageResolution = storageResolution
standardUnit = CloudWatchTimeUnit.toStandardUnit(unit)
).metricDatum(report.timestamp.toInstant, data)
}

val timer_calls: Map[MetricKey, Long] =
val lookup: Map[UUID, Long] = report.previous.lookupCount

val timer_count: List[MetricDatum] =
report.snapshot.timers.map { timer =>
val calls: Long = timer.timer.calls
val delta: Long = lookup.get(timer.metricId.metricName.uuid).fold(calls)(calls - _)
MetricKey(
serviceParams = report.serviceParams,
metricLabel = timer.metricId.metricLabel,
metricName = timer.metricId.metricName.name,
standardUnit = StandardUnit.COUNT,
storageResolution = storageResolution
) -> timer.timer.calls
}.toMap
standardUnit = StandardUnit.COUNT
).metricDatum(report.timestamp.toInstant, delta.toDouble)
}

val meter_sum: Map[MetricKey, Long] =
val meter_count: List[MetricDatum] =
report.snapshot.meters.map { meter =>
val Normalized(data, unit) = unitNormalization.normalize(meter.meter.unit, meter.meter.sum)
val sum: Long = meter.meter.sum
val value: Long = lookup.get(meter.metricId.metricName.uuid).fold(sum)(sum - _)
val Normalized(delta, unit) = unitNormalization.normalize(meter.meter.unit, value)
MetricKey(
serviceParams = report.serviceParams,
metricLabel = meter.metricId.metricLabel,
metricName = meter.metricId.metricName.name,
standardUnit = CloudWatchTimeUnit.toStandardUnit(unit),
storageResolution = storageResolution
) -> data.toLong
}.toMap
standardUnit = CloudWatchTimeUnit.toStandardUnit(unit)
).metricDatum(report.timestamp.toInstant, delta)
}

val histogram_updates: Map[MetricKey, Long] =
val histogram_count: List[MetricDatum] =
if (histogramB.includeUpdate)
report.snapshot.histograms.map { histo =>
val updates: Long = histo.histogram.updates
val delta: Long = lookup.get(histo.metricId.metricName.uuid).fold(updates)(updates - _)
MetricKey(
serviceParams = report.serviceParams,
metricLabel = histo.metricId.metricLabel,
metricName = histo.metricId.metricName.name,
standardUnit = StandardUnit.COUNT,
storageResolution = storageResolution
) -> histo.histogram.updates
}.toMap
else Map.empty

val counterKeyMap: Map[MetricKey, Long] = timer_calls ++ meter_sum ++ histogram_updates

val (counters, lastUpdates) = counterKeyMap.foldLeft((List.empty[MetricDatum], last)) {
case ((mds, newLast), (key, count)) =>
newLast.get(key) match {
// counters of timer/meter increase monotonically.
// however, service may be crushed and restart
// such that counters are reset to zero.
case Some(old) =>
val nc = if (count > old) count - old else 0
(key.metricDatum(report.timestamp.toInstant, nc.toDouble) :: mds, newLast.updated(key, count))
case None =>
(key.metricDatum(report.timestamp.toInstant, count.toDouble) :: mds, newLast.updated(key, count))
standardUnit = StandardUnit.COUNT
).metricDatum(report.timestamp.toInstant, delta.toDouble)
}
}
(counters ::: timer_histo ::: histograms, lastUpdates)
else Nil

timer_count ::: meter_count ::: histogram_count ::: timer_histo ::: histograms
}

def observe(namespace: CloudWatchNamespace): Pipe[F, NJEvent, NJEvent] = (es: Stream[F, NJEvent]) => {
def go(cwc: CloudWatch[F], ss: Stream[F, NJEvent], last: Map[MetricKey, Long]): Pull[F, NJEvent, Unit] =
def go(cwc: CloudWatch[F], ss: Stream[F, NJEvent]): Pull[F, NJEvent, Unit] =
ss.pull.uncons.flatMap {
case Some((events, tail)) =>
val (mds, next) =
events.collect { case mr: MetricReport => mr }.foldLeft((List.empty[MetricDatum], last)) {
case ((lmd, last), mr) =>
val (mds, newLast) = computeDatum(mr, last)
(mds ::: lmd, newLast)
}
val mds: List[MetricDatum] = events.toList.collect { case mr: MetricReport =>
computeDatum(mr)
}.flatten

val publish: F[List[Either[Throwable, PutMetricDataResponse]]] =
mds // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
.grouped(20)
.toList
.traverse(ds => cwc.putMetricData(_.namespace(namespace.value).metricData(ds.asJava)).attempt)

Pull.eval(publish) >> Pull.output(events) >> go(cwc, tail, next)
Pull.eval(publish) >> Pull.output(events) >> go(cwc, tail)

case None => Pull.done
}

Stream.resource(client).flatMap(cw => go(cw, es, Map.empty).stream)
Stream.resource(client).flatMap(cw => go(cw, es).stream)
}

private case class MetricKey(
serviceParams: ServiceParams,
metricLabel: MetricLabel,
metricName: String,
standardUnit: StandardUnit,
storageResolution: Int) {
standardUnit: StandardUnit) {

private val permanent: Map[String, String] = Map(
textConstants.CONSTANT_LABEL -> metricLabel.label,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class InfluxdbObserver[F[_]](
for {
writer <- Stream.resource(client.map(_.makeWriteApi(writeOptions(WriteOptions.builder()).build())))
event <- events.evalTap {
case NJEvent.MetricReport(_, sp, snapshot, timestamp) =>
case NJEvent.MetricReport(_, sp, _, snapshot, timestamp) =>
val spDimensions: Map[String, String] = dimension(sp)
val timers: List[Point] = snapshot.timers.map { timer =>
val tagToAdd = dimension(timer) ++ spDimensions ++ tags
Expand Down

0 comments on commit e5e8eb5

Please sign in to comment.