Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch: add on-demand retry for preemption #7581

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import com.google.cloud.batch.v1.BatchServiceSettings
import com.google.common.collect.ImmutableMap
import com.typesafe.scalalogging.StrictLogging
import cromwell.backend._
import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{
preemptionCountKey
}
import cromwell.backend.google.batch.actors._
import cromwell.backend.google.batch.api.request.{BatchRequestExecutor, RequestHandler}
import cromwell.backend.google.batch.authentication.GcpBatchDockerCredentials
Expand All @@ -30,6 +33,7 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
) extends StandardLifecycleActorFactory
with GcpPlatform {

override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey)
import GcpBatchBackendLifecycleActorFactory._

override def jobIdKey: String = "__gcp_batch"
Expand Down Expand Up @@ -133,6 +137,7 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
}

object GcpBatchBackendLifecycleActorFactory extends StrictLogging {
val preemptionCountKey = "PreemptionCount"

private[batch] def robustBuildAttributes(buildAttributes: () => GcpBatchConfigurationAttributes,
maxAttempts: Int = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import cromwell.backend.async.{
AbortedExecutionHandle,
ExecutionHandle,
FailedNonRetryableExecutionHandle,
FailedRetryableExecutionHandle,
PendingExecutionHandle
}
import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory
import cromwell.backend.google.batch.api.GcpBatchRequestFactory._
import cromwell.backend.google.batch.io._
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
import cromwell.backend.google.batch.models.GcpBatchJobPaths.GcsTransferLibraryName
import cromwell.backend.google.batch.models.RunStatus.TerminalRunStatus
import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus}
import cromwell.backend.google.batch.models._
import cromwell.backend.google.batch.monitoring.{BatchInstrumentation, CheckpointingConfiguration, MonitoringImage}
import cromwell.backend.google.batch.runnable.WorkflowOptionKeys
Expand All @@ -46,7 +49,7 @@ import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.sra.SraPath
import cromwell.services.instrumentation.CromwellInstrumentation
import cromwell.services.keyvalue.KeyValueServiceActor.KvJobKey
import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey}
import cromwell.services.metadata.CallMetadataKeys
import mouse.all._
import shapeless.Coproduct
Expand Down Expand Up @@ -175,6 +178,15 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

override def dockerImageUsed: Option[String] = Option(jobDockerImage)

override lazy val preemptible: Int = jobDescriptor.prefetchedKvStoreEntries.get(GcpBatchBackendLifecycleActorFactory.preemptionCountKey) match {
case Some(KvPair(_, v)) =>
Try(v.toInt) match {
case Success(m) => m
case Failure(_) => 0
}
case _ => runtimeAttributes.preemptible
}

override def tryAbort(job: StandardAsyncJob): Unit =
abortJob(workflowId = workflowId,
jobName = JobName.parse(job.jobId),
Expand Down Expand Up @@ -619,6 +631,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
projectId = googleProject(jobDescriptor.workflowDescriptor),
computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor),
googleLabels = backendLabels ++ customLabels,
preemptible = preemptible,
batchTimeout = batchConfiguration.batchTimeout,
jobShell = batchConfiguration.jobShell,
privateDockerKeyAndEncryptedToken = dockerKeyAndToken,
Expand Down Expand Up @@ -825,7 +838,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
override def executeAsync(): Future[ExecutionHandle] = {

// Want to force runtimeAttributes to evaluate so we can fail quickly now if we need to:
def evaluateRuntimeAttributes = Future.fromTry(Try(runtimeAttributes))
def evaluateRuntimeAttributes = Future.fromTry(Try(runtimeAttributes.copy(preemptible = preemptible)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why does this need copy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some issues of getting the attributes updating with the new preemptible value... so I was getting the job respawned, but with original preemption value. This is me probably not understanding the intricacies of the scala class initialization, so when those parameters are updated. Should they be updated without setting the preempt with this copy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah just tested and somehow the preemption value from initialization is not propagated without the copy, so I get spot VM instead of standard...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This occurs because the runtimeAttributes is being updated to set the local "preemptible" value, this is not the right place to do it, unfortunately, I can't recall the right place to set this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I now recall that the runtimeAttributes was some kind of object, that I just could not set the value (without copy). I guess some kind of static object created up somewhere. I think I found it from some test files where this kind of copies where made.

Anyway I see the batch API provision model is calculated from preemptible in runtimeAttributes, so if someone can guide me how to set it and where... So this is currently doing exactly what I want: after preempted job a new job with standard VM is created. Basically I just want to inject the "correct" preemptible value into runtimeAttributes after preempt fail. Of course it is doing "unnecessary" copy in the first time around, but how to get my hands on the runtimeAttributes on the second round?


def generateInputOutputParameters: Future[InputOutputParameters] = Future.fromTry(Try {
val rcFileOutput = GcpBatchFileOutput(
Expand Down Expand Up @@ -896,7 +909,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
})

val runBatchResponse = for {
_ <- evaluateRuntimeAttributes
runtimeAttributes <- evaluateRuntimeAttributes
_ <- uploadScriptFile()
customLabels <- Future.fromTry(GcpLabel.fromWorkflowOptions(workflowDescriptor.workflowOptions))
batchParameters <- generateInputOutputParameters
Expand Down Expand Up @@ -1046,14 +1059,23 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// returnCode is provided by cromwell, so far, this is empty for all the tests I ran
override def handleExecutionFailure(runStatus: RunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = {
def handleFailedRunStatus(runStatus: RunStatus.UnsuccessfulRunStatus): ExecutionHandle =
FailedNonRetryableExecutionHandle(
StandardException(
if (runStatus.exitCode == Some(GcpBatchExitCode.VMPreemption)) {
FailedRetryableExecutionHandle(
StandardException(
message = runStatus.prettyPrintedError,
jobTag = jobTag),
returnCode,
Option(Seq(KvPair(ScopedKey(workflowId, futureKvJobKey, GcpBatchBackendLifecycleActorFactory.preemptionCountKey), "0")))
)
} else {
FailedNonRetryableExecutionHandle(
StandardException(
message = runStatus.prettyPrintedError,
jobTag = jobTag
),
returnCode,
None
)
jobTag = jobTag),
returnCode,
None
)
}

Future.fromTry {
Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
batchConfiguration.runtimeConfig
)

val preemptible: Int

lazy val workingDisk: GcpBatchAttachedDisk = runtimeAttributes.disks.find(_.name == GcpBatchWorkingDisk.Name).get

lazy val callRootPath: Path = gcpBatchCallPaths.callExecutionRoot
Expand Down Expand Up @@ -75,9 +77,10 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
.get(WorkflowOptionKeys.GoogleProject)
.getOrElse(batchAttributes.project)

Map[String, String](
Map[String, Any](
GcpBatchMetadataKeys.GoogleProject -> googleProject,
GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString
GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString,
"preemptible" -> preemptible
) ++ originalLabelEvents
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object GcpBatchRequestFactory {
projectId: String,
computeServiceAccount: String,
googleLabels: Seq[GcpLabel],
preemptible: Int,
batchTimeout: FiniteDuration,
jobShell: String,
privateDockerKeyAndEncryptedToken: Option[CreateBatchDockerKeyAndToken],
Expand Down
Loading