From 037e5dfa6e81ef0701ef4692e0b7e019c3027b79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Wed, 30 Oct 2024 14:52:05 +0200 Subject: [PATCH] Fix. --- TODO.md | 2 - .../TwTasksEnvironmentPostProcessor.java | 39 ++++ .../main/resources/META-INF/spring.factories | 1 + .../tasks/helpers/CoreMetricsTemplate.java | 166 +++++++++--------- 4 files changed, 123 insertions(+), 85 deletions(-) create mode 100644 tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java diff --git a/TODO.md b/TODO.md index 1887d44e..3aef293a 100644 --- a/TODO.md +++ b/TODO.md @@ -28,8 +28,6 @@ topics configurations. Could refactor the properties to more hierarhical structu 23. Start using Avro or other binary messages for triggering queue. This Json crap is expensive? -24. Add local extremums for most gauges. - 25. Check automatically if the concurrency policy returned is the same instance. Unfortunately, it is quite common for services to create a new instance everytime we ask it, for let's say SimpleConcurrencyPolicy, and with doing that, losing any concurrency control. diff --git a/tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java b/tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java new file mode 100644 index 00000000..5a8a1e2a --- /dev/null +++ b/tw-tasks-core-spring-boot-starter/src/main/java/com/transferwise/tasks/core/autoconfigure/TwTasksEnvironmentPostProcessor.java @@ -0,0 +1,39 @@ +package com.transferwise.tasks.core.autoconfigure; + +import com.transferwise.tasks.helpers.CoreMetricsTemplate; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.env.EnvironmentPostProcessor; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.PropertySource; + +@Slf4j +public class TwTasksEnvironmentPostProcessor implements EnvironmentPostProcessor { + + private static final String PROPERTY_SOURCE_KEY = TwTasksEnvironmentPostProcessor.class.getName(); + static final String TW_OBS_BASE_EXTREMUM_CONFIG_PATH = "transferwise.observability.base.metrics.local-extremum-gauge-names.tw-tasks"; + + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { + PropertySource propertySource = environment.getPropertySources().get(PROPERTY_SOURCE_KEY); + if (propertySource == null) { + final HashMap map = new HashMap<>(); + + // Calculate last minute min/max using tw-observability-base local extremums. + Set gaugeNames = new HashSet<>(); + gaugeNames.add(CoreMetricsTemplate.GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT); + gaugeNames.add(CoreMetricsTemplate.GAUGE_PROCESSING_RUNNING_TASKS_COUNT); + gaugeNames.add(CoreMetricsTemplate.GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT); + gaugeNames.add(CoreMetricsTemplate.GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT); + + map.put(TW_OBS_BASE_EXTREMUM_CONFIG_PATH, gaugeNames); + + MapPropertySource mapPropertySource = new MapPropertySource(PROPERTY_SOURCE_KEY, map); + environment.getPropertySources().addLast(mapPropertySource); + } + } +} diff --git a/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories b/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories index 46ceed1a..bbc40bdf 100644 --- a/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/tw-tasks-core-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -1,2 +1,3 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.transferwise.tasks.core.autoconfigure.TwTasksCoreAutoConfiguration org.springframework.context.ApplicationListener=com.transferwise.tasks.TwTasksApplicationListener +org.springframework.boot.env.EnvironmentPostProcessor=com.transferwise.tasks.core.autoconfigure.TwTasksEnvironmentPostProcessor diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 4d601e4d..70dcae0c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -29,70 +29,70 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { - private static final String METRIC_LIBRARY_INFO = "tw.library.info"; - - private static final String METRIC_TASKS_MARKED_AS_ERROR_COUNT = METRIC_PREFIX + "tasks.markedAsErrorCount"; - private static final String METRIC_TASKS_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.processingsCount"; - private static final String METRIC_TASKS_ONGOING_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.ongoingProcessingsCount"; - private static final String METRIC_TASKS_PROCESSED_COUNT = METRIC_PREFIX + "tasks.processedCount"; - private static final String METRIC_TASKS_PROCESSING_TIME = METRIC_PREFIX + "tasks.processingTime"; - private static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount"; - private static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck"; - private static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing"; - private static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType"; - private static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty"; - private static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount"; - private static final String METRIC_TASKS_RESUMER_SCHEDULED_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.scheduledTasks.resumedCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_FAILED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markFailedCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_IGNORED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.ignoredCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.resumedCount"; - private static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_ERROR_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markErrorCount"; - private static final String METRIC_TASKS_FAILED_GRABBINGS_COUNT = METRIC_PREFIX + "tasks.failedGrabbingsCount"; - private static final String METRIC_TASKS_RETRIES_COUNT = METRIC_PREFIX + "tasks.retriesCount"; - private static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; - private static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; - private static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; - private static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; - private static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; - private static final String METRIC_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; - private static final String METRIC_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.activeTriggeringsCount"; - private static final String METRIC_BUCKETS_MANAGER_BUCKETS_COUNT = METRIC_PREFIX + "bucketsManager.bucketsCount"; - private static final String METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT = METRIC_PREFIX + "processing.ongoingTasksGrabbingsCount"; - private static final String METRIC_TASKS_CLEANER_DELETABLE_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletableTasksCount"; - private static final String METRIC_TASKS_CLEANER_DELETED_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTasksCount"; - private static final String METRIC_TASKS_CLEANER_DELETED_UNIQUE_KEYS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedUniqueKeysCount"; - private static final String METRIC_TASKS_CLEANER_DELETED_TASK_DATAS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTaskDatasCount"; - private static final String METRIC_TASKS_CLEANER_DELETE_LAG_SECONDS = METRIC_PREFIX + "tasksCleaner.deleteLagSeconds"; - private static final String METRIC_DAO_DATA_SIZE = METRIC_PREFIX + "dao.data.size"; - private static final String METRIC_DAO_DATA_SERIALIZED_SIZE = METRIC_PREFIX + "dao.data.serialized.size"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT = + private static final String GAUGE_LIBRARY_INFO = "tw.library.info"; + + public static final String METRIC_TASKS_MARKED_AS_ERROR_COUNT = METRIC_PREFIX + "tasks.markedAsErrorCount"; + public static final String METRIC_TASKS_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.processingsCount"; + public static final String GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT = METRIC_PREFIX + "tasks.ongoingProcessingsCount"; + public static final String METRIC_TASKS_PROCESSED_COUNT = METRIC_PREFIX + "tasks.processedCount"; + public static final String METRIC_TASKS_PROCESSING_TIME = METRIC_PREFIX + "tasks.processingTime"; + public static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount"; + public static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck"; + public static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing"; + public static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType"; + public static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty"; + public static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount"; + public static final String METRIC_TASKS_RESUMER_SCHEDULED_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.scheduledTasks.resumedCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_FAILED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markFailedCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_IGNORED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.ignoredCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_RESUMED_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.resumedCount"; + public static final String METRIC_TASKS_RESUMER_STUCK_TASKS_MARK_ERROR_COUNT = METRIC_PREFIX + "tasksResumer.stuckTasks.markErrorCount"; + public static final String METRIC_TASKS_FAILED_GRABBINGS_COUNT = METRIC_PREFIX + "tasks.failedGrabbingsCount"; + public static final String METRIC_TASKS_RETRIES_COUNT = METRIC_PREFIX + "tasks.retriesCount"; + public static final String METRIC_TASKS_RESUMINGS_COUNT = METRIC_PREFIX + "tasks.resumingsCount"; + public static final String METRIC_TASKS_MARKED_AS_FAILED_COUNT = METRIC_PREFIX + "tasks.markedAsFailedCount"; + public static final String METRIC_TASKS_RESCHEDULED_COUNT = METRIC_PREFIX + "tasks.rescheduledCount"; + public static final String METRIC_TASKS_FAILED_NEXT_EVENT_TIME_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedNextEventTimeChangeCount"; + public static final String METRIC_TASKS_ADDINGS_COUNT = METRIC_PREFIX + "task.addings.count"; + public static final String GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.inProgressTriggeringsCount"; + public static final String GAUGE_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT = METRIC_PREFIX + "tasksService.activeTriggeringsCount"; + public static final String METRIC_BUCKETS_MANAGER_BUCKETS_COUNT = METRIC_PREFIX + "bucketsManager.bucketsCount"; + public static final String GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT = METRIC_PREFIX + "processing.ongoingTasksGrabbingsCount"; + public static final String METRIC_TASKS_CLEANER_DELETABLE_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletableTasksCount"; + public static final String METRIC_TASKS_CLEANER_DELETED_TASKS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTasksCount"; + public static final String METRIC_TASKS_CLEANER_DELETED_UNIQUE_KEYS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedUniqueKeysCount"; + public static final String METRIC_TASKS_CLEANER_DELETED_TASK_DATAS_COUNT = METRIC_PREFIX + "tasksCleaner.deletedTaskDatasCount"; + public static final String GAUGE_TASKS_CLEANER_DELETE_LAG_SECONDS = METRIC_PREFIX + "tasksCleaner.deleteLagSeconds"; + public static final String METRIC_DAO_DATA_SIZE = METRIC_PREFIX + "dao.data.size"; + public static final String METRIC_DAO_DATA_SERIALIZED_SIZE = METRIC_PREFIX + "dao.data.serialized.size"; + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.pollingBucketsCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_RECEIVED_TRIGGERS_COUNT = + public static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_RECEIVED_TRIGGERS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.receivedTriggersCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_COMMITS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.commitsCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSET_ALREADY_COMMITTED = + public static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_COMMITS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.commitsCount"; + public static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSET_ALREADY_COMMITTED = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetAlreadyCommitted"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetsToBeCommitedCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetsCompletedCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.unprocessedFetchedRecordsCount"; - private static final String METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT = + public static final String GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT = METRIC_PREFIX + "kafkaTasksExecutionTriggerer.offsetsCount"; - private static final String METRIC_HEALTH_TASKS_IN_ERROR_COUNT = METRIC_PREFIX + "health.tasksInErrorCount"; - private static final String METRIC_HEALTH_STUCK_TASKS_COUNT = METRIC_PREFIX + "health.stuckTasksCount"; - private static final String METRIC_HEALTH_TASK_HISTORY_LENGTH_SECONDS = METRIC_PREFIX + "health.tasksHistoryLengthSeconds"; - private static final String METRIC_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE = METRIC_PREFIX + "health.tasksInErrorCountPerType"; - private static final String METRIC_HEALTH_STUCK_TASKS_COUNT_PER_TYPE = METRIC_PREFIX + "health.stuckTasksCountPerType"; - private static final String METRIC_STATE_APPROXIMATE_TASKS = METRIC_PREFIX + "state.approximateTasks"; - private static final String METRIC_STATE_APPROXIMATE_UNIQUE_KEYS = METRIC_PREFIX + "state.approximateUniqueKeys"; - private static final String METRIC_STATE_APPROXIMATE_TASK_DATAS = METRIC_PREFIX + "state.approximateTaskDatas"; - private static final String METRIC_PROCESSING_TYPE_TRIGGERS_COUNT = METRIC_PREFIX + "processing.typeTriggersCount"; - private static final String METRIC_PROCESSING_RUNNING_TASKS_COUNT = METRIC_PREFIX + "processing.runningTasksCount"; - private static final String METRIC_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT = METRIC_PREFIX + "processing.inProgressTasksGrabbingCount"; - private static final String METRIC_PROCESSING_TRIGGERS_COUNT = METRIC_PREFIX + "processing.triggersCount"; - private static final String METRIC_PROCESSING_STATE_VERSION = METRIC_PREFIX + "processing.stateVersion"; + public static final String GAUGE_METRIC_HEALTH_TASKS_IN_ERROR_COUNT = METRIC_PREFIX + "health.tasksInErrorCount"; + public static final String GAUGE_HEALTH_STUCK_TASKS_COUNT = METRIC_PREFIX + "health.stuckTasksCount"; + public static final String GAUGE_HEALTH_TASK_HISTORY_LENGTH_SECONDS = METRIC_PREFIX + "health.tasksHistoryLengthSeconds"; + public static final String GAUGE_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE = METRIC_PREFIX + "health.tasksInErrorCountPerType"; + public static final String GAUGE_HEALTH_STUCK_TASKS_COUNT_PER_TYPE = METRIC_PREFIX + "health.stuckTasksCountPerType"; + public static final String GAUGE_METRIC_STATE_APPROXIMATE_TASKS = METRIC_PREFIX + "state.approximateTasks"; + public static final String GAUGE_STATE_APPROXIMATE_UNIQUE_KEYS = METRIC_PREFIX + "state.approximateUniqueKeys"; + public static final String GAUGE_STATE_APPROXIMATE_TASK_DATAS = METRIC_PREFIX + "state.approximateTaskDatas"; + public static final String GAUGE_PROCESSING_TYPE_TRIGGERS_COUNT = METRIC_PREFIX + "processing.typeTriggersCount"; + public static final String GAUGE_PROCESSING_RUNNING_TASKS_COUNT = METRIC_PREFIX + "processing.runningTasksCount"; + public static final String GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT = METRIC_PREFIX + "processing.inProgressTasksGrabbingCount"; + public static final String GAUGE_PROCESSING_TRIGGERS_COUNT = METRIC_PREFIX + "processing.triggersCount"; + public static final String GAUGE_PROCESSING_STATE_VERSION = METRIC_PREFIX + "processing.stateVersion"; private static final String TAG_PROCESSING_RESULT = "processingResult"; private static final String TAG_FROM_STATUS = "fromStatus"; @@ -137,10 +137,10 @@ public void registerTaskProcessingStart(String bucketId, String taskType) { meterCache.counter(METRIC_TASKS_PROCESSINGS_COUNT, TagsSet.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType)) .increment(); - taskProcessingGauges.computeIfAbsent(Triple.of(METRIC_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType), (t) -> { + taskProcessingGauges.computeIfAbsent(Triple.of(GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType), (t) -> { AtomicInteger counter = new AtomicInteger(0); meterCache.getMeterRegistry() - .gauge(METRIC_TASKS_ONGOING_PROCESSINGS_COUNT, Tags.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType), counter); + .gauge(GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT, Tags.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType), counter); return counter; }).incrementAndGet(); } @@ -154,7 +154,7 @@ public void registerTaskProcessingEnd(String bucketId, String taskType, long pro meterCache.timer(METRIC_TASKS_PROCESSING_TIME, TagsSet.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType, TAG_PROCESSING_RESULT, processingResult)) .record(TwContextClockHolder.getClock().millis() - processingStartTimeMs, TimeUnit.MILLISECONDS); - taskProcessingGauges.get(Triple.of(METRIC_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType)) + taskProcessingGauges.get(Triple.of(GAUGE_TASKS_ONGOING_PROCESSINGS_COUNT, resolvedBucketId, taskType)) .decrementAndGet(); } @@ -291,25 +291,25 @@ TAG_DATA_SIZE, getDataSizeBucket(data) @Override public void registerInProgressTriggeringsCount(AtomicInteger count) { - Gauge.builder(METRIC_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT, count::get) + Gauge.builder(GAUGE_TASKS_SERVICE_IN_PROGRESS_TRIGGERINGS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @Override public void registerActiveTriggeringsCount(AtomicInteger count) { - Gauge.builder(METRIC_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT, count::get) + Gauge.builder(GAUGE_TASKS_SERVICE_ACTIVE_TRIGGERINGS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @Override public void registerOngoingTasksGrabbingsCount(AtomicInteger count) { - Gauge.builder(METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT, count::get) + Gauge.builder(GAUGE_METRIC_PROCESSING_ONGOING_TASKS_GRABBINGS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @Override public void registerPollingBucketsCount(AtomicInteger count) { - Gauge.builder(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT, count::get) + Gauge.builder(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_POLLING_BUCKETS_COUNT, count::get) .register(meterCache.getMeterRegistry()); } @@ -358,7 +358,7 @@ public void registerKafkaTasksExecutionTriggererAlreadyCommitedOffset(String buc @Override public Object registerTasksCleanerDeleteLagSeconds(TaskStatus status, AtomicLong lagInSeconds) { - return registerGauge(METRIC_TASKS_CLEANER_DELETE_LAG_SECONDS, lagInSeconds::get, TAG_TASK_STATUS, status.name()); + return registerGauge(GAUGE_TASKS_CLEANER_DELETE_LAG_SECONDS, lagInSeconds::get, TAG_TASK_STATUS, status.name()); } @Override @@ -373,90 +373,90 @@ public void unregisterMetric(Object rawMetricHandle) { @Override public Object registerTasksInErrorCount(AtomicInteger erroneousTasksCount) { - return registerGauge(METRIC_HEALTH_TASKS_IN_ERROR_COUNT, erroneousTasksCount::get); + return registerGauge(GAUGE_METRIC_HEALTH_TASKS_IN_ERROR_COUNT, erroneousTasksCount::get); } @Override public Object registerTasksInErrorCount(String taskType, AtomicInteger count) { - return registerGauge(METRIC_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE, count::get, TAG_TASK_TYPE, taskType); + return registerGauge(GAUGE_HEALTH_TASKS_IN_ERROR_COUNT_PER_TYPE, count::get, TAG_TASK_TYPE, taskType); } @Override public Object registerStuckTasksCount(AtomicInteger stuckTasksCount) { - return registerGauge(METRIC_HEALTH_STUCK_TASKS_COUNT, stuckTasksCount::get); + return registerGauge(GAUGE_HEALTH_STUCK_TASKS_COUNT, stuckTasksCount::get); } @Override public Object registerStuckTasksCount(TaskStatus status, String type, AtomicInteger count) { - return registerGauge(METRIC_HEALTH_STUCK_TASKS_COUNT_PER_TYPE, count::get, TAG_TASK_STATUS, status.name(), TAG_TASK_TYPE, type); + return registerGauge(GAUGE_HEALTH_STUCK_TASKS_COUNT_PER_TYPE, count::get, TAG_TASK_STATUS, status.name(), TAG_TASK_TYPE, type); } @Override public Object registerApproximateTasksCount(AtomicLong approximateTasksCount) { - return registerGauge(METRIC_STATE_APPROXIMATE_TASKS, approximateTasksCount::get); + return registerGauge(GAUGE_METRIC_STATE_APPROXIMATE_TASKS, approximateTasksCount::get); } @Override public Object registerApproximateUniqueKeysCount(AtomicLong approximateUniqueKeysCount) { - return registerGauge(METRIC_STATE_APPROXIMATE_UNIQUE_KEYS, approximateUniqueKeysCount::get); + return registerGauge(GAUGE_STATE_APPROXIMATE_UNIQUE_KEYS, approximateUniqueKeysCount::get); } @Override public Object registerApproximateTaskDatasCount(AtomicLong approximateTaskDatasCount) { - return registerGauge(METRIC_STATE_APPROXIMATE_TASK_DATAS, approximateTaskDatasCount::get); + return registerGauge(GAUGE_STATE_APPROXIMATE_TASK_DATAS, approximateTaskDatasCount::get); } @Override public Object registerTaskHistoryLength(TaskStatus status, AtomicLong lengthInSeconds) { - return registerGauge(METRIC_HEALTH_TASK_HISTORY_LENGTH_SECONDS, lengthInSeconds::get, TAG_TASK_STATUS, status.name()); + return registerGauge(GAUGE_HEALTH_TASK_HISTORY_LENGTH_SECONDS, lengthInSeconds::get, TAG_TASK_STATUS, status.name()); } @Override public Object registerProcessingTriggersCount(String bucketId, String taskType, Supplier countSupplier) { - return registerGauge(METRIC_PROCESSING_TYPE_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId), + return registerGauge(GAUGE_PROCESSING_TYPE_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId), TAG_TASK_TYPE, taskType); } @Override public Object registerProcessingTriggersCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_PROCESSING_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerRunningTasksCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_PROCESSING_RUNNING_TASKS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_RUNNING_TASKS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerInProgressTasksGrabbingCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerProcessingStateVersion(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_PROCESSING_STATE_VERSION, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_PROCESSING_STATE_VERSION, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererOffsetsToBeCommitedCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT, countSupplier, TAG_BUCKET_ID, + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_TO_BE_COMMITED_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererOffsetsCompletedCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COMPLETED_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererUnprocessedFetchedRecordsCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT, countSupplier, TAG_BUCKET_ID, + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_UNPROCESSED_FETCHED_RECORDS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override public Object registerKafkaTasksExecutionTriggererOffsetsCount(String bucketId, Supplier countSupplier) { - return registerGauge(METRIC_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + return registerGauge(GAUGE_KAFKA_TASKS_EXECUTION_TRIGGERER_OFFSETS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } @Override @@ -471,7 +471,7 @@ public void registerLibrary() { version = "Unknown"; } - Gauge.builder(METRIC_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tasks-core") + Gauge.builder(GAUGE_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tasks-core") .description("Provides metadata about the library, for example the version.") .register(meterCache.getMeterRegistry());