diff --git a/src/e2e-test/resources/errorMessage.properties b/src/e2e-test/resources/errorMessage.properties index 6bf51faf6d..ac54e2b732 100644 --- a/src/e2e-test/resources/errorMessage.properties +++ b/src/e2e-test/resources/errorMessage.properties @@ -28,7 +28,7 @@ errorMessageInvalidBucketName=Invalid bucket name in path errorMessageInvalidFormat=Input has multi-level structure that cannot be represented appropriately as csv. \ Consider using json, avro or parquet to write data. errorMessageMultipleFileWithFirstRowAsHeaderDisabled=Spark program 'phase-1' failed with error: Found a row with 6 fields when the schema only contains 4 fields. Check that the schema contains the right number of fields.. Please check the system logs for more details. -errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error: For input string: +errorMessageMultipleFileWithFirstRowAsHeaderEnabled=Spark program 'phase-1' failed with error: errorMessageMultipleFileWithoutClearDefaultSchema=Spark program 'phase-1' failed with error: Found a row with 4 fields when the schema only contains 2 fields. errorMessageInvalidSourcePath=Invalid bucket name in path 'abc@'. Bucket name should errorMessageInvalidDestPath=Invalid bucket name in path 'abc@'. Bucket name should diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryErrorDetailsProvider.java new file mode 100644 index 0000000000..e0ec915fb6 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryErrorDetailsProvider.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.common; + +import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; + +/** + * A custom ErrorDetailsProvider for BigQuery plugins. + */ +public class BigQueryErrorDetailsProvider extends GCPErrorDetailsProvider { + + @Override + protected String getExternalDocumentationLink() { + return "https://cloud.google.com/bigquery/docs/error-messages"; + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index e8490e97e8..b2e309f059 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -34,7 +34,9 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.Asset; +import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider; import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize; @@ -116,6 +118,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception { storage, bucket, bucketName, config.getLocation(), cmekKeyName); } + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName())); prepareRunInternal(context, bigQuery, bucketName); } @@ -124,9 +128,9 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) { String gcsPath; String bucket = getConfig().getBucket(); if (bucket == null) { - gcsPath = String.format("gs://%s", runUUID.toString()); + gcsPath = String.format("gs://%s", runUUID); } else { - gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString()); + gcsPath = String.format(gcsPathFormat, bucket, runUUID); } try { BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath); @@ -327,9 +331,8 @@ private void validateRecordDepth(@Nullable Schema schema, FailureCollector colle * * @return Hadoop configuration */ - protected Configuration getOutputConfiguration() throws IOException { - Configuration configuration = new Configuration(baseConfiguration); - return configuration; + protected Configuration getOutputConfiguration() { + return new Configuration(baseConfiguration); } /** diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java index 5c95c67f85..ac56fac87c 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java @@ -61,6 +61,10 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings; import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; @@ -103,6 +107,7 @@ */ public class BigQueryOutputFormat extends ForwardingBigQueryFileOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(BigQueryOutputFormat.class); + private static final String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) @@ -165,19 +170,31 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, // Error if the output path already exists. FileSystem outputFileSystem = outputPath.getFileSystem(conf); if (outputFileSystem.exists(outputPath)) { - throw new IOException("The output path '" + outputPath + "' already exists."); + String errorMessage = String.format("The output path '%s' already exists.", outputPath); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, + new IOException(errorMessage)); } // Error if compression is set as there's mixed support in BigQuery. if (FileOutputFormat.getCompressOutput(job)) { - throw new IOException("Compression isn't supported for this OutputFormat."); + String errorMessage = "Compression isn't supported for this OutputFormat."; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, + new IOException(errorMessage)); } // Error if unable to create a BigQuery helper. try { new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf); } catch (GeneralSecurityException gse) { - throw new IOException("Failed to create BigQuery client", gse); + String errorMessage = "Failed to create BigQuery client"; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.VALIDATING_OUTPUT_SPECS, errorMessage), ErrorType.SYSTEM, true, + new IOException(errorMessage, gse)); } // Let delegate process its checks. @@ -208,7 +225,11 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES); this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration()); } catch (GeneralSecurityException e) { - throw new IOException("Failed to create Bigquery client.", e); + String errorMessage = "Failed to create BigQuery client"; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, + new IOException(errorMessage, e)); } } @@ -266,7 +287,11 @@ public void commitJob(JobContext jobContext) throws IOException { writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField, requirePartitionFilter, clusteringOrderList, tableExists, jobLabelKeyValue, conf); } catch (Exception e) { - throw new IOException("Failed to import GCS into BigQuery. ", e); + String errorMessage = "Failed to import GCS into BigQuery."; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, + new IOException(errorMessage, e)); } cleanup(jobContext); @@ -566,26 +591,34 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p int numOfErrors; String errorMessage; if (errors == null || errors.isEmpty()) { - errorMessage = pollJob.getStatus().getErrorResult().getMessage(); + errorMessage = String.format("reason: %s, %s", pollJob.getStatus().getErrorResult().getReason(), + pollJob.getStatus().getErrorResult().getMessage()); numOfErrors = 1; } else { - errorMessage = errors.get(errors.size() - 1).getMessage(); + errorMessage = String.format("reason: %s, %s", errors.get(errors.size() - 1).getReason(), + errors.get(errors.size() - 1).getMessage()); numOfErrors = errors.size(); } // Only add first error message in the exception. For other errors user should look at BigQuery job logs. - throw new IOException(String.format("Error occurred while importing data to BigQuery '%s'." + - " There are total %s error(s) for BigQuery job %s. Please look at " + - "BigQuery job logs for more information.", - errorMessage, numOfErrors, jobReference.getJobId())); + String errorMessageException = String.format("Error occurred while importing data to BigQuery '%s'." + + " There are total %s error(s) for BigQuery job %s. Please look at " + + "BigQuery job logs for more information.", + errorMessage, numOfErrors, jobReference.getJobId()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException, + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.UNKNOWN, true, + new IOException(errorMessageException)); + } } else { long millisToWait = pollBackOff.nextBackOffMillis(); if (millisToWait == BackOff.STOP) { - throw new IOException( - String.format( - "Job %s failed to complete after %s millis.", - jobReference.getJobId(), - elapsedTime)); + String errorMessage = String.format("Job %s failed to complete after %s millis.", jobReference.getJobId() + , elapsedTime); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.UNKNOWN, true, + new IOException(errorMessage)); } // Pause execution for the configured duration before polling job status again. Thread.sleep(millisToWait); @@ -621,8 +654,12 @@ private static Optional getTableSchema(Configuration conf) throws I TableSchema tableSchema = createTableSchemaFromFields(fieldsJson); return Optional.of(tableSchema); } catch (IOException e) { - throw new IOException( - "Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e); + String errorMessage = String.format("Unable to parse key '%s'.", + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true, + new IOException(errorMessage, e)); } } return Optional.empty(); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java index c397e52c74..81ac25b967 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryRecordToJson.java @@ -189,7 +189,7 @@ private static void writeSimpleTypes(JsonWriter writer, String name, boolean isA } else if (jsonString.startsWith("[") && jsonString.endsWith("]")) { writeJsonArrayToWriter(gson.fromJson(jsonString, JsonArray.class), writer); } else { - throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " + + throw new IllegalArgumentException(String.format("Expected value of Field '%s' to be a valid JSON " + "object or array.", name)); } break; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java index 4ee209de09..7f91bbc78f 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java @@ -35,8 +35,12 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; @@ -110,11 +114,10 @@ public final class BigQuerySinkUtils { * @param bucketName the name of the bucket * @param location the location of the resources, this is only applied if both the bucket and dataset do not exist * @param cmekKeyName the name of the cmek key - * @throws IOException if there was an error creating or fetching any GCP resource */ public static void createResources(BigQuery bigQuery, Storage storage, DatasetId datasetId, String bucketName, @Nullable String location, - @Nullable CryptoKeyName cmekKeyName) throws IOException { + @Nullable CryptoKeyName cmekKeyName) { // Get the required Dataset and bucket instances using the supplied clients Dataset dataset = bigQuery.getDataset(datasetId); Bucket bucket = storage.get(bucketName); @@ -138,11 +141,10 @@ public static void createResources(BigQuery bigQuery, Storage storage, * @param bucketName the name of the bucket * @param location the location of the resources, this is only applied if both the bucket and dataset do not exist * @param cmekKey the name of the cmek key - * @throws IOException */ public static void createResources(BigQuery bigQuery, @Nullable Dataset dataset, DatasetId datasetId, Storage storage, @Nullable Bucket bucket, String bucketName, - @Nullable String location, @Nullable CryptoKeyName cmekKey) throws IOException { + @Nullable String location, @Nullable CryptoKeyName cmekKey) { if (dataset == null && bucket == null) { createBucket(storage, bucketName, location, cmekKey, () -> String.format("Unable to create Cloud Storage bucket '%s'", bucketName)); @@ -174,11 +176,10 @@ public static void createResources(BigQuery bigQuery, @Nullable Dataset dataset, * @param location Location for this dataset. * @param cmekKeyName CMEK key to use for this dataset. * @param errorMessage Supplier for the error message to output if the dataset could not be created. - * @throws IOException if the dataset could not be created. */ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullable String location, @Nullable CryptoKeyName cmekKeyName, - Supplier errorMessage) throws IOException { + Supplier errorMessage) { DatasetInfo.Builder builder = DatasetInfo.newBuilder(dataset); if (location != null) { builder.setLocation(location); @@ -194,7 +195,11 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl // A conflict means the dataset already exists (https://cloud.google.com/bigquery/troubleshooting-errors) // This most likely means multiple stages in the same pipeline are trying to create the same dataset. // Ignore this and move on, since all that matters is that the dataset exists. - throw new IOException(errorMessage.get(), e); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode()); + String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(), + pair.getErrorType(), true, e); } } } @@ -207,11 +212,10 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl * @param location Location for this dataset. * @param cmekKeyName CMEK key to use for this dataset. * @param errorMessage Supplier for the error message to output if the dataset could not be created. - * @throws IOException if the dataset could not be created. */ public static void createDatasetIfNotExists(BigQuery bigQuery, DatasetId datasetId, @Nullable String location, @Nullable CryptoKeyName cmekKeyName, - Supplier errorMessage) throws IOException { + Supplier errorMessage) { // Check if dataset exists Dataset ds = bigQuery.getDataset(datasetId); // Create dataset if needed @@ -228,11 +232,10 @@ public static void createDatasetIfNotExists(BigQuery bigQuery, DatasetId dataset * @param location Location for this bucket. * @param cmekKeyName CMEK key to use for this bucket. * @param errorMessage Supplier for the error message to output if the bucket could not be created. - * @throws IOException if the bucket could not be created. */ private static void createBucket(Storage storage, String bucket, @Nullable String location, @Nullable CryptoKeyName cmekKeyName, - Supplier errorMessage) throws IOException { + Supplier errorMessage) { try { GCPUtils.createBucket(storage, bucket, location, cmekKeyName); } catch (StorageException e) { @@ -240,7 +243,11 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin // A conflict means the bucket already exists // This most likely means multiple stages in the same pipeline are trying to create the same dataset. // Ignore this and move on, since all that matters is that the dataset exists. - throw new IOException(errorMessage.get(), e); + ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode()); + String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction()); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(), + pair.getErrorType(), true, e); } } } @@ -318,13 +325,12 @@ private static void setJsonStringFields(List fields, S * @param tableName name of the table to use * @param gcsPath GCS path to use for output * @param fields list of BigQuery table fields - * @throws IOException if the output cannot be configured */ public static void configureOutput(Configuration configuration, DatasetId datasetId, String tableName, String gcsPath, - List fields) throws IOException { + List fields) { // Set up table schema BigQueryTableSchema outputTableSchema = new BigQueryTableSchema(); @@ -337,13 +343,19 @@ public static void configureOutput(Configuration configuration, } BigQueryFileFormat fileFormat = getFileFormat(fields); - BigQueryOutputConfiguration.configure( - configuration, - String.format("%s:%s.%s", datasetId.getProject(), datasetId.getDataset(), tableName), - outputTableSchema, - gcsPath, - fileFormat, - getOutputFormat(fileFormat)); + try { + BigQueryOutputConfiguration.configure( + configuration, + String.format("%s:%s.%s", datasetId.getProject(), datasetId.getDataset(), tableName), + outputTableSchema, + gcsPath, + fileFormat, + getOutputFormat(fileFormat)); + } catch (IOException e) { + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Unable to configure output", + "Unable to configure output for BigQuery sink", ErrorType.UNKNOWN, true, e); + } } /** @@ -354,13 +366,12 @@ public static void configureOutput(Configuration configuration, * @param tableName name of the table to use * @param gcsPath GCS path to use for output * @param fields list of BigQuery table fields - * @throws IOException if the output cannot be configured */ public static void configureMultiSinkOutput(Configuration configuration, DatasetId datasetId, String tableName, String gcsPath, - List fields) throws IOException { + List fields) { configureOutput(configuration, datasetId, tableName, diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java index adf301c856..75adba1699 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryAvroToStructuredTransformer.java @@ -20,6 +20,9 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.format.UnexpectedFormatException; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.common.RecordConverter; import org.apache.avro.generic.GenericRecord; @@ -90,11 +93,11 @@ protected Object convertField(Object field, Schema fieldSchema) throws IOExcepti try { LocalDateTime.parse(field.toString()); } catch (DateTimeParseException exception) { - throw new UnexpectedFormatException( - String.format("Datetime field with value '%s' is not in ISO-8601 format.", - fieldSchema.getDisplayName(), - field.toString()), - exception); + String errorMessage = String.format("Datetime field %s with value '%s' is not in ISO-8601 format.", + fieldSchema.getDisplayName(), field); + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, "DataError"), + errorMessage, exception.getMessage(), ErrorType.USER, true, exception); } //If properly formatted return the string return field.toString(); @@ -110,7 +113,9 @@ protected Object convertField(Object field, Schema fieldSchema) throws IOExcepti } } } catch (ArithmeticException e) { - throw new IOException("Field type %s has value that is too large." + fieldType); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN, + "DataError"), + "Field type %s has value that is too large.", e.getMessage(), ErrorType.USER, true, e); } // Complex types like maps and unions are not supported in BigQuery plugins. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index acc6018745..77e63ee406 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -48,9 +48,11 @@ import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider; import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine; @@ -135,7 +137,16 @@ public void prepareRun(BatchSourceContext context) throws Exception { // Create BigQuery client String serviceAccount = config.getServiceAccount(); - Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection()); + Credentials credentials = null; + try { + credentials = BigQuerySourceUtils.getCredentials(config.getConnection()); + } catch (Exception e) { + String errorReason = "Unable to load service account credentials."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null); Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset())); Storage storage = GCPUtils.getStorage(config.getProject(), credentials); @@ -144,19 +155,30 @@ public void prepareRun(BatchSourceContext context) throws Exception { bucketPath = UUID.randomUUID().toString(); CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); - configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName, - config.getServiceAccountType()); + try { + configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName, + config.getServiceAccountType()); + } catch (Exception e) { + String errorReason = "Failed to create BigQuery configuration."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), null, dataset, config.getBucket()); // Configure GCS Bucket to use - String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, - storage, - bucketName, - dataset, - bucketPath, - cmekKeyName); + String bucket = null; + try { + bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath, + cmekKeyName); + } catch (Exception e) { + String errorReason = "Failed to create bucket."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } // Configure Service account credentials BigQuerySourceUtils.configureServiceAccount(configuration, config.getConnection()); @@ -166,10 +188,17 @@ public void prepareRun(BatchSourceContext context) throws Exception { // Configure BigQuery input format. String temporaryGcsPath = BigQuerySourceUtils.getTemporaryGcsPath(bucket, bucketPath, bucketPath); - BigQuerySourceUtils.configureBigQueryInput(configuration, - DatasetId.of(config.getDatasetProject(), config.getDataset()), - config.getTable(), - temporaryGcsPath); + try { + BigQuerySourceUtils.configureBigQueryInput(configuration, + DatasetId.of(config.getDatasetProject(), config.getDataset()), + config.getTable(), + temporaryGcsPath); + } catch (Exception e) { + String errorReason = "Failed to configure BigQuery input."; + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } // Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists. // We call emitLineage before since it creates the dataset with schema. @@ -178,6 +207,10 @@ public void prepareRun(BatchSourceContext context) throws Exception { .setFqn(BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), config.getTable())) .setLocation(dataset.getLocation()) .build(); + + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigQueryErrorDetailsProvider.class.getName())); + emitLineage(context, configuredSchema, sourceTableType, config.getTable(), asset); setInputFormat(context, configuredSchema); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java index 69804a6cae..c6bc3e04be 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java @@ -25,6 +25,9 @@ import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; @@ -93,10 +96,12 @@ public static String getOrCreateBucket(Configuration configuration, // Ignore this and move on, since all that matters is that the bucket exists. return bucket; } - throw new IOException(String.format("Unable to create Cloud Storage bucket '%s' in the same " + + String errorMessage = String.format("Unable to create Cloud Storage bucket '%s' in the same " + "location ('%s') as BigQuery dataset '%s'. " + "Please use a bucket " + "that is in the same location as the dataset.", - bucket, dataset.getLocation(), dataset.getDatasetId().getDataset()), e); + bucket, dataset.getLocation(), dataset.getDatasetId().getDataset()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.USER, true, e); } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java index 88e8267053..40004feb71 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java @@ -37,6 +37,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.GCPUtils; @@ -110,7 +113,8 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc try { bigQueryHelper = getBigQueryHelper(configuration); } catch (GeneralSecurityException gse) { - throw new IOException("Failed to create BigQuery client", gse); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + "Failed to create BigQuery client", gse.getMessage(), ErrorType.UNKNOWN, true, gse); } List> hadoopConfigurationProperties = new ArrayList<>( diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java index 983561f9f3..a9f4113b43 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java @@ -22,11 +22,13 @@ import com.google.common.base.Throwables; import io.cdap.cdap.api.exception.ErrorCategory; import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum; +import io.cdap.cdap.api.exception.ErrorType; import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import java.io.IOException; import java.util.List; /** @@ -41,7 +43,6 @@ public class GCPErrorDetailsProvider implements ErrorDetailsProvider { * @param e The Throwable to get the error information from. * @return A ProgramFailureException with the given error information, otherwise null. */ - @Override public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { List causalChain = Throwables.getCausalChain(e); for (Throwable t : causalChain) { @@ -52,6 +53,12 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err if (t instanceof HttpResponseException) { return getProgramFailureException((HttpResponseException) t, errorContext); } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext); + } + if (t instanceof IllegalStateException) { + return getProgramFailureException((IllegalStateException) t, errorContext); + } } return null; } @@ -63,8 +70,7 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err * @param e The HttpResponseException to get the error information from. * @return A ProgramFailureException with the given error information. */ - private ProgramFailureException getProgramFailureException(HttpResponseException e, - ErrorContext errorContext) { + private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) { Integer statusCode = e.getStatusCode(); ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(), @@ -93,6 +99,35 @@ private ProgramFailureException getProgramFailureException(HttpResponseException pair.getErrorType(), true, e); } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalArgumentException}. + * + * @param e The IllegalArgumentException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalStateException}. + * + * @param e The IllegalStateException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, true, e); + } + /** * Get the external documentation link for the client errors if available. * diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java index e7c1f82aee..6a152e74f6 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQueryRecordToJsonTest.java @@ -477,9 +477,9 @@ public void testJsonStringWithArrayAndNestedRecord() throws IOException { /** * Empty JSON string is not a valid JSON string and should throw an exception. - * @throws IOException + * @throws IllegalArgumentException */ - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testEmptyJsonString() throws IOException { Schema recordSchema = Schema.recordOf( "record",