diff --git a/src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java b/src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java index c210a14..724710e 100644 --- a/src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java +++ b/src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java @@ -13,9 +13,9 @@ import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSink; import org.apache.flink.connector.base.DeliveryGuarantee; -public class BigQueryStreamSink { +public class BigQueryStreamSink { private RowValueSerializer rowValueSerializer = new NoOpRowSerializer<>(); - private ClientProvider clientProvider = null; + private ClientProvider clientProvider = null; private ExecutorProvider executorProvider = MoreExecutors::directExecutor; @@ -23,30 +23,26 @@ public class BigQueryStreamSink { private BigQueryStreamSink() { } - public static BigQueryStreamSink newProto() { + public static BigQueryStreamSink newBuilder() { return new BigQueryStreamSink<>(); } - public static BigQueryStreamSink newJson() { - return new BigQueryStreamSink<>(); - } - - public BigQueryStreamSink withRowValueSerializer(RowValueSerializer serializer) { + public BigQueryStreamSink withRowValueSerializer(RowValueSerializer serializer) { this.rowValueSerializer = serializer; return this; } - public BigQueryStreamSink withClientProvider(ClientProvider clientProvider) { + public BigQueryStreamSink withClientProvider(ClientProvider clientProvider) { this.clientProvider = clientProvider; return this; } - public BigQueryStreamSink withExecutorProvider(ExecutorProvider executorProvider) { + public BigQueryStreamSink withExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = executorProvider; return this; } - public BigQueryStreamSink withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + public BigQueryStreamSink withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { this.deliveryGuarantee = deliveryGuarantee; return this; } diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java index dbe6b0f..0590b7a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java @@ -11,12 +11,13 @@ import com.vinted.flink.bigquery.model.config.Credentials; import com.vinted.flink.bigquery.model.config.WriterSettings; import com.vinted.flink.bigquery.schema.SchemaTransformer; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; import java.io.IOException; import java.util.Optional; import java.util.concurrent.Executors; -public class BigQueryJsonClientProvider implements ClientProvider { +public class BigQueryJsonClientProvider implements ClientProvider { private Credentials credentials; private WriterSettings writerSettings; @@ -42,16 +43,18 @@ public BigQueryWriteClient getClient() { } @Override - public JsonStreamWriter getWriter(String streamName, TableId table) { + public BigQueryStreamWriter getWriter(String streamName, TableId table, RowValueSerializer serializer) { try { var executorProvider = this.writerSettings.getWriterThreads() > 1 ? FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) : BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); - return JsonStreamWriter + var writer = JsonStreamWriter .newBuilder(streamName, getTableSchema(table), this.getClient()) .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) .setExecutorProvider(executorProvider) .build(); + + return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer); } catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index b3049cd..85b1759 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -8,13 +8,14 @@ import com.vinted.flink.bigquery.model.config.Credentials; import com.vinted.flink.bigquery.model.config.WriterSettings; import com.vinted.flink.bigquery.schema.SchemaTransformer; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; import java.io.IOException; import java.util.concurrent.Executors; -public class BigQueryProtoClientProvider implements ClientProvider { - private Credentials credentials; - private WriterSettings writerSettings; +public class BigQueryProtoClientProvider implements ClientProvider { + private final Credentials credentials; + private final WriterSettings writerSettings; private transient BigQueryWriteClient bigQueryWriteClient; @@ -37,7 +38,7 @@ public BigQueryWriteClient getClient() { } @Override - public StreamWriter getWriter(String streamName, TableId table) { + public BigQueryStreamWriter getWriter(String streamName, TableId table, RowValueSerializer serializer) { try { var descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(getTableSchema(table)); var protoSchema = ProtoSchemaConverter.convert(descriptor); @@ -53,7 +54,8 @@ public StreamWriter getWriter(String streamName, TableId table) { .setExecutorProvider(executorProvider) .setLocation(table.getProject()) .setWriterSchema(protoSchema); - return streamWriterBuilder.build(); + + return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build()); } catch (IOException | Descriptors.DescriptorValidationException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java new file mode 100644 index 0000000..9485adb --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java @@ -0,0 +1,15 @@ +package com.vinted.flink.bigquery.client; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.vinted.flink.bigquery.model.Rows; + +public interface BigQueryStreamWriter extends AutoCloseable { + ApiFuture append(Rows data); + ApiFuture append(Rows data, long offset); + + String getStreamName(); + + String getWriterId(); + boolean isClosed(); +} diff --git a/src/main/java/com/vinted/flink/bigquery/client/ClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/ClientProvider.java index c922f63..edea1fd 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/ClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/ClientProvider.java @@ -3,13 +3,14 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.vinted.flink.bigquery.model.config.WriterSettings; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; import java.io.Serializable; public interface ClientProvider extends Serializable { BigQueryWriteClient getClient(); - A getWriter(String streamName, TableId table); + BigQueryStreamWriter getWriter(String streamName, TableId table, RowValueSerializer serializer); WriterSettings writeSettings(); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java new file mode 100644 index 0000000..c26983b --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java @@ -0,0 +1,64 @@ +package com.vinted.flink.bigquery.client; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.protobuf.Descriptors; +import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import org.json.JSONArray; +import org.json.JSONObject; + +import java.io.IOException; + +public class JsonStreamWriter implements BigQueryStreamWriter{ + private final RowValueSerializer rowSerializer; + + private final com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer; + + public JsonStreamWriter(RowValueSerializer rowSerializer, com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer) { + this.rowSerializer = rowSerializer; + this.writer = writer; + } + + @Override + public ApiFuture append(Rows data) { + var rowArray = new JSONArray(); + data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); + try { + return writer.append(rowArray); + } catch (IOException | Descriptors.DescriptorValidationException e) { + throw new RuntimeException(e); + } + } + + @Override + public ApiFuture append(Rows data, long offset) { + var rowArray = new JSONArray(); + data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); + try { + return writer.append(rowArray, offset); + } catch (IOException | Descriptors.DescriptorValidationException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getStreamName() { + return writer.getStreamName(); + } + + @Override + public String getWriterId() { + return writer.getWriterId(); + } + + @Override + public boolean isClosed() { + return writer.isClosed() || writer.isUserClosed(); + } + + @Override + public void close() throws Exception { + writer.close(); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java new file mode 100644 index 0000000..f651476 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java @@ -0,0 +1,60 @@ +package com.vinted.flink.bigquery.client; + +import com.google.api.core.ApiFuture; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.protobuf.ByteString; +import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; + +import java.util.stream.Collectors; + +public class ProtoStreamWriter implements BigQueryStreamWriter{ + private final RowValueSerializer rowSerializer; + + private final StreamWriter writer; + + public ProtoStreamWriter(RowValueSerializer rowSerializer, StreamWriter writer) { + this.rowSerializer = rowSerializer; + this.writer = writer; + } + + @Override + public ApiFuture append(Rows data) { + var prows = ProtoRows + .newBuilder() + .addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList())) + .build(); + return writer.append(prows); + } + + @Override + public ApiFuture append(Rows data, long offset) { + var prows = ProtoRows + .newBuilder() + .addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList())) + .build(); + return writer.append(prows, offset); + } + + @Override + public String getStreamName() { + return writer.getStreamName(); + } + + @Override + public String getWriterId() { + return writer.getWriterId(); + } + + @Override + public boolean isClosed() { + return writer.isClosed() || writer.isUserClosed(); + } + + @Override + public void close() throws Exception { + writer.close(); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index 110c0a9..35bdc17 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -1,10 +1,12 @@ package com.vinted.flink.bigquery.sink; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.common.collect.Iterators; +import com.vinted.flink.bigquery.client.BigQueryStreamWriter; import com.vinted.flink.bigquery.client.ClientProvider; import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; import com.vinted.flink.bigquery.model.Rows; @@ -28,13 +30,13 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -public abstract class BigQuerySinkWriter implements SinkWriter> { +public abstract class BigQuerySinkWriter implements SinkWriter> { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Iterator streamIndexIterator; private final SinkWriterMetricGroup metricGroup; - protected ClientProvider clientProvider; - protected transient Map streamMap = new ConcurrentHashMap<>(); + protected ClientProvider clientProvider; + protected transient Map> streamMap = new ConcurrentHashMap<>(); protected Sink.InitContext sinkInitContext; protected RowValueSerializer rowSerializer; @@ -44,12 +46,12 @@ public abstract class BigQuerySinkWriter imple protected Counter numRecordsOutCounter; protected transient Map metrics = new HashMap<>(); - protected abstract ApiFuture append(String traceId, Rows rows); + protected abstract AppendResult append(String traceId, Rows rows); public BigQuerySinkWriter( Sink.InitContext sinkInitContext, RowValueSerializer rowSerializer, - ClientProvider clientProvider, + ClientProvider clientProvider, ExecutorProvider executorProvider) { this.sinkInitContext = sinkInitContext; @@ -67,28 +69,28 @@ public BigQuerySinkWriter( } - protected final StreamT streamWriter(String traceId, String streamName, TableId table) { + protected final BigQueryStreamWriter streamWriter(String traceId, String streamName, TableId table) { var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next()); return streamMap.computeIfAbsent(streamWithIndex, name -> { logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex); // Stream name can't contain index - return this.clientProvider.getWriter(streamName, table); + return this.clientProvider.getWriter(streamName, table, rowSerializer); }); } - protected final void recreateAllStreamWriters(String traceId, String streamName, TableId table) { + protected final void recreateStreamWriter(String traceId, String streamName, String writerId, TableId table) { logger.info("Trace-id {} Closing all writers for {}", traceId, streamName); try { flush(true); streamMap.replaceAll((key, writer) -> { var newWriter = writer; - if (key.startsWith(streamName)) { + if (writer.getWriterId().equals(writerId)) { try { writer.close(); } catch (Exception e) { logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); } - newWriter = this.clientProvider.getWriter(streamName, table); + newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer); } return newWriter; }); @@ -136,4 +138,17 @@ protected String createLogMessage(String title, String errorTraceId, Status stat ); } + public static class AppendResult { + public final ApiFuture response; + public final String writerId; + + public AppendResult(ApiFuture response, String writerId) { + this.response = response; + this.writerId = writerId; + } + + public static AppendResult failure(Throwable t, String writerId) { + return new AppendResult(ApiFutures.immediateFailedFuture(t), writerId); + } + } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSink.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSink.java index 411ca9f..5c68f06 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSink.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSink.java @@ -14,13 +14,13 @@ import java.io.IOException; -public class BigQueryBufferedSink implements TwoPhaseCommittingSink, BigQueryCommittable> { +public class BigQueryBufferedSink implements TwoPhaseCommittingSink, BigQueryCommittable> { private final RowValueSerializer rowValueSerializer; - private final ClientProvider clientProvider; + private final ClientProvider clientProvider; private final ExecutorProvider executorProvider; - public BigQueryBufferedSink(RowValueSerializer rowValueSerializer, ClientProvider clientProvider, ExecutorProvider executorProvider) { + public BigQueryBufferedSink(RowValueSerializer rowValueSerializer, ClientProvider clientProvider, ExecutorProvider executorProvider) { this.rowValueSerializer = rowValueSerializer; this.clientProvider = clientProvider; this.executorProvider = executorProvider; @@ -28,13 +28,7 @@ public BigQueryBufferedSink(RowValueSerializer rowValueSerializer, ClientProv @Override public PrecommittingSinkWriter, BigQueryCommittable> createWriter(InitContext context) throws IOException { - if (rowValueSerializer instanceof JsonRowValueSerializer) { - return new BigQueryJsonBufferedSinkWriter<>(context, rowValueSerializer, (ClientProvider) clientProvider, executorProvider); - } else if (rowValueSerializer instanceof ProtoValueSerializer) { - return new BigQueryProtoBufferedSinkWriter<>(context, rowValueSerializer, (ClientProvider) clientProvider, executorProvider); - } else { - throw new RuntimeException("Not supported serializer"); - } + return new BigQueryBufferedSinkWriter<>(context, rowValueSerializer, clientProvider, executorProvider); } @Override diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java index c233837..89aafd1 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java @@ -5,6 +5,8 @@ import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.protobuf.ByteString; import io.grpc.Status; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; @@ -23,9 +25,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.stream.Collectors; -public abstract class BigQueryBufferedSinkWriter - extends BigQuerySinkWriter +public class BigQueryBufferedSinkWriter + extends BigQuerySinkWriter implements TwoPhaseCommittingSink.PrecommittingSinkWriter, BigQueryCommittable> { private static final Logger logger = LoggerFactory.getLogger(BigQueryBufferedSinkWriter.class); private Map streamOffsets = new ConcurrentHashMap<>(); @@ -33,12 +36,37 @@ public abstract class BigQueryBufferedSinkWriter rowSerializer, - ClientProvider clientProvider, + ClientProvider clientProvider, ExecutorProvider executorProvider) { super(sinkInitContext, rowSerializer, clientProvider, executorProvider); } - protected abstract ApiFuture append(String traceId, Rows rows); + @Override + protected AppendResult append(String traceId, Rows rows) { + var prows = ProtoRows + .newBuilder() + .addAllSerializedRows(rows.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList())) + .build(); + var size = prows.getSerializedSize(); + numBytesOutCounter.inc(size); + Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); + var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + + if (writer.isClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); + recreateStreamWriter(traceId, rows.getStream(), writer.getWriterId(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } + + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); + + try { + return new AppendResult(writer.append(rows, rows.getOffset()), writer.getWriterId()); + } catch (Throwable t) { + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + return AppendResult.failure(t, writer.getWriterId()); + } + } @Override protected void writeWithRetry(String traceId, Rows rows, int retryCount) throws Throwable { @@ -47,9 +75,9 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro "Trace-id: {} Appending rows \nstream: {}\ntable: {}\noffset: {}\nsize: {}\nretries: {}", traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount ); - var response = append(traceId, rows); + var result = append(traceId, rows); var callback = new AppendCallBack<>(this, rows, retryCount, traceId); - ApiFutures.addCallback(response, callback, appendExecutor); + ApiFutures.addCallback(result.response, callback, appendExecutor); try { callback.future.get(); } catch (ExecutionException e) { @@ -191,14 +219,14 @@ public void flush(boolean endOfInput) { } static class AppendCallBack implements ApiFutureCallback { - private final BigQueryBufferedSinkWriter parent; + private final BigQueryBufferedSinkWriter parent; private final Rows rows; private final String traceId; private final int retryCount; private final CompletableFuture future = new CompletableFuture<>(); - public AppendCallBack(BigQueryBufferedSinkWriter parent, Rows rows, int retryCount, String traceId) { + public AppendCallBack(BigQueryBufferedSinkWriter parent, Rows rows, int retryCount, String traceId) { this.parent = parent; this.rows = rows; this.traceId = traceId; diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java deleted file mode 100644 index 929d384..0000000 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.vinted.flink.bigquery.sink.buffered; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.protobuf.Descriptors; -import org.apache.flink.api.connector.sink2.Sink; -import com.vinted.flink.bigquery.client.ClientProvider; -import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.serializer.RowValueSerializer; -import com.vinted.flink.bigquery.sink.ExecutorProvider; -import org.json.JSONArray; -import org.json.JSONObject; - -import java.io.IOException; - -public class BigQueryJsonBufferedSinkWriter extends BigQueryBufferedSinkWriter { - public BigQueryJsonBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValueSerializer rowSerializer, ClientProvider clientProvider, ExecutorProvider executorProvider) { - super(sinkInitContext, rowSerializer, clientProvider, executorProvider); - } - - @Override - protected ApiFuture append(String traceId, Rows rows) { - var rowArray = new JSONArray(); - rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); - var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - - if (writer.isClosed() || writer.isUserClosed()) { - recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); - writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - } - - try { - return writer.append(rowArray, rows.getOffset()); - } catch (Throwable t) { - return ApiFutures.immediateFailedFuture(t); - } - } -} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java deleted file mode 100644 index db41a64..0000000 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.vinted.flink.bigquery.sink.buffered; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.ProtoRows; -import com.google.cloud.bigquery.storage.v1.StreamWriter; -import com.google.protobuf.ByteString; -import org.apache.flink.api.connector.sink2.Sink; -import com.vinted.flink.bigquery.client.ClientProvider; -import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.serializer.RowValueSerializer; -import com.vinted.flink.bigquery.sink.ExecutorProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.stream.Collectors; - -public class BigQueryProtoBufferedSinkWriter extends BigQueryBufferedSinkWriter { - private static final Logger logger = LoggerFactory.getLogger(BigQueryProtoBufferedSinkWriter.class); - public BigQueryProtoBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValueSerializer rowSerializer, ClientProvider clientProvider, ExecutorProvider executorProvider) { - super(sinkInitContext, rowSerializer, clientProvider, executorProvider); - } - - @Override - protected ApiFuture append(String traceId, Rows rows) { - var prows = ProtoRows - .newBuilder() - .addAllSerializedRows(rows.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList())) - .build(); - var size = prows.getSerializedSize(); - numBytesOutCounter.inc(size); - Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); - var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - - if (writer.isClosed() || writer.isUserClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); - recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); - writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - } - - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); - - try { - return writer.append(prows, rows.getOffset()); - } catch (Throwable t) { - logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); - return ApiFutures.immediateFailedFuture(t); - } - } -} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQuerySinkCommitter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQuerySinkCommitter.java index 6edd310..736c9eb 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQuerySinkCommitter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQuerySinkCommitter.java @@ -13,9 +13,9 @@ public class BigQuerySinkCommitter implements Committer { private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkCommitter.class); - private ClientProvider clientProvider; + private ClientProvider clientProvider; - public BigQuerySinkCommitter(ClientProvider clientProvider) { + public BigQuerySinkCommitter(ClientProvider clientProvider) { this.clientProvider = clientProvider; } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java deleted file mode 100644 index e011d51..0000000 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.vinted.flink.bigquery.sink.defaultStream; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.protobuf.Descriptors; -import org.apache.flink.api.connector.sink2.Sink; -import com.vinted.flink.bigquery.client.ClientProvider; -import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.serializer.RowValueSerializer; -import com.vinted.flink.bigquery.sink.ExecutorProvider; -import org.json.JSONArray; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class BigQueryDefaultJsonSinkWriter extends BigQueryDefaultSinkWriter { - private static final Logger logger = LoggerFactory.getLogger(BigQueryDefaultJsonSinkWriter.class); - public BigQueryDefaultJsonSinkWriter(Sink.InitContext sinkInitContext, RowValueSerializer rowSerializer, ClientProvider clientProvider, ExecutorProvider executorProvider) { - super(sinkInitContext, rowSerializer, clientProvider, executorProvider); - } - - @Override - protected ApiFuture append(String traceId, Rows rows) { - var rowArray = new JSONArray(); - rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); - var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - - if (writer.isClosed() || writer.isUserClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); - recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); - writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - } - - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); - - try { - return writer.append(rowArray); - } catch (Throwable t) { - logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); - return ApiFutures.immediateFailedFuture(t); - } - } -} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java deleted file mode 100644 index f124e7f..0000000 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.vinted.flink.bigquery.sink.defaultStream; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.ProtoRows; -import com.google.cloud.bigquery.storage.v1.StreamWriter; -import com.google.protobuf.ByteString; -import org.apache.flink.api.connector.sink2.Sink; -import com.vinted.flink.bigquery.client.ClientProvider; -import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.serializer.RowValueSerializer; -import com.vinted.flink.bigquery.sink.ExecutorProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.stream.Collectors; - -public class BigQueryDefaultProtoSinkWriter extends BigQueryDefaultSinkWriter { - private static final Logger logger = LoggerFactory.getLogger(BigQueryDefaultProtoSinkWriter.class); - - public BigQueryDefaultProtoSinkWriter( - Sink.InitContext sinkInitContext, - RowValueSerializer rowSerializer, - ClientProvider clientProvider, - ExecutorProvider executorProvider) { - super(sinkInitContext, rowSerializer, clientProvider, executorProvider); - } - - @Override - protected ApiFuture append(String traceId, Rows rows) { - var prows = ProtoRows - .newBuilder() - .addAllSerializedRows(rows.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList())) - .build(); - var size = prows.getSerializedSize(); - numBytesOutCounter.inc(size); - numRecordsOutCounter.inc(rows.getData().size()); - Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); - - var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - - if (writer.isClosed() || writer.isUserClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); - recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); - writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - } - - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); - try { - return writer.append(prows); - } catch (Throwable t) { - logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); - return ApiFutures.immediateFailedFuture(t); - } - } -} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSink.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSink.java index 85baddf..3a3fa57 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSink.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSink.java @@ -13,14 +13,14 @@ import java.io.IOException; -public class BigQueryDefaultSink implements Sink> { +public class BigQueryDefaultSink implements Sink> { private final RowValueSerializer rowValueSerializer; - private final ClientProvider clientProvider; + private final ClientProvider clientProvider; private final ExecutorProvider executorProvider; public BigQueryDefaultSink( RowValueSerializer rowValueSerializer, - ClientProvider clientProvider, + ClientProvider clientProvider, ExecutorProvider executorProvider) { this.rowValueSerializer = rowValueSerializer; this.clientProvider = clientProvider; @@ -29,13 +29,7 @@ public BigQueryDefaultSink( @Override public SinkWriter> createWriter(InitContext context) throws IOException { - if (rowValueSerializer instanceof JsonRowValueSerializer) { - return new BigQueryDefaultJsonSinkWriter(context, rowValueSerializer, (ClientProvider) clientProvider, executorProvider); - } else if (rowValueSerializer instanceof ProtoValueSerializer) { - return new BigQueryDefaultProtoSinkWriter(context, rowValueSerializer, (ClientProvider) clientProvider, executorProvider); - } else { - throw new RuntimeException("Not supported serializer"); - } + return new BigQueryDefaultSinkWriter(context, rowValueSerializer, clientProvider, executorProvider); } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index b1a3d51..a7de3a0 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -1,5 +1,6 @@ package com.vinted.flink.bigquery.sink.defaultStream; +import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; @@ -19,8 +20,8 @@ import java.util.Optional; import java.util.concurrent.Phaser; -public abstract class BigQueryDefaultSinkWriter - extends BigQuerySinkWriter { +public class BigQueryDefaultSinkWriter + extends BigQuerySinkWriter { private static final Logger logger = LoggerFactory.getLogger(BigQueryDefaultSinkWriter.class); private final Phaser inflightRequestCount = new Phaser(1); @@ -29,7 +30,7 @@ public abstract class BigQueryDefaultSinkWriter rowSerializer, - ClientProvider clientProvider, + ClientProvider clientProvider, ExecutorProvider executorProvider) { super(sinkInitContext, rowSerializer, clientProvider, executorProvider); @@ -56,6 +57,28 @@ private void checkAsyncException() { } } + protected AppendResult append(String traceId, Rows rows) { + var size = 0L; + numRecordsOutCounter.inc(rows.getData().size()); + Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); + + var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + + if (writer.isClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); + recreateStreamWriter(traceId, rows.getStream(), writer.getWriterId(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } + + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); + try { + return new AppendResult(writer.append(rows), writer.getWriterId()); + } catch (Throwable t) { + logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); + return AppendResult.failure(t, writer.getWriterId()); + } + } + @Override protected void writeWithRetry(String traceId, Rows rows, int retryCount) throws Throwable { try { @@ -64,9 +87,9 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro "Trace-id: {} Appending rows \nstream: {}\ntable: {}\noffset: {}\nsize: {}\nretries: {}", traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount ); - var response = append(traceId, rows); - var callback = new AppendCallBack<>(this, traceId, rows, retryCount); - ApiFutures.addCallback(response, callback, appendExecutor); + var result = append(traceId, rows); + var callback = new AppendCallBack<>(this, result.writerId, traceId, rows, retryCount); + ApiFutures.addCallback(result.response, callback, appendExecutor); inflightRequestCount.register(); } catch (AppendException exception) { var error = exception.getError(); @@ -104,14 +127,17 @@ public void flush(boolean endOfInput) { } static class AppendCallBack implements ApiFutureCallback { - private final BigQueryDefaultSinkWriter parent; + private final BigQueryDefaultSinkWriter parent; private final Rows rows; + + private final String writerId; private final String traceId; private final int retryCount; - public AppendCallBack(BigQueryDefaultSinkWriter parent, String traceId, Rows rows, int retryCount) { + public AppendCallBack(BigQueryDefaultSinkWriter parent, String writerId, String traceId, Rows rows, int retryCount) { this.parent = parent; + this.writerId = writerId; this.traceId = traceId; this.rows = rows; this.retryCount = retryCount; @@ -140,7 +166,7 @@ public void onFailure(Throwable t) { break; case ABORTED: case UNAVAILABLE: { - this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable()); retryWrite(t, retryCount - 1); break; } @@ -165,7 +191,7 @@ public void onFailure(Throwable t) { case UNKNOWN: if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); - this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable()); retryWrite(t, retryCount - 1); } else { logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java index 1c1986c..d69c14e 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java @@ -99,8 +99,8 @@ private Function>> pipeline( return env -> env.fromCollection(data); } - private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { - var sink = BigQueryStreamSink.newJson() + private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { + var sink = BigQueryStreamSink.newBuilder() .withClientProvider(mockClientProvider) .withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .withRowValueSerializer((JsonRowValueSerializer) String::getBytes) diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index fba9609..982a449 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -4,6 +4,7 @@ import com.google.cloud.bigquery.storage.v1.Exceptions; import com.vinted.flink.bigquery.model.Rows; import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; import com.vinted.flink.bigquery.util.FlinkTest; import com.vinted.flink.bigquery.util.MockJsonClientProvider; import io.grpc.Status; @@ -128,7 +129,7 @@ public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTe } @Test - public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { mockClientProvider.givenStreamIsFinalized(stream); assertThatThrownBy(() -> { @@ -186,11 +187,11 @@ private Function>> pipeline( return env -> env.fromCollection(data); } - private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { - var sink = BigQueryStreamSink.newJson() + private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { + var sink = BigQueryStreamSink.newBuilder() .withClientProvider(mockClientProvider) .withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .withRowValueSerializer((JsonRowValueSerializer) String::getBytes) + .withRowValueSerializer((RowValueSerializer) String::getBytes) .build(); return pipeline.andThen(s -> s.sinkTo(sink)); diff --git a/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java b/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java index 077270a..a262014 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java +++ b/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java @@ -4,8 +4,11 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.*; import com.google.protobuf.Descriptors; +import com.vinted.flink.bigquery.client.BigQueryJsonClientProvider; +import com.vinted.flink.bigquery.client.BigQueryStreamWriter; import com.vinted.flink.bigquery.client.ClientProvider; import com.vinted.flink.bigquery.model.config.WriterSettings; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; import io.grpc.Status; import io.grpc.StatusException; import org.mockito.Mockito; @@ -13,9 +16,10 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -public class MockJsonClientProvider implements ClientProvider, Serializable { +public class MockJsonClientProvider implements ClientProvider, Serializable { private static BigQueryWriteClient mockClient = Mockito.mock(BigQueryWriteClient.class); private static JsonStreamWriter writer = Mockito.mock(JsonStreamWriter.class); @@ -160,9 +164,10 @@ public BigQueryWriteClient getClient() { } @Override - public JsonStreamWriter getWriter(String streamName, TableId table) { + public BigQueryStreamWriter getWriter(String streamName, TableId table, RowValueSerializer serializer) { numOfCreatedWriters.incrementAndGet(); - return MockJsonClientProvider.writer; + Mockito.when(MockJsonClientProvider.writer.getWriterId()).thenReturn(UUID.randomUUID().toString()); + return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, MockJsonClientProvider.writer); } diff --git a/src/test/java/com/vinted/flink/bigquery/util/MockProtoClientProvider.java b/src/test/java/com/vinted/flink/bigquery/util/MockProtoClientProvider.java index 36eb555..4763334 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/MockProtoClientProvider.java +++ b/src/test/java/com/vinted/flink/bigquery/util/MockProtoClientProvider.java @@ -4,8 +4,11 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.*; import com.google.protobuf.Descriptors; +import com.vinted.flink.bigquery.client.BigQueryStreamWriter; import com.vinted.flink.bigquery.client.ClientProvider; +import com.vinted.flink.bigquery.client.ProtoStreamWriter; import com.vinted.flink.bigquery.model.config.WriterSettings; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; import io.grpc.Status; import io.grpc.StatusException; import org.mockito.Mockito; @@ -13,6 +16,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.UUID; public class MockProtoClientProvider implements ClientProvider, Serializable { private static BigQueryWriteClient mockClient = Mockito.mock(BigQueryWriteClient.class); @@ -149,11 +153,11 @@ public BigQueryWriteClient getClient() { } @Override - public StreamWriter getWriter(String streamName, TableId table) { - return MockProtoClientProvider.protoWriter; + public BigQueryStreamWriter getWriter(String streamName, TableId table, RowValueSerializer serializer) { + Mockito.when(MockProtoClientProvider.protoWriter.getWriterId()).thenReturn(UUID.randomUUID().toString()); + return new ProtoStreamWriter<>(serializer, MockProtoClientProvider.protoWriter); } - @Override public WriterSettings writeSettings() { return WriterSettings.newBuilder().build();