From db4093277fc3a358314aca4de441d81f02dea598 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Thu, 20 Jan 2022 07:56:06 -0800 Subject: [PATCH] async container launch kubernetes "process" (#9242) * add misc todos * save work so far * configure async pod processing * remove comment * fmt * working except logging propagation? * add comment * add logging and misc configuration fixes * add output propagation * fix state reading * logging is working (but background highlighting is not) * fix log highlighting * use sys instead of ctx * comment * clean up and test state management * clean up orchestrator app construction * unify launcher workers and handle resuming * respond to comments * misc * disable * fix comment * respond to comments --- .env | 1 - airbyte-commons/src/main/resources/log4j2.xml | 2 +- .../java/io/airbyte/config/EnvConfigs.java | 30 +- .../storage/DefaultGcsClientFactory.java | 14 +- .../storage/DefaultS3ClientFactory.java | 2 + .../config/storage/MinioS3ClientFactory.java | 2 + airbyte-container-orchestrator/Dockerfile | 7 + .../AsyncStateManager.java | 40 ++ .../ContainerOrchestratorApp.java | 150 ++++++-- .../DbtJobOrchestrator.java | 11 +- .../DefaultAsyncStateManager.java | 86 +++++ .../JobOrchestrator.java | 48 ++- .../NoOpOrchestrator.java | 33 ++ .../NormalizationJobOrchestrator.java | 10 +- .../ReplicationJobOrchestrator.java | 18 +- .../DefaultAsyncStateManagerTest.java | 108 ++++++ .../java/io/airbyte/workers/WorkerApp.java | 113 +++--- .../workers/process/AsyncKubePodStatus.java | 14 + .../process/AsyncOrchestratorPodProcess.java | 347 ++++++++++++++++++ .../io/airbyte/workers/process/KubePod.java | 21 ++ .../airbyte/workers/process/KubePodInfo.java | 7 + .../workers/process/KubePodProcess.java | 47 ++- .../workers/process/KubeProcessFactory.java | 20 +- .../airbyte/workers/storage/StateClients.java | 30 ++ .../airbyte/workers/storage/WorkerStore.java | 54 --- .../temporal/sync/DbtLauncherWorker.java | 129 +------ .../sync/DbtTransformationActivityImpl.java | 16 +- .../workers/temporal/sync/LauncherWorker.java | 183 +++++++++ .../sync/NormalizationActivityImpl.java | 16 +- .../sync/NormalizationLauncherWorker.java | 127 +------ .../temporal/sync/OrchestratorConstants.java | 28 +- .../sync/ReplicationActivityImpl.java | 22 +- .../sync/ReplicationLauncherWorker.java | 152 +------- .../resources/entrypoints/{ => sync}/check.sh | 0 .../resources/entrypoints/{ => sync}/main.sh | 0 ...OrchestratorPodProcessIntegrationTest.java | 149 ++++++++ .../workers/storage/WorkerStoreTest.java | 69 ---- charts/airbyte/templates/env-configmap.yaml | 2 + .../airbyte/templates/worker/deployment.yaml | 10 + kube/overlays/dev-integration-test/.env | 6 +- kube/overlays/dev-integration-test/.secrets | 2 + kube/overlays/dev/.env | 5 +- kube/overlays/dev/.secrets | 2 + .../overlays/stable-with-resource-limits/.env | 6 +- .../stable-with-resource-limits/.secrets | 3 +- kube/overlays/stable/.env | 6 +- kube/overlays/stable/.secrets | 2 + kube/resources/worker.yaml | 26 ++ 48 files changed, 1496 insertions(+), 680 deletions(-) create mode 100644 airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java create mode 100644 airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java create mode 100644 airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java create mode 100644 airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java delete mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java rename airbyte-workers/src/main/resources/entrypoints/{ => sync}/check.sh (100%) rename airbyte-workers/src/main/resources/entrypoints/{ => sync}/main.sh (100%) create mode 100644 airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java delete mode 100644 airbyte-workers/src/test/java/io/airbyte/workers/storage/WorkerStoreTest.java diff --git a/.env b/.env index 03d63b19bfa9..4e4bd18d3cc4 100644 --- a/.env +++ b/.env @@ -50,7 +50,6 @@ CONFIG_DATABASE_PASSWORD= CONFIG_DATABASE_URL= CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.1.001 - ### AIRBYTE SERVICES ### TEMPORAL_HOST=airbyte-temporal:7233 INTERNAL_API_HOST=airbyte-server:8001 diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml index ed578354eb79..70cabfd000a7 100644 --- a/airbyte-commons/src/main/resources/log4j2.xml +++ b/airbyte-commons/src/main/resources/log4j2.xml @@ -128,7 +128,7 @@ s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}" s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}"> - + diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 065c50963f90..540bc7b05b74 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -89,16 +89,16 @@ public class EnvConfigs implements Configs { private static final String JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS = "JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS"; private static final String CONTAINER_ORCHESTRATOR_ENABLED = "CONTAINER_ORCHESTRATOR_ENABLED"; - private static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME"; - private static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION"; - private static final String STATE_STORAGE_S3_ACCESS_KEY = "STATE_STORAGE_S3_ACCESS_KEY"; - private static final String STATE_STORAGE_S3_SECRET_ACCESS_KEY = "STATE_STORAGE_S3_SECRET_ACCESS_KEY"; - private static final String STATE_STORAGE_MINIO_BUCKET_NAME = "STATE_STORAGE_MINIO_BUCKET_NAME"; - private static final String STATE_STORAGE_MINIO_ENDPOINT = "STATE_STORAGE_MINIO_ENDPOINT"; - private static final String STATE_STORAGE_MINIO_ACCESS_KEY = "STATE_STORAGE_MINIO_ACCESS_KEY"; - private static final String STATE_STORAGE_MINIO_SECRET_ACCESS_KEY = "STATE_STORAGE_MINIO_SECRET_ACCESS_KEY"; - private static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME"; - private static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS"; + public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME"; + public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION"; + public static final String STATE_STORAGE_S3_ACCESS_KEY = "STATE_STORAGE_S3_ACCESS_KEY"; + public static final String STATE_STORAGE_S3_SECRET_ACCESS_KEY = "STATE_STORAGE_S3_SECRET_ACCESS_KEY"; + public static final String STATE_STORAGE_MINIO_BUCKET_NAME = "STATE_STORAGE_MINIO_BUCKET_NAME"; + public static final String STATE_STORAGE_MINIO_ENDPOINT = "STATE_STORAGE_MINIO_ENDPOINT"; + public static final String STATE_STORAGE_MINIO_ACCESS_KEY = "STATE_STORAGE_MINIO_ACCESS_KEY"; + public static final String STATE_STORAGE_MINIO_SECRET_ACCESS_KEY = "STATE_STORAGE_MINIO_SECRET_ACCESS_KEY"; + public static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME"; + public static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS"; // defaults private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; @@ -172,18 +172,18 @@ private Optional getStateStorageConfiguration() { if (getEnv(STATE_STORAGE_GCS_BUCKET_NAME) != null) { return Optional.of(CloudStorageConfigs.gcs(new GcsConfig( getEnvOrDefault(STATE_STORAGE_GCS_BUCKET_NAME, ""), - getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, "")))); + getEnvOrDefault(STATE_STORAGE_GCS_APPLICATION_CREDENTIALS, "")))); } else if (getEnv(STATE_STORAGE_MINIO_ENDPOINT) != null) { return Optional.of(CloudStorageConfigs.minio(new MinioConfig( getEnvOrDefault(STATE_STORAGE_MINIO_BUCKET_NAME, ""), - getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""), - getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""), + getEnvOrDefault(STATE_STORAGE_MINIO_ACCESS_KEY, ""), + getEnvOrDefault(STATE_STORAGE_MINIO_SECRET_ACCESS_KEY, ""), getEnvOrDefault(STATE_STORAGE_MINIO_ENDPOINT, "")))); } else if (getEnv(STATE_STORAGE_S3_REGION) != null) { return Optional.of(CloudStorageConfigs.s3(new S3Config( getEnvOrDefault(STATE_STORAGE_S3_BUCKET_NAME, ""), - getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""), - getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""), + getEnvOrDefault(STATE_STORAGE_S3_ACCESS_KEY, ""), + getEnvOrDefault(STATE_STORAGE_S3_SECRET_ACCESS_KEY, ""), getEnvOrDefault(STATE_STORAGE_S3_REGION, "")))); } else { return Optional.empty(); diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java index 0c242c9098ec..dbd4b7b6f3e4 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java @@ -5,9 +5,12 @@ package io.airbyte.config.storage; import com.google.api.client.util.Preconditions; +import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; import java.util.function.Supplier; /** @@ -16,8 +19,11 @@ */ public class DefaultGcsClientFactory implements Supplier { + private final GcsConfig config; + public DefaultGcsClientFactory(final GcsConfig config) { validate(config); + this.config = config; } private static void validate(final GcsConfig config) { @@ -27,7 +33,13 @@ private static void validate(final GcsConfig config) { @Override public Storage get() { - return StorageOptions.getDefaultInstance().getService(); + try { + final var credentialsByteStream = new ByteArrayInputStream(config.getGoogleApplicationCredentials().getBytes(StandardCharsets.UTF_8)); + final var credentials = ServiceAccountCredentials.fromStream(credentialsByteStream); + return StorageOptions.newBuilder().setCredentials(credentials).build().getService(); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java index 62b0237009b8..897cc68cd02a 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java @@ -8,6 +8,7 @@ import io.airbyte.config.storage.CloudStorageConfigs.S3ApiWorkerStorageConfig; import io.airbyte.config.storage.CloudStorageConfigs.S3Config; import java.util.function.Supplier; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -41,6 +42,7 @@ static void validateBase(final S3ApiWorkerStorageConfig s3BaseConfig) { @Override public S3Client get() { final var builder = S3Client.builder(); + builder.credentialsProvider(() -> AwsBasicCredentials.create(s3Config.getAwsAccessKey(), s3Config.getAwsSecretAccessKey())); builder.region(Region.of(s3Config.getRegion())); return builder.build(); } diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java b/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java index 31cd86e71db7..5cef8b97ac2f 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java @@ -9,6 +9,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.function.Supplier; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -40,6 +41,7 @@ public S3Client get() { final var minioEndpoint = minioConfig.getMinioEndpoint(); try { final var minioUri = new URI(minioEndpoint); + builder.credentialsProvider(() -> AwsBasicCredentials.create(minioConfig.getAwsAccessKey(), minioConfig.getAwsSecretAccessKey())); builder.endpointOverride(minioUri); builder.region(Region.US_EAST_1); // Although this is not used, the S3 client will error out if this is not set. Set a stub value. } catch (final URISyntaxException e) { diff --git a/airbyte-container-orchestrator/Dockerfile b/airbyte-container-orchestrator/Dockerfile index b805172795d8..4fba8521e028 100644 --- a/airbyte-container-orchestrator/Dockerfile +++ b/airbyte-container-orchestrator/Dockerfile @@ -18,6 +18,13 @@ RUN add-apt-repository \ stable" RUN apt-get update && apt-get install -y docker-ce-cli jq +# Install kubectl for copying files to kube pods. Eventually should be replaced with a kube java client. +# See https://github.com/airbytehq/airbyte/issues/8643 for more information on why we are using kubectl for copying. +# The following commands were taken from https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/#install-using-native-package-management +RUN curl -fsSLo /usr/share/keyrings/kubernetes-archive-keyring.gpg https://packages.cloud.google.com/apt/doc/apt-key.gpg +RUN echo "deb [signed-by=/usr/share/keyrings/kubernetes-archive-keyring.gpg] https://apt.kubernetes.io/ kubernetes-xenial main" | tee /etc/apt/sources.list.d/kubernetes.list +RUN apt-get update && apt-get install -y kubectl + ENV APPLICATION airbyte-container-orchestrator ENV AIRBYTE_ENTRYPOINT "/app/${APPLICATION}-0.35.6-alpha/bin/${APPLICATION}" diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java new file mode 100644 index 000000000000..d05d2f408e1a --- /dev/null +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.container_orchestrator; + +import io.airbyte.workers.process.AsyncKubePodStatus; +import io.airbyte.workers.process.KubePodInfo; + +/** + * The state manager writes the "truth" for states of the async pod process. If the store isn't + * updated by the underlying pod, it will appear as failed. + * + * It doesn't have a single value for a state. Instead, in a location on cloud storage or disk, it + * writes every state it's encountered. + */ +public interface AsyncStateManager { + + /** + * Writes a file containing a string value to a location designated by the input status. + */ + void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value); + + /** + * Writes an empty file to a location designated by the input status. + */ + void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status); + + /** + * Interprets the state given all written state messages for the pod. + */ + AsyncKubePodStatus getStatus(final KubePodInfo kubePodInfo); + + /** + * @return the output stored in the success file. This can be an empty string. + * @throws IllegalArgumentException if no success file exists + */ + String getOutput(final KubePodInfo kubePodInfo) throws IllegalArgumentException; + +} diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java index c75b0e3a65d6..476f049d608f 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java @@ -4,16 +4,25 @@ package io.airbyte.container_orchestrator; -import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper; +import io.airbyte.commons.logging.MdcScope; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.helpers.LogClientSingleton; +import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.process.AsyncKubePodStatus; +import io.airbyte.workers.process.AsyncOrchestratorPodProcess; import io.airbyte.workers.process.DockerProcessFactory; +import io.airbyte.workers.process.KubePodInfo; +import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.KubePortManagerSingleton; import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; +import io.airbyte.workers.storage.StateClients; import io.airbyte.workers.temporal.sync.DbtLauncherWorker; import io.airbyte.workers.temporal.sync.NormalizationLauncherWorker; import io.airbyte.workers.temporal.sync.OrchestratorConstants; @@ -22,9 +31,9 @@ import io.fabric8.kubernetes.client.KubernetesClient; import java.io.IOException; import java.net.InetAddress; -import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; /** @@ -41,54 +50,129 @@ @Slf4j public class ContainerOrchestratorApp { - public static void main(final String[] args) throws Exception { - WorkerHeartbeatServer heartbeatServer = null; + private final String application; + private final Map envMap; + private final JobRunConfig jobRunConfig; + private final KubePodInfo kubePodInfo; + private final Configs configs; - try { - // read files that contain all necessary configuration - final String application = Files.readString(Path.of(OrchestratorConstants.INIT_FILE_APPLICATION)); - final Map envMap = - (Map) Jsons.deserialize(Files.readString(Path.of(OrchestratorConstants.INIT_FILE_ENV_MAP)), Map.class); + public ContainerOrchestratorApp( + final String application, + final Map envMap, + final JobRunConfig jobRunConfig, + final KubePodInfo kubePodInfo) { + this.application = application; + this.envMap = envMap; + this.jobRunConfig = jobRunConfig; + this.kubePodInfo = kubePodInfo; + this.configs = new EnvConfigs(envMap); + } - final Configs configs = new EnvConfigs(envMap); + private void configureLogging() { + for (String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) { + if (envMap.containsKey(envVar)) { + System.setProperty(envVar, envMap.get(envVar)); + } + } - heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT); - heartbeatServer.startBackground(); + final var logClient = LogClientSingleton.getInstance(); + logClient.setJobMdc( + configs.getWorkerEnvironment(), + configs.getLogConfigs(), + WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId())); + } + + /** + * Handles state updates (including writing failures) and running the job orchestrator. As much of + * the initialization as possible should go in here so it's logged properly and the state storage is + * updated appropriately. + */ + private void runInternal(final DefaultAsyncStateManager asyncStateManager) { + try { + asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.INITIALIZING); final WorkerConfigs workerConfigs = new WorkerConfigs(configs); final ProcessFactory processFactory = getProcessBuilderFactory(configs, workerConfigs); final JobOrchestrator jobOrchestrator = getJobOrchestrator(configs, workerConfigs, processFactory, application); - log.info("Starting {} orchestrator...", jobOrchestrator.getOrchestratorName()); - jobOrchestrator.runJob(); - log.info("{} orchestrator complete!", jobOrchestrator.getOrchestratorName()); - } finally { - if (heartbeatServer != null) { - log.info("Shutting down heartbeat server..."); - heartbeatServer.stop(); + if (jobOrchestrator == null) { + throw new IllegalStateException("Could not find job orchestrator for application: " + application); } + + final var heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT); + heartbeatServer.startBackground(); + + asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.RUNNING); + + final Optional output = jobOrchestrator.runJob(); + + asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.SUCCEEDED, output.orElse("")); + + // required to kill clients with thread pools + System.exit(0); + } catch (Throwable t) { + asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.FAILED); + System.exit(1); } + } - // required to kill kube client - log.info("Runner closing..."); - System.exit(0); + /** + * Configures logging/mdc scope, and creates all objects necessary to handle state updates. + * Everything else is delegated to {@link ContainerOrchestratorApp#runInternal}. + */ + public void run() { + configureLogging(); + + // set mdc scope for the remaining execution + try (final var mdcScope = new MdcScope.Builder() + .setLogPrefix(application) + .setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND) + .build()) { + + // IMPORTANT: Changing the storage location will orphan already existing kube pods when the new + // version is deployed! + final var documentStoreClient = StateClients.create(configs.getStateStorageCloudConfigs(), WorkerApp.STATE_STORAGE_PREFIX); + final var asyncStateManager = new DefaultAsyncStateManager(documentStoreClient); + + runInternal(asyncStateManager); + } + } + + public static void main(final String[] args) { + try { + // wait for config files to be copied + final var successFile = Path.of(KubePodProcess.CONFIG_DIR, KubePodProcess.SUCCESS_FILE_NAME); + + while (!successFile.toFile().exists()) { + log.info("Waiting for config file transfers to complete..."); + Thread.sleep(1000); + } + + final var applicationName = JobOrchestrator.readApplicationName(); + final var envMap = JobOrchestrator.readEnvMap(); + final var jobRunConfig = JobOrchestrator.readJobRunConfig(); + final var kubePodInfo = JobOrchestrator.readKubePodInfo(); + + final var app = new ContainerOrchestratorApp(applicationName, envMap, jobRunConfig, kubePodInfo); + app.run(); + } catch (Throwable t) { + log.info("Orchestrator failed...", t); + System.exit(1); + } } private static JobOrchestrator getJobOrchestrator(final Configs configs, final WorkerConfigs workerConfigs, final ProcessFactory processFactory, final String application) { - if (application.equals(ReplicationLauncherWorker.REPLICATION)) { - return new ReplicationJobOrchestrator(configs, workerConfigs, processFactory); - } else if (application.equals(NormalizationLauncherWorker.NORMALIZATION)) { - return new NormalizationJobOrchestrator(configs, workerConfigs, processFactory); - } else if (application.equals(DbtLauncherWorker.DBT)) { - return new DbtJobOrchestrator(configs, workerConfigs, processFactory); - } else { - log.error("Runner failed", new IllegalStateException("Unexpected value: " + application)); - System.exit(1); - throw new IllegalStateException(); // should never be reached, but necessary to compile - } + + return switch (application) { + case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(configs, workerConfigs, processFactory); + case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(configs, workerConfigs, processFactory); + case DbtLauncherWorker.DBT -> new DbtJobOrchestrator(configs, workerConfigs, processFactory); + case AsyncOrchestratorPodProcess.NO_OP -> new NoOpOrchestrator(); + default -> null; + }; } /** diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java index fc426adb5ed2..a7b043f75a1e 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java @@ -13,9 +13,11 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker; import java.nio.file.Path; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -42,12 +44,13 @@ public Class getInputClass() { } @Override - public void runJob() throws Exception { - final JobRunConfig jobRunConfig = readJobRunConfig(); + public Optional runJob() throws Exception { + final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig(); final OperatorDbtInput dbtInput = readInput(); final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile( - ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class); + Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG), + IntegrationLauncherConfig.class); log.info("Setting up dbt worker..."); final DbtTransformationWorker worker = new DbtTransformationWorker( @@ -65,6 +68,8 @@ public void runJob() throws Exception { log.info("Running dbt worker..."); final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); worker.run(dbtInput, jobRoot); + + return Optional.empty(); } } diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java new file mode 100644 index 000000000000..212d9adef578 --- /dev/null +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.container_orchestrator; + +import io.airbyte.workers.process.AsyncKubePodStatus; +import io.airbyte.workers.process.KubePodInfo; +import io.airbyte.workers.storage.DocumentStoreClient; +import java.util.List; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DefaultAsyncStateManager implements AsyncStateManager { + + private static final List STATUS_CHECK_ORDER = List.of( + // terminal states first + AsyncKubePodStatus.FAILED, + AsyncKubePodStatus.SUCCEEDED, + + // then check in progress state + AsyncKubePodStatus.RUNNING, + + // then check for initialization state + AsyncKubePodStatus.INITIALIZING); + + private final DocumentStoreClient documentStoreClient; + + public DefaultAsyncStateManager(final DocumentStoreClient documentStoreClient) { + this.documentStoreClient = documentStoreClient; + } + + @Override + public void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value) { + final var key = getDocumentStoreKey(kubePodInfo, status); + log.info("Writing async status {} for {}...", status, kubePodInfo); + documentStoreClient.write(key, value); + } + + @Override + public void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status) { + write(kubePodInfo, status, ""); + } + + /** + * Checks terminal states first, then running, then initialized. Defaults to not started. + * + * The order matters here! + */ + @Override + public AsyncKubePodStatus getStatus(KubePodInfo kubePodInfo) { + for (AsyncKubePodStatus status : STATUS_CHECK_ORDER) { + if (statusFileExists(kubePodInfo, status)) { + return status; + } + } + + return AsyncKubePodStatus.NOT_STARTED; + } + + @Override + public String getOutput(KubePodInfo kubePodInfo) throws IllegalArgumentException { + final var key = getDocumentStoreKey(kubePodInfo, AsyncKubePodStatus.SUCCEEDED); + final var output = documentStoreClient.read(key); + + if (output.isPresent()) { + return output.get(); + } else { + throw new IllegalArgumentException("Expected to retrieve output from a successfully completed pod!"); + } + } + + /** + * IMPORTANT: Changing the storage location will orphan already existing kube pods when the new + * version is deployed! + */ + public static String getDocumentStoreKey(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status) { + return kubePodInfo.namespace() + "/" + kubePodInfo.name() + "/" + status.name(); + } + + private boolean statusFileExists(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status) { + final var key = getDocumentStoreKey(kubePodInfo, status); + return documentStoreClient.read(key).isPresent(); + } + +} diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java index d207763a4f28..8c572cd97bf6 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java @@ -6,10 +6,15 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.process.AsyncOrchestratorPodProcess; +import io.airbyte.workers.process.KubePodInfo; +import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.temporal.sync.OrchestratorConstants; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; +import java.util.Optional; /** * The job orchestrator helps abstract over container launcher application differences across @@ -27,19 +32,46 @@ public interface JobOrchestrator { // reads input from a file that was copied to the container launcher default INPUT readInput() throws IOException { - return readAndDeserializeFile(OrchestratorConstants.INIT_FILE_INPUT, getInputClass()); + return readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_INPUT), getInputClass()); } - // reads the job run config from a file that was copied to the container launcher - default JobRunConfig readJobRunConfig() throws IOException { - return readAndDeserializeFile(OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, JobRunConfig.class); + /** + * reads the application name from a file that was copied to the container launcher + */ + static String readApplicationName() throws IOException { + return Files.readString(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_APPLICATION)); } - // the unique logic that belongs to each type of job belongs here - void runJob() throws Exception; + /** + * reads the environment variable map from a file that was copied to the container launcher + */ + static Map readEnvMap() throws IOException { + return (Map) readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_ENV_MAP), Map.class); + } + + /** + * reads the job run config from a file that was copied to the container launcher + */ + static JobRunConfig readJobRunConfig() throws IOException { + return readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG), JobRunConfig.class); + } + + /** + * reads the kube pod info from a file that was copied to the container launcher + */ + static KubePodInfo readKubePodInfo() throws IOException { + return readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, AsyncOrchestratorPodProcess.KUBE_POD_INFO), KubePodInfo.class); + } + + /** + * Contains the unique logic that belongs to each type of job. + * + * @return an optional output value to place within the output document store item. + */ + Optional runJob() throws Exception; - static T readAndDeserializeFile(String path, Class type) throws IOException { - return Jsons.deserialize(Files.readString(Path.of(path)), type); + static T readAndDeserializeFile(Path path, Class type) throws IOException { + return Jsons.deserialize(Files.readString(path), type); } } diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java new file mode 100644 index 000000000000..77181e817644 --- /dev/null +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.container_orchestrator; + +import io.airbyte.workers.process.AsyncOrchestratorPodProcess; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; + +/** + * For testing only. + */ +@Slf4j +public class NoOpOrchestrator implements JobOrchestrator { + + @Override + public String getOrchestratorName() { + return AsyncOrchestratorPodProcess.NO_OP; + } + + @Override + public Class getInputClass() { + return String.class; + } + + @Override + public Optional runJob() throws Exception { + log.info("Running no-op job."); + return Optional.empty(); + } + +} diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java index 6d8251e28947..0462be271ca8 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java @@ -13,9 +13,11 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker; import java.nio.file.Path; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -42,12 +44,13 @@ public Class getInputClass() { } @Override - public void runJob() throws Exception { - final JobRunConfig jobRunConfig = readJobRunConfig(); + public Optional runJob() throws Exception { + final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig(); final NormalizationInput normalizationInput = readInput(); final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile( - ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class); + Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG), + IntegrationLauncherConfig.class); log.info("Setting up normalization worker..."); final NormalizationWorker normalizationWorker = new DefaultNormalizationWorker( @@ -64,6 +67,7 @@ public void runJob() throws Exception { final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); normalizationWorker.run(normalizationInput, jobRoot); + return Optional.empty(); } } diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java index cbcc4338c056..f7f515ce4d18 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java @@ -17,6 +17,7 @@ import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker; import io.airbyte.workers.protocols.airbyte.AirbyteSource; @@ -26,6 +27,7 @@ import io.airbyte.workers.protocols.airbyte.NamespacingMapper; import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker; import java.nio.file.Path; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -52,15 +54,17 @@ public Class getInputClass() { } @Override - public void runJob() throws Exception { - final JobRunConfig jobRunConfig = readJobRunConfig(); + public Optional runJob() throws Exception { + final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig(); final StandardSyncInput syncInput = readInput(); final IntegrationLauncherConfig sourceLauncherConfig = JobOrchestrator.readAndDeserializeFile( - ReplicationLauncherWorker.INIT_FILE_SOURCE_LAUNCHER_CONFIG, IntegrationLauncherConfig.class); + Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_SOURCE_LAUNCHER_CONFIG), + IntegrationLauncherConfig.class); final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile( - ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class); + Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG), + IntegrationLauncherConfig.class); log.info("Setting up source launcher..."); final IntegrationLauncher sourceLauncher = new AirbyteIntegrationLauncher( @@ -97,10 +101,8 @@ public void runJob() throws Exception { final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); final ReplicationOutput replicationOutput = replicationWorker.run(syncInput, jobRoot); - log.info("Sending output..."); - // this uses stdout directly because it shouldn't have the logging related prefix - // the replication output is read from the container that launched the runner - System.out.println(Jsons.serialize(replicationOutput)); + log.info("Returning output..."); + return Optional.of(Jsons.serialize(replicationOutput)); } } diff --git a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java new file mode 100644 index 000000000000..c543d3d6a823 --- /dev/null +++ b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.container_orchestrator; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; + +import io.airbyte.workers.process.AsyncKubePodStatus; +import io.airbyte.workers.process.KubePodInfo; +import io.airbyte.workers.storage.DocumentStoreClient; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class DefaultAsyncStateManagerTest { + + private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1"); + + private DocumentStoreClient documentStore; + private AsyncStateManager stateManager; + + @BeforeEach + void setup() { + documentStore = mock(DocumentStoreClient.class); + stateManager = new DefaultAsyncStateManager(documentStore); + } + + @Test + void testEmptyWrite() { + stateManager.write(KUBE_POD_INFO, AsyncKubePodStatus.INITIALIZING); + + // test for overwrite (which should be allowed) + stateManager.write(KUBE_POD_INFO, AsyncKubePodStatus.INITIALIZING); + + final var key = getKey(AsyncKubePodStatus.INITIALIZING); + verify(documentStore, times(2)).write(key, ""); + } + + @Test + void testContentfulWrite() { + stateManager.write(KUBE_POD_INFO, AsyncKubePodStatus.SUCCEEDED, "some output value"); + + final var key = getKey(AsyncKubePodStatus.SUCCEEDED); + verify(documentStore, times(1)).write(key, "some output value"); + } + + @Test + void testReadingOutputWhenItExists() { + final var key = getKey(AsyncKubePodStatus.SUCCEEDED); + when(documentStore.read(key)).thenReturn(Optional.of("some output value")); + assertEquals("some output value", stateManager.getOutput(KUBE_POD_INFO)); + } + + @Test + void testReadingOutputWhenItDoesNotExist() { + // getting the output should throw an exception when there is no record in the document store + assertThrows(IllegalArgumentException.class, () -> { + stateManager.getOutput(KUBE_POD_INFO); + }); + } + + @Test + void testSuccessfulStatusRetrievalLifecycle() { + when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.empty()); + final var beforeInitializingStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.NOT_STARTED, beforeInitializingStatus); + + when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.of("")); + final var initializingStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.INITIALIZING, initializingStatus); + + when(documentStore.read(getKey(AsyncKubePodStatus.RUNNING))).thenReturn(Optional.of("")); + final var runningStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.RUNNING, runningStatus); + + when(documentStore.read(getKey(AsyncKubePodStatus.SUCCEEDED))).thenReturn(Optional.of("output")); + final var succeededStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.SUCCEEDED, succeededStatus); + } + + @Test + void testFailureStatusRetrievalLifecycle() { + when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.empty()); + final var beforeInitializingStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.NOT_STARTED, beforeInitializingStatus); + + when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.of("")); + final var initializingStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.INITIALIZING, initializingStatus); + + when(documentStore.read(getKey(AsyncKubePodStatus.RUNNING))).thenReturn(Optional.of("")); + final var runningStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.RUNNING, runningStatus); + + when(documentStore.read(getKey(AsyncKubePodStatus.FAILED))).thenReturn(Optional.of("output")); + final var failedStatus = stateManager.getStatus(KUBE_POD_INFO); + assertEquals(AsyncKubePodStatus.FAILED, failedStatus); + } + + private static String getKey(final AsyncKubePodStatus status) { + return DefaultAsyncStateManager.getDocumentStoreKey(KUBE_POD_INFO, status); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 5bf166662218..bc9e89b01e69 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -39,6 +39,8 @@ import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; +import io.airbyte.workers.storage.DocumentStoreClient; +import io.airbyte.workers.storage.StateClients; import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalUtils; @@ -84,9 +86,12 @@ public class WorkerApp { private static final Logger LOGGER = LoggerFactory.getLogger(WorkerApp.class); public static final int KUBE_HEARTBEAT_PORT = 9000; + // IMPORTANT: Changing the storage location will orphan already existing kube pods when the new + // version is deployed! + public static final Path STATE_STORAGE_PREFIX = Path.of("/state"); + private final Path workspaceRoot; private final ProcessFactory jobProcessFactory; - private final ProcessFactory orchestratorProcessFactory; private final SecretsHydrator secretsHydrator; private final WorkflowServiceStubs temporalService; private final ConfigRepository configRepository; @@ -103,7 +108,7 @@ public class WorkerApp { private final TemporalWorkerRunFactory temporalWorkerRunFactory; private final Configs configs; private final ConnectionHelper connectionHelper; - private final boolean containerOrchestratorEnabled; + private final Optional containerOrchestratorConfig; private final JobNotifier jobNotifier; private final JobTracker jobTracker; @@ -147,10 +152,9 @@ public void start() { final NormalizationActivityImpl normalizationActivity = new NormalizationActivityImpl( - containerOrchestratorEnabled, + containerOrchestratorConfig, workerConfigs, jobProcessFactory, - orchestratorProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, @@ -161,10 +165,9 @@ public void start() { airbyteVersion); final DbtTransformationActivityImpl dbtTransformationActivity = new DbtTransformationActivityImpl( - containerOrchestratorEnabled, + containerOrchestratorConfig, workerConfigs, jobProcessFactory, - orchestratorProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, @@ -177,10 +180,9 @@ public void start() { final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository); final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl( - containerOrchestratorEnabled, + containerOrchestratorConfig, workerConfigs, jobProcessFactory, - orchestratorProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, @@ -226,10 +228,9 @@ public void start() { * launching or not. */ private ReplicationActivityImpl getReplicationActivityImpl( - final boolean containerOrchestratorEnabled, + final Optional containerOrchestratorConfig, final WorkerConfigs workerConfigs, final ProcessFactory jobProcessFactory, - final ProcessFactory orchestratorProcessFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, final WorkerEnvironment workerEnvironment, @@ -238,33 +239,19 @@ private ReplicationActivityImpl getReplicationActivityImpl( final String databasePassword, final String databaseUrl, final String airbyteVersion) { - if (containerOrchestratorEnabled) { - return new ReplicationActivityImpl( - containerOrchestratorEnabled, - workerConfigs, - orchestratorProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - databaseUser, - databasePassword, - databaseUrl, - airbyteVersion); - } else { - return new ReplicationActivityImpl( - containerOrchestratorEnabled, - workerConfigs, - jobProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - databaseUser, - databasePassword, - databaseUrl, - airbyteVersion); - } + + return new ReplicationActivityImpl( + containerOrchestratorConfig, + workerConfigs, + jobProcessFactory, + secretsHydrator, + workspaceRoot, + workerEnvironment, + logConfigs, + databaseUser, + databasePassword, + databaseUrl, + airbyteVersion); } private static ProcessFactory getJobProcessFactory(final Configs configs) throws IOException { @@ -287,36 +274,34 @@ private static ProcessFactory getJobProcessFactory(final Configs configs) throws } } - private static ProcessFactory getOrchestratorProcessFactory(final Configs configs) throws IOException { - final WorkerConfigs workerConfigs = new WorkerConfigs(configs); - - if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { - final KubernetesClient fabricClient = new DefaultKubernetesClient(); - final String localIp = InetAddress.getLocalHost().getHostAddress(); - final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; - LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace()); - return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, true); - } else { - return new DockerProcessFactory( - workerConfigs, - configs.getWorkspaceRoot(), - configs.getWorkspaceDockerMount(), - configs.getLocalDockerMount(), - - // this needs to point at the Docker network Airbyte is running on, not the host network or job - // runner network, otherwise it can't talk with the db/minio - "airbyte_default", - - true); - } - } - private static WorkerOptions getWorkerOptions(final int max) { return WorkerOptions.newBuilder() .setMaxConcurrentActivityExecutionSize(max) .build(); } + public static record ContainerOrchestratorConfig( + String namespace, + DocumentStoreClient documentStoreClient, + KubernetesClient kubernetesClient) {} + + static Optional getContainerOrchestratorConfig(Configs configs) { + if (configs.getContainerOrchestratorEnabled()) { + final var kubernetesClient = new DefaultKubernetesClient(); + + final DocumentStoreClient documentStoreClient = StateClients.create( + configs.getStateStorageCloudConfigs(), + STATE_STORAGE_PREFIX); + + return Optional.of(new ContainerOrchestratorConfig( + configs.getJobKubeNamespace(), + documentStoreClient, + kubernetesClient)); + } else { + return Optional.empty(); + } + } + public static void main(final String[] args) throws IOException, InterruptedException { final Configs configs = new EnvConfigs(); @@ -336,7 +321,6 @@ public static void main(final String[] args) throws IOException, InterruptedExce } final ProcessFactory jobProcessFactory = getJobProcessFactory(configs); - final ProcessFactory orchestratorProcessFactory = getOrchestratorProcessFactory(configs); final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); @@ -390,6 +374,8 @@ public static void main(final String[] args) throws IOException, InterruptedExce workspaceHelper, workerConfigs); + final Optional containerOrchestratorConfig = getContainerOrchestratorConfig(configs); + final JobNotifier jobNotifier = new JobNotifier( configs.getWebappUrl(), configRepository, @@ -401,7 +387,6 @@ public static void main(final String[] args) throws IOException, InterruptedExce new WorkerApp( workspaceRoot, jobProcessFactory, - orchestratorProcessFactory, secretsHydrator, temporalService, configRepository, @@ -418,7 +403,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce temporalWorkerRunFactory, configs, connectionHelper, - configs.getContainerOrchestratorEnabled(), + containerOrchestratorConfig, jobNotifier, jobTracker).start(); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java new file mode 100644 index 000000000000..58a4e7de1c22 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +public enum AsyncKubePodStatus { + NOT_STARTED, // Pod hasn't been started yet. + INITIALIZING, // On-start container started but not completed + RUNNING, // Main container posted running + FAILED, // Reported status was "failed" or pod was in Error (or other terminal state) without a reported + // status. + SUCCEEDED; // Reported status was "success" so both main and on-start succeeded. +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java new file mode 100644 index 000000000000..0e0ea440d878 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ResourceRequirements; +import io.airbyte.workers.WorkerApp; +import io.airbyte.workers.storage.DocumentStoreClient; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import java.io.IOException; +import java.nio.file.Path; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; + +/** + * This process allows creating and managing a pod outside the lifecycle of the launching + * application. Unlike {@link KubePodProcess} there is no heartbeat mechanism that requires the + * launching pod and the launched pod to co-exist for the duration of execution for the launched + * pod. + * + * Instead, this process creates the pod and interacts with a document store on cloud storage to + * understand the state of the created pod. + * + * The document store is considered to be the truth when retrieving the status for an async pod + * process. If the store isn't updated by the underlying pod, it will appear as failed. + */ +@Slf4j +public class AsyncOrchestratorPodProcess implements KubePod { + + public static final String KUBE_POD_INFO = "KUBE_POD_INFO"; + public static final String NO_OP = "NO_OP"; + + private final KubePodInfo kubePodInfo; + private final DocumentStoreClient documentStoreClient; + private final KubernetesClient kubernetesClient; + private final AtomicReference> cachedExitValue; + + public AsyncOrchestratorPodProcess( + final KubePodInfo kubePodInfo, + final DocumentStoreClient documentStoreClient, + final KubernetesClient kubernetesClient) { + this.kubePodInfo = kubePodInfo; + this.documentStoreClient = documentStoreClient; + this.kubernetesClient = kubernetesClient; + this.cachedExitValue = new AtomicReference<>(Optional.empty()); + } + + public Optional getOutput() { + final var possibleOutput = getDocument(AsyncKubePodStatus.SUCCEEDED.name()); + + if (possibleOutput.isPresent() && possibleOutput.get().isBlank()) { + return Optional.empty(); + } else { + return possibleOutput; + } + } + + private int computeExitValue() { + final AsyncKubePodStatus docStoreStatus = getDocStoreStatus(); + + // trust the doc store if it's in a terminal state + if (docStoreStatus.equals(AsyncKubePodStatus.FAILED)) { + return 1; + } else if (docStoreStatus.equals(AsyncKubePodStatus.SUCCEEDED)) { + return 0; + } + + final Pod pod = kubernetesClient.pods() + .inNamespace(getInfo().namespace()) + .withName(getInfo().name()) + .get(); + + // Since the pod creation blocks until the pod is created the first time, + // if the pod no longer exists (and we don't have a success/fail document) + // we must be in a failure state. If it wasn't able to write out its status + // we must assume failure, since the document store is the "truth" for + // async pod status. + if (pod == null) { + return 1; + } + + // If the pod does exist, it may be in a terminal (error or completed) state. + final boolean isTerminal = KubePodProcess.isTerminal(pod); + + if (isTerminal) { + // In case the doc store was updated in between when we pulled it and when + // we read the status from the Kubernetes API, we need to check the doc store again. + final AsyncKubePodStatus secondDocStoreStatus = getDocStoreStatus(); + if (secondDocStoreStatus.equals(AsyncKubePodStatus.FAILED)) { + return 1; + } else if (secondDocStoreStatus.equals(AsyncKubePodStatus.SUCCEEDED)) { + return 0; + } else { + // otherwise, the actual pod is terminal when the doc store says it shouldn't be. + return 1; + } + } + + // Otherwise, throw an exception because this is still running, which will be caught in hasExited + switch (docStoreStatus) { + case NOT_STARTED -> throw new IllegalThreadStateException("Pod hasn't started yet."); + case INITIALIZING -> throw new IllegalThreadStateException("Pod is initializing."); + default -> throw new IllegalThreadStateException("Pod is running."); + } + } + + @Override + public int exitValue() { + final var optionalCached = cachedExitValue.get(); + + if (optionalCached.isPresent()) { + return optionalCached.get(); + } else { + final var exitValue = computeExitValue(); + cachedExitValue.set(Optional.of(exitValue)); + return exitValue; + } + } + + @Override + public void destroy() { + final var wasDestroyed = kubernetesClient.pods() + .inNamespace(getInfo().namespace()) + .withName(getInfo().name()) + .delete(); + + if (wasDestroyed) { + log.info("Deleted pod {} in namespace {}", getInfo().name(), getInfo().namespace()); + } else { + log.warn("Wasn't able to delete pod {} from namespace {}", getInfo().name(), getInfo().namespace()); + } + } + + // implementation copied from Process.java since this isn't a real Process + public boolean hasExited() { + try { + exitValue(); + return true; + } catch (IllegalThreadStateException e) { + return false; + } + } + + @Override + public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException { + // implementation copied from Process.java since this isn't a real Process + long remainingNanos = unit.toNanos(timeout); + if (hasExited()) + return true; + if (timeout <= 0) + return false; + + long deadline = System.nanoTime() + remainingNanos; + do { + Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(remainingNanos) + 1, 100)); + if (hasExited()) + return true; + remainingNanos = deadline - System.nanoTime(); + } while (remainingNanos > 0); + + return false; + } + + @Override + public int waitFor() throws InterruptedException { + boolean exited = waitFor(10, TimeUnit.DAYS); + + if (exited) { + return exitValue(); + } else { + throw new InterruptedException("Pod did not complete within timeout."); + } + } + + @Override + public KubePodInfo getInfo() { + return kubePodInfo; + } + + private Optional getDocument(final String key) { + return documentStoreClient.read(getInfo().namespace() + "/" + getInfo().name() + "/" + key); + } + + private boolean checkStatus(final AsyncKubePodStatus status) { + return getDocument(status.name()).isPresent(); + } + + /** + * Checks terminal states first, then running, then initialized. Defaults to not started. + * + * The order matters here! + */ + public AsyncKubePodStatus getDocStoreStatus() { + if (checkStatus(AsyncKubePodStatus.FAILED)) { + return AsyncKubePodStatus.FAILED; + } else if (checkStatus(AsyncKubePodStatus.SUCCEEDED)) { + return AsyncKubePodStatus.SUCCEEDED; + } else if (checkStatus(AsyncKubePodStatus.RUNNING)) { + return AsyncKubePodStatus.RUNNING; + } else if (checkStatus(AsyncKubePodStatus.INITIALIZING)) { + return AsyncKubePodStatus.INITIALIZING; + } else { + return AsyncKubePodStatus.NOT_STARTED; + } + } + + // but does that mean there won't be a docker equivalent? + public void create(final String airbyteVersion, + final Map allLabels, + final ResourceRequirements resourceRequirements, + final Map fileMap, + final Map portMap) { + final Volume configVolume = new VolumeBuilder() + .withName("airbyte-config") + .withNewEmptyDir() + .withMedium("Memory") + .endEmptyDir() + .build(); + + final VolumeMount configVolumeMount = new VolumeMountBuilder() + .withName("airbyte-config") + .withMountPath(KubePodProcess.CONFIG_DIR) + .build(); + + final List containerPorts = KubePodProcess.createContainerPortList(portMap); + + final var mainContainer = new ContainerBuilder() + .withName("main") + .withImage("airbyte/container-orchestrator:" + airbyteVersion) + .withResources(KubePodProcess.getResourceRequirementsBuilder(resourceRequirements).build()) + .withPorts(containerPorts) + .withPorts(new ContainerPort(WorkerApp.KUBE_HEARTBEAT_PORT, null, null, null, null)) + .withVolumeMounts(configVolumeMount) + .build(); + + final Pod pod = new PodBuilder() + .withApiVersion("v1") + .withNewMetadata() + .withName(getInfo().name()) + .withNamespace(getInfo().namespace()) + .withLabels(allLabels) + .endMetadata() + .withNewSpec() + .withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true) + .withRestartPolicy("Never") + .withContainers(mainContainer) + .withVolumes(configVolume) + .endSpec() + .build(); + + // should only create after the kubernetes API creates the pod + final var createdPod = kubernetesClient.pods().createOrReplace(pod); + + log.info("Waiting for pod to be running..."); + try { + kubernetesClient.pods() + .inNamespace(kubePodInfo.namespace()) + .withName(kubePodInfo.name()) + .waitUntilCondition(p -> { + return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null; + }, 5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + final var containerState = kubernetesClient.pods() + .inNamespace(kubePodInfo.namespace()) + .withName(kubePodInfo.name()) + .get() + .getStatus() + .getContainerStatuses() + .get(0) + .getState(); + + if (containerState.getRunning() == null) { + throw new RuntimeException("Pod was not running, state was: " + containerState); + } + + final var updatedFileMap = new HashMap<>(fileMap); + updatedFileMap.put(KUBE_POD_INFO, Jsons.serialize(kubePodInfo)); + + copyFilesToKubeConfigVolumeMain(createdPod, updatedFileMap); + } + + public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map files) { + final List> fileEntries = new ArrayList<>(files.entrySet()); + + // copy this file last to indicate that the copy has completed + fileEntries.add(new AbstractMap.SimpleEntry<>(KubePodProcess.SUCCESS_FILE_NAME, "")); + + Path tmpFile = null; + Process proc = null; + for (final Map.Entry file : fileEntries) { + try { + tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue())); + + log.info("Uploading file: " + file.getKey()); + final var containerPath = Path.of(KubePodProcess.CONFIG_DIR + "/" + file.getKey()); + + // using kubectl cp directly here, because both fabric and the official kube client APIs have + // several issues with copying files. See https://github.com/airbytehq/airbyte/issues/8643 for + // details. + final String command = String.format("kubectl cp %s %s/%s:%s -c %s", tmpFile, podDefinition.getMetadata().getNamespace(), + podDefinition.getMetadata().getName(), containerPath, "main"); + log.info(command); + + proc = Runtime.getRuntime().exec(command); + log.info("Waiting for kubectl cp to complete"); + final int exitCode = proc.waitFor(); + + if (exitCode != 0) { + throw new IOException("kubectl cp failed with exit code " + exitCode); + } + + log.info("kubectl cp complete, closing process"); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException(e); + } finally { + if (tmpFile != null) { + tmpFile.toFile().delete(); + } + if (proc != null) { + proc.destroy(); + } + } + } + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java new file mode 100644 index 000000000000..2cd4640eb185 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +import java.util.concurrent.TimeUnit; + +public interface KubePod { + + int exitValue(); + + void destroy(); + + boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException; + + int waitFor() throws InterruptedException; + + KubePodInfo getInfo(); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java new file mode 100644 index 000000000000..0a0ab5949a51 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java @@ -0,0 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +public record KubePodInfo(String namespace, String name) {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 0274f42126e5..c0b6f83401eb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -90,7 +90,7 @@ */ // TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700. -public class KubePodProcess extends Process { +public class KubePodProcess extends Process implements KubePod { private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class); @@ -105,11 +105,11 @@ public class KubePodProcess extends Process { private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin"; private static final String STDOUT_PIPE_FILE = PIPES_DIR + "/stdout"; private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr"; - private static final String CONFIG_DIR = "/config"; + public static final String CONFIG_DIR = "/config"; private static final String TERMINATION_DIR = "/termination"; private static final String TERMINATION_FILE_MAIN = TERMINATION_DIR + "/main"; private static final String TERMINATION_FILE_CHECK = TERMINATION_DIR + "/check"; - private static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING"; + public static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING"; // 143 is the typical SIGTERM exit code. private static final int KILLED_EXIT_CODE = 143; @@ -182,7 +182,7 @@ private static Container getMain(final String image, // communicates its completion to the heartbeat check via a file and closes itself if the heartbeat // fails - final var mainCommand = MoreResources.readResource("entrypoints/main.sh") + final var mainCommand = MoreResources.readResource("entrypoints/sync/main.sh") .replaceAll("TERMINATION_FILE_CHECK", TERMINATION_FILE_CHECK) .replaceAll("TERMINATION_FILE_MAIN", TERMINATION_FILE_MAIN) .replaceAll("OPTIONAL_STDIN", optionalStdin) @@ -191,11 +191,7 @@ private static Container getMain(final String image, .replaceAll("STDERR_PIPE_FILE", STDERR_PIPE_FILE) .replaceAll("STDOUT_PIPE_FILE", STDOUT_PIPE_FILE); - final List containerPorts = internalToExternalPorts.keySet().stream() - .map(integer -> new ContainerPortBuilder() - .withContainerPort(integer) - .build()) - .collect(Collectors.toList()); + final List containerPorts = createContainerPortList(internalToExternalPorts); final List envVars = envMap.entrySet().stream() .map(entry -> new EnvVar(entry.getKey(), entry.getValue(), null)) @@ -218,9 +214,17 @@ private static Container getMain(final String image, return containerBuilder.build(); } - private static void copyFilesToKubeConfigVolume(final KubernetesClient client, - final Pod podDefinition, - final Map files) { + public static List createContainerPortList(final Map internalToExternalPorts) { + return internalToExternalPorts.keySet().stream() + .map(integer -> new ContainerPortBuilder() + .withContainerPort(integer) + .build()) + .collect(Collectors.toList()); + } + + public static void copyFilesToKubeConfigVolume(final KubernetesClient client, + final Pod podDefinition, + final Map files) { final List> fileEntries = new ArrayList<>(files.entrySet()); // copy this file last to indicate that the copy has completed @@ -262,7 +266,11 @@ private static void copyFilesToKubeConfigVolume(final KubernetesClient client, throw new RuntimeException(e); } finally { if (tmpFile != null) { - tmpFile.toFile().delete(); + try { + tmpFile.toFile().delete(); + } catch (Exception e) { + LOGGER.info("Caught exception when deleting temp file but continuing to allow process deletion.", e); + } } if (proc != null) { proc.destroy(); @@ -429,7 +437,7 @@ public KubePodProcess(final boolean isOrchestrator, // communicates via a file if it isn't able to reach the heartbeating server and succeeds if the // main container completes - final String heartbeatCommand = MoreResources.readResource("entrypoints/check.sh") + final String heartbeatCommand = MoreResources.readResource("entrypoints/sync/check.sh") .replaceAll("TERMINATION_FILE_CHECK", TERMINATION_FILE_CHECK) .replaceAll("TERMINATION_FILE_MAIN", TERMINATION_FILE_MAIN) .replaceAll("HEARTBEAT_URL", kubeHeartbeatUrl); @@ -552,7 +560,7 @@ public InputStream getErrorStream() { public int waitFor() throws InterruptedException { final Pod refreshedPod = fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get(); - fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS); + fabricClient.resource(refreshedPod).waitUntilCondition(KubePodProcess::isTerminal, 10, TimeUnit.DAYS); wasKilled.set(true); return exitValue(); } @@ -586,6 +594,11 @@ public Info info() { return new KubePodProcessInfo(podDefinition.getMetadata().getName()); } + @Override + public KubePodInfo getInfo() { + return new KubePodInfo(podDefinition.getMetadata().getNamespace(), podDefinition.getMetadata().getName()); + } + /** * Close all open resource in the opposite order of resource creation. * @@ -620,7 +633,7 @@ private void close() { LOGGER.debug("Closed {}", podDefinition.getMetadata().getName()); } - private boolean isTerminal(final Pod pod) { + public static boolean isTerminal(final Pod pod) { if (pod.getStatus() != null) { // Check if "main" container has terminated, as that defines whether the parent process has // terminated. @@ -698,7 +711,7 @@ public int exitValue() { return returnCode; } - private static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) { + public static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) { if (resourceRequirements != null) { final Map requestMap = new HashMap<>(); // if null then use unbounded resource allocation diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 5323d9e2e4e7..c6b18cfce653 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -117,12 +117,7 @@ public Process create(final String jobId, final int stderrLocalPort = KubePortManagerSingleton.getInstance().take(); LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort); - final var allLabels = new HashMap<>(customLabels); - final var generalKubeLabels = Map.of( - JOB_LABEL_KEY, jobId, - ATTEMPT_LABEL_KEY, String.valueOf(attempt), - WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE); - allLabels.putAll(generalKubeLabels); + final var allLabels = getLabels(jobId, attempt, customLabels); return new KubePodProcess( isOrchestrator, @@ -155,6 +150,19 @@ public Process create(final String jobId, } } + public static Map getLabels(final String jobId, final int attemptId, final Map customLabels) { + final var allLabels = new HashMap<>(customLabels); + + final var generalKubeLabels = Map.of( + JOB_LABEL_KEY, jobId, + ATTEMPT_LABEL_KEY, String.valueOf(attemptId), + WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE); + + allLabels.putAll(generalKubeLabels); + + return allLabels; + } + /** * Docker image names are by convention separated by slashes. The last portion is the image's name. * This is followed by a colon and a version number. e.g. airbyte/scheduler:v1 or diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java b/airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java new file mode 100644 index 000000000000..ce4be0d5e36a --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.storage; + +import io.airbyte.config.storage.CloudStorageConfigs; +import java.nio.file.Path; + +public class StateClients { + + public static DocumentStoreClient create(final CloudStorageConfigs cloudStorageConfigs, final Path prefix) { + DocumentStoreClient documentStoreClient = null; + + switch (cloudStorageConfigs.getType()) { + case S3 -> { + documentStoreClient = S3DocumentStoreClient.s3(cloudStorageConfigs.getS3Config(), prefix); + } + case MINIO -> { + documentStoreClient = S3DocumentStoreClient.minio(cloudStorageConfigs.getMinioConfig(), prefix); + } + case GCS -> { + documentStoreClient = GcsDocumentStoreClient.create(cloudStorageConfigs.getGcsConfig(), prefix); + } + } + + return documentStoreClient; + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java b/airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java deleted file mode 100644 index 2da164ec4008..000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.storage; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import java.util.Optional; -import java.util.UUID; - -/** - * JSON layer over { @link CloudDocumentStore }. - */ -public class WorkerStore { - - private final DocumentStoreClient documentStoreClient; - - public WorkerStore(final DocumentStoreClient documentStoreClient) { - this.documentStoreClient = documentStoreClient; - } - - /** - * Set the document for an id. Overwrites existing document, if present. - * - * @param id - id to associate document with - * @param document - document to persist - */ - void set(final UUID id, final JsonNode document) { - documentStoreClient.write(id.toString(), Jsons.serialize(document)); - } - - /** - * Fetch previously persisted document. - * - * @param id - id that the document is associated with - * @return returns document if present, otherwise empty - */ - Optional get(final UUID id) { - return documentStoreClient.read(id.toString()).map(Jsons::deserialize); - } - - /** - * Delete persisted document. - * - * @param id - id that the document is associated with - * @return true if actually deletes something, otherwise false. (e.g. false if document doest not - * exist). - */ - boolean delete(final UUID id) { - return documentStoreClient.delete(id.toString()); - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java index 4a3eb9eb9efb..46b73467a6d6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java @@ -4,134 +4,35 @@ package io.airbyte.workers.temporal.sync; -import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.logging.LoggingHelper; -import io.airbyte.commons.logging.MdcScope; import io.airbyte.config.OperatorDbtInput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.WorkerException; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.KubeProcessFactory; -import io.airbyte.workers.process.ProcessFactory; -import java.nio.file.Path; import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -// todo: DRY the launchers -public class DbtLauncherWorker implements Worker { +public class DbtLauncherWorker extends LauncherWorker { - private static final Logger LOGGER = LoggerFactory.getLogger(DbtLauncherWorker.class); - - private static final MdcScope.Builder LOG_MDC_BUILDER = new MdcScope.Builder() - .setLogPrefix("dbt-orchestrator") - .setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND); - - public static final String DBT = "dbt"; + public static final String DBT = "dbt-orchestrator"; + private static final String POD_NAME_PREFIX = "orchestrator-dbt"; public static final String INIT_FILE_DESTINATION_LAUNCHER_CONFIG = "destinationLauncherConfig.json"; - private final WorkerConfigs workerConfigs; - private final ProcessFactory processFactory; - private final String airbyteVersion; - private final Path workspaceRoot; - private final IntegrationLauncherConfig destinationLauncherConfig; - private final JobRunConfig jobRunConfig; - - private final AtomicBoolean cancelled = new AtomicBoolean(false); - - private Process process; - - public DbtLauncherWorker( - final Path workspaceRoot, - final IntegrationLauncherConfig destinationLauncherConfig, + public DbtLauncherWorker(final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, final WorkerConfigs workerConfigs, - final ProcessFactory processFactory, + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, final String airbyteVersion) { - this.workspaceRoot = workspaceRoot; - this.destinationLauncherConfig = destinationLauncherConfig; - this.jobRunConfig = jobRunConfig; - this.workerConfigs = workerConfigs; - this.processFactory = processFactory; - this.airbyteVersion = airbyteVersion; - } - - @Override - public Void run(OperatorDbtInput operatorDbtInput, Path jobRoot) throws WorkerException { - try { - final Path jobPath = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); - - // we want to filter down to remove secrets, so we aren't writing over a bunch of unnecessary - // secrets - final Map envMap = System.getenv().entrySet().stream() - .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - final Map fileMap = Map.of( - OrchestratorConstants.INIT_FILE_APPLICATION, DBT, - OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig), - OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(operatorDbtInput), - OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap), - INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)); - - process = processFactory.create( - "runner-" + UUID.randomUUID().toString().substring(0, 10), - 0, - jobPath, - "airbyte/container-orchestrator:" + airbyteVersion, - false, - fileMap, - null, - workerConfigs.getResourceRequirements(), - Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_RUNNER), - Map.of( - WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, - OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, - OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, - OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, - OrchestratorConstants.PORT4, OrchestratorConstants.PORT4)); - - LineGobbler.gobble(process.getInputStream(), LOGGER::info, LOG_MDC_BUILDER); - LineGobbler.gobble(process.getErrorStream(), LOGGER::error, LOG_MDC_BUILDER); - - WorkerUtils.wait(process); - - if (process.exitValue() != 0) { - throw new WorkerException("Non-zero exit code!"); - } - } catch (Exception e) { - if (cancelled.get()) { - throw new WorkerException("Sync was cancelled.", e); - } else { - throw new WorkerException("Running the sync attempt failed", e); - } - } - - return null; - } - - @Override - public void cancel() { - cancelled.set(true); - - if (process == null) { - return; - } - - LOGGER.debug("Closing dbt launcher process"); - WorkerUtils.gentleClose(workerConfigs, process, 1, TimeUnit.MINUTES); - if (process.isAlive() || process.exitValue() != 0) { - LOGGER.error("Dbt launcher process wasn't successful"); - } + super( + DBT, + POD_NAME_PREFIX, + jobRunConfig, + Map.of( + INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)), + containerOrchestratorConfig, + airbyteVersion, + workerConfigs.getResourceRequirements(), + Void.class); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index 404eaef20412..425a915c69bb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -18,20 +18,20 @@ import io.airbyte.workers.DbtTransformationRunner; import io.airbyte.workers.DbtTransformationWorker; import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import java.nio.file.Path; +import java.util.Optional; import java.util.function.Supplier; public class DbtTransformationActivityImpl implements DbtTransformationActivity { - private final boolean containerOrchestratorEnabled; private final WorkerConfigs workerConfigs; private final ProcessFactory jobProcessFactory; - private final ProcessFactory orchestratorProcessFactory; private final SecretsHydrator secretsHydrator; private final Path workspaceRoot; private final AirbyteConfigValidator validator; @@ -41,11 +41,11 @@ public class DbtTransformationActivityImpl implements DbtTransformationActivity private final String databasePassword; private final String databaseUrl; private final String airbyteVersion; + private final Optional containerOrchestratorConfig; - public DbtTransformationActivityImpl(final boolean containerOrchestratorEnabled, + public DbtTransformationActivityImpl(final Optional containerOrchestratorConfig, final WorkerConfigs workerConfigs, final ProcessFactory jobProcessFactory, - final ProcessFactory orchestratorProcessFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, final WorkerEnvironment workerEnvironment, @@ -54,10 +54,9 @@ public DbtTransformationActivityImpl(final boolean containerOrchestratorEnabled, final String databasePassword, final String databaseUrl, final String airbyteVersion) { - this.containerOrchestratorEnabled = containerOrchestratorEnabled; + this.containerOrchestratorConfig = containerOrchestratorConfig; this.workerConfigs = workerConfigs; this.jobProcessFactory = jobProcessFactory; - this.orchestratorProcessFactory = orchestratorProcessFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = new AirbyteConfigValidator(); @@ -85,7 +84,7 @@ public Void run(final JobRunConfig jobRunConfig, final CheckedSupplier, Exception> workerFactory; - if (containerOrchestratorEnabled) { + if (containerOrchestratorConfig.isPresent()) { workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig); } else { workerFactory = getLegacyWorkerFactory(destinationLauncherConfig, jobRunConfig, resourceRequirements); @@ -122,11 +121,10 @@ private CheckedSupplier, Exception> getContainerL final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig) { return () -> new DbtLauncherWorker( - workspaceRoot, destinationLauncherConfig, jobRunConfig, workerConfigs, - orchestratorProcessFactory, + containerOrchestratorConfig.get(), airbyteVersion); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java new file mode 100644 index 000000000000..7481a5ae5b85 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.sync; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ResourceRequirements; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerApp; +import io.airbyte.workers.WorkerException; +import io.airbyte.workers.process.AsyncKubePodStatus; +import io.airbyte.workers.process.AsyncOrchestratorPodProcess; +import io.airbyte.workers.process.KubePodInfo; +import io.airbyte.workers.process.KubeProcessFactory; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +/** + * Coordinates configuring and managing the state of an async process. This is tied to the (job_id, + * attempt_id) and will attempt to kill off lower attempt ids. + * + * @param a json-serializable input class for the worker + * @param either {@link Void} or a json-serializable output class for the worker + */ +@Slf4j +public class LauncherWorker implements Worker { + + private final String application; + private final String podNamePrefix; + private final JobRunConfig jobRunConfig; + private final Map additionalFileMap; + private final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig; + private final String airbyteVersion; + private final ResourceRequirements resourceRequirements; + private final Class outputClass; + + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private AsyncOrchestratorPodProcess process; + + public LauncherWorker( + final String application, + final String podNamePrefix, + final JobRunConfig jobRunConfig, + final Map additionalFileMap, + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, + final String airbyteVersion, + final ResourceRequirements resourceRequirements, + final Class outputClass) { + this.application = application; + this.podNamePrefix = podNamePrefix; + this.jobRunConfig = jobRunConfig; + this.additionalFileMap = additionalFileMap; + this.containerOrchestratorConfig = containerOrchestratorConfig; + this.airbyteVersion = airbyteVersion; + this.resourceRequirements = resourceRequirements; + this.outputClass = outputClass; + } + + @Override + public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException { + try { + final Map envMap = System.getenv().entrySet().stream() + .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + final Map fileMap = new HashMap<>(additionalFileMap); + fileMap.putAll(Map.of( + OrchestratorConstants.INIT_FILE_APPLICATION, application, + OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig), + OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(input), + OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap))); + + final Map portMap = Map.of( + WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, + OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, + OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, + OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, + OrchestratorConstants.PORT4, OrchestratorConstants.PORT4); + + final var allLabels = KubeProcessFactory.getLabels( + jobRunConfig.getJobId(), + Math.toIntExact(jobRunConfig.getAttemptId()), + Collections.emptyMap()); + + final var podNameAndJobPrefix = podNamePrefix + "-j-" + jobRunConfig.getJobId() + "-a-"; + killLowerAttemptIdsIfPresent(podNameAndJobPrefix, jobRunConfig.getAttemptId()); + + final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId(); + final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), podName); + + process = new AsyncOrchestratorPodProcess( + kubePodInfo, + containerOrchestratorConfig.documentStoreClient(), + containerOrchestratorConfig.kubernetesClient()); + + if (process.getDocStoreStatus().equals(AsyncKubePodStatus.NOT_STARTED)) { + process.create( + airbyteVersion, + allLabels, + resourceRequirements, + fileMap, + portMap); + } + + // this waitFor can resume if the activity is re-run + process.waitFor(); + + if (process.exitValue() != 0) { + throw new WorkerException("Non-zero exit code!"); + } + + final var output = process.getOutput(); + + if (output.isPresent()) { + return Jsons.deserialize(output.get(), outputClass); + } else { + throw new WorkerException("Running the " + application + " launcher resulted in no readable output!"); + } + } catch (Exception e) { + if (cancelled.get()) { + throw new WorkerException("Launcher " + application + " was cancelled.", e); + } else { + throw new WorkerException("Running the launcher " + application + " failed", e); + } + } + } + + /** + * If the sync workflow has advanced to the next attempt, we don't want to leave a zombie of the + * older job running (if it exists). In order to ensure a consistent state, we should kill the older + * versions. + */ + private void killLowerAttemptIdsIfPresent(final String podNameAndJobPrefix, final long currentAttempt) { + for (long previousAttempt = currentAttempt - 1; previousAttempt >= 0; previousAttempt--) { + final var podName = podNameAndJobPrefix + previousAttempt; + final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), podName); + final var oldProcess = new AsyncOrchestratorPodProcess( + kubePodInfo, + containerOrchestratorConfig.documentStoreClient(), + containerOrchestratorConfig.kubernetesClient()); + + try { + oldProcess.destroy(); + log.info("Found and destroyed a previous attempt: " + previousAttempt); + } catch (Exception e) { + log.warn("Wasn't able to find and destroy a previous attempt: " + previousAttempt); + } + } + } + + @Override + public void cancel() { + cancelled.set(true); + + if (process == null) { + return; + } + + log.debug("Closing sync runner process"); + process.destroy(); + + if (process.hasExited()) { + log.info("Successfully cancelled process."); + } else { + // try again + process.destroy(); + + if (process.hasExited()) { + log.info("Successfully cancelled process."); + } else { + log.error("Unable to cancel process"); + } + } + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index ee01e9075e67..eb82b2c730eb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -16,20 +16,20 @@ import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.DefaultNormalizationWorker; import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import java.nio.file.Path; +import java.util.Optional; import java.util.function.Supplier; public class NormalizationActivityImpl implements NormalizationActivity { - private final boolean containerOrchestratorEnabled; private final WorkerConfigs workerConfigs; private final ProcessFactory jobProcessFactory; - private final ProcessFactory orchestratorProcessFactory; private final SecretsHydrator secretsHydrator; private final Path workspaceRoot; private final AirbyteConfigValidator validator; @@ -39,11 +39,11 @@ public class NormalizationActivityImpl implements NormalizationActivity { private final String databasePassword; private final String databaseUrl; private final String airbyteVersion; + private final Optional containerOrchestratorConfig; - public NormalizationActivityImpl(final boolean containerOrchestratorEnabled, + public NormalizationActivityImpl(final Optional containerOrchestratorConfig, final WorkerConfigs workerConfigs, final ProcessFactory jobProcessFactory, - final ProcessFactory orchestratorProcessFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, final WorkerEnvironment workerEnvironment, @@ -52,10 +52,9 @@ public NormalizationActivityImpl(final boolean containerOrchestratorEnabled, final String databasePassword, final String databaseUrl, final String airbyteVersion) { - this.containerOrchestratorEnabled = containerOrchestratorEnabled; + this.containerOrchestratorConfig = containerOrchestratorConfig; this.workerConfigs = workerConfigs; this.jobProcessFactory = jobProcessFactory; - this.orchestratorProcessFactory = orchestratorProcessFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = new AirbyteConfigValidator(); @@ -82,7 +81,7 @@ public Void normalize(final JobRunConfig jobRunConfig, final CheckedSupplier, Exception> workerFactory; - if (containerOrchestratorEnabled) { + if (containerOrchestratorConfig.isPresent()) { workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig); } else { workerFactory = getLegacyWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig); @@ -118,11 +117,10 @@ private CheckedSupplier, Exception> getContaine final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig) { return () -> new NormalizationLauncherWorker( - workspaceRoot, destinationLauncherConfig, jobRunConfig, workerConfigs, - orchestratorProcessFactory, + containerOrchestratorConfig.get(), airbyteVersion); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java index ade742b135ad..d353f46c52af 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java @@ -4,132 +4,35 @@ package io.airbyte.workers.temporal.sync; -import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.logging.LoggingHelper; -import io.airbyte.commons.logging.MdcScope; import io.airbyte.config.NormalizationInput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.WorkerException; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.KubeProcessFactory; -import io.airbyte.workers.process.ProcessFactory; -import java.nio.file.Path; import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class NormalizationLauncherWorker implements Worker { +public class NormalizationLauncherWorker extends LauncherWorker { - private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationLauncherWorker.class); - - private static final MdcScope.Builder LOG_MDC_BUILDER = new MdcScope.Builder() - .setLogPrefix("normalization-orchestrator") - .setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND); - - public static final String NORMALIZATION = "normalization"; + public static final String NORMALIZATION = "normalization-orchestrator"; + private static final String POD_NAME_PREFIX = "orchestrator-norm"; public static final String INIT_FILE_DESTINATION_LAUNCHER_CONFIG = "destinationLauncherConfig.json"; - private final WorkerConfigs workerConfigs; - private final ProcessFactory processFactory; - private final String airbyteVersion; - private final AtomicBoolean cancelled = new AtomicBoolean(false); - private final Path workspaceRoot; - private final IntegrationLauncherConfig destinationLauncherConfig; - private final JobRunConfig jobRunConfig; - - private Process process; - - public NormalizationLauncherWorker( - final Path workspaceRoot, - final IntegrationLauncherConfig destinationLauncherConfig, + public NormalizationLauncherWorker(final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, final WorkerConfigs workerConfigs, - final ProcessFactory processFactory, + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, final String airbyteVersion) { - this.workspaceRoot = workspaceRoot; - this.destinationLauncherConfig = destinationLauncherConfig; - this.jobRunConfig = jobRunConfig; - this.workerConfigs = workerConfigs; - this.processFactory = processFactory; - this.airbyteVersion = airbyteVersion; - } - - @Override - public Void run(NormalizationInput normalizationInput, Path jobRoot) throws WorkerException { - try { - final Path jobPath = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); - - // we want to filter down to remove secrets, so we aren't writing over a bunch of unnecessary - // secrets - final Map envMap = System.getenv().entrySet().stream() - .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - final Map fileMap = Map.of( - OrchestratorConstants.INIT_FILE_APPLICATION, NORMALIZATION, - OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig), - OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(normalizationInput), - OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap), - INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)); - - process = processFactory.create( - "runner-" + UUID.randomUUID().toString().substring(0, 10), - 0, - jobPath, - "airbyte/container-orchestrator:" + airbyteVersion, - false, - fileMap, - null, - workerConfigs.getResourceRequirements(), - Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_RUNNER), - Map.of( - WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, - OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, - OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, - OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, - OrchestratorConstants.PORT4, OrchestratorConstants.PORT4)); - - LineGobbler.gobble(process.getInputStream(), LOGGER::info, LOG_MDC_BUILDER); - LineGobbler.gobble(process.getErrorStream(), LOGGER::error, LOG_MDC_BUILDER); - - WorkerUtils.wait(process); - - if (process.exitValue() != 0) { - throw new WorkerException("Non-zero exit code!"); - } - } catch (Exception e) { - if (cancelled.get()) { - throw new WorkerException("Sync was cancelled.", e); - } else { - throw new WorkerException("Running the sync attempt failed", e); - } - } - - return null; - } - - @Override - public void cancel() { - cancelled.set(true); - - if (process == null) { - return; - } - - LOGGER.debug("Closing normalization launcher process"); - WorkerUtils.gentleClose(workerConfigs, process, 1, TimeUnit.MINUTES); - if (process.isAlive() || process.exitValue() != 0) { - LOGGER.error("Normalization launcher process wasn't successful"); - } + super( + NORMALIZATION, + POD_NAME_PREFIX, + jobRunConfig, + Map.of( + INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)), + containerOrchestratorConfig, + airbyteVersion, + workerConfigs.getResourceRequirements(), + Void.class); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java index ce7b456faf71..85de5c0b6f79 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java @@ -5,10 +5,17 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.helpers.LogClientSingleton; import java.util.Set; public class OrchestratorConstants { + // we want to propagate log level, even if it isn't consumed by EnvConfigs + private static final String LOG_LEVEL = "LOG_LEVEL"; + + // necessary for s3/minio logging. used in the log4j2 configuration. + private static final String S3_PATH_STYLE_ACCESS = "S3_PATH_STYLE_ACCESS"; + // set of env vars necessary for the container orchestrator app to run public static final Set ENV_VARS_TO_TRANSFER = Set.of( EnvConfigs.WORKER_ENVIRONMENT, @@ -29,7 +36,26 @@ public class OrchestratorConstants { EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_REQUEST, EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT, EnvConfigs.JOB_DEFAULT_ENV_MAP, - EnvConfigs.LOCAL_ROOT); + EnvConfigs.LOCAL_ROOT, + LOG_LEVEL, + LogClientSingleton.GCS_LOG_BUCKET, + LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, + LogClientSingleton.S3_MINIO_ENDPOINT, + S3_PATH_STYLE_ACCESS, + LogClientSingleton.S3_LOG_BUCKET, + LogClientSingleton.AWS_ACCESS_KEY_ID, + LogClientSingleton.AWS_SECRET_ACCESS_KEY, + LogClientSingleton.S3_LOG_BUCKET_REGION, + EnvConfigs.STATE_STORAGE_GCS_BUCKET_NAME, + EnvConfigs.STATE_STORAGE_GCS_APPLICATION_CREDENTIALS, + EnvConfigs.STATE_STORAGE_MINIO_ENDPOINT, + EnvConfigs.STATE_STORAGE_MINIO_BUCKET_NAME, + EnvConfigs.STATE_STORAGE_MINIO_ACCESS_KEY, + EnvConfigs.STATE_STORAGE_MINIO_SECRET_ACCESS_KEY, + EnvConfigs.STATE_STORAGE_S3_BUCKET_NAME, + EnvConfigs.STATE_STORAGE_S3_ACCESS_KEY, + EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY, + EnvConfigs.STATE_STORAGE_S3_REGION); public static final String INIT_FILE_ENV_MAP = "envMap.json"; public static final String INIT_FILE_INPUT = "input.json"; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 8eb3533b4e1d..763f6e056e50 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -20,6 +20,7 @@ import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.DefaultReplicationWorker; import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.process.AirbyteIntegrationLauncher; @@ -34,6 +35,7 @@ import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import java.nio.file.Path; +import java.util.Optional; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +44,7 @@ public class ReplicationActivityImpl implements ReplicationActivity { private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationActivityImpl.class); - private final boolean containerOrchestratorEnabled; + private final Optional containerOrchestratorConfig; private final WorkerConfigs workerConfigs; private final ProcessFactory processFactory; private final SecretsHydrator secretsHydrator; @@ -56,7 +58,7 @@ public class ReplicationActivityImpl implements ReplicationActivity { private final String databaseUrl; private final String airbyteVersion; - public ReplicationActivityImpl(final boolean containerOrchestratorEnabled, + public ReplicationActivityImpl(final Optional containerOrchestratorConfig, final WorkerConfigs workerConfigs, final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, @@ -67,13 +69,13 @@ public ReplicationActivityImpl(final boolean containerOrchestratorEnabled, final String databasePassword, final String databaseUrl, final String airbyteVersion) { - this(containerOrchestratorEnabled, workerConfigs, processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, + this(containerOrchestratorConfig, workerConfigs, processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, new AirbyteConfigValidator(), databaseUser, databasePassword, databaseUrl, airbyteVersion); } @VisibleForTesting - ReplicationActivityImpl(final boolean containerOrchestratorEnabled, + ReplicationActivityImpl(final Optional containerOrchestratorConfig, final WorkerConfigs workerConfigs, final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, @@ -85,7 +87,7 @@ public ReplicationActivityImpl(final boolean containerOrchestratorEnabled, final String databasePassword, final String databaseUrl, final String airbyteVersion) { - this.containerOrchestratorEnabled = containerOrchestratorEnabled; + this.containerOrchestratorConfig = containerOrchestratorConfig; this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; @@ -119,8 +121,9 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, final CheckedSupplier, Exception> workerFactory; - if (containerOrchestratorEnabled) { - workerFactory = getContainerLauncherWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput); + if (containerOrchestratorConfig.isPresent()) { + workerFactory = getContainerLauncherWorkerFactory(containerOrchestratorConfig.get(), sourceLauncherConfig, destinationLauncherConfig, + jobRunConfig, syncInput); } else { workerFactory = getLegacyWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput); } @@ -202,17 +205,16 @@ private CheckedSupplier, Exception> } private CheckedSupplier, Exception> getContainerLauncherWorkerFactory( + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, final StandardSyncInput syncInput) { return () -> new ReplicationLauncherWorker( + containerOrchestratorConfig, sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, - syncInput, - workspaceRoot, - processFactory, airbyteVersion, workerConfigs); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java index 6fad439553e8..d8dcadaa0052 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java @@ -4,167 +4,45 @@ package io.airbyte.workers.temporal.sync; -import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.logging.LoggingHelper; -import io.airbyte.commons.logging.MdcScope; import io.airbyte.config.ReplicationOutput; import io.airbyte.config.StandardSyncInput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerApp; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.WorkerException; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.KubeProcessFactory; -import io.airbyte.workers.process.ProcessFactory; -import java.nio.file.Path; import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Launches a container-orchestrator container/pod to manage the message passing for the replication * step. This step configs onto the container-orchestrator and retrieves logs and the output from * the container-orchestrator. */ -public class ReplicationLauncherWorker implements Worker { +public class ReplicationLauncherWorker extends LauncherWorker { - private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationLauncherWorker.class); - - private static final MdcScope.Builder LOG_MDC_BUILDER = new MdcScope.Builder() - .setLogPrefix("replication-orchestrator") - .setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND); - - public static final String REPLICATION = "replication"; + public static final String REPLICATION = "replication-orchestrator"; + private static final String POD_NAME_PREFIX = "orchestrator-repl"; public static final String INIT_FILE_SOURCE_LAUNCHER_CONFIG = "sourceLauncherConfig.json"; public static final String INIT_FILE_DESTINATION_LAUNCHER_CONFIG = "destinationLauncherConfig.json"; - private final AtomicBoolean cancelled = new AtomicBoolean(false); - private final IntegrationLauncherConfig sourceLauncherConfig; - private final IntegrationLauncherConfig destinationLauncherConfig; - private final JobRunConfig jobRunConfig; - private final StandardSyncInput syncInput; - private final Path workspaceRoot; - private final ProcessFactory processFactory; - private final String airbyteVersion; - private final WorkerConfigs workerConfigs; - - private Process process; - public ReplicationLauncherWorker( + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, - final StandardSyncInput syncInput, - final Path workspaceRoot, - final ProcessFactory processFactory, final String airbyteVersion, final WorkerConfigs workerConfigs) { - - this.sourceLauncherConfig = sourceLauncherConfig; - this.destinationLauncherConfig = destinationLauncherConfig; - this.jobRunConfig = jobRunConfig; - this.syncInput = syncInput; - this.workspaceRoot = workspaceRoot; - this.processFactory = processFactory; - this.airbyteVersion = airbyteVersion; - this.workerConfigs = workerConfigs; - } - - @Override - public ReplicationOutput run(StandardSyncInput standardSyncInput, Path jobRoot) throws WorkerException { - try { - final Path jobPath = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); - - // we want to filter down to remove secrets, so we aren't writing over a bunch of unnecessary - // secrets - final Map envMap = System.getenv().entrySet().stream() - .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - final Map fileMap = Map.of( - OrchestratorConstants.INIT_FILE_APPLICATION, REPLICATION, - OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig), - OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(syncInput), - OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap), - INIT_FILE_SOURCE_LAUNCHER_CONFIG, Jsons.serialize(sourceLauncherConfig), - INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)); - - process = processFactory.create( - "runner-" + UUID.randomUUID().toString().substring(0, 10), - 0, - jobPath, - "airbyte/container-orchestrator:" + airbyteVersion, - false, - fileMap, - null, - workerConfigs.getResourceRequirements(), - Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_RUNNER), - Map.of( - WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, - OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, - OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, - OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, - OrchestratorConstants.PORT4, OrchestratorConstants.PORT4)); - - final AtomicReference output = new AtomicReference<>(); - - LineGobbler.gobble(process.getInputStream(), line -> { - final Optional maybeOutput = Jsons.tryDeserialize(line, ReplicationOutput.class); - - if (maybeOutput.isPresent()) { - LOGGER.info("Found output!"); - output.set(maybeOutput.get()); - } else { - try (final var mdcScope = LOG_MDC_BUILDER.build()) { - LOGGER.info(line); - } - } - }); - - LineGobbler.gobble(process.getErrorStream(), LOGGER::error, LOG_MDC_BUILDER); - - WorkerUtils.wait(process); - - if (process.exitValue() != 0) { - throw new WorkerException("Non-zero exit code!"); - } - - if (output.get() != null) { - return output.get(); - } else { - throw new WorkerException("Running the sync attempt resulted in no readable output!"); - } - } catch (Exception e) { - if (cancelled.get()) { - throw new WorkerException("Sync was cancelled.", e); - } else { - throw new WorkerException("Running the sync attempt failed", e); - } - } - } - - @Override - public void cancel() { - cancelled.set(true); - - if (process == null) { - return; - } - - LOGGER.debug("Closing replication launcher process"); - WorkerUtils.gentleClose(workerConfigs, process, 1, TimeUnit.MINUTES); - if (process.isAlive() || process.exitValue() != 0) { - LOGGER.error("Replication launcher process wasn't successful"); - } + super( + REPLICATION, + POD_NAME_PREFIX, + jobRunConfig, + Map.of( + INIT_FILE_SOURCE_LAUNCHER_CONFIG, Jsons.serialize(sourceLauncherConfig), + INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)), + containerOrchestratorConfig, + airbyteVersion, + workerConfigs.getResourceRequirements(), + ReplicationOutput.class); } } diff --git a/airbyte-workers/src/main/resources/entrypoints/check.sh b/airbyte-workers/src/main/resources/entrypoints/sync/check.sh similarity index 100% rename from airbyte-workers/src/main/resources/entrypoints/check.sh rename to airbyte-workers/src/main/resources/entrypoints/sync/check.sh diff --git a/airbyte-workers/src/main/resources/entrypoints/main.sh b/airbyte-workers/src/main/resources/entrypoints/sync/main.sh similarity index 100% rename from airbyte-workers/src/main/resources/entrypoints/main.sh rename to airbyte-workers/src/main/resources/entrypoints/sync/main.sh diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java new file mode 100644 index 000000000000..49b648b30299 --- /dev/null +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.EnvConfigs; +import io.airbyte.config.storage.CloudStorageConfigs; +import io.airbyte.config.storage.MinioS3ClientFactory; +import io.airbyte.workers.WorkerApp; +import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.storage.DocumentStoreClient; +import io.airbyte.workers.storage.S3DocumentStoreClient; +import io.airbyte.workers.temporal.sync.OrchestratorConstants; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; + +public class AsyncOrchestratorPodProcessIntegrationTest { + + private static KubernetesClient kubernetesClient; + private static DocumentStoreClient documentStoreClient; + private static Process portForwardProcess; + + @BeforeAll + public static void init() throws Exception { + kubernetesClient = new DefaultKubernetesClient(); + + final var podName = "test-minio-" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + + final var minioContainer = new ContainerBuilder() + .withName("minio") + .withImage("minio/minio:latest") + .withArgs("server", "/home/shared") + .withEnv( + new EnvVar("MINIO_ACCESS_KEY", "minio", null), + new EnvVar("MINIO_SECRET_KEY", "minio123", null)) + .withPorts(new ContainerPort(9000, null, null, null, null)) + .build(); + + final Pod minioPod = new PodBuilder() + .withApiVersion("v1") + .withNewMetadata() + .withName(podName) + .withNamespace("default") + .endMetadata() + .withNewSpec() + .withRestartPolicy("Never") + .withContainers(minioContainer) + .endSpec() + .build(); + + kubernetesClient.pods().inNamespace("default").create(minioPod); + kubernetesClient.resource(minioPod).waitUntilReady(1, TimeUnit.MINUTES); + + portForwardProcess = new ProcessBuilder("kubectl", "port-forward", "pod/" + podName, "9432:9000").start(); + + final var localMinioEndpoint = "http://localhost:9432"; + + final var minioConfig = new CloudStorageConfigs.MinioConfig( + "anything", + "minio", + "minio123", + localMinioEndpoint); + + final var s3Client = new MinioS3ClientFactory(minioConfig).get(); + + final var createBucketRequest = CreateBucketRequest.builder() + .bucket("anything") + .build(); + + s3Client.createBucket(createBucketRequest); + + documentStoreClient = S3DocumentStoreClient.minio( + minioConfig, + Path.of("/")); + } + + @Test + public void test() throws InterruptedException { + final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + + // make kubepodinfo + final var kubePodInfo = new KubePodInfo("default", podName); + + // another activity issues the request to create the pod process -> here we'll just create it + final var asyncProcess = new AsyncOrchestratorPodProcess( + kubePodInfo, + documentStoreClient, + kubernetesClient); + + final Map portMap = Map.of( + WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, + OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, + OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, + OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, + OrchestratorConstants.PORT4, OrchestratorConstants.PORT4); + + final Map envMap = System.getenv().entrySet().stream() + .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + asyncProcess.create("dev", Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of( + OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP, + OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap); + + // a final activity waits until there is output from the kube pod process + asyncProcess.waitFor(10, TimeUnit.SECONDS); + + final var exitValue = asyncProcess.exitValue(); + final var output = asyncProcess.getOutput(); + + assertEquals(0, exitValue); + assertTrue(output.isPresent()); + assertEquals("expected output", output.get()); + } + + @AfterAll + public static void teardown() { + try { + portForwardProcess.destroyForcibly(); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + kubernetesClient.pods().delete(); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/storage/WorkerStoreTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/storage/WorkerStoreTest.java deleted file mode 100644 index 181299de2c9c..000000000000 --- a/airbyte-workers/src/test/java/io/airbyte/workers/storage/WorkerStoreTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import java.util.Optional; -import java.util.UUID; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; - -class WorkerStoreTest { - - private static final UUID ID = UUID.randomUUID(); - private static final JsonNode DOCUMENT = Jsons.jsonNode(ImmutableMap.of("a", 1)); - - private DocumentStoreClient documentStore; - private WorkerStore store; - - @BeforeEach - void setup() { - documentStore = mock(DocumentStoreClient.class); - store = new WorkerStore(documentStore); - } - - @Test - void testWrite() { - store.set(ID, DOCUMENT); - // overwrites are allowed, so test calling it twice. - store.set(ID, DOCUMENT); - verify(documentStore, times(2)).write(ID.toString(), Jsons.serialize(DOCUMENT)); - } - - @Test - void testReadExists() { - when(documentStore.read(ID.toString())).thenReturn(Optional.of(Jsons.serialize(DOCUMENT))); - assertEquals(Optional.of(DOCUMENT), store.get(ID)); - } - - @Test - void testReadNotExists() { - when(documentStore.read(ID.toString())).thenReturn(Optional.empty()); - assertEquals(Optional.empty(), store.get(ID)); - } - - @Test - void testDeleteExists() { - when(documentStore.delete(ID.toString())).thenReturn(true); - assertTrue(store.delete(ID)); - } - - @Test - void testDeleteNotExists() { - when(documentStore.delete(ID.toString())).thenReturn(false); - assertFalse(store.delete(ID)); - } - -} diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 073b20249ac0..0b4bc1214175 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -38,6 +38,8 @@ data: S3_PATH_STYLE_ACCESS: {{ include "airbyte.s3PathStyleAccess" . | quote }} STATE_STORAGE_MINIO_BUCKET_NAME: airbyte-state-storage STATE_STORAGE_MINIO_ENDPOINT: {{ include "airbyte.minio.endpoint" . | quote }} + STATE_STORAGE_MINIO_ACCESS_KEY: {{ include "airbyte.minio.accessKey.password" . | quote }} + STATE_STORAGE_MINIO_SECRET_ACCESS_KEY: {{ include "airbyte.minio.secretKey.password" . | quote }} SUBMITTER_NUM_THREADS: "10" TEMPORAL_HOST: {{ include "common.names.fullname" . }}-temporal:{{ .Values.temporal.service.port }} TEMPORAL_WORKER_PORTS: 9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030,9031,9032,9033,9034,9035,9036,9037,9038,9039,9040 diff --git a/charts/airbyte/templates/worker/deployment.yaml b/charts/airbyte/templates/worker/deployment.yaml index d62546dd918a..fd6c110f74fd 100644 --- a/charts/airbyte/templates/worker/deployment.yaml +++ b/charts/airbyte/templates/worker/deployment.yaml @@ -187,6 +187,16 @@ spec: configMapKeyRef: name: airbyte-env key: STATE_STORAGE_MINIO_BUCKET_NAME + - name: STATE_STORAGE_MINIO_ACCESS_KEY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_ACCESS_KEY + - name: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY - name: STATE_STORAGE_MINIO_ENDPOINT valueFrom: configMapKeyRef: diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 6cadc72d4a7f..e85cf73e04d3 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -48,7 +48,7 @@ S3_PATH_STYLE_ACCESS=true GCS_LOG_BUCKET= # State Storage Configuration -STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-state-storage +STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-dev-logs STATE_STORAGE_MINIO_ENDPOINT=http://airbyte-minio-svc:9000 # Docker Resource Limits @@ -63,3 +63,7 @@ JOB_KUBE_NODE_SELECTORS= # Job image pull policy JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= + +# Launch a separate pod to orchestrate sync steps +CONTAINER_ORCHESTRATOR_ENABLED=false + diff --git a/kube/overlays/dev-integration-test/.secrets b/kube/overlays/dev-integration-test/.secrets index bf69a08191f3..67e8f4aae977 100644 --- a/kube/overlays/dev-integration-test/.secrets +++ b/kube/overlays/dev-integration-test/.secrets @@ -3,3 +3,5 @@ DATABASE_PASSWORD=docker AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=minio123 GOOGLE_APPLICATION_CREDENTIALS= +STATE_STORAGE_MINIO_ACCESS_KEY=minio +STATE_STORAGE_MINIO_SECRET_ACCESS_KEY=minio123 diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 61acc4aad498..5e4680e590cf 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -50,7 +50,7 @@ S3_PATH_STYLE_ACCESS=true GCS_LOG_BUCKET= # State Storage Configuration -STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-state-storage +STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-dev-logs STATE_STORAGE_MINIO_ENDPOINT=http://airbyte-minio-svc:9000 # Docker Resource Limits @@ -65,3 +65,6 @@ JOB_KUBE_NODE_SELECTORS= # Job image pull policy JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= + +# Launch a separate pod to orchestrate sync steps +CONTAINER_ORCHESTRATOR_ENABLED=false diff --git a/kube/overlays/dev/.secrets b/kube/overlays/dev/.secrets index bf69a08191f3..67e8f4aae977 100644 --- a/kube/overlays/dev/.secrets +++ b/kube/overlays/dev/.secrets @@ -3,3 +3,5 @@ DATABASE_PASSWORD=docker AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=minio123 GOOGLE_APPLICATION_CREDENTIALS= +STATE_STORAGE_MINIO_ACCESS_KEY=minio +STATE_STORAGE_MINIO_SECRET_ACCESS_KEY=minio123 diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 041ebcb9c50b..4d6d391b6315 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -50,7 +50,7 @@ S3_PATH_STYLE_ACCESS=true GCS_LOG_BUCKET= # State Storage Configuration -STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-state-storage +STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-dev-logs STATE_STORAGE_MINIO_ENDPOINT=http://airbyte-minio-svc:9000 # Docker Resource Limits @@ -65,3 +65,7 @@ JOB_KUBE_NODE_SELECTORS= # Job image pull policy JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= + +# Launch a separate pod to orchestrate sync steps +CONTAINER_ORCHESTRATOR_ENABLED=false + diff --git a/kube/overlays/stable-with-resource-limits/.secrets b/kube/overlays/stable-with-resource-limits/.secrets index bf69a08191f3..c9f0964dda5b 100644 --- a/kube/overlays/stable-with-resource-limits/.secrets +++ b/kube/overlays/stable-with-resource-limits/.secrets @@ -2,4 +2,5 @@ DATABASE_USER=docker DATABASE_PASSWORD=docker AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=minio123 -GOOGLE_APPLICATION_CREDENTIALS= +STATE_STORAGE_MINIO_ACCESS_KEY=minio +STATE_STORAGE_MINIO_SECRET_ACCESS_KEY=minio123 diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 041ebcb9c50b..4d6d391b6315 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -50,7 +50,7 @@ S3_PATH_STYLE_ACCESS=true GCS_LOG_BUCKET= # State Storage Configuration -STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-state-storage +STATE_STORAGE_MINIO_BUCKET_NAME=airbyte-dev-logs STATE_STORAGE_MINIO_ENDPOINT=http://airbyte-minio-svc:9000 # Docker Resource Limits @@ -65,3 +65,7 @@ JOB_KUBE_NODE_SELECTORS= # Job image pull policy JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= + +# Launch a separate pod to orchestrate sync steps +CONTAINER_ORCHESTRATOR_ENABLED=false + diff --git a/kube/overlays/stable/.secrets b/kube/overlays/stable/.secrets index bf69a08191f3..67e8f4aae977 100644 --- a/kube/overlays/stable/.secrets +++ b/kube/overlays/stable/.secrets @@ -3,3 +3,5 @@ DATABASE_PASSWORD=docker AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=minio123 GOOGLE_APPLICATION_CREDENTIALS= +STATE_STORAGE_MINIO_ACCESS_KEY=minio +STATE_STORAGE_MINIO_SECRET_ACCESS_KEY=minio123 diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index 0e051e1bfaf8..2c8c5eb92e90 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -184,6 +184,32 @@ spec: configMapKeyRef: name: airbyte-env key: JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY + # todo: add other state storage keys + - name: STATE_STORAGE_MINIO_BUCKET_NAME + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_BUCKET_NAME + - name: STATE_STORAGE_MINIO_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_ENDPOINT + - name: STATE_STORAGE_MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: STATE_STORAGE_MINIO_ACCESS_KEY + - name: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + - name: CONTAINER_ORCHESTRATOR_ENABLED + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONTAINER_ORCHESTRATOR_ENABLED ports: - containerPort: 9000 # for heartbeat server - containerPort: 9001 # start temporal worker port pool