Skip to content

Commit

Permalink
async container launch kubernetes "process" (airbytehq#9242)
Browse files Browse the repository at this point in the history
* add misc todos

* save work so far

* configure async pod processing

* remove comment

* fmt

* working except logging propagation?

* add comment

* add logging and misc configuration fixes

* add output propagation

* fix state reading

* logging is working (but background highlighting is not)

* fix log highlighting

* use sys instead of ctx

* comment

* clean up and test state management

* clean up orchestrator app construction

* unify launcher workers and handle resuming

* respond to comments

* misc

* disable

* fix comment

* respond to comments
  • Loading branch information
jrhizor authored Jan 20, 2022
1 parent 9afbbff commit db40932
Show file tree
Hide file tree
Showing 48 changed files with 1,496 additions and 680 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.1.001


### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001
Expand Down
2 changes: 1 addition & 1 deletion airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}">
<PatternLayout pattern="${default-pattern}"/>
<PatternLayout pattern="${simple-pattern}"/>
</Log4j2Appender>
</Route>
</Routes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ public class EnvConfigs implements Configs {
private static final String JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS = "JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS";
private static final String CONTAINER_ORCHESTRATOR_ENABLED = "CONTAINER_ORCHESTRATOR_ENABLED";

private static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
private static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
private static final String STATE_STORAGE_S3_ACCESS_KEY = "STATE_STORAGE_S3_ACCESS_KEY";
private static final String STATE_STORAGE_S3_SECRET_ACCESS_KEY = "STATE_STORAGE_S3_SECRET_ACCESS_KEY";
private static final String STATE_STORAGE_MINIO_BUCKET_NAME = "STATE_STORAGE_MINIO_BUCKET_NAME";
private static final String STATE_STORAGE_MINIO_ENDPOINT = "STATE_STORAGE_MINIO_ENDPOINT";
private static final String STATE_STORAGE_MINIO_ACCESS_KEY = "STATE_STORAGE_MINIO_ACCESS_KEY";
private static final String STATE_STORAGE_MINIO_SECRET_ACCESS_KEY = "STATE_STORAGE_MINIO_SECRET_ACCESS_KEY";
private static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME";
private static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS";
public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
public static final String STATE_STORAGE_S3_ACCESS_KEY = "STATE_STORAGE_S3_ACCESS_KEY";
public static final String STATE_STORAGE_S3_SECRET_ACCESS_KEY = "STATE_STORAGE_S3_SECRET_ACCESS_KEY";
public static final String STATE_STORAGE_MINIO_BUCKET_NAME = "STATE_STORAGE_MINIO_BUCKET_NAME";
public static final String STATE_STORAGE_MINIO_ENDPOINT = "STATE_STORAGE_MINIO_ENDPOINT";
public static final String STATE_STORAGE_MINIO_ACCESS_KEY = "STATE_STORAGE_MINIO_ACCESS_KEY";
public static final String STATE_STORAGE_MINIO_SECRET_ACCESS_KEY = "STATE_STORAGE_MINIO_SECRET_ACCESS_KEY";
public static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME";
public static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
Expand Down Expand Up @@ -172,18 +172,18 @@ private Optional<CloudStorageConfigs> getStateStorageConfiguration() {
if (getEnv(STATE_STORAGE_GCS_BUCKET_NAME) != null) {
return Optional.of(CloudStorageConfigs.gcs(new GcsConfig(
getEnvOrDefault(STATE_STORAGE_GCS_BUCKET_NAME, ""),
getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, ""))));
getEnvOrDefault(STATE_STORAGE_GCS_APPLICATION_CREDENTIALS, ""))));
} else if (getEnv(STATE_STORAGE_MINIO_ENDPOINT) != null) {
return Optional.of(CloudStorageConfigs.minio(new MinioConfig(
getEnvOrDefault(STATE_STORAGE_MINIO_BUCKET_NAME, ""),
getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_MINIO_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_MINIO_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_MINIO_ENDPOINT, ""))));
} else if (getEnv(STATE_STORAGE_S3_REGION) != null) {
return Optional.of(CloudStorageConfigs.s3(new S3Config(
getEnvOrDefault(STATE_STORAGE_S3_BUCKET_NAME, ""),
getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_S3_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_S3_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_S3_REGION, ""))));
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
package io.airbyte.config.storage;

import com.google.api.client.util.Preconditions;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;

/**
Expand All @@ -16,8 +19,11 @@
*/
public class DefaultGcsClientFactory implements Supplier<Storage> {

private final GcsConfig config;

public DefaultGcsClientFactory(final GcsConfig config) {
validate(config);
this.config = config;
}

private static void validate(final GcsConfig config) {
Expand All @@ -27,7 +33,13 @@ private static void validate(final GcsConfig config) {

@Override
public Storage get() {
return StorageOptions.getDefaultInstance().getService();
try {
final var credentialsByteStream = new ByteArrayInputStream(config.getGoogleApplicationCredentials().getBytes(StandardCharsets.UTF_8));
final var credentials = ServiceAccountCredentials.fromStream(credentialsByteStream);
return StorageOptions.newBuilder().setCredentials(credentials).build().getService();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.S3ApiWorkerStorageConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.util.function.Supplier;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

Expand Down Expand Up @@ -41,6 +42,7 @@ static void validateBase(final S3ApiWorkerStorageConfig s3BaseConfig) {
@Override
public S3Client get() {
final var builder = S3Client.builder();
builder.credentialsProvider(() -> AwsBasicCredentials.create(s3Config.getAwsAccessKey(), s3Config.getAwsSecretAccessKey()));
builder.region(Region.of(s3Config.getRegion()));
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Supplier;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

Expand Down Expand Up @@ -40,6 +41,7 @@ public S3Client get() {
final var minioEndpoint = minioConfig.getMinioEndpoint();
try {
final var minioUri = new URI(minioEndpoint);
builder.credentialsProvider(() -> AwsBasicCredentials.create(minioConfig.getAwsAccessKey(), minioConfig.getAwsSecretAccessKey()));
builder.endpointOverride(minioUri);
builder.region(Region.US_EAST_1); // Although this is not used, the S3 client will error out if this is not set. Set a stub value.
} catch (final URISyntaxException e) {
Expand Down
7 changes: 7 additions & 0 deletions airbyte-container-orchestrator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ RUN add-apt-repository \
stable"
RUN apt-get update && apt-get install -y docker-ce-cli jq

# Install kubectl for copying files to kube pods. Eventually should be replaced with a kube java client.
# See https://github.com/airbytehq/airbyte/issues/8643 for more information on why we are using kubectl for copying.
# The following commands were taken from https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/#install-using-native-package-management
RUN curl -fsSLo /usr/share/keyrings/kubernetes-archive-keyring.gpg https://packages.cloud.google.com/apt/doc/apt-key.gpg
RUN echo "deb [signed-by=/usr/share/keyrings/kubernetes-archive-keyring.gpg] https://apt.kubernetes.io/ kubernetes-xenial main" | tee /etc/apt/sources.list.d/kubernetes.list
RUN apt-get update && apt-get install -y kubectl

ENV APPLICATION airbyte-container-orchestrator
ENV AIRBYTE_ENTRYPOINT "/app/${APPLICATION}-0.35.6-alpha/bin/${APPLICATION}"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.container_orchestrator;

import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.KubePodInfo;

/**
* The state manager writes the "truth" for states of the async pod process. If the store isn't
* updated by the underlying pod, it will appear as failed.
*
* It doesn't have a single value for a state. Instead, in a location on cloud storage or disk, it
* writes every state it's encountered.
*/
public interface AsyncStateManager {

/**
* Writes a file containing a string value to a location designated by the input status.
*/
void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value);

/**
* Writes an empty file to a location designated by the input status.
*/
void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status);

/**
* Interprets the state given all written state messages for the pod.
*/
AsyncKubePodStatus getStatus(final KubePodInfo kubePodInfo);

/**
* @return the output stored in the success file. This can be an empty string.
* @throws IllegalArgumentException if no success file exists
*/
String getOutput(final KubePodInfo kubePodInfo) throws IllegalArgumentException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@

package io.airbyte.container_orchestrator;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.storage.StateClients;
import io.airbyte.workers.temporal.sync.DbtLauncherWorker;
import io.airbyte.workers.temporal.sync.NormalizationLauncherWorker;
import io.airbyte.workers.temporal.sync.OrchestratorConstants;
Expand All @@ -22,9 +31,9 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -41,54 +50,129 @@
@Slf4j
public class ContainerOrchestratorApp {

public static void main(final String[] args) throws Exception {
WorkerHeartbeatServer heartbeatServer = null;
private final String application;
private final Map<String, String> envMap;
private final JobRunConfig jobRunConfig;
private final KubePodInfo kubePodInfo;
private final Configs configs;

try {
// read files that contain all necessary configuration
final String application = Files.readString(Path.of(OrchestratorConstants.INIT_FILE_APPLICATION));
final Map<String, String> envMap =
(Map<String, String>) Jsons.deserialize(Files.readString(Path.of(OrchestratorConstants.INIT_FILE_ENV_MAP)), Map.class);
public ContainerOrchestratorApp(
final String application,
final Map<String, String> envMap,
final JobRunConfig jobRunConfig,
final KubePodInfo kubePodInfo) {
this.application = application;
this.envMap = envMap;
this.jobRunConfig = jobRunConfig;
this.kubePodInfo = kubePodInfo;
this.configs = new EnvConfigs(envMap);
}

final Configs configs = new EnvConfigs(envMap);
private void configureLogging() {
for (String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) {
if (envMap.containsKey(envVar)) {
System.setProperty(envVar, envMap.get(envVar));
}
}

heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
heartbeatServer.startBackground();
final var logClient = LogClientSingleton.getInstance();
logClient.setJobMdc(
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId()));
}

/**
* Handles state updates (including writing failures) and running the job orchestrator. As much of
* the initialization as possible should go in here so it's logged properly and the state storage is
* updated appropriately.
*/
private void runInternal(final DefaultAsyncStateManager asyncStateManager) {
try {
asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.INITIALIZING);

final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
final ProcessFactory processFactory = getProcessBuilderFactory(configs, workerConfigs);
final JobOrchestrator<?> jobOrchestrator = getJobOrchestrator(configs, workerConfigs, processFactory, application);

log.info("Starting {} orchestrator...", jobOrchestrator.getOrchestratorName());
jobOrchestrator.runJob();
log.info("{} orchestrator complete!", jobOrchestrator.getOrchestratorName());
} finally {
if (heartbeatServer != null) {
log.info("Shutting down heartbeat server...");
heartbeatServer.stop();
if (jobOrchestrator == null) {
throw new IllegalStateException("Could not find job orchestrator for application: " + application);
}

final var heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
heartbeatServer.startBackground();

asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.RUNNING);

final Optional<String> output = jobOrchestrator.runJob();

asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.SUCCEEDED, output.orElse(""));

// required to kill clients with thread pools
System.exit(0);
} catch (Throwable t) {
asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.FAILED);
System.exit(1);
}
}

// required to kill kube client
log.info("Runner closing...");
System.exit(0);
/**
* Configures logging/mdc scope, and creates all objects necessary to handle state updates.
* Everything else is delegated to {@link ContainerOrchestratorApp#runInternal}.
*/
public void run() {
configureLogging();

// set mdc scope for the remaining execution
try (final var mdcScope = new MdcScope.Builder()
.setLogPrefix(application)
.setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND)
.build()) {

// IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
// version is deployed!
final var documentStoreClient = StateClients.create(configs.getStateStorageCloudConfigs(), WorkerApp.STATE_STORAGE_PREFIX);
final var asyncStateManager = new DefaultAsyncStateManager(documentStoreClient);

runInternal(asyncStateManager);
}
}

public static void main(final String[] args) {
try {
// wait for config files to be copied
final var successFile = Path.of(KubePodProcess.CONFIG_DIR, KubePodProcess.SUCCESS_FILE_NAME);

while (!successFile.toFile().exists()) {
log.info("Waiting for config file transfers to complete...");
Thread.sleep(1000);
}

final var applicationName = JobOrchestrator.readApplicationName();
final var envMap = JobOrchestrator.readEnvMap();
final var jobRunConfig = JobOrchestrator.readJobRunConfig();
final var kubePodInfo = JobOrchestrator.readKubePodInfo();

final var app = new ContainerOrchestratorApp(applicationName, envMap, jobRunConfig, kubePodInfo);
app.run();
} catch (Throwable t) {
log.info("Orchestrator failed...", t);
System.exit(1);
}
}

private static JobOrchestrator<?> getJobOrchestrator(final Configs configs,
final WorkerConfigs workerConfigs,
final ProcessFactory processFactory,
final String application) {
if (application.equals(ReplicationLauncherWorker.REPLICATION)) {
return new ReplicationJobOrchestrator(configs, workerConfigs, processFactory);
} else if (application.equals(NormalizationLauncherWorker.NORMALIZATION)) {
return new NormalizationJobOrchestrator(configs, workerConfigs, processFactory);
} else if (application.equals(DbtLauncherWorker.DBT)) {
return new DbtJobOrchestrator(configs, workerConfigs, processFactory);
} else {
log.error("Runner failed", new IllegalStateException("Unexpected value: " + application));
System.exit(1);
throw new IllegalStateException(); // should never be reached, but necessary to compile
}

return switch (application) {
case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(configs, workerConfigs, processFactory);
case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(configs, workerConfigs, processFactory);
case DbtLauncherWorker.DBT -> new DbtJobOrchestrator(configs, workerConfigs, processFactory);
case AsyncOrchestratorPodProcess.NO_OP -> new NoOpOrchestrator();
default -> null;
};
}

/**
Expand Down
Loading

0 comments on commit db40932

Please sign in to comment.