Skip to content

Commit

Permalink
Fix record count and add acceptance test to the new scheduler (airbyt…
Browse files Browse the repository at this point in the history
…ehq#9487)

* Add a job notification

The new scheduler was missing a notification step after the job is done.

This is needed in order to report the number of record of a sync.

* Acceptance test with the new scheduler

Add a new github action task to run the acceptances test with the new scheduler

* Retry if the failure

* PR comments
  • Loading branch information
benmoriceau authored Jan 20, 2022
1 parent 97eeb2f commit e7da923
Show file tree
Hide file tree
Showing 26 changed files with 437 additions and 59 deletions.
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
MAX_CHECK_WORKERS=5
MAX_DISCOVER_WORKERS=5


### FEATURE FLAGS ###
NEW_SCHEDULER=false
3 changes: 3 additions & 0 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ jobs:
- name: Run End-to-End Acceptance Tests
run: ./tools/bin/acceptance_test.sh

- name: Run End-to-End Acceptance Tests with the new scheduler
run: ./tools/bin/acceptance_test_with_new_scheduler.sh

- name: Automatic Migration Acceptance Test
run: SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

@Override
public boolean usesNewScheduler() {
return System.getenv().containsKey("NEW_SCHEDULER");
return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ

@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody.getConnectionId()));
}

return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)

if (featureFlags.usesNewScheduler()) {
try {
LOGGER.info("Starting a connection using the new scheduler");
temporalWorkerRunFactory.createNewSchedulerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the temporal connection manager workflow failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ
return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}

final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID connectionId = connectionIdRequestBody.getConnectionId();
Expand Down Expand Up @@ -456,13 +468,14 @@ public JobInfoRead createManualRun(final UUID connectionId) throws IOException {
public JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));
final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
}

return jobConverter.getJobInfoRead(job);
final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest())
.memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit()));

when(schedulerHandler.resetConnection(any())).thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED)));
when(schedulerHandler.resetConnection(any(ConnectionIdRequestBody.class)))
.thenReturn(new JobInfoRead().job(new JobRead().status(JobStatus.SUCCEEDED)));
}

@Test
Expand Down
3 changes: 3 additions & 0 deletions airbyte-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ dependencies {

acceptanceTestsImplementation project(':airbyte-api')
acceptanceTestsImplementation project(':airbyte-commons')
acceptanceTestsImplementation project(':airbyte-config:models')
acceptanceTestsImplementation project(':airbyte-config:persistence')
acceptanceTestsImplementation project(':airbyte-db:lib')
acceptanceTestsImplementation project(':airbyte-tests')
acceptanceTestsImplementation project(':airbyte-test-utils')
acceptanceTestsImplementation project(':airbyte-workers')

acceptanceTestsImplementation 'com.fasterxml.jackson.core:jackson-databind'
acceptanceTestsImplementation 'io.github.cdimascio:java-dotenv:3.0.0'
acceptanceTestsImplementation 'io.temporal:temporal-sdk:1.6.0'
acceptanceTestsImplementation 'org.apache.commons:commons-csv:1.4'
acceptanceTestsImplementation 'org.testcontainers:postgresql:1.15.3'
acceptanceTestsImplementation 'org.postgresql:postgresql:42.2.18'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import io.airbyte.api.client.model.SourceIdRequestBody;
import io.airbyte.api.client.model.SourceRead;
import io.airbyte.api.client.model.SyncMode;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.resources.MoreResources;
Expand Down Expand Up @@ -171,6 +173,8 @@ public class AcceptanceTests {
private List<UUID> destinationIds;
private List<UUID> operationIds;

private static FeatureFlags featureFlags;

@SuppressWarnings("UnstableApiUsage")
@BeforeAll
public static void init() throws URISyntaxException, IOException, InterruptedException {
Expand Down Expand Up @@ -203,6 +207,8 @@ public static void init() throws URISyntaxException, IOException, InterruptedExc
} else {
LOGGER.info("Using external deployment of airbyte.");
}

featureFlags = new EnvVariableFeatureFlags();
}

@AfterAll
Expand Down Expand Up @@ -467,7 +473,10 @@ public void testManualSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand All @@ -486,7 +495,10 @@ public void testCancelSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

Expand Down Expand Up @@ -519,7 +531,10 @@ public void testIncrementalSync() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
LOGGER.info("Beginning testIncrementalSync() sync 1");
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -550,21 +565,31 @@ public void testIncrementalSync() throws Exception {
assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME));

// reset back to no data.

LOGGER.info("Starting testIncrementalSync() reset");
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
FeatureFlags featureFlags = new EnvVariableFeatureFlags();
if (featureFlags.usesNewScheduler()) {
waitForJob(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));
} else {
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
}

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", STREAM_NAME));
assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public",
STREAM_NAME));

// sync one more time. verify it is the equivalent of a full refresh.
LOGGER.info("Starting testIncrementalSync() sync 3");
final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

assertSourceAndDestinationDbInSync(false);

}

@Test
Expand Down Expand Up @@ -613,7 +638,10 @@ public void testMultipleSchemasAndTablesSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand Down Expand Up @@ -743,7 +771,10 @@ public void testCheckpointing() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -834,7 +865,10 @@ public void testBackpressure() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -912,6 +946,10 @@ public void testFailureTimeout() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1315,4 +1353,17 @@ public enum Type {
DESTINATION
}

private static void waitForTemporalWorkflow(final UUID connectionId) {
/*
* do { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e);
* } } while
* (temporalClient.isWorkflowRunning(temporalClient.getConnectionManagerName(connectionId)));
*/
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {

implementation project(':airbyte-analytics')
implementation project(':airbyte-api')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db:lib')
Expand Down
32 changes: 30 additions & 2 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers;

import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand All @@ -24,11 +25,14 @@
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubePortManagerSingleton;
Expand Down Expand Up @@ -100,6 +104,8 @@ public class WorkerApp {
private final Configs configs;
private final ConnectionHelper connectionHelper;
private final boolean containerOrchestratorEnabled;
private final JobNotifier jobNotifier;
private final JobTracker jobTracker;

public void start() {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -187,6 +193,8 @@ public void start() {

syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);

final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository);

final Worker connectionUpdaterWorker =
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SyncWorkflowImpl.class);
Expand All @@ -198,7 +206,11 @@ public void start() {
jobPersistence,
temporalWorkerRunFactory,
workerEnvironment,
logConfigs),
logConfigs,
jobNotifier,
jobTracker,
configRepository,
jobCreator),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper),
replicationActivity,
Expand Down Expand Up @@ -345,6 +357,12 @@ public static void main(final String[] args) throws IOException, InterruptedExce
.getInitialized();

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
new Deployment(configs.getDeploymentMode(), jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()),
configs.getAirbyteRole(),
configs.getAirbyteVersion(),
configRepository);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence, configRepository),
Expand Down Expand Up @@ -372,6 +390,14 @@ public static void main(final String[] args) throws IOException, InterruptedExce
workspaceHelper,
workerConfigs);

final JobNotifier jobNotifier = new JobNotifier(
configs.getWebappUrl(),
configRepository,
workspaceHelper,
TrackingClientSingleton.get());

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);

new WorkerApp(
workspaceRoot,
jobProcessFactory,
Expand All @@ -392,7 +418,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce
temporalWorkerRunFactory,
configs,
connectionHelper,
configs.getContainerOrchestratorEnabled()).start();
configs.getContainerOrchestratorEnabled(),
jobNotifier,
jobTracker).start();
}

}
Loading

0 comments on commit e7da923

Please sign in to comment.