Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue #757 Initialization failure handling control #758

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.harvester.conf;

public enum InitFailureHandlingMode {

// Log errors and continue without lineage tracking
LOG,

// Propagate errors to the Spark process
BREAK,
}
5 changes: 5 additions & 0 deletions core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ spline:
# (DON'T MODIFY UNLESS YOU UNDERSTAND THE IMPLICATIONS)
internal.execPlan.uuid.version: 5

# How the agent should respond to initialization errors:
# - LOG (log the error and disable the agent. Spark job continues unaffected without lineage tracking.)
# - BREAK (propagate the error to the Spark process.)
onInitFailure: LOG

# Should the agent capture failed executions:
# - NONE (only capture successful executions)
# - NON_FATAL (capture successful executions, and failed executions, but only when the error is non-fatal)
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.SparkSession
import za.co.absa.spline.HierarchicalObjectFactory
import za.co.absa.spline.agent.AgentConfig.ConfProperty
import za.co.absa.spline.harvester.IdGenerator.UUIDVersion
import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode}
import za.co.absa.spline.harvester.conf.{InitFailureHandlingMode, SQLFailureCaptureMode, SplineMode}
import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher}
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter}
Expand All @@ -32,6 +32,7 @@ import scala.reflect.ClassTag

private[spline] trait AgentBOM {
def splineMode: SplineMode
def initFailureHandlingMode: InitFailureHandlingMode
def sqlFailureCaptureMode: SQLFailureCaptureMode
def postProcessingFilter: Option[PostProcessingFilter]
def lineageDispatcher: LineageDispatcher
Expand All @@ -58,6 +59,10 @@ object AgentBOM {
mergedConfig.getRequiredEnum[SQLFailureCaptureMode](ConfProperty.SQLFailureCaptureMode)
}

override def initFailureHandlingMode: InitFailureHandlingMode = {
mergedConfig.getRequiredEnum[InitFailureHandlingMode](ConfProperty.InitFailureHandlingMode)
}

override def execPlanUUIDVersion: UUIDVersion = {
mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion)
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object AgentConfig {

def from(configuration: Configuration): AgentConfig = from(
configuration
.getKeys.asScala.toSeq.asInstanceOf[Seq[String]]
.getKeys.asScala.toSeq
.map(k => k -> configuration.getProperty(k)))

def from(options: Iterable[(String, Any)]): AgentConfig =
Expand Down Expand Up @@ -95,10 +95,17 @@ object AgentConfig {
*/
val Mode = "spline.mode"

/**
* How Spline should handle initialization errors.
*
* @see [[za.co.absa.spline.harvester.conf.InitFailureHandlingMode]]
*/
val InitFailureHandlingMode = "spline.onInitFailure"

/**
* How Spline should handle failed SQL executions.
*
* @see [[SQLFailureCaptureMode]]
* @see [[za.co.absa.spline.harvester.conf.SQLFailureCaptureMode]]
*/
val SQLFailureCaptureMode = "spline.sql.failure.capture"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend
logInfo("initialization aborted")
None
}
else withErrorHandling {
else withErrorHandling(bom.initFailureHandlingMode) {
if (isCodelessInit)
Some(createListener(bom))
else
Expand Down Expand Up @@ -171,11 +171,11 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend
}
}

private def withErrorHandling(body: => Option[QueryExecutionListener]) = {
private def withErrorHandling(initFailureMode: InitFailureHandlingMode)(body: => Option[QueryExecutionListener]) = {
try {
body
} catch {
case NonFatal(e) =>
case NonFatal(e) if initFailureMode == InitFailureHandlingMode.LOG =>
logError(s"Spline initialization failed! Spark Lineage tracking is DISABLED.", e)
None
}
Expand Down
1 change: 1 addition & 0 deletions examples/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RUN chmod a+x /entrypoint.sh
# Bind environment variables
ENV SPLINE_PRODUCER_URL=
ENV SPLINE_MODE=ENABLED
ENV ON_INIT_FAILURE=BREAK
ENV DISABLE_SSL_VALIDATION=false

ENV HTTP_PROXY_HOST=
Expand Down
1 change: 1 addition & 0 deletions examples/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ exec ./run.sh \
-Dspline.lineageDispatcher=http \
-Dspline.lineageDispatcher.http.producer.url="$SPLINE_PRODUCER_URL" \
-Dspline.lineageDispatcher.http.disableSslValidation="$DISABLE_SSL_VALIDATION" \
-Dspline.onInitFailure="$ON_INIT_FAILURE" \
-Dspline.mode="$SPLINE_MODE" \
-Dhttp.proxyHost="$HTTP_PROXY_HOST" \
-Dhttp.proxyPort="$HTTP_PROXY_PORT" \
Expand Down
Loading