diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java index 81ab379..781bd2f 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.sink.buffered; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.protobuf.Descriptors; @@ -26,8 +27,8 @@ protected ApiFuture append(String traceId, Rows rows) { try { return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset()); - } catch (IOException | Descriptors.DescriptorValidationException e) { - throw new RuntimeException(e); + } catch (Throwable t) { + return ApiFutures.immediateFailedFuture(t); } } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java index da801c9..a1ad7a0 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.sink.buffered; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.StreamWriter; @@ -39,6 +40,11 @@ protected ApiFuture append(String traceId, Rows rows) { logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); - return writer.append(prows, rows.getOffset()); + try { + return writer.append(prows, rows.getOffset()); + } catch (Throwable t) { + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + return ApiFutures.immediateFailedFuture(t); + } } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java index 436851a..47f3d73 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.sink.defaultStream; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.protobuf.Descriptors; @@ -36,9 +37,9 @@ protected ApiFuture append(String traceId, Rows rows) { try { return writer.append(rowArray); - } catch (IOException | Descriptors.DescriptorValidationException e) { - logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, e.getMessage()); - throw new RuntimeException(e); + } catch (Throwable t) { + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + return ApiFutures.immediateFailedFuture(t); } } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java index 89e44ca..9351637 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.sink.defaultStream; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.StreamWriter; @@ -44,6 +45,11 @@ protected ApiFuture append(String traceId, Rows rows) { } logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); - return writer.append(prows); + try { + return writer.append(prows); + } catch (Throwable t) { + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + return ApiFutures.immediateFailedFuture(t); + } } }