From 2f955e704cc39fa92f995ec380b056accc2479bc Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Tue, 3 Sep 2024 17:24:59 +0530 Subject: [PATCH 1/7] Add temporal workflow cancel support --- .../launcher/ExecuteGobblinJobLauncher.java | 4 +- .../GenerateWorkUnitsJobLauncher.java | 3 +- .../launcher/ProcessWorkUnitsJobLauncher.java | 3 +- .../GobblinTemporalJobLauncher.java | 42 ++++- .../GobblinTemporalJobLauncherTest.java | 170 ++++++++++++++++++ .../org.mockito.plugins.MockMaker | 1 + 6 files changed, 218 insertions(+), 5 deletions(-) create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java create mode 100644 gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java index 6950e6a6781..53db2c95b08 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java @@ -74,9 +74,11 @@ public ExecuteGobblinJobLauncher( @Override public void submitJob(List workunits) { try { + // Initialize workflowId to be used by cancel workflow. + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps))) + .setWorkflowId(this.workflowId) .build(); ExecuteGobblinWorkflow workflow = this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java index 6ddc96a9778..bb2cb99d61e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java @@ -70,9 +70,10 @@ public GenerateWorkUnitsJobLauncher( @Override public void submitJob(List workunits) { try { + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps))) + .setWorkflowId(this.workflowId) .build(); GenerateWorkUnitsWorkflow workflow = this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java index fcd520714c6..e80da1f0f39 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -100,9 +100,10 @@ public void submitJob(List workunits) { GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX)); + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps))) + .setWorkflowId(this.workflowId) .build(); Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec))); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 82aeb8b20f0..6a07d9def29 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -25,7 +25,11 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Workflow; @@ -69,6 +73,8 @@ public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher { protected WorkflowServiceStubs workflowServiceStubs; protected WorkflowClient client; protected String queueName; + protected String namespace; + protected String workflowId; public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, List> metadataTags, ConcurrentHashMap runningMap, EventBus eventBus) @@ -79,11 +85,13 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING); this.workflowServiceStubs = createServiceInstance(connectionUri); - String namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); + this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); this.client = createClientInstance(workflowServiceStubs, namespace); this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); + // non-null value indicates job has been submitted + this.workflowId = null; startCancellationExecutor(); } @@ -113,7 +121,37 @@ protected void handleLaunchFinalization() { @Override protected void executeCancellation() { - log.info("Cancel temporal workflow"); + if (this.workflowId == null) { + log.info("Cancellation of temporal workflow attempted without submitting it"); + return; + } + + log.info("Cancelling temporal workflow {}", this.workflowId); + try { + WorkflowStub workflowStub = this.client.newUntypedWorkflowStub(this.workflowId); + + // Describe the workflow execution to get its status + DescribeWorkflowExecutionRequest request = DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace(this.namespace) + .setExecution(workflowStub.getExecution()) + .build(); + DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request); + + // Check if the workflow is not finished + WorkflowExecutionStatus status = response.getWorkflowExecutionInfo().getStatus(); + if (status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED && + status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED && + status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED && + status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) { + workflowStub.cancel(); + log.info("Temporal workflow {} cancelled successfully", this.workflowId); + } else { + log.info("Workflow {} is already finished with status {}", this.workflowId, status); + } + } + catch (Exception e) { + log.error("Exception occurred while cancelling the workflow " + this.workflowId, e); + } } /** No-op: merely logs a warning, since not expected to be invoked */ diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java new file mode 100644 index 00000000000..af356104818 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.joblauncher; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.fs.Path; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.io.Files; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.example.simplejson.SimpleJsonSource; +import org.apache.gobblin.runtime.locks.FileBasedJobLock; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory; +import org.apache.gobblin.util.JobLauncherUtils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class GobblinTemporalJobLauncherTest { + + private GobblinTemporalJobLauncher jobLauncher; + private MockedStatic mockWorkflowClientFactory; + private WorkflowServiceStubs mockServiceStubs; + private WorkflowClient mockClient; + private WorkflowStub mockStub; + private WorkflowExecutionInfo mockExecutionInfo; + private Properties jobProperties; + + class GobblinTemporalJobLauncherForTest extends GobblinTemporalJobLauncher { + public GobblinTemporalJobLauncherForTest(Properties jobProperties, Path appWorkDir) throws Exception { + super(jobProperties, appWorkDir, new ArrayList<>(), new ConcurrentHashMap<>(), null); + } + + @Override + protected void submitJob(List workUnits) + throws Exception { + this.workflowId = "someWorkflowId"; + } + } + + + @BeforeClass + public void setUp() throws Exception { + mockServiceStubs = mock(WorkflowServiceStubs.class); + mockClient = mock(WorkflowClient.class); + mockStub = mock(WorkflowStub.class); + mockExecutionInfo = mock(WorkflowExecutionInfo.class); + DescribeWorkflowExecutionResponse mockResponse = mock(DescribeWorkflowExecutionResponse.class); + WorkflowServiceGrpc.WorkflowServiceBlockingStub mockBlockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(mockServiceStubs.blockingStub()).thenReturn(mockBlockingStub); + when(mockBlockingStub.describeWorkflowExecution(Mockito.any())).thenReturn(mockResponse); + when(mockResponse.getWorkflowExecutionInfo()).thenReturn(mockExecutionInfo); + + mockWorkflowClientFactory = Mockito.mockStatic(TemporalWorkflowClientFactory.class); + mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString())) + .thenReturn(mockServiceStubs); + mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), Mockito.anyString())) + .thenReturn(mockClient); + when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub); + when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance()); + + jobProperties = new Properties(); + jobProperties.setProperty(ConfigurationKeys.FS_URI_KEY, "file:///"); + jobProperties.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING, "someConnString"); + jobProperties.setProperty(ConfigurationKeys.JOB_LOCK_TYPE, FileBasedJobLock.class.getName()); + jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, SimpleJsonSource.class.getName()); + } + + @BeforeMethod + public void methodSetUp() throws Exception { + File tmpDir = Files.createTempDir(); + String basePath = tmpDir.getAbsolutePath(); + Path appWorkDir = new Path(basePath, "testAppWorkDir"); + String jobLockDir = new Path(basePath, "jobLockDir").toString(); + String stateStoreDir = new Path(basePath, "stateStoreDir").toString(); + String jobName = "testJob"; + String jobId = JobLauncherUtils.newJobId(jobName); + jobProperties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName); + jobProperties.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId); + jobProperties.setProperty(FileBasedJobLock.JOB_LOCK_DIR, jobLockDir); + jobProperties.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, stateStoreDir); + + jobLauncher = new GobblinTemporalJobLauncherForTest(jobProperties, appWorkDir); + } + + @Test + public void testCancelWorkflowIfFailed() throws Exception { + // For workflowId to be generated + jobLauncher.submitJob(null); + + // Mock the workflow status to be failed + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(0)).cancel(); + } + + @Test + public void testCancelWorkflowIfCompleted() throws Exception { + // For workflowId to be generated + jobLauncher.submitJob(null); + + // Mock the workflow status to be completed + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(0)).cancel(); + } + + @Test + public void testCancelWorkflowIfRunning() throws Exception { + // Mock the workflow status to be running + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING); + + jobLauncher.executeCancellation(); + + // Verify that the cancel method was not called without job submission + verify(mockStub, times(0)).cancel(); + + jobLauncher.submitJob(null); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(1)).cancel(); + } +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000000..1f0955d450f --- /dev/null +++ b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline From f3f61d62f6466af7df1e5e813fd1a462d13ce7a0 Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Wed, 4 Sep 2024 16:47:14 +0530 Subject: [PATCH 2/7] Ignore exceptions while fetching workflow status --- .../joblauncher/GobblinTemporalJobLauncher.java | 13 ++++++++++++- .../joblauncher/GobblinTemporalJobLauncherTest.java | 12 ++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 6a07d9def29..5f3b27c5b06 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -137,8 +137,19 @@ protected void executeCancellation() { .build(); DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request); + WorkflowExecutionStatus status; + try { + status = response.getWorkflowExecutionInfo().getStatus(); + } + catch (Exception e) { + log.warn("Exception occurred while getting status of the workflow " + this.workflowId + + ". We would still attempt the cancellation", e); + workflowStub.cancel(); + log.info("Temporal workflow {} cancelled successfully", this.workflowId); + return; + } + // Check if the workflow is not finished - WorkflowExecutionStatus status = response.getWorkflowExecutionInfo().getStatus(); if (status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED && status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED && status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED && diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java index af356104818..85bd5a2bb03 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -167,4 +167,16 @@ public void testCancelWorkflowIfRunning() throws Exception { verify(mockStub, times(1)).cancel(); } + + @Test + public void testCancelWorkflowFetchStatusThrowsException() throws Exception { + // Mock the get workflow status to throw an exception + Mockito.doThrow(new RuntimeException("Some exception occurred")).when(mockExecutionInfo).getStatus(); + + jobLauncher.submitJob(null); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(1)).cancel(); + } } \ No newline at end of file From 39b48cbf72d08821beb1814c68c5bb0ffb8eb9e9 Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Thu, 5 Sep 2024 09:33:28 +0530 Subject: [PATCH 3/7] Update mock code for exception test --- .../joblauncher/GobblinTemporalJobLauncherTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java index 85bd5a2bb03..b45dafc6054 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -82,7 +82,6 @@ protected void submitJob(List workUnits) public void setUp() throws Exception { mockServiceStubs = mock(WorkflowServiceStubs.class); mockClient = mock(WorkflowClient.class); - mockStub = mock(WorkflowStub.class); mockExecutionInfo = mock(WorkflowExecutionInfo.class); DescribeWorkflowExecutionResponse mockResponse = mock(DescribeWorkflowExecutionResponse.class); WorkflowServiceGrpc.WorkflowServiceBlockingStub mockBlockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); @@ -95,8 +94,6 @@ public void setUp() throws Exception { .thenReturn(mockServiceStubs); mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), Mockito.anyString())) .thenReturn(mockClient); - when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub); - when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance()); jobProperties = new Properties(); jobProperties.setProperty(ConfigurationKeys.FS_URI_KEY, "file:///"); @@ -107,6 +104,10 @@ public void setUp() throws Exception { @BeforeMethod public void methodSetUp() throws Exception { + mockStub = mock(WorkflowStub.class); + when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub); + when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance()); + File tmpDir = Files.createTempDir(); String basePath = tmpDir.getAbsolutePath(); Path appWorkDir = new Path(basePath, "testAppWorkDir"); @@ -178,5 +179,7 @@ public void testCancelWorkflowFetchStatusThrowsException() throws Exception { jobLauncher.executeCancellation(); verify(mockStub, times(1)).cancel(); + + Mockito.reset(mockExecutionInfo); } } \ No newline at end of file From f5c0cd608cda5fbd1d74949dd853e78f3c368bef Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Wed, 18 Sep 2024 11:04:52 +0530 Subject: [PATCH 4/7] Add shutdown hook and terminate after 3 seconds of cancel --- .../joblauncher/GobblinJobLauncher.java | 25 +++++++++++++++---- .../GobblinTemporalJobLauncher.java | 18 ++++++++++++- .../GobblinTemporalJobScheduler.java | 7 ++++++ 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index 12d8861ecdf..ea2c2ce7c2e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -22,6 +22,10 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -87,7 +91,7 @@ public abstract class GobblinJobLauncher extends AbstractJobLauncher { protected final StateStores stateStores; protected JobListener jobListener; protected volatile boolean jobSubmitted = false; - + private final ExecutorService executor; public GobblinJobLauncher(Properties jobProps, Path appWorkDir, List> metadataTags, ConcurrentHashMap runningMap, EventBus eventbus) @@ -122,6 +126,7 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir, this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter, this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository()); + this.executor = Executors.newSingleThreadExecutor(); } @Override @@ -150,17 +155,23 @@ protected void runWorkUnits(List workUnits) throws Exception { // Start the output TaskState collector service this.taskStateCollectorService.startAsync().awaitRunning(); + Future submitJobFuture = null; synchronized (this.cancellationRequest) { if (!this.cancellationRequested) { - submitJob(workUnits); + submitJobFuture = executor.submit(() -> { + try { + submitJob(workUnits); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); log.info(String.format("Submitted job %s", this.jobContext.getJobId())); this.jobSubmitted = true; } else { log.warn("Job {} not submitted as it was requested to be cancelled.", this.jobContext.getJobId()); } } - - waitJob(); + waitJob(submitJobFuture); log.info(String.format("Job %s completed", this.jobContext.getJobId())); } finally { // The last iteration of output TaskState collecting will run when the collector service gets stopped @@ -172,7 +183,11 @@ protected void runWorkUnits(List workUnits) throws Exception { protected void submitJob(List workUnits) throws Exception { } - protected void waitJob() throws InterruptedException { + protected void waitJob(Future submitJobFuture) + throws InterruptedException, ExecutionException { + if (submitJobFuture != null) { + submitJobFuture.get(); + } } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 5f3b27c5b06..30dc38da07e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; @@ -29,6 +31,7 @@ import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Workflow; @@ -148,13 +151,26 @@ protected void executeCancellation() { log.info("Temporal workflow {} cancelled successfully", this.workflowId); return; } - + // Check if the workflow is not finished if (status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED && status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED && status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED && status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) { workflowStub.cancel(); + try { + // Check workflow status, if it is cancelled, will throw WorkflowFailedException else TimeoutException + workflowStub.getResult(3, TimeUnit.SECONDS, String.class, String.class); + } + catch (TimeoutException te) { + // Workflow is still running, terminate it. + log.info("Workflow is still running, will attempt termination", te); + workflowStub.terminate("Job cancel invoked"); + } + catch (WorkflowFailedException wfe) { + // Do nothing as exception is expected. + log.info("Workflow cancellation successful", wfe); + } log.info("Temporal workflow {} cancelled successfully", this.workflowId); } else { log.info("Workflow {} is already finished with status {}", this.workflowId, status); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index 76629aa68dc..b0c50c62310 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -210,6 +210,13 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { LOGGER.info("No job schedule found, so running job " + jobUri); GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics); JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig()); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + launcher.cancelJob(listener); + } catch (JobException e) { + throw new RuntimeException(e); + } + })); launcher.launchJob(listener); } } catch (Exception je) { From 20f3ed92c02eb2618649d252aa8eb267d184ef04 Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Mon, 30 Sep 2024 18:03:39 +0530 Subject: [PATCH 5/7] resolve comments --- .../launcher/ExecuteGobblinJobLauncher.java | 2 +- .../GobblinTemporalJobLauncher.java | 3 ++- .../GobblinTemporalJobScheduler.java | 1 + .../GobblinTemporalJobLauncherTest.java | 23 +++++++++++++++++++ 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java index 6f7eae1d4fe..3b01b18fc99 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java @@ -84,7 +84,7 @@ public void submitJob(List workunits) { try { Properties finalProps = adjustJobProperties(this.jobProps); // Initialize workflowId to be used by cancel workflow. - this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)); + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) .setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps)) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 30dc38da07e..ce7c413d4b3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -72,6 +72,7 @@ @Alpha public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher { private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class); + private static final int TERMINATION_TIMEOUT_SECONDS = 3; protected WorkflowServiceStubs workflowServiceStubs; protected WorkflowClient client; @@ -160,7 +161,7 @@ protected void executeCancellation() { workflowStub.cancel(); try { // Check workflow status, if it is cancelled, will throw WorkflowFailedException else TimeoutException - workflowStub.getResult(3, TimeUnit.SECONDS, String.class, String.class); + workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS, String.class, String.class); } catch (TimeoutException te) { // Workflow is still running, terminate it. diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index b0c50c62310..34a1cec4dc2 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -214,6 +214,7 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { try { launcher.cancelJob(listener); } catch (JobException e) { + LOGGER.error("Failed to cancel the job during shutdown", e); throw new RuntimeException(e); } })); diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java index b45dafc6054..8d399e5b930 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -22,10 +22,15 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.Path; +import org.mockito.AdditionalAnswers; import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; +import org.mockito.internal.stubbing.answers.Returns; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -38,6 +43,7 @@ import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -182,4 +188,21 @@ public void testCancelWorkflowFetchStatusThrowsException() throws Exception { Mockito.reset(mockExecutionInfo); } + + @Test + public void testTerminateWorkflow() throws Exception { + // Mock the workflow status to be running + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING); + + // Mock getResult to throw TimeoutException + Mockito.doThrow(new TimeoutException("Workflow still in running")) + .when(mockStub).getResult(3L, TimeUnit.SECONDS, String.class, String.class); + + jobLauncher.submitJob(null); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(1)).terminate("Job cancel invoked"); + } } \ No newline at end of file From 50b9a2ee9b46844c40b61f2fcbc5c4112af70996 Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Wed, 2 Oct 2024 23:12:11 +0530 Subject: [PATCH 6/7] resolve comments --- .../ddm/launcher/ExecuteGobblinJobLauncher.java | 2 +- .../joblauncher/GobblinTemporalJobLauncher.java | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java index 3b01b18fc99..b2f65ccb410 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java @@ -83,7 +83,7 @@ public ExecuteGobblinJobLauncher( public void submitJob(List workunits) { try { Properties finalProps = adjustJobProperties(this.jobProps); - // Initialize workflowId to be used by cancel workflow. + // Initialize workflowId. this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index ce7c413d4b3..2d17fe20a30 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -144,8 +144,7 @@ protected void executeCancellation() { WorkflowExecutionStatus status; try { status = response.getWorkflowExecutionInfo().getStatus(); - } - catch (Exception e) { + } catch (Exception e) { log.warn("Exception occurred while getting status of the workflow " + this.workflowId + ". We would still attempt the cancellation", e); workflowStub.cancel(); @@ -162,22 +161,18 @@ protected void executeCancellation() { try { // Check workflow status, if it is cancelled, will throw WorkflowFailedException else TimeoutException workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS, String.class, String.class); - } - catch (TimeoutException te) { + } catch (TimeoutException te) { // Workflow is still running, terminate it. log.info("Workflow is still running, will attempt termination", te); workflowStub.terminate("Job cancel invoked"); - } - catch (WorkflowFailedException wfe) { + } catch (WorkflowFailedException wfe) { // Do nothing as exception is expected. - log.info("Workflow cancellation successful", wfe); } log.info("Temporal workflow {} cancelled successfully", this.workflowId); } else { log.info("Workflow {} is already finished with status {}", this.workflowId, status); } - } - catch (Exception e) { + } catch (Exception e) { log.error("Exception occurred while cancelling the workflow " + this.workflowId, e); } } From 0f127f5221b7a80755f8154341c209724e5824b8 Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Mon, 7 Oct 2024 22:48:10 +0530 Subject: [PATCH 7/7] remove unused imports --- .../temporal/joblauncher/GobblinTemporalJobLauncherTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java index 8d399e5b930..98d0379b651 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -26,11 +26,8 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.Path; -import org.mockito.AdditionalAnswers; import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.mockito.internal.stubbing.answers.AnswersWithDelay; -import org.mockito.internal.stubbing.answers.Returns; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -43,7 +40,6 @@ import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs;