Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1818] ErrorDetailsProvider - BigQuery Source/Sink plugin #1458

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

psainics
Copy link
Contributor

@psainics psainics commented Nov 6, 2024

ErrorDetailsProvider - BigQuery Source/Sink plugin

Jira : PLUGIN-1818

Description

Adding error details provider on bigquery.

CDAP Error logs
  • Test Case (Input invalid Json)
2024-11-13 11:19:00,929 - ERROR [Executor task launch worker for task 0.0 in stage 0.0 (TID 0):o.a.s.u.Utils@98] - Aborting task
io.cdap.cdap.api.exception.WrappedStageException: Stage 'BigQuery' encountered : io.cdap.cdap.api.exception.ProgramFailureException: Error occurred in the phase: 'Writing'. Error message: Expected value of Field 'raw' to be a valid JSON object or array.
  at io.cdap.cdap.etl.common.ErrorDetails.handleException(ErrorDetails.java:77)
  at io.cdap.cdap.etl.spark.io.StageTrackingRecordWriter.write(StageTrackingRecordWriter.java:57)
  at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:368)
  at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:138)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
  at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
  at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
Caused by: io.cdap.cdap.api.exception.ProgramFailureException: Error occurred in the phase: 'Writing'. Error message: Expected value of Field 'raw' to be a valid JSON object or array.
  at io.cdap.cdap.api.exception.ProgramFailureException$Builder.build(ProgramFailureException.java:186)
  at io.cdap.cdap.api.exception.ErrorUtils.getProgramFailureException(ErrorUtils.java:161)
  at io.cdap.plugin.gcp.common.GCPErrorDetailsProvider.getProgramFailureException(GCPErrorDetailsProvider.java:133)
  at io.cdap.plugin.gcp.common.GCPErrorDetailsProvider.getExceptionDetails(GCPErrorDetailsProvider.java:60)
  at io.cdap.cdap.etl.common.ErrorDetails.handleException(ErrorDetails.java:75)
  ... 14 common frames omitted
Caused by: java.lang.IllegalStateException: Expected value of Field 'raw' to be a valid JSON object or array.
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson.writeSimpleTypes(BigQueryRecordToJson.java:192)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson.write(BigQueryRecordToJson.java:96)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordToJson.write(BigQueryRecordToJson.java:71)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryJsonConverter.transform(BigQueryJsonConverter.java:51)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryJsonConverter.transform(BigQueryJsonConverter.java:32)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordWriter.write(BigQueryRecordWriter.java:62)
  at io.cdap.plugin.gcp.bigquery.sink.BigQueryRecordWriter.write(BigQueryRecordWriter.java:33)
  at io.cdap.cdap.etl.spark.io.TrackingRecordWriter.write(TrackingRecordWriter.java:41)
  at io.cdap.cdap.etl.spark.io.StageTrackingRecordWriter.write(StageTrackingRecordWriter.java:55)
  ... 13 common frames omitted
  Suppressed: java.io.IOException: Incomplete document
  	at com.google.gson.internal.bind.JsonTreeWriter.close(JsonTreeWriter.java:196)
  	at io.cdap.plugin.gcp.bigquery.sink.BigQueryJsonConverter.transform(BigQueryJsonConverter.java:56)
  	... 18 common frames omitted

Code change

  • Added BigQueryErrorDetailsProvider.java
  • Modified AbstractBigQuerySink.java
  • Modified BigQueryOutputFormat.java
  • Modified BigQueryRecordToJson.java
  • Modified BigQuerySinkUtils.java
  • Modified BigQueryAvroToStructuredTransformer.java
  • Modified BigQuerySource.java
  • Modified BigQuerySourceUtils.java
  • Modified PartitionedBigQueryInputFormat.java
  • Modified GCPErrorDetailsProvider.java

Unit Tests

  • Modified BigQueryRecordToJsonTest.java

@psainics psainics added the build Trigger unit test build label Nov 6, 2024
@psainics psainics changed the title TEST Error details provider - BigQuerySource Wrap known errors Nov 8, 2024
@psainics psainics changed the title Error details provider - BigQuerySource Wrap known errors [PLUGIN-21080] Error details provider - BigQuerySource Wrap known errors Nov 13, 2024
@psainics psainics changed the title [PLUGIN-21080] Error details provider - BigQuerySource Wrap known errors [PLUGIN-21080] ErrorDetailsProvider - BigQuery Source/Sink plugin Nov 13, 2024
@psainics psainics changed the title [PLUGIN-21080] ErrorDetailsProvider - BigQuery Source/Sink plugin [PLUGIN-1818] ErrorDetailsProvider - BigQuery Source/Sink plugin Nov 13, 2024
@psainics psainics self-assigned this Nov 13, 2024
@psainics psainics marked this pull request as ready for review November 13, 2024 06:57
* @param e The IOException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IOException e, ErrorContext errorContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is IOException is a user error? Even the error messages are not at all actionable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to convert only those exceptions to ProgramFailureException where we can get enough information from the exceptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In underlying places, if we are throwing IOException we should change it to throw ProgramFailureException rather than assuming IOException will always be user error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are IOException being thrown from BigQueryOutputFormat in code eg.

throw new IOException("The output path '" + outputPath + "' already exists.");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In underlying places, if we are throwing IOException we should change it to throw ProgramFailureException rather than assuming IOException will always be user error.

Ok got it, wll do these changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed underlying IOException to ProgramFailureException.

* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, how is IllegalStateException a user error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An IllegalStateException usually represents a programming or configuration error in the code. Error Type System might be better fit as it suggests something went wrong with the program's state or lifecycle that's under the application's control rather than being an external client issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In code there are places where we can reach IllegalStateException by incorrect user inputs, either we should change those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new IllegalStateException(String.format("Expected value of Field '%s' to be a valid JSON " +

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this above should be changed to IllegalArgumentException right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed IllegalStateException to SYSTEM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this above should be changed to IllegalArgumentException right ?

right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated !

@@ -64,7 +74,7 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
ErrorContext errorContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed !

errorMessage, numOfErrors, jobReference.getJobId());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessageException,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessageException), ErrorType.SYSTEM, true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think error type here should be UNKNOWN, how are we sure it is a system issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What metric can be used to better distinguish between system / unknown error type ?

Here the job failed on BigQuery "System" since the logs are pulled in from the BigQuery, so the reason will likely be known.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job can fail on BigQuery due to many reasons like permission issues as well right? So, we need to decide based on error codes & error reason cannot directly say that it is a system error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to type UNKNOWN

, elapsedTime);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think error type here should be UNKNOWN, how are we sure it is a system issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, UNKNOW is more appropriate here , updated !

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

@psainics
Copy link
Contributor Author

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the e2e test failure might be caused by change in error message:
To verify the pipeline is getting failed from GCS Source to GCS Sink On Multiple File having different schemas without connection

image

@psainics psainics force-pushed the fem/big-query branch 2 times, most recently from 170fc3b to 9e79da9 Compare November 14, 2024 09:27
throw new IOException(errorMessage.get(), e);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Bucket already exists", errorMessage.get(),
ErrorType.UNKNOWN, true, e);
Copy link
Member

@itsankit-google itsankit-google Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can decide on error type here based on status code: see

ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);

Only 409 means bucket already exists for others it means the request failed to create bucket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated !

try {
  GCPUtils.createBucket(storage, bucket, location, cmekKeyName);
} catch (StorageException e) {
  if (e.getCode() != 409) {
    // A conflict means the bucket already exists
    // This most likely means multiple stages in the same pipeline are trying to create the same dataset.
    // Ignore this and move on, since all that matters is that the dataset exists.
    ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
    String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction());
    throw ErrorUtils.getProgramFailureException(
      new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
      pair.getErrorType(), true, e);
  }
}

throw new IOException(errorMessage.get(), e);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), "Dataset already exists", errorMessage.get(),
ErrorType.UNKNOWN, true, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment here:

ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);

Only 409 means dataset already exists for others it means the request failed to create dataset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated !

try {
  bigQuery.create(builder.build());
} catch (BigQueryException e) {
  if (e.getCode() != 409) {
    // A conflict means the dataset already exists (https://cloud.google.com/bigquery/troubleshooting-errors)
    // This most likely means multiple stages in the same pipeline are trying to create the same dataset.
    // Ignore this and move on, since all that matters is that the dataset exists.
    ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
    String errorReason = String.format("%s %s %s", e.getCode(), e.getMessage(), pair.getCorrectiveAction());
    throw ErrorUtils.getProgramFailureException(
      new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
      pair.getErrorType(), true, e);
  }
}

*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String subCategory = "IllegalState";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't sound like an appropriate category. Categories do not contain lower level exception name rather high level generic exception types like access issues

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we can point out the exact subCategory, we can omit adding it rather than adding generic stuff in subcategories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, removed, also removed the String subCategory = "IllegalArgument"

collector.getOrThrowException();
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit remove extra empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed !

@itsankit-google
Copy link
Member

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Are we sure that IllegalArgumentException is thrown from methods called from OutputFormat methods? since ErrorDetailsProvider is currently used there only.

throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, ErrorPhase.COMMITTING, errorMessage), ErrorType.SYSTEM, true,
new IOException(errorMessage, e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to wrap actual exception in Ioexception here now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment in all places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed!
I have made changes in a new commit to help with the review, i will squash those before merging.

@psainics
Copy link
Contributor Author

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Are we sure that IllegalArgumentException is thrown from methods called from OutputFormat methods? since ErrorDetailsProvider is currently used there only.

The two places that mention IllegalArgumentException is thrown in description on the condition when out schema has more field than BQ Table, but the case is already handled by adding the error in a collector

    for (String field : remainingBQFields) {
      if (bqFields.get(field).getMode() != Field.Mode.NULLABLE) {
        collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
                             String.format("Add '%s' to the schema.", field));
      }

is there somewhere i missed it ?

@itsankit-google
Copy link
Member

There are multiple exceptions being thrown BigQuerySinkUtils why is that not covered in this PR?

Wrapped the Io Exception, IllegalArgumentException will be handled by the error details provider !

Are we sure that IllegalArgumentException is thrown from methods called from OutputFormat methods? since ErrorDetailsProvider is currently used there only.

The two places that mention IllegalArgumentException is thrown in description on the condition when out schema has more field than BQ Table, but the case is already handled by adding the error in a collector

    for (String field : remainingBQFields) {
      if (bqFields.get(field).getMode() != Field.Mode.NULLABLE) {
        collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
                             String.format("Add '%s' to the schema.", field));
      }

is there somewhere i missed it ?

sg.

String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, true, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dependency might not be true in this case since the exception is thrown from our code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, updated !

String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, true, e);
Copy link
Member

@itsankit-google itsankit-google Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly here dependency should not be true unless error is thrown from dependent services client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, updated !

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of minor comments. Otherwise , PR looks good.

Please check once all the e2e tests pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Trigger unit test build
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants