Skip to content

Commit

Permalink
Add BigQuery Error Details Provider
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Nov 13, 2024
1 parent 601f62b commit 5c897be
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.bigquery.common;

import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;

/**
* A custom ErrorDetailsProvider for BigQuery plugins.
*/
public class BigQueryErrorDetailsProvider extends GCPErrorDetailsProvider {

@Override
protected String getExternalDocumentationLink() {
return "https://cloud.google.com/bigquery/docs/error-messages";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
Expand Down Expand Up @@ -116,6 +118,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
storage, bucket, bucketName,
config.getLocation(), cmekKeyName);
}
// set error details provider
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));
prepareRunInternal(context, bigQuery, bucketName);
}

Expand All @@ -124,9 +128,9 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
String gcsPath;
String bucket = getConfig().getBucket();
if (bucket == null) {
gcsPath = String.format("gs://%s", runUUID.toString());
gcsPath = String.format("gs://%s", runUUID);
} else {
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
gcsPath = String.format(gcsPathFormat, bucket, runUUID);
}
try {
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
Expand Down Expand Up @@ -327,9 +331,8 @@ private void validateRecordDepth(@Nullable Schema schema, FailureCollector colle
*
* @return Hadoop configuration
*/
protected Configuration getOutputConfiguration() throws IOException {
Configuration configuration = new Configuration(baseConfiguration);
return configuration;
protected Configuration getOutputConfiguration() {
return new Configuration(baseConfiguration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.common.RecordConverter;
import org.apache.avro.generic.GenericRecord;

Expand Down Expand Up @@ -90,11 +93,11 @@ protected Object convertField(Object field, Schema fieldSchema) throws IOExcepti
try {
LocalDateTime.parse(field.toString());
} catch (DateTimeParseException exception) {
throw new UnexpectedFormatException(
String.format("Datetime field with value '%s' is not in ISO-8601 format.",
fieldSchema.getDisplayName(),
field.toString()),
exception);
String errorMessage = String.format("Datetime field %s with value '%s' is not in ISO-8601 format.",
fieldSchema.getDisplayName(), field);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, "DataError"),
errorMessage, exception.getMessage(), ErrorType.USER, true, exception);
}
//If properly formatted return the string
return field.toString();
Expand All @@ -110,7 +113,9 @@ protected Object convertField(Object field, Schema fieldSchema) throws IOExcepti
}
}
} catch (ArithmeticException e) {
throw new IOException("Field type %s has value that is too large." + fieldType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN,
"DataError"),
"Field type %s has value that is too large.", e.getMessage(), ErrorType.USER, true, e);
}

// Complex types like maps and unions are not supported in BigQuery plugins.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
Expand Down Expand Up @@ -135,7 +137,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {

// Create BigQuery client
String serviceAccount = config.getServiceAccount();
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
Credentials credentials = null;
try {
credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}


BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
Expand All @@ -144,19 +156,30 @@ public void prepareRun(BatchSourceContext context) throws Exception {
bucketPath = UUID.randomUUID().toString();
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
config.getServiceAccountType());
try {
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
config.getServiceAccountType());
} catch (Exception e) {
String errorReason = "Failed to create BigQuery configuration.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), null,
dataset, config.getBucket());

// Configure GCS Bucket to use
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration,
storage,
bucketName,
dataset,
bucketPath,
cmekKeyName);
String bucket = null;
try {
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath,
cmekKeyName);
} catch (Exception e) {
String errorReason = "Failed to create bucket.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

// Configure Service account credentials
BigQuerySourceUtils.configureServiceAccount(configuration, config.getConnection());
Expand All @@ -166,10 +189,17 @@ public void prepareRun(BatchSourceContext context) throws Exception {

// Configure BigQuery input format.
String temporaryGcsPath = BigQuerySourceUtils.getTemporaryGcsPath(bucket, bucketPath, bucketPath);
BigQuerySourceUtils.configureBigQueryInput(configuration,
DatasetId.of(config.getDatasetProject(), config.getDataset()),
config.getTable(),
temporaryGcsPath);
try {
BigQuerySourceUtils.configureBigQueryInput(configuration,
DatasetId.of(config.getDatasetProject(), config.getDataset()),
config.getTable(),
temporaryGcsPath);
} catch (Exception e) {
String errorReason = "Failed to configure BigQuery input.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
// We call emitLineage before since it creates the dataset with schema.
Expand All @@ -178,6 +208,10 @@ public void prepareRun(BatchSourceContext context) throws Exception {
.setFqn(BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable()))
.setLocation(dataset.getLocation())
.build();

// set error details provider
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName()));

emitLineage(context, configuredSchema, sourceTableType, config.getTable(), asset);
setInputFormat(context, configuredSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
Expand Down Expand Up @@ -93,10 +96,12 @@ public static String getOrCreateBucket(Configuration configuration,
// Ignore this and move on, since all that matters is that the bucket exists.
return bucket;
}
throw new IOException(String.format("Unable to create Cloud Storage bucket '%s' in the same " +
String errorMessage = String.format("Unable to create Cloud Storage bucket '%s' in the same " +
"location ('%s') as BigQuery dataset '%s'. " + "Please use a bucket " +
"that is in the same location as the dataset.",
bucket, dataset.getLocation(), dataset.getDatasetId().getDataset()), e);
bucket, dataset.getLocation(), dataset.getDatasetId().getDataset());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage, e.getMessage(), ErrorType.USER, true, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
Expand Down Expand Up @@ -110,7 +113,8 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
try {
bigQueryHelper = getBigQueryHelper(configuration);
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create BigQuery client", gse);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
"Failed to create BigQuery client", gse.getMessage(), ErrorType.UNKNOWN, true, gse);
}

List<HadoopConfigurationProperty<?>> hadoopConfigurationProperties = new ArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -41,7 +43,6 @@ public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
* @param e The Throwable to get the error information from.
* @return A ProgramFailureException with the given error information, otherwise null.
*/
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
Expand All @@ -52,6 +53,15 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
if (t instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) t, errorContext);
}
if (t instanceof IllegalArgumentException) {
return getProgramFailureException((IllegalArgumentException) t, errorContext);
}
if (t instanceof IllegalStateException) {
return getProgramFailureException((IllegalStateException) t, errorContext);
}
if (t instanceof IOException) {
return getProgramFailureException((IOException) t, errorContext);
}
}
return null;
}
Expand All @@ -63,8 +73,7 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
Expand Down Expand Up @@ -93,6 +102,52 @@ private ProgramFailureException getProgramFailureException(HttpResponseException
pair.getErrorType(), true, e);
}


/**
* Get a ProgramFailureException with the given error
* information from {@link IllegalArgumentException}.
*
* @param e The IllegalArgumentException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String subCategory = "IllegalArgument";
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
}

/**
* Get a ProgramFailureException with the given error
* information from {@link IllegalStateException}.
*
* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String subCategory = "IllegalState";
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, true, e);
}

/**
* Get a ProgramFailureException with the given error
* information from {@link IOException}.
*
* @param e The IOException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IOException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String subCategory = "IOException";
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN, subCategory), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
}

/**
* Get the external documentation link for the client errors if available.
*
Expand Down

0 comments on commit 5c897be

Please sign in to comment.