From 670b2865d17044c5150ba847bc866a0e3f154ad1 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 8 Feb 2024 14:28:04 +0200 Subject: [PATCH 1/2] feat: handle when writer is closed --- .../sink/defaultStream/BigQueryDefaultJsonSinkWriter.java | 8 ++++++++ .../defaultStream/BigQueryDefaultProtoSinkWriter.java | 6 ++++++ .../sink/defaultStream/BigQueryDefaultSinkWriter.java | 2 +- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java index fb3b408..e011d51 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java @@ -2,6 +2,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.protobuf.Descriptors; @@ -28,6 +29,13 @@ protected ApiFuture append(String traceId, Rows rows) { var rowArray = new JSONArray(); rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + + if (writer.isClosed() || writer.isUserClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); + recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try { diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java index aa156ed..f124e7f 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java @@ -41,6 +41,12 @@ protected ApiFuture append(String traceId, Rows rows) { var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + if (writer.isClosed() || writer.isUserClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); + recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try { return writer.append(prows); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index 5f53a3f..45899b3 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -130,13 +130,13 @@ public void onFailure(Throwable t) { var status = Status.fromThrowable(t); switch (status.getCode()) { case INTERNAL: - case ABORTED: case CANCELLED: case FAILED_PRECONDITION: case DEADLINE_EXCEEDED: doPauseBeforeRetry(); retryWrite(t, retryCount - 1); break; + case ABORTED: case UNAVAILABLE: { this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); retryWrite(t, retryCount - 1); From 3443e8d37892b24898d9f39e12ba82167fdd8985 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 8 Feb 2024 15:14:51 +0200 Subject: [PATCH 2/2] feat: add closed stream handling for buffered stream --- .../sink/buffered/BigQueryBufferedSinkWriter.java | 9 ++++++++- .../sink/buffered/BigQueryJsonBufferedSinkWriter.java | 8 +++++++- .../sink/buffered/BigQueryProtoBufferedSinkWriter.java | 6 ++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java index 1a7badd..c233837 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java @@ -65,7 +65,6 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro switch (status.getCode()) { case INTERNAL: case CANCELLED: - case ABORTED: { logger.warn(createLogMessage.apply("Recoverable error. Retrying.., "), error); try { Thread.sleep(clientProvider.writeSettings().getRetryPause().toMillis()); @@ -73,6 +72,14 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro throw new RuntimeException(e); } + if (retryCount > 0) { + writeWithRetry(errorTraceId, errorRows, retryCount - 1); + } else { + throw error; + } + break; + case UNAVAILABLE: + case ABORTED: { if (retryCount > 0) { writeWithRetry(errorTraceId, errorRows, retryCount - 1); } else { diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java index 781bd2f..929d384 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java @@ -24,9 +24,15 @@ public BigQueryJsonBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValue protected ApiFuture append(String traceId, Rows rows) { var rowArray = new JSONArray(); rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); + var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + + if (writer.isClosed() || writer.isUserClosed()) { + recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } try { - return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset()); + return writer.append(rowArray, rows.getOffset()); } catch (Throwable t) { return ApiFutures.immediateFailedFuture(t); } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java index a45ec4b..db41a64 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java @@ -34,6 +34,12 @@ protected ApiFuture append(String traceId, Rows rows) { Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + if (writer.isClosed() || writer.isUserClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); + recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try {