From 927057e621391d7a2c3610cb7feb6dba68bb9b40 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Fri, 9 Feb 2024 14:00:32 -0600 Subject: [PATCH] Gather attributes of downstream resources (#61) --- deploy/subscriptions.crd.yaml | 17 ++- .../operator/kafka/KafkaTopicReconciler.java | 12 ++ .../hoptimator/models/V1alpha1Acl.java | 2 +- .../hoptimator/models/V1alpha1AclList.java | 2 +- .../hoptimator/models/V1alpha1AclSpec.java | 2 +- .../models/V1alpha1AclSpecResource.java | 2 +- .../hoptimator/models/V1alpha1AclStatus.java | 2 +- .../hoptimator/models/V1alpha1KafkaTopic.java | 2 +- .../models/V1alpha1KafkaTopicList.java | 2 +- .../models/V1alpha1KafkaTopicSpec.java | 2 +- .../V1alpha1KafkaTopicSpecClientConfigs.java | 2 +- .../V1alpha1KafkaTopicSpecConfigMapRef.java | 2 +- .../models/V1alpha1KafkaTopicStatus.java | 2 +- .../models/V1alpha1Subscription.java | 2 +- .../models/V1alpha1SubscriptionList.java | 2 +- .../models/V1alpha1SubscriptionSpec.java | 2 +- .../models/V1alpha1SubscriptionStatus.java | 121 +++++++++++++++++- .../subscription/SubscriptionReconciler.java | 66 +++++++++- 18 files changed, 223 insertions(+), 21 deletions(-) diff --git a/deploy/subscriptions.crd.yaml b/deploy/subscriptions.crd.yaml index 1ab8a0d..67943d9 100644 --- a/deploy/subscriptions.crd.yaml +++ b/deploy/subscriptions.crd.yaml @@ -68,8 +68,23 @@ spec: type: object additionalProperties: type: string + attributes: + description: Physical attributes of the job and sink/output table. + type: object + additionalProperties: + type: string resources: - description: The YAML generated to implement this pipeline. + description: The yaml generated to implement this pipeline. + type: array + items: + type: string + jobResources: + description: The yaml generated to implement the job. + type: array + items: + type: string + downstreamResources: + description: The yaml generated to implement the sink/output table. type: array items: type: string diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java index 4b88b8c..c6e2f0e 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java @@ -3,6 +3,7 @@ import com.linkedin.hoptimator.operator.Operator; import com.linkedin.hoptimator.operator.ConfigAssembler; import com.linkedin.hoptimator.models.V1alpha1KafkaTopic; +import com.linkedin.hoptimator.models.V1alpha1KafkaTopicStatus; import io.kubernetes.client.extended.controller.reconciler.Reconciler; import io.kubernetes.client.extended.controller.reconciler.Request; @@ -52,6 +53,10 @@ public Result reconcile(Request request) { return new Result(false); } + if (object.getStatus() == null) { + object.setStatus(new V1alpha1KafkaTopicStatus()); + } + String topicName = object.getSpec().getTopicName(); Integer desiredPartitions = object.getSpec().getNumPartitions(); Integer desiredReplicationFactor = object.getSpec().getReplicationFactor(); @@ -72,22 +77,29 @@ public Result reconcile(Request request) { log.info("Found existing topic {}", topicName); int actualPartitions = topicDescription.partitions().size(); + object.getStatus().setNumPartitions(actualPartitions); if (desiredPartitions != null && desiredPartitions > actualPartitions) { log.info("Desired partitions {} > actual partitions {}. Creating additional partitions.", desiredPartitions, actualPartitions); admin.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo(desiredPartitions))).all().get(); + object.getStatus().setNumPartitions(desiredPartitions); } } catch(ExecutionException e) { if (e.getCause() instanceof UnknownTopicOrPartitionException ) { log.info("No existing topic {}. Will create it.", topicName); admin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.ofNullable(desiredPartitions), Optional.ofNullable(desiredReplicationFactor).map(x -> x.shortValue())))).all().get(); + object.getStatus().setNumPartitions(desiredPartitions); } else { throw e; } } finally { admin.close(); } + + operator.apiFor(KAFKATOPIC).updateStatus(object, x -> object.getStatus()) + .onFailure((x, y) -> log.error("Failed to update status of KafkaTopic {}/{}: {}.", namespace, name, + y.getMessage())); } catch (Exception e) { log.error("Encountered exception while reconciling KafkaTopic {}/{}", namespace, name, e); return new Result(true, operator.failureRetryDuration()); diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java index cb336b9..9df4d51 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java @@ -31,7 +31,7 @@ * Access control rule (colloquially, an Acl) */ @ApiModel(description = "Access control rule (colloquially, an Acl)") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java index 16df131..8334649 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java @@ -32,7 +32,7 @@ * AclList is a list of Acl */ @ApiModel(description = "AclList is a list of Acl") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java index 7cd96fa..9ee42fc 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java @@ -29,7 +29,7 @@ * A set of related ACL rules. */ @ApiModel(description = "A set of related ACL rules.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1AclSpec { /** * The resource access method. diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java index 7a2659d..522ed41 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java @@ -28,7 +28,7 @@ * The resource being controlled. */ @ApiModel(description = "The resource being controlled.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1AclSpecResource { public static final String SERIALIZED_NAME_KIND = "kind"; @SerializedName(SERIALIZED_NAME_KIND) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java index a0de51e..3d083b8 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java @@ -28,7 +28,7 @@ * Status, as set by the operator. */ @ApiModel(description = "Status, as set by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1AclStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java index 68464b7..61a5b24 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java @@ -31,7 +31,7 @@ * Kafka Topic */ @ApiModel(description = "Kafka Topic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java index b9442b9..0af5478 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java @@ -32,7 +32,7 @@ * KafkaTopicList is a list of KafkaTopic */ @ApiModel(description = "KafkaTopicList is a list of KafkaTopic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java index 937939b..74f2b5c 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java @@ -33,7 +33,7 @@ * Desired Kafka topic configuration. */ @ApiModel(description = "Desired Kafka topic configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1KafkaTopicSpec { public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs"; @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java index 62f99e3..5081585 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java @@ -28,7 +28,7 @@ /** * V1alpha1KafkaTopicSpecClientConfigs */ -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecClientConfigs { public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef"; @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java index 8af64e1..7c8b924 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java @@ -28,7 +28,7 @@ * Reference to a ConfigMap to use for AdminClient configuration. */ @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecConfigMapRef { public static final String SERIALIZED_NAME_NAME = "name"; @SerializedName(SERIALIZED_NAME_NAME) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java index 3fb8eb5..f29d602 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java @@ -28,7 +28,7 @@ * Current state of the topic. */ @ApiModel(description = "Current state of the topic.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1KafkaTopicStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java index 3f95148..4da7849 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java index b8d2538..2d5f0eb 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java index 501afd3..d55e49f 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java @@ -31,7 +31,7 @@ * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java index 4ebef0b..8ed4ef4 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java @@ -32,8 +32,16 @@ * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { + public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes"; + @SerializedName(SERIALIZED_NAME_ATTRIBUTES) + private Map attributes = null; + + public static final String SERIALIZED_NAME_DOWNSTREAM_RESOURCES = "downstreamResources"; + @SerializedName(SERIALIZED_NAME_DOWNSTREAM_RESOURCES) + private List downstreamResources = null; + public static final String SERIALIZED_NAME_FAILED = "failed"; @SerializedName(SERIALIZED_NAME_FAILED) private Boolean failed; @@ -42,6 +50,10 @@ public class V1alpha1SubscriptionStatus { @SerializedName(SERIALIZED_NAME_HINTS) private Map hints = null; + public static final String SERIALIZED_NAME_JOB_RESOURCES = "jobResources"; + @SerializedName(SERIALIZED_NAME_JOB_RESOURCES) + private List jobResources = null; + public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) private String message; @@ -59,6 +71,68 @@ public class V1alpha1SubscriptionStatus { private String sql; + public V1alpha1SubscriptionStatus attributes(Map attributes) { + + this.attributes = attributes; + return this; + } + + public V1alpha1SubscriptionStatus putAttributesItem(String key, String attributesItem) { + if (this.attributes == null) { + this.attributes = new HashMap<>(); + } + this.attributes.put(key, attributesItem); + return this; + } + + /** + * Physical attributes of the job and sink/output table. + * @return attributes + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Physical attributes of the job and sink/output table.") + + public Map getAttributes() { + return attributes; + } + + + public void setAttributes(Map attributes) { + this.attributes = attributes; + } + + + public V1alpha1SubscriptionStatus downstreamResources(List downstreamResources) { + + this.downstreamResources = downstreamResources; + return this; + } + + public V1alpha1SubscriptionStatus addDownstreamResourcesItem(String downstreamResourcesItem) { + if (this.downstreamResources == null) { + this.downstreamResources = new ArrayList<>(); + } + this.downstreamResources.add(downstreamResourcesItem); + return this; + } + + /** + * The yaml generated to implement the sink/output table. + * @return downstreamResources + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "The yaml generated to implement the sink/output table.") + + public List getDownstreamResources() { + return downstreamResources; + } + + + public void setDownstreamResources(List downstreamResources) { + this.downstreamResources = downstreamResources; + } + + public V1alpha1SubscriptionStatus failed(Boolean failed) { this.failed = failed; @@ -113,6 +187,37 @@ public void setHints(Map hints) { } + public V1alpha1SubscriptionStatus jobResources(List jobResources) { + + this.jobResources = jobResources; + return this; + } + + public V1alpha1SubscriptionStatus addJobResourcesItem(String jobResourcesItem) { + if (this.jobResources == null) { + this.jobResources = new ArrayList<>(); + } + this.jobResources.add(jobResourcesItem); + return this; + } + + /** + * The yaml generated to implement the job. + * @return jobResources + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "The yaml generated to implement the job.") + + public List getJobResources() { + return jobResources; + } + + + public void setJobResources(List jobResources) { + this.jobResources = jobResources; + } + + public V1alpha1SubscriptionStatus message(String message) { this.message = message; @@ -174,11 +279,11 @@ public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) { } /** - * The YAML generated to implement this pipeline. + * The yaml generated to implement this pipeline. * @return resources **/ @javax.annotation.Nullable - @ApiModelProperty(value = "The YAML generated to implement this pipeline.") + @ApiModelProperty(value = "The yaml generated to implement this pipeline.") public List getResources() { return resources; @@ -222,8 +327,11 @@ public boolean equals(Object o) { return false; } V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o; - return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) && + return Objects.equals(this.attributes, v1alpha1SubscriptionStatus.attributes) && + Objects.equals(this.downstreamResources, v1alpha1SubscriptionStatus.downstreamResources) && + Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) && Objects.equals(this.hints, v1alpha1SubscriptionStatus.hints) && + Objects.equals(this.jobResources, v1alpha1SubscriptionStatus.jobResources) && Objects.equals(this.message, v1alpha1SubscriptionStatus.message) && Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) && Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) && @@ -232,7 +340,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(failed, hints, message, ready, resources, sql); + return Objects.hash(attributes, downstreamResources, failed, hints, jobResources, message, ready, resources, sql); } @@ -240,8 +348,11 @@ public int hashCode() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("class V1alpha1SubscriptionStatus {\n"); + sb.append(" attributes: ").append(toIndentedString(attributes)).append("\n"); + sb.append(" downstreamResources: ").append(toIndentedString(downstreamResources)).append("\n"); sb.append(" failed: ").append(toIndentedString(failed)).append("\n"); sb.append(" hints: ").append(toIndentedString(hints)).append("\n"); + sb.append(" jobResources: ").append(toIndentedString(jobResources)).append("\n"); sb.append(" message: ").append(toIndentedString(message)).append("\n"); sb.append(" ready: ").append(toIndentedString(ready)).append("\n"); sb.append(" resources: ").append(toIndentedString(resources)).append("\n"); diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index 1db3931..592ee81 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -38,6 +38,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; +import java.util.stream.Stream; public class SubscriptionReconciler implements Reconciler { private final static Logger log = LoggerFactory.getLogger(SubscriptionReconciler.class); @@ -84,6 +85,14 @@ public Result reconcile(Request request) { object.getSpec().setHints(new HashMap<>()); } + if (status.getJobResources() == null) { + status.setJobResources(Collections.emptyList()); + } + + if (status.getDownstreamResources() == null) { + status.setDownstreamResources(Collections.emptyList()); + } + // We deploy in three phases: // 1. Plan a pipeline, and write the plan to Status. // 2. Deploy the pipeline per plan. @@ -123,7 +132,9 @@ public Result reconcile(Request request) { combined.addAll(sqlJob); combined.addAll(downstreamResources); - status.setResources(combined); + status.setResources(combined); + status.setJobResources(new ArrayList<>(sqlJob)); + status.setDownstreamResources(downstreamResources); status.setSql(object.getSpec().getSql()); status.setHints(object.getSpec().getHints()); @@ -172,6 +183,11 @@ public Result reconcile(Request request) { } } + status.setAttributes(Stream.concat(status.getJobResources().stream(), status.getDownstreamResources().stream()) + .map(x -> fetchAttributes(x)) + .flatMap(x -> x.entrySet().stream()) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue(), (x, y) -> y))); + operator.apiFor(SUBSCRIPTION).updateStatus(object, x -> object.getStatus()) .onFailure((x, y) -> log.error("Failed to update status of {}/{}: {}.", kind, name, y.getMessage())); } catch (Exception e) { @@ -303,6 +319,54 @@ private static boolean diverged(V1alpha1SubscriptionSpec spec, V1alpha1Subscript || !status.getHints().equals(spec.getHints()); } + // Fetch attributes from downstream controllers + private Map fetchAttributes(String yaml) { + DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); + String namespace = obj.getMetadata().getNamespace(); + String name = obj.getMetadata().getName(); + String kind = obj.getKind(); + try { + KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); + existing.onFailure((code, status) -> log.info("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); + if (!existing.isSuccess()) { + return Collections.emptyMap(); + } else { + return guessAttributes(existing.getObject()); + } + } catch (Exception e) { + return Collections.emptyMap(); + } + } + + private static Map guessAttributes(DynamicKubernetesObject obj) { + // We make a best effort to guess the attributes of the dynamic object. + if (obj == null || obj.getRaw() == null) { + return Collections.emptyMap(); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("attributes").getAsJsonObject().entrySet().stream() + .filter(x -> x.getValue().isJsonPrimitive()) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().getAsString())); + } catch (Exception e) { + log.debug("Exception looking for .status.attributes. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("jobStatus").getAsJsonObject().entrySet().stream() + .filter(x -> x.getValue().isJsonPrimitive()) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().getAsString())); + } catch (Exception e) { + log.debug("Exception looking for .status.jobStatus. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().entrySet().stream() + .filter(x -> x.getValue().isJsonPrimitive()) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue().getAsString())); + } catch (Exception e) { + log.debug("Exception looking for .status. Swallowing.", e); + } + return Collections.emptyMap(); + } + public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory, Resource.Environment environment) { Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory, environment); return ControllerBuilder.defaultBuilder(operator.informerFactory())