Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporal POC #3778

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gobblin-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
compile externalDependency.hadoopCommon
compile externalDependency.avroMapredH2
compile externalDependency.findBugsAnnotations
compile externalDependency."temporal-sdk"
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,7 @@ public class GobblinClusterConfigurationKeys {

public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = "helix.job.scheduling.throttle.timeout.seconds";
public static final long DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = Duration.ofMinutes(40).getSeconds();;

public static final String TEMPORAL_WORKER_SIZE = "temporal.worker.size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's choose an unambiguous name. is this num workers or size of each one? if the latter, what units are we discussing (e.g. mb of memory)?

public static final String TEMPORAL_ENABLED = "temporal.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so many of the other keys use GOBBLIN_CLUSTER_PREFIX; shouldn't this one?

public static final Boolean DEFAULT_TEMPORAL_ENABLED = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
* The central cluster manager for Gobblin Clusters.
*
* <p>
* This class runs the {@link GobblinHelixJobScheduler} for scheduling and running Gobblin jobs.
* This class runs the {@link GobblinHelixJobScheduler} for scheduling
* and running Gobblin jobs.
Comment on lines +82 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're not changing anything here, leave this file off the commit.

non-substantive, formatting-only changes are OK, but belong in a separate commit

* This class serves as the Helix controller and it uses a {@link HelixManager} to work with Helix.
* </p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
* The {@link GobblinHelixJobLauncherMetrics} will always be passed in because
* it will be be updated accordingly.
*/
class GobblinHelixJobLauncherListener extends AbstractJobListener {
public class GobblinHelixJobLauncherListener extends AbstractJobListener {

private final GobblinHelixJobLauncherMetrics jobLauncherMetrics;
private static final String JOB_START_TIME = "jobStartTime";

GobblinHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics) {
public GobblinHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics) {
this.jobLauncherMetrics = jobLauncherMetrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* Metrics that relates to jobs launched by {@link GobblinHelixJobLauncher}.
*/
class GobblinHelixJobLauncherMetrics extends StandardMetricsBridge.StandardMetrics {
public class GobblinHelixJobLauncherMetrics extends StandardMetricsBridge.StandardMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the need to call from the temporal sub-package you've created... but that sidesteps the potential confusion of using "helix-named" classes, when we're not actually involved w/ helix at runtime.

a good compromise for this PR, might be to add a TODO to all three--this, the job scheduler metrics, and the GobblinHelixJobLauncherListener--about renaming to be helix/temporal agnostic... (or if merited to instead create a separate temporal-specific variant)

private final String metricsName;

final ContextAwareMeter numJobsLaunched;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.gobblin.runtime.api.JobExecutionLauncher;


class GobblinHelixJobSchedulerMetrics extends StandardMetricsBridge.StandardMetrics {
public class GobblinHelixJobSchedulerMetrics extends StandardMetricsBridge.StandardMetrics {
public static final String SCHEDULE_CANCELLATION_START = "scheduleCancellationStart";
public static final String SCHEDULE_CANCELLATION_END = "scheduleCancellationEnd";
public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling";
Expand Down Expand Up @@ -74,21 +74,21 @@ public GobblinHelixJobSchedulerMetrics (final ExecutorService jobExecutor, final
this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size()));
}

void updateTimeBeforeJobScheduling (Properties jobProps) {
public void updateTimeBeforeJobScheduling (Properties jobProps) {
long jobCreationTime = Long.parseLong(jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0"));
Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling),
System.currentTimeMillis() - jobCreationTime,
TimeUnit.MILLISECONDS);
}

void updateTimeBeforeJobLaunching (Properties jobProps) {
public void updateTimeBeforeJobLaunching (Properties jobProps) {
long jobCreationTime = Long.parseLong(jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0"));
Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching),
System.currentTimeMillis() - jobCreationTime,
TimeUnit.MILLISECONDS);
}

void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) {
public void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) {
Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching),
launchingTime - scheduledTime,
TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Optional;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;

import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.rest.LauncherTypeEnum;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.SerializationUtils;

/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using the Temporal task framework.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sounds here like temporal is the fundamental proposition, but I don't immediately see where it comes in within this class def. is the class actually more general? please update the javadoc to guide on where that stands

*
* <p>
* Each {@link WorkUnit} of the job is persisted to the {@link FileSystem} of choice and the path to the file
* storing the serialized {@link WorkUnit} is passed to the Temporal task running the {@link WorkUnit} as a
* user-defined property {@link GobblinClusterConfigurationKeys#WORK_UNIT_FILE_PATH}. Upon startup, the gobblin
* task reads the property for the file path and de-serializes the {@link WorkUnit} from the file.
* </p>
*/
@Alpha
@Slf4j
public class GobblinJobLauncher extends AbstractJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinJobLauncher.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not rely solely on @Slf4j?

protected static final String WORK_UNIT_FILE_EXTENSION = ".wu";
protected final FileSystem fs;
protected final Path appWorkDir;
protected final Path inputWorkUnitDir;
protected final Path outputTaskStateDir;

// Number of ParallelRunner threads to be used for state serialization/deserialization
protected final int stateSerDeRunnerThreads;

protected final TaskStateCollectorService taskStateCollectorService;
protected final ConcurrentHashMap<String, Boolean> runningMap;
@Getter
protected final StateStores stateStores;
protected JobListener jobListener;
protected volatile boolean jobSubmitted = false;


public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap)
throws Exception {
super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
LOGGER.debug("GobblinJobLauncher: jobProps {}, appWorkDir {}", jobProps, appWorkDir);
this.runningMap = runningMap;
this.appWorkDir = appWorkDir;
this.inputWorkUnitDir = new Path(appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
this.outputTaskStateDir = new Path(this.appWorkDir,
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + Path.SEPARATOR + this.jobContext.getJobId());

this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);

this.stateSerDeRunnerThreads = Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));

Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(
new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(), appWorkDir.toUri().getPort(),
"/", null, null).toString()));

this.stateStores =
new StateStores(stateStoreJobConfig, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir,
GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);

URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI));
this.fs = FileSystem.get(fsUri, new Configuration());

this.taskStateCollectorService =
new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter,
this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository());
}

@Override
public void close() throws IOException {
try {
executeCancellation();
} finally {
super.close();
}
}

public String getJobId() {
return this.jobContext.getJobId();
}

@Override
protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
try {
CountEventBuilder countEventBuilder = new CountEventBuilder(JobEvent.WORK_UNITS_CREATED, workUnits.size());
this.eventSubmitter.submit(countEventBuilder);
LOGGER.info("Emitting WorkUnitsCreated Count: " + countEventBuilder.getCount());

long workUnitStartTime = System.currentTimeMillis();
workUnits.forEach((k) -> k.setProp(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, workUnitStartTime));

// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();

TimingEvent jobSubmissionTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);

synchronized (this.cancellationRequest) {
if (!this.cancellationRequested) {
submitJob(workUnits);
jobSubmissionTimer.stop();
LOGGER.info(String.format("Submitted job %s", this.jobContext.getJobId()));
this.jobSubmitted = true;
} else {
LOGGER.warn("Job {} not submitted as it was requested to be cancelled.", this.jobContext.getJobId());
}
}

TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN);
waitJob();
jobRunTimer.stop();
LOGGER.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
// The last iteration of output TaskState collecting will run when the collector service gets stopped
this.taskStateCollectorService.stopAsync().awaitTerminated();
cleanupWorkingDirectory();
}
}

protected void submitJob(List<WorkUnit> workUnits) throws Exception {
}

protected void waitJob() throws InterruptedException {
}

@Override
protected void executeCancellation() {
}

public void launchJob(@Nullable JobListener jobListener) throws JobException {
this.jobListener = jobListener;
LOGGER.info("Launching Job");
boolean isLaunched = false;
this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);

Throwable errorInJobLaunching = null;
try {
if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
LOGGER.info("Job {} will be executed, add into running map.", this.jobContext.getJobId());
isLaunched = true;
launchJobImpl(jobListener);
} else {
LOGGER.warn("Job {} will not be executed because other jobs are still running.", this.jobContext.getJobId());
}

// TODO: Better error handling. The current impl swallows exceptions for jobs that were started by this method call.
// One potential way to improve the error handling is to make this error swallowing conifgurable
} catch (Throwable t) {
errorInJobLaunching = t;
if (isLaunched) {
// Attempts to cancel workflow if an error occurs during launch
cancelJob(jobListener);
}
} finally {
if (isLaunched) {
if (this.runningMap.replace(this.jobContext.getJobName(), true, false)) {
LOGGER.info("Job {} is done, remove from running map.", this.jobContext.getJobId());
} else {
throw errorInJobLaunching == null ? new IllegalStateException(
"A launched job should have running state equal to true in the running map.")
: new RuntimeException("Failure in launching job:", errorInJobLaunching);
}
}
}
}

/**
* This method looks silly at first glance but exists for a reason.
*
* The method {@link GobblinJobLauncher#launchJob(JobListener)} contains boiler plate for handling exceptions and
* mutating the runningMap to communicate state back to the {@link GobblinJobScheduler}. The boiler plate swallows
* exceptions when launching the job because many use cases require that 1 job failure should not affect other jobs by causing the
* entire process to fail through an uncaught exception.
*
* This method is useful for unit testing edge cases where we expect {@link JobException}s during the underlying launch operation.
* It would be nice to not swallow exceptions, but the implications of doing that will require careful refactoring since
* the class {@link GobblinJobLauncher} and {@link GobblinJobScheduler} are shared for 2 quite different cases
* between GaaS and streaming. GaaS typically requiring many short lifetime workflows (where a failure is tolerated) and
* streaming requiring a small number of long running workflows (where failure to submit is unexpected and is not
* tolerated)
*
* @throws JobException
*/
@VisibleForTesting
void launchJobImpl(@Nullable JobListener jobListener) throws JobException {
super.launchJob(jobListener);
}

/**
* Delete persisted {@link WorkUnit}s and {@link JobState} upon job completion.
*/
protected void cleanupWorkingDirectory() throws IOException {
LOGGER.info("Deleting persisted work units for job " + this.jobContext.getJobId());
stateStores.getWuStateStore().delete(this.jobContext.getJobId());

// delete the directory that stores the task state files
stateStores.getTaskStateStore().delete(outputTaskStateDir.getName());

LOGGER.info("Deleting job state file for job " + this.jobContext.getJobId());

if (this.stateStores.haveJobStateStore()) {
this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
} else {
Path jobStateFilePath =
GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, this.jobContext.getJobId());
this.fs.delete(jobStateFilePath, false);
}
}
}

Loading