Skip to content

Commit

Permalink
feat: add scalable metrics
Browse files Browse the repository at this point in the history
This should make it easier to see whether the KPA is can be scaled
  • Loading branch information
Steve Mason committed Jul 6, 2023
1 parent fa8dd5d commit a208b40
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,22 @@ public UpdateControl<KafkaPodAutoscaler> reconcile(KafkaPodAutoscaler kafkaPodAu

if (resource == null) {
statusLogger.clearStatus();
statusLogger.setScaleable(false);
statusLogger.log(targetKind + " not found. Skipping scale");
return UpdateControl.patchStatus(kafkaPodAutoscaler)
.rescheduleAfter(Duration.ofSeconds(10));
}

if (resource.getReplicaCount() == 0) {
statusLogger.clearStatus();
statusLogger.setScaleable(false);
statusLogger.log(targetKind + " has been scaled to zero. Skipping scale");
return UpdateControl.patchStatus(kafkaPodAutoscaler)
.rescheduleAfter(Duration.ofSeconds(10));
}

if (!resource.isReady()) {
statusLogger.setScaleable(false);
statusLogger.log(targetKind + " is not ready. Skipping scale");
return UpdateControl.patchStatus(kafkaPodAutoscaler)
.rescheduleAfter(Duration.ofSeconds(10));
Expand Down Expand Up @@ -98,6 +101,7 @@ public UpdateControl<KafkaPodAutoscaler> reconcile(KafkaPodAutoscaler kafkaPodAu
if (currentReplicaCount != finalReplicaCount) {
var rescaleWindow = Instant.now().minus(Duration.ofSeconds(kafkaPodAutoscaler.getSpec().getCooloffSeconds()));
if (statusLogger.getLastScale() != null && statusLogger.getLastScale().isAfter(rescaleWindow)) {
statusLogger.setScaleable(false);
statusLogger.log(targetKind + " has been scaled recently. Skipping scale");
return UpdateControl.patchStatus(kafkaPodAutoscaler)
.rescheduleAfter(Duration.ofSeconds(10));
Expand All @@ -117,6 +121,7 @@ public UpdateControl<KafkaPodAutoscaler> reconcile(KafkaPodAutoscaler kafkaPodAu
} else {
statusLogger.log(targetKind + " is correctly scaled to " + finalReplicaCount + " replicas");
}
statusLogger.setScaleable(true);

return UpdateControl.patchStatus(kafkaPodAutoscaler)
// TODO: Backoff if scaled up/down - allow this to be configurable
Expand Down Expand Up @@ -291,6 +296,10 @@ public void recordLastScale() {
status.setLastScale(DATE_TIME_FORMATTER.format(Instant.now().atZone(ZoneOffset.UTC)));
}

public void setScaleable(boolean scalable) {
scalerMetrics.setScalable(scalable);
}

public void clearStatus() {
status.setLastScale(null);
status.setDryRunReplicas(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class ScalerMetrics {
private final AtomicInteger finalReplicaCount;
private final AtomicInteger dryRunReplicas;
private final AtomicLong lastScale;
private final AtomicInteger scalable;
private final Map<String, AtomicLong> triggerValueMetrics = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> triggerThresholdMetrics = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> triggerReplicaMetrics = new ConcurrentHashMap<>();
Expand All @@ -33,6 +34,7 @@ public ScalerMetrics(@NonNull String namespace, @NonNull String name) {
finalReplicaCount = Metrics.gauge("kpa_final_replica_count", tags, new AtomicInteger());
dryRunReplicas = Metrics.gauge("kpa_dry_run_replicas", tags, new AtomicInteger());
lastScale = Metrics.gauge("kpa_last_scale", tags, new AtomicLong());
scalable = Metrics.gauge("kpa_scaleable", tags, new AtomicInteger());
}

public static ScalerMetrics getOrCreate(String namespace, String name) {
Expand Down Expand Up @@ -64,6 +66,10 @@ public void setLastScale(long lastScale) {
this.lastScale.set(lastScale);
}

public void setScalable(boolean scalable) {
this.scalable.set(scalable ? 1 : 0);
}

public void setTriggerMetrics(TriggerResult result, int recommendedReplicas) {
triggerValueMetrics.computeIfAbsent(result.trigger().getType(),
type -> {
Expand Down

0 comments on commit a208b40

Please sign in to comment.