Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2148] Add temporal workflow cancel support #4045

Merged
merged 9 commits into from
Oct 7, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public ExecuteGobblinJobLauncher(
public void submitJob(List<WorkUnit> workunits) {
try {
Properties finalProps = adjustJobProperties(this.jobProps);
// Initialize workflowId.
this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps)))
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps))
.setWorkflowId(this.workflowId)
.build();
ExecuteGobblinWorkflow workflow = this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ public GenerateWorkUnitsJobLauncher(
@Override
public void submitJob(List<WorkUnit> workunits) {
try {
this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)))
.setWorkflowId(this.workflowId)
.build();
GenerateWorkUnitsWorkflow workflow = this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ public void submitJob(List<WorkUnit> workunits) {
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));

this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
.setWorkflowId(this.workflowId)
.build();

Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -87,7 +91,7 @@ public abstract class GobblinJobLauncher extends AbstractJobLauncher {
protected final StateStores stateStores;
protected JobListener jobListener;
protected volatile boolean jobSubmitted = false;

private final ExecutorService executor;

public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap, EventBus eventbus)
Expand Down Expand Up @@ -122,6 +126,7 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
this.taskStateCollectorService =
new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter,
this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository());
this.executor = Executors.newSingleThreadExecutor();
}

@Override
Expand Down Expand Up @@ -150,17 +155,23 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();

Future<?> submitJobFuture = null;
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved
synchronized (this.cancellationRequest) {
if (!this.cancellationRequested) {
submitJob(workUnits);
submitJobFuture = executor.submit(() -> {
try {
submitJob(workUnits);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.info(String.format("Submitted job %s", this.jobContext.getJobId()));
this.jobSubmitted = true;
} else {
log.warn("Job {} not submitted as it was requested to be cancelled.", this.jobContext.getJobId());
}
}

waitJob();
waitJob(submitJobFuture);
log.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
// The last iteration of output TaskState collecting will run when the collector service gets stopped
Expand All @@ -172,7 +183,11 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
protected void submitJob(List<WorkUnit> workUnits) throws Exception {
}

protected void waitJob() throws InterruptedException {
protected void waitJob(Future<?> submitJobFuture)
throws InterruptedException, ExecutionException {
if (submitJobFuture != null) {
submitJobFuture.get();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowStub;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;

Expand Down Expand Up @@ -65,10 +72,13 @@
@Alpha
public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
private static final int TERMINATION_TIMEOUT_SECONDS = 3;
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved

protected WorkflowServiceStubs workflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
protected String namespace;
protected String workflowId;

public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
Expand All @@ -79,11 +89,13 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
this.workflowServiceStubs = createServiceInstance(connectionUri);

String namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
this.client = createClientInstance(workflowServiceStubs, namespace);

this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);

// non-null value indicates job has been submitted
this.workflowId = null;
startCancellationExecutor();
}

Expand Down Expand Up @@ -113,7 +125,56 @@ protected void handleLaunchFinalization() {

@Override
protected void executeCancellation() {
log.info("Cancel temporal workflow");
if (this.workflowId == null) {
log.info("Cancellation of temporal workflow attempted without submitting it");
return;
}

log.info("Cancelling temporal workflow {}", this.workflowId);
try {
WorkflowStub workflowStub = this.client.newUntypedWorkflowStub(this.workflowId);

// Describe the workflow execution to get its status
DescribeWorkflowExecutionRequest request = DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(this.namespace)
.setExecution(workflowStub.getExecution())
.build();
DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request);

WorkflowExecutionStatus status;
try {
status = response.getWorkflowExecutionInfo().getStatus();
} catch (Exception e) {
log.warn("Exception occurred while getting status of the workflow " + this.workflowId
+ ". We would still attempt the cancellation", e);
workflowStub.cancel();
log.info("Temporal workflow {} cancelled successfully", this.workflowId);
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// Check if the workflow is not finished
if (status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED &&
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved
status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED &&
status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED &&
status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) {
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved
workflowStub.cancel();
try {
// Check workflow status, if it is cancelled, will throw WorkflowFailedException else TimeoutException
workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS, String.class, String.class);
} catch (TimeoutException te) {
// Workflow is still running, terminate it.
log.info("Workflow is still running, will attempt termination", te);
workflowStub.terminate("Job cancel invoked");
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved
} catch (WorkflowFailedException wfe) {
// Do nothing as exception is expected.
}
log.info("Temporal workflow {} cancelled successfully", this.workflowId);
} else {
log.info("Workflow {} is already finished with status {}", this.workflowId, status);
}
} catch (Exception e) {
log.error("Exception occurred while cancelling the workflow " + this.workflowId, e);
}
}

/** No-op: merely logs a warning, since not expected to be invoked */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
LOGGER.info("No job schedule found, so running job " + jobUri);
GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics);
JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
launcher.cancelJob(listener);
} catch (JobException e) {
LOGGER.error("Failed to cancel the job during shutdown", e);
throw new RuntimeException(e);
abhishekmjain marked this conversation as resolved.
Show resolved Hide resolved
}
}));
launcher.launchJob(listener);
}
} catch (Exception je) {
Expand Down
Loading
Loading