From 19ea1b66be1727d3d3632162e03c072ff7b98fb6 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 21 Jan 2022 15:02:15 -0800 Subject: [PATCH] Update the cancellation (#9705) --- .../scheduling/ConnectionManagerWorkflowImpl.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 058bb927188d..d07189309050 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -167,6 +167,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } else if (workflowState.isCancelled()) { jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput( maybeJobId.get())); + resetNewConnectionInput(connectionUpdaterInput); } else if (workflowState.isFailed()) { reportFailure(connectionUpdaterInput); } else { @@ -188,9 +189,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) maybeAttemptId.get(), standardSyncOutput.orElse(null))); - connectionUpdaterInput.setJobId(null); - connectionUpdaterInput.setAttemptNumber(1); - connectionUpdaterInput.setFromFailure(false); + resetNewConnectionInput(connectionUpdaterInput); } private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) { @@ -212,12 +211,16 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) Workflow.await(Duration.ofMinutes(1), () -> skipScheduling()); - connectionUpdaterInput.setJobId(null); - connectionUpdaterInput.setAttemptNumber(1); - connectionUpdaterInput.setFromFailure(false); + resetNewConnectionInput(connectionUpdaterInput); } } + private void resetNewConnectionInput(ConnectionUpdaterInput connectionUpdaterInput) { + connectionUpdaterInput.setJobId(null); + connectionUpdaterInput.setAttemptNumber(1); + connectionUpdaterInput.setFromFailure(false); + } + @Override public void submitManualSync() { if (workflowState.isRunning()) {