From dbed0351947ed2a5eed54d93bfc0efeeeeab9ef4 Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Mon, 23 Oct 2023 17:26:28 +0200 Subject: [PATCH] issue #757 Initialization failure handling control --- .../conf/InitFailureHandlingMode.java | 26 +++++++++++++++++++ core/src/main/resources/spline.default.yaml | 5 ++++ .../za/co/absa/spline/agent/AgentBOM.scala | 7 ++++- .../za/co/absa/spline/agent/AgentConfig.scala | 11 ++++++-- .../harvester/SparkLineageInitializer.scala | 6 ++--- examples/Dockerfile | 1 + examples/entrypoint.sh | 1 + 7 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 core/src/main/java/za/co/absa/spline/harvester/conf/InitFailureHandlingMode.java diff --git a/core/src/main/java/za/co/absa/spline/harvester/conf/InitFailureHandlingMode.java b/core/src/main/java/za/co/absa/spline/harvester/conf/InitFailureHandlingMode.java new file mode 100644 index 00000000..20258b03 --- /dev/null +++ b/core/src/main/java/za/co/absa/spline/harvester/conf/InitFailureHandlingMode.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spline.harvester.conf; + +public enum InitFailureHandlingMode { + + // Log errors and continue without lineage tracking + LOG, + + // Propagate errors to the Spark process + BREAK, +} diff --git a/core/src/main/resources/spline.default.yaml b/core/src/main/resources/spline.default.yaml index 5115eb72..f7829aac 100644 --- a/core/src/main/resources/spline.default.yaml +++ b/core/src/main/resources/spline.default.yaml @@ -26,6 +26,11 @@ spline: # (DON'T MODIFY UNLESS YOU UNDERSTAND THE IMPLICATIONS) internal.execPlan.uuid.version: 5 + # How the agent should respond to initialization errors: + # - LOG (log the error and disable the agent. Spark job continues unaffected without lineage tracking.) + # - BREAK (propagate the error to the Spark process.) + onInitFailure: LOG + # Should the agent capture failed executions: # - NONE (only capture successful executions) # - NON_FATAL (capture successful executions, and failed executions, but only when the error is non-fatal) diff --git a/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala b/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala index bc7da129..124d32d8 100644 --- a/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala +++ b/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.SparkSession import za.co.absa.spline.HierarchicalObjectFactory import za.co.absa.spline.agent.AgentConfig.ConfProperty import za.co.absa.spline.harvester.IdGenerator.UUIDVersion -import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode} +import za.co.absa.spline.harvester.conf.{InitFailureHandlingMode, SQLFailureCaptureMode, SplineMode} import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher} import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter} @@ -32,6 +32,7 @@ import scala.reflect.ClassTag private[spline] trait AgentBOM { def splineMode: SplineMode + def initFailureHandlingMode: InitFailureHandlingMode def sqlFailureCaptureMode: SQLFailureCaptureMode def postProcessingFilter: Option[PostProcessingFilter] def lineageDispatcher: LineageDispatcher @@ -58,6 +59,10 @@ object AgentBOM { mergedConfig.getRequiredEnum[SQLFailureCaptureMode](ConfProperty.SQLFailureCaptureMode) } + override def initFailureHandlingMode: InitFailureHandlingMode = { + mergedConfig.getRequiredEnum[InitFailureHandlingMode](ConfProperty.InitFailureHandlingMode) + } + override def execPlanUUIDVersion: UUIDVersion = { mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion) } diff --git a/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala b/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala index 1e5b992e..183512a7 100644 --- a/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala +++ b/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala @@ -34,7 +34,7 @@ object AgentConfig { def from(configuration: Configuration): AgentConfig = from( configuration - .getKeys.asScala.toSeq.asInstanceOf[Seq[String]] + .getKeys.asScala.toSeq .map(k => k -> configuration.getProperty(k))) def from(options: Iterable[(String, Any)]): AgentConfig = @@ -95,10 +95,17 @@ object AgentConfig { */ val Mode = "spline.mode" + /** + * How Spline should handle initialization errors. + * + * @see [[za.co.absa.spline.harvester.conf.InitFailureHandlingMode]] + */ + val InitFailureHandlingMode = "spline.onInitFailure" + /** * How Spline should handle failed SQL executions. * - * @see [[SQLFailureCaptureMode]] + * @see [[za.co.absa.spline.harvester.conf.SQLFailureCaptureMode]] */ val SQLFailureCaptureMode = "spline.sql.failure.capture" diff --git a/core/src/main/scala/za/co/absa/spline/harvester/SparkLineageInitializer.scala b/core/src/main/scala/za/co/absa/spline/harvester/SparkLineageInitializer.scala index 3872cb5e..38673340 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/SparkLineageInitializer.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/SparkLineageInitializer.scala @@ -134,7 +134,7 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend logInfo("initialization aborted") None } - else withErrorHandling { + else withErrorHandling(bom.initFailureHandlingMode) { if (isCodelessInit) Some(createListener(bom)) else @@ -171,11 +171,11 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend } } - private def withErrorHandling(body: => Option[QueryExecutionListener]) = { + private def withErrorHandling(initFailureMode: InitFailureHandlingMode)(body: => Option[QueryExecutionListener]) = { try { body } catch { - case NonFatal(e) => + case NonFatal(e) if initFailureMode == InitFailureHandlingMode.LOG => logError(s"Spline initialization failed! Spark Lineage tracking is DISABLED.", e) None } diff --git a/examples/Dockerfile b/examples/Dockerfile index 33d4cbe8..92e73e56 100644 --- a/examples/Dockerfile +++ b/examples/Dockerfile @@ -44,6 +44,7 @@ RUN chmod a+x /entrypoint.sh # Bind environment variables ENV SPLINE_PRODUCER_URL= ENV SPLINE_MODE=ENABLED +ENV ON_INIT_FAILURE=BREAK ENV DISABLE_SSL_VALIDATION=false ENV HTTP_PROXY_HOST= diff --git a/examples/entrypoint.sh b/examples/entrypoint.sh index e96992bf..a98db660 100644 --- a/examples/entrypoint.sh +++ b/examples/entrypoint.sh @@ -21,6 +21,7 @@ exec ./run.sh \ -Dspline.lineageDispatcher=http \ -Dspline.lineageDispatcher.http.producer.url="$SPLINE_PRODUCER_URL" \ -Dspline.lineageDispatcher.http.disableSslValidation="$DISABLE_SSL_VALIDATION" \ + -Dspline.onInitFailure="$ON_INIT_FAILURE" \ -Dspline.mode="$SPLINE_MODE" \ -Dhttp.proxyHost="$HTTP_PROXY_HOST" \ -Dhttp.proxyPort="$HTTP_PROXY_PORT" \