Skip to content

Commit

Permalink
use attemptNumber instead of attemptId where appropriate (airbytehq#9671
Browse files Browse the repository at this point in the history
)
  • Loading branch information
pmossman authored Jan 21, 2022
1 parent 111131a commit 0bad099
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public interface JobPersistence {
* will not be changed if it is already in a terminal state.
*
* @param jobId job id
* @param attemptNumber attempt id
* @param attemptNumber attempt number
* @throws IOException exception due to interaction with persistence
*/
void failAttempt(long jobId, int attemptNumber) throws IOException;
Expand All @@ -99,7 +99,7 @@ public interface JobPersistence {
* is changed regardless of what state it is in.
*
* @param jobId job id
* @param attemptNumber attempt id
* @param attemptNumber attempt number
* @throws IOException exception due to interaction with persistence
*/
void succeedAttempt(long jobId, int attemptNumber) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput) {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(
maybeJobId.get(),
maybeAttemptId.get(),
connectionUpdaterInput.getAttemptNumber(),
standardSyncOutput.orElse(null)));

connectionUpdaterInput.setJobId(null);
Expand All @@ -196,7 +196,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput)
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput) {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(
connectionUpdaterInput.getJobId(),
connectionUpdaterInput.getAttemptId()));
connectionUpdaterInput.getAttemptNumber()));

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class AttemptCreationOutput {
class JobSuccessInput {

private long jobId;
private int attemptId;
private int attemptNumber;
private StandardSyncOutput standardSyncOutput;

}
Expand Down Expand Up @@ -110,7 +110,7 @@ class JobFailureInput {
class AttemptFailureInput {

private long jobId;
private int attemptId;
private int attemptNumber;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ public void jobSuccess(final JobSuccessInput input) {
try {
if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput);
jobPersistence.writeOutput(input.getJobId(), input.getAttemptNumber(), jobOutput);
} else {
log.warn("The job {} doesn't have an input for the attempt {}", input.getJobId(), input.getAttemptId());
log.warn("The job {} doesn't have an input for attempt number {}", input.getJobId(), input.getAttemptNumber());
}
jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptId());
jobPersistence.succeedAttempt(input.getJobId(), input.getAttemptNumber());
final Job job = jobPersistence.getJob(input.getJobId());
jobNotifier.successJob(job);
trackCompletion(job, JobStatus.SUCCEEDED);
Expand All @@ -138,8 +138,7 @@ public void jobFailure(final JobFailureInput input) {
@Override
public void attemptFailure(final AttemptFailureInput input) {
try {
jobPersistence.failAttempt(input.getJobId(), input.getAttemptId());
final Job job = jobPersistence.getJob(input.getJobId());
jobPersistence.failAttempt(input.getJobId(), input.getAttemptNumber());
} catch (final IOException e) {
throw new RetryableException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class JobCreationAndStatusUpdateActivityTest {
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final long JOB_ID = 123L;
private static final int ATTEMPT_ID = 321;
private static final int ATTEMPT_NUMBER = 2;
private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
Expand Down Expand Up @@ -146,21 +147,21 @@ class Update {

@Test
public void setJobSuccess() throws IOException {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_NUMBER, standardSyncOutput));
final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput);

Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_NUMBER, jobOutput);
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_NUMBER);
Mockito.verify(mJobNotifier).successJob(Mockito.any());
Mockito.verify(mJobtracker).trackSync(Mockito.any(), Mockito.eq(JobState.SUCCEEDED));
}

@Test
public void setJobSuccessWrapException() throws IOException {
Mockito.doThrow(new IOException())
.when(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
.when(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_NUMBER);

Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, null)))
Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_NUMBER, null)))
.isInstanceOf(RetryableException.class)
.hasCauseInstanceOf(IOException.class);
}
Expand All @@ -185,17 +186,17 @@ public void setJobFailureWrapException() throws IOException {

@Test
public void setAttemptFailure() throws IOException {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID));
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_NUMBER));

Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_NUMBER);
}

@Test
public void setAttemptFailureWrapException() throws IOException {
Mockito.doThrow(new IOException())
.when(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
.when(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_NUMBER);

Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID)))
Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_NUMBER)))
.isInstanceOf(RetryableException.class)
.hasCauseInstanceOf(IOException.class);
}
Expand Down

0 comments on commit 0bad099

Please sign in to comment.