diff --git a/kustomize/crd/kafkapodautoscaler.brandwatch.com-v1alpha1.yaml b/kustomize/crd/kafkapodautoscaler.brandwatch.com-v1alpha1.yaml index 3085da7..7391d0e 100644 --- a/kustomize/crd/kafkapodautoscaler.brandwatch.com-v1alpha1.yaml +++ b/kustomize/crd/kafkapodautoscaler.brandwatch.com-v1alpha1.yaml @@ -27,8 +27,11 @@ spec: name: Replicas priority: 0 type: integer - - description: The date and time that this message was added - jsonPath: .status.lastScale + - jsonPath: .status.lastNotReady + name: Last Not Ready + priority: 0 + type: string + - jsonPath: .status.lastScale name: Last Scale priority: 0 type: string @@ -98,8 +101,12 @@ spec: type: integer finalReplicaCount: type: integer + lastNotReady: + description: The date and time that the object was last seen as not + ready + type: string lastScale: - description: The date and time that this message was added + description: The date and time that the last scale happened type: string message: type: string diff --git a/src/main/java/com/brandwatch/kafka_pod_autoscaler/KafkaPodAutoscalerReconciler.java b/src/main/java/com/brandwatch/kafka_pod_autoscaler/KafkaPodAutoscalerReconciler.java index 958b348..40dae40 100644 --- a/src/main/java/com/brandwatch/kafka_pod_autoscaler/KafkaPodAutoscalerReconciler.java +++ b/src/main/java/com/brandwatch/kafka_pod_autoscaler/KafkaPodAutoscalerReconciler.java @@ -86,6 +86,7 @@ public UpdateControl reconcile(KafkaPodAutoscaler kafkaPodAu if (!resource.isReady()) { statusLogger.setScaleable(false); + statusLogger.recordNotReady(); statusLogger.log(targetKind + " is not ready. Skipping scale"); return UpdateControl.patchStatus(kafkaPodAutoscaler) .rescheduleAfter(Duration.ofSeconds(10)); @@ -119,10 +120,14 @@ public UpdateControl reconcile(KafkaPodAutoscaler kafkaPodAu statusLogger.recordCalculatedReplicaCount(calculatedReplicaCount); statusLogger.recordFinalReplicaCount(finalReplicaCount); - if (recentlyScaled(kafkaPodAutoscaler, statusLogger)) { + var scaledRecently = recentlyScaled(kafkaPodAutoscaler, statusLogger); + var notReadyRecently = recentlyNotReady(kafkaPodAutoscaler, statusLogger); + if (scaledRecently || notReadyRecently) { if (currentReplicaCount == finalReplicaCount) { // Reset to the correct message here statusLogger.log(targetKind + " is correctly scaled to " + finalReplicaCount + " replicas"); + } else if (notReadyRecently) { + statusLogger.log(targetKind + " has been not-ready recently. Skipping scale"); } else { statusLogger.log(targetKind + " has been scaled recently. Skipping scale"); } @@ -156,10 +161,12 @@ public UpdateControl reconcile(KafkaPodAutoscaler kafkaPodAu private boolean recentlyScaled(KafkaPodAutoscaler kafkaPodAutoscaler, StatusLogger statusLogger) { var rescaleWindow = Instant.now(clock).minus(Duration.ofSeconds(kafkaPodAutoscaler.getSpec().getCooloffSeconds())); - if (statusLogger.getLastScale() != null && statusLogger.getLastScale().isAfter(rescaleWindow)) { - return true; - } - return false; + return statusLogger.getLastScale() != null && statusLogger.getLastScale().isAfter(rescaleWindow); + } + + private boolean recentlyNotReady(KafkaPodAutoscaler kafkaPodAutoscaler, StatusLogger statusLogger) { + var rescaleWindow = Instant.now(clock).minus(Duration.ofSeconds(kafkaPodAutoscaler.getSpec().getCooloffSeconds())); + return statusLogger.getLastNotReady() != null && statusLogger.getLastNotReady().isAfter(rescaleWindow); } private ScaledResource getScaledResource(KubernetesClient client, String namespace, ScaleTargetRef scaleTargetRef) { @@ -263,7 +270,9 @@ static class StatusLogger { private final Clock clock; private final String name; @Getter - private final Instant lastScale; + private Instant lastNotReady; + @Getter + private Instant lastScale; private final KafkaPodAutoscalerStatus status; private final ScalerMetrics scalerMetrics; @@ -272,11 +281,16 @@ public StatusLogger(KubernetesClient client, KafkaPodAutoscaler kafkaPodAutoscal this.kafkaPodAutoscaler = kafkaPodAutoscaler; this.clock = clock; this.name = kafkaPodAutoscaler.getMetadata().getName(); + this.lastNotReady = Optional.ofNullable(kafkaPodAutoscaler.getStatus()) + .map(KafkaPodAutoscalerStatus::getLastNotReady) + .map(DATE_TIME_FORMATTER::parse) + .map(Instant::from) + .orElse(null); this.lastScale = Optional.ofNullable(kafkaPodAutoscaler.getStatus()) - .map(KafkaPodAutoscalerStatus::getLastScale) - .map(DATE_TIME_FORMATTER::parse) - .map(Instant::from) - .orElse(null); + .map(KafkaPodAutoscalerStatus::getLastScale) + .map(DATE_TIME_FORMATTER::parse) + .map(Instant::from) + .orElse(null); this.scalerMetrics = ScalerMetrics.getOrCreate(kafkaPodAutoscaler); this.status = Optional.ofNullable(kafkaPodAutoscaler.getStatus()) .orElseGet(KafkaPodAutoscalerStatus::new); @@ -360,7 +374,14 @@ public void setDryRunReplicas(Integer dryRunReplicas) { status.setDryRunReplicas(dryRunReplicas); } + public void recordNotReady() { + lastNotReady = Instant.now(clock); + scalerMetrics.setNotReady(lastNotReady.toEpochMilli()); + status.setLastNotReady(DATE_TIME_FORMATTER.format(lastNotReady.atZone(ZoneOffset.UTC))); + } + public void recordLastScale() { + lastScale = Instant.now(clock); scalerMetrics.setLastScale(Instant.now(clock).toEpochMilli()); status.setLastScale(DATE_TIME_FORMATTER.format(Instant.now(clock).atZone(ZoneOffset.UTC))); } diff --git a/src/main/java/com/brandwatch/kafka_pod_autoscaler/metrics/ScalerMetrics.java b/src/main/java/com/brandwatch/kafka_pod_autoscaler/metrics/ScalerMetrics.java index 77ee234..99d6ea2 100644 --- a/src/main/java/com/brandwatch/kafka_pod_autoscaler/metrics/ScalerMetrics.java +++ b/src/main/java/com/brandwatch/kafka_pod_autoscaler/metrics/ScalerMetrics.java @@ -23,6 +23,7 @@ public class ScalerMetrics { private final AtomicInteger calculatedReplicaCount; private final AtomicInteger finalReplicaCount; private final AtomicInteger dryRunReplicas; + private final AtomicLong notReady; private final AtomicLong lastScale; private final AtomicInteger scalable; private final Map triggerValueMetrics = new ConcurrentHashMap<>(); @@ -45,6 +46,7 @@ public ScalerMetrics(@NonNull KafkaPodAutoscaler kpa) { calculatedReplicaCount = Metrics.gauge("kpa_calculated_replica_count", tags, new AtomicInteger()); finalReplicaCount = Metrics.gauge("kpa_final_replica_count", tags, new AtomicInteger()); dryRunReplicas = Metrics.gauge("kpa_dry_run_replicas", tags, new AtomicInteger()); + notReady = Metrics.gauge("kpa_not_ready", tags, new AtomicLong()); lastScale = Metrics.gauge("kpa_last_scale", tags, new AtomicLong()); scalable = Metrics.gauge("kpa_scaleable", tags, new AtomicInteger()); } @@ -74,6 +76,10 @@ public void setDryRunReplicas(Integer dryRunReplicas) { this.dryRunReplicas.set(Optional.ofNullable(dryRunReplicas).orElse(-1)); } + public void setNotReady(long notReady) { + this.notReady.set(notReady); + } + public void setLastScale(long lastScale) { this.lastScale.set(lastScale); } diff --git a/src/main/java/com/brandwatch/kafka_pod_autoscaler/v1alpha1/KafkaPodAutoscalerStatus.java b/src/main/java/com/brandwatch/kafka_pod_autoscaler/v1alpha1/KafkaPodAutoscalerStatus.java index 5851630..a622a0e 100644 --- a/src/main/java/com/brandwatch/kafka_pod_autoscaler/v1alpha1/KafkaPodAutoscalerStatus.java +++ b/src/main/java/com/brandwatch/kafka_pod_autoscaler/v1alpha1/KafkaPodAutoscalerStatus.java @@ -44,13 +44,17 @@ public class KafkaPodAutoscalerStatus implements KubernetesResource { @JsonProperty("finalReplicaCount") @JsonSetter(nulls = Nulls.SKIP) private Integer finalReplicaCount; - /** - * The date and time that this message was added - */ + @Getter + @Setter + @JsonProperty("lastNotReady") + @JsonPropertyDescription("The date and time that the object was last seen as not ready") + @JsonSetter(nulls = Nulls.SKIP) + @PrinterColumn(name = "Last Not Ready") + private String lastNotReady; @Getter @Setter @JsonProperty("lastScale") - @JsonPropertyDescription("The date and time that this message was added") + @JsonPropertyDescription("The date and time that the last scale happened") @JsonSetter(nulls = Nulls.SKIP) @PrinterColumn(name = "Last Scale") private String lastScale;