Skip to content

Commit

Permalink
Merge pull request #8 from vinted/fix/handle-exception
Browse files Browse the repository at this point in the history
fix: handle immediate append error
  • Loading branch information
gintarasm authored Nov 29, 2023
2 parents 4cbde99 + 4d92459 commit 014a8f1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.sink.buffered;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.protobuf.Descriptors;
Expand All @@ -26,8 +27,8 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {

try {
return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (Throwable t) {
return ApiFutures.immediateFailedFuture(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.sink.buffered;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
Expand Down Expand Up @@ -39,6 +40,11 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());

return writer.append(prows, rows.getOffset());
try {
return writer.append(prows, rows.getOffset());
} catch (Throwable t) {
logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage());
return ApiFutures.immediateFailedFuture(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.sink.defaultStream;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.protobuf.Descriptors;
Expand Down Expand Up @@ -36,9 +37,9 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {

try {
return writer.append(rowArray);
} catch (IOException | Descriptors.DescriptorValidationException e) {
logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, e.getMessage());
throw new RuntimeException(e);
} catch (Throwable t) {
logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage());
return ApiFutures.immediateFailedFuture(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.sink.defaultStream;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
Expand Down Expand Up @@ -44,6 +45,11 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
return writer.append(prows);
try {
return writer.append(prows);
} catch (Throwable t) {
logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage());
return ApiFutures.immediateFailedFuture(t);
}
}
}

0 comments on commit 014a8f1

Please sign in to comment.