Skip to content

Commit

Permalink
Add SqlJob and Flink Streaming impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jul 26, 2024
1 parent 701ce75 commit b4e9a94
Show file tree
Hide file tree
Showing 31 changed files with 1,297 additions and 122 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ deploy-dev-environment:
deploy-samples: deploy
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
crds/kafkatopics.hoptimator.linkedin.com
crds/kafkatopics.hoptimator.linkedin.com \
crds/sqljobs.hoptimator.linkedin.com
kubectl apply -f ./deploy/samples

deploy-config:
Expand Down
4 changes: 2 additions & 2 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ metadata:
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["acls", "kafkatopics", "subscriptions"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
Expand Down
10 changes: 10 additions & 0 deletions deploy/samples/sqljobs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: SqlJob
metadata:
name: hello-world
spec:
dialect: Flink
executionMode: Streaming
sql:
- create table bh (text varchar) with ('connector' = 'blackhole');
- insert into bh values ('hello world');
87 changes: 87 additions & 0 deletions deploy/sqljobs.crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: sqljobs.hoptimator.linkedin.com
spec:
group: hoptimator.linkedin.com
names:
kind: SqlJob
listKind: SqlJobList
plural: sqljobs
singular: sqljob
shortNames:
- sql
- sj
preserveUnknownFields: false
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
description: Hoptimator generic SQL job
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
description: SQL job spec
type: object
properties:
sql:
description: SQL script the job should run.
type: array
items:
type: string
dialect:
description: Flink, etc.
type: string
enum:
- Flink
default: Flink
executionMode:
description: Streaming or Batch.
type: string
enum:
- Streaming
- Batch
default: Streaming
required:
- sql
status:
description: Filled in by the operator.
type: object
properties:
ready:
description: Whether the SqlJob is running or completed.
type: boolean
failed:
description: Whether the SqlJob has failed.
type: boolean
message:
description: Error or success message, for information only.
type: string
sql:
description: The SQL being implemented by this SqlJob.
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: DIALECT
type: string
description: SQL dialect.
jsonPath: .spec.dialect
- name: MODE
type: string
description: Execution mode.
jsonPath: .spec.executionMode
- name: STATUS
type: string
description: Job status.
jsonPath: .status.message

1 change: 1 addition & 0 deletions hoptimator-flink-adapter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {

dependencies {
implementation project(':hoptimator-catalog')
implementation project(':hoptimator-models')
implementation project(':hoptimator-operator')
implementation libs.kubernetesClient
implementation libs.kubernetesExtendedClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.linkedin.hoptimator.catalog.flink;

import com.linkedin.hoptimator.catalog.Resource;

public class FlinkStreamingSqlJob extends Resource {

public FlinkStreamingSqlJob(String namespace, String name, String sql) {
super("FlinkStreamingSqlJob");
export("namespace", namespace);
export("name", name);
export("sql", sql);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.linkedin.hoptimator.operator.flink;

import com.linkedin.hoptimator.models.V1alpha1SqlJob;
import com.linkedin.hoptimator.models.V1alpha1SqlJobList;
import com.linkedin.hoptimator.operator.ControllerProvider;
import com.linkedin.hoptimator.operator.Operator;

import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
import io.kubernetes.client.extended.controller.reconciler.Reconciler;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -16,7 +20,17 @@ public Collection<Controller> controllers(Operator operator) {
operator.registerApi("FlinkDeployment", "flinkdeployment", "flinkdeployments",
"flink.apache.org", "v1beta1");

// We don't need a controller
return Collections.emptyList();
operator.registerApi("SqlJob", "sqljob", "sqljobs",
"hoptimator.linkedin.com", "v1alpha1", V1alpha1SqlJob.class, V1alpha1SqlJobList.class);

Reconciler reconciler = new FlinkStreamingSqlJobReconciler(operator);
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(reconciler)
.withName("flink-streaming-sql-job-controller")
.withWorkerCount(1)
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1SqlJob.class, x).build())
.build();

return Collections.singleton(controller);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.linkedin.hoptimator.operator.flink;

import com.linkedin.hoptimator.catalog.Resource;
import com.linkedin.hoptimator.catalog.flink.FlinkStreamingSqlJob;
import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.models.V1alpha1SqlJob;
import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec.DialectEnum;
import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec.ExecutionModeEnum;
import com.linkedin.hoptimator.models.V1alpha1SqlJobStatus;

import io.kubernetes.client.extended.controller.reconciler.Reconciler;
import io.kubernetes.client.extended.controller.reconciler.Request;
import io.kubernetes.client.extended.controller.reconciler.Result;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;

/**
* Manifests streaming SqlJobs as Flink jobs.
*
*/
public class FlinkStreamingSqlJobReconciler implements Reconciler {
private final static Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class);
private final static String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob";

private final Operator operator;

public FlinkStreamingSqlJobReconciler(Operator operator) {
this.operator = operator;
}

@Override
public Result reconcile(Request request) {
log.info("Reconciling request {}", request);
String name = request.getName();
String namespace = request.getNamespace();

try {
V1alpha1SqlJob object = operator.<V1alpha1SqlJob>fetch(SQLJOB, namespace, name);

if (object == null) {
log.info("Object {}/{} deleted. Skipping.");
return new Result(false);
}

V1alpha1SqlJobStatus status = object.getStatus();
if (status == null) {
status = new V1alpha1SqlJobStatus();
object.setStatus(status);
}

List<String> sql = object.getSpec().getSql();
String script = sql.stream().collect(Collectors.joining(";\n"));

DialectEnum dialect = object.getSpec().getDialect();
if (!DialectEnum.FLINK.equals(dialect)) {
log.info("Not Flink SQL. Skipping.");
return new Result(false);
}

ExecutionModeEnum mode = object.getSpec().getExecutionMode();
if (!ExecutionModeEnum.STREAMING.equals(mode)) {
log.info("Not a streaming job. Skipping.");
return new Result(false);
}

Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(Resource.Environment.EMPTY);
Resource sqlJob = new FlinkStreamingSqlJob(namespace, name, script);
boolean allReady = true;
boolean anyFailed = false;
for (String yaml : sqlJob.render(templateFactory)) {
operator.apply(yaml, object);
if (!operator.isReady(yaml)) {
allReady = false;
}
if (operator.isFailed(yaml)) {
anyFailed = true;
allReady = false;
}
}

object.getStatus().setReady(allReady);
object.getStatus().setFailed(anyFailed);

if (allReady) {
object.getStatus().setMessage("Ready.");
}
if (anyFailed) {
object.getStatus().setMessage("Failed.");
}

operator.apiFor(SQLJOB).updateStatus(object, x -> object.getStatus())
.onFailure((x, y) -> log.error("Failed to update status of SqlJob {}: {}.", name, y.getMessage()))
.throwsApiException();

} catch (Exception e) {
log.error("Encountered exception while reconciling Flink streaming SqlJob {}/{}", namespace, name, e);
return new Result(true, operator.failureRetryDuration());
}
return new Result(false);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: {{namespace}}
name: {{name}}-flink-job
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{sql}}
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running

1 change: 1 addition & 0 deletions hoptimator-models/generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ docker run \
-u "$(pwd)/deploy/acls.crd.yaml" \
-u "$(pwd)/deploy/kafkatopics.crd.yaml" \
-u "$(pwd)/deploy/subscriptions.crd.yaml" \
-u "$(pwd)/deploy/sqljobs.crd.yaml" \
&& echo "done."
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Loading

0 comments on commit b4e9a94

Please sign in to comment.