From 0bad09965024329cf47bda43b53989edcb3d20d2 Mon Sep 17 00:00:00 2001 From: Parker Mossman Date: Thu, 20 Jan 2022 18:31:51 -0800 Subject: [PATCH] use attemptNumber instead of attemptId where appropriate (#9671) --- .../scheduler/persistence/JobPersistence.java | 4 ++-- .../ConnectionManagerWorkflowImpl.java | 4 ++-- .../JobCreationAndStatusUpdateActivity.java | 4 ++-- ...obCreationAndStatusUpdateActivityImpl.java | 9 ++++----- ...obCreationAndStatusUpdateActivityTest.java | 19 ++++++++++--------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 6abb06991082..c2d8b46d9e05 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -89,7 +89,7 @@ public interface JobPersistence { * will not be changed if it is already in a terminal state. * * @param jobId job id - * @param attemptNumber attempt id + * @param attemptNumber attempt number * @throws IOException exception due to interaction with persistence */ void failAttempt(long jobId, int attemptNumber) throws IOException; @@ -99,7 +99,7 @@ public interface JobPersistence { * is changed regardless of what state it is in. * * @param jobId job id - * @param attemptNumber attempt id + * @param attemptNumber attempt number * @throws IOException exception due to interaction with persistence */ void succeedAttempt(long jobId, int attemptNumber) throws IOException; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 058bb927188d..d6c53efdf9c7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -185,7 +185,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) { jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput( maybeJobId.get(), - maybeAttemptId.get(), + connectionUpdaterInput.getAttemptNumber(), standardSyncOutput.orElse(null))); connectionUpdaterInput.setJobId(null); @@ -196,7 +196,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) { jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput( connectionUpdaterInput.getJobId(), - connectionUpdaterInput.getAttemptId())); + connectionUpdaterInput.getAttemptNumber())); final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt(); final int attemptNumber = connectionUpdaterInput.getAttemptNumber(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java index 385f952eb56a..285f349d2cdc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java @@ -77,7 +77,7 @@ class AttemptCreationOutput { class JobSuccessInput { private long jobId; - private int attemptId; + private int attemptNumber; private StandardSyncOutput standardSyncOutput; } @@ -110,7 +110,7 @@ class JobFailureInput { class AttemptFailureInput { private long jobId; - private int attemptId; + private int attemptNumber; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index bfd122a8d808..4e9150b8411c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -110,11 +110,11 @@ public void jobSuccess(final JobSuccessInput input) { try { if (input.getStandardSyncOutput() != null) { final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput()); - jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput); + jobPersistence.writeOutput(input.getJobId(), input.getAttemptNumber(), jobOutput); } else { - log.warn("The job {} doesn't have an input for the attempt {}", input.getJobId(), input.getAttemptId()); + log.warn("The job {} doesn't have an input for attempt number {}", input.getJobId(), input.getAttemptNumber()); } - jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptId()); + jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptNumber()); final Job job = jobPersistence.getJob(input.getJobId()); jobNotifier.successJob(job); trackCompletion(job, JobStatus.SUCCEEDED); @@ -138,8 +138,7 @@ public void jobFailure(final JobFailureInput input) { @Override public void attemptFailure(final AttemptFailureInput input) { try { - jobPersistence.failAttempt(input.getJobId(), input.getAttemptId()); - final Job job = jobPersistence.getJob(input.getJobId()); + jobPersistence.failAttempt(input.getJobId(), input.getAttemptNumber()); } catch (final IOException e) { throw new RetryableException(e); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 6cb059dee118..1eb94fcef167 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -72,6 +72,7 @@ public class JobCreationAndStatusUpdateActivityTest { private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final long JOB_ID = 123L; private static final int ATTEMPT_ID = 321; + private static final int ATTEMPT_NUMBER = 2; private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput() .withStandardSyncSummary( new StandardSyncSummary() @@ -146,11 +147,11 @@ class Update { @Test public void setJobSuccess() throws IOException { - jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput)); + jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_NUMBER, standardSyncOutput)); final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput); - Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); - Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); + Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_NUMBER, jobOutput); + Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_NUMBER); Mockito.verify(mJobNotifier).successJob(Mockito.any()); Mockito.verify(mJobtracker).trackSync(Mockito.any(), Mockito.eq(JobState.SUCCEEDED)); } @@ -158,9 +159,9 @@ public void setJobSuccess() throws IOException { @Test public void setJobSuccessWrapException() throws IOException { Mockito.doThrow(new IOException()) - .when(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); + .when(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_NUMBER); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, null))) + Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_NUMBER, null))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); } @@ -185,17 +186,17 @@ public void setJobFailureWrapException() throws IOException { @Test public void setAttemptFailure() throws IOException { - jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID)); + jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_NUMBER)); - Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID); + Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_NUMBER); } @Test public void setAttemptFailureWrapException() throws IOException { Mockito.doThrow(new IOException()) - .when(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID); + .when(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_NUMBER); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID))) + Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_NUMBER))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); }