Skip to content

Commit

Permalink
refactor: abstract stream writers behind interface
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Feb 20, 2024
1 parent 8689b91 commit cc159ef
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 280 deletions.
18 changes: 7 additions & 11 deletions src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,36 @@
import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSink;
import org.apache.flink.connector.base.DeliveryGuarantee;

public class BigQueryStreamSink<A, StreamT> {
public class BigQueryStreamSink<A> {
private RowValueSerializer<A> rowValueSerializer = new NoOpRowSerializer<>();
private ClientProvider<StreamT> clientProvider = null;
private ClientProvider<A> clientProvider = null;

private ExecutorProvider executorProvider = MoreExecutors::directExecutor;

private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private BigQueryStreamSink() {
}

public static <A> BigQueryStreamSink<A, StreamWriter> newProto() {
public static <A> BigQueryStreamSink<A> newBuilder() {
return new BigQueryStreamSink<>();
}

public static <A> BigQueryStreamSink<A, JsonStreamWriter> newJson() {
return new BigQueryStreamSink<>();
}

public BigQueryStreamSink<A, StreamT> withRowValueSerializer(RowValueSerializer<A> serializer) {
public BigQueryStreamSink<A> withRowValueSerializer(RowValueSerializer<A> serializer) {
this.rowValueSerializer = serializer;
return this;
}

public BigQueryStreamSink<A, StreamT> withClientProvider(ClientProvider<StreamT> clientProvider) {
public BigQueryStreamSink<A> withClientProvider(ClientProvider<A> clientProvider) {
this.clientProvider = clientProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withExecutorProvider(ExecutorProvider executorProvider) {
public BigQueryStreamSink<A> withExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
public BigQueryStreamSink<A> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
this.deliveryGuarantee = deliveryGuarantee;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;

public class BigQueryJsonClientProvider implements ClientProvider<JsonStreamWriter> {
public class BigQueryJsonClientProvider<A> implements ClientProvider<A> {
private Credentials credentials;
private WriterSettings writerSettings;

Expand All @@ -42,16 +43,18 @@ public BigQueryWriteClient getClient() {
}

@Override
public JsonStreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
return JsonStreamWriter
var writer = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();

return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.concurrent.Executors;

public class BigQueryProtoClientProvider implements ClientProvider<StreamWriter> {
private Credentials credentials;
private WriterSettings writerSettings;
public class BigQueryProtoClientProvider<A> implements ClientProvider<A> {
private final Credentials credentials;
private final WriterSettings writerSettings;

private transient BigQueryWriteClient bigQueryWriteClient;

Expand All @@ -37,7 +38,7 @@ public BigQueryWriteClient getClient() {
}

@Override
public StreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(getTableSchema(table));
var protoSchema = ProtoSchemaConverter.convert(descriptor);
Expand All @@ -53,7 +54,8 @@ public StreamWriter getWriter(String streamName, TableId table) {
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);
return streamWriterBuilder.build();

return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.vinted.flink.bigquery.model.Rows;

public interface BigQueryStreamWriter<T> extends AutoCloseable {
ApiFuture<AppendRowsResponse> append(Rows<T> data);
ApiFuture<AppendRowsResponse> append(Rows<T> data, long offset);

String getStreamName();

String getWriterId();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.Serializable;

public interface ClientProvider<A> extends Serializable {
BigQueryWriteClient getClient();

A getWriter(String streamName, TableId table);
BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer);

WriterSettings writeSettings();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;

public class JsonStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer;

public JsonStreamWriter(RowValueSerializer<A> rowSerializer, com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray, offset);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.protobuf.ByteString;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.util.stream.Collectors;

public class ProtoStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final StreamWriter writer;

public ProtoStreamWriter(RowValueSerializer<A> rowSerializer, StreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows);
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows, offset);
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.vinted.flink.bigquery.sink;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.common.collect.Iterators;
import com.vinted.flink.bigquery.client.BigQueryStreamWriter;
import com.vinted.flink.bigquery.client.ClientProvider;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
Expand All @@ -28,13 +30,13 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public abstract class BigQuerySinkWriter<A, StreamT extends AutoCloseable> implements SinkWriter<Rows<A>> {
public abstract class BigQuerySinkWriter<A> implements SinkWriter<Rows<A>> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Iterator<Integer> streamIndexIterator;
private final SinkWriterMetricGroup metricGroup;

protected ClientProvider<StreamT> clientProvider;
protected transient Map<String, StreamT> streamMap = new ConcurrentHashMap<>();
protected ClientProvider<A> clientProvider;
protected transient Map<String, BigQueryStreamWriter<A>> streamMap = new ConcurrentHashMap<>();
protected Sink.InitContext sinkInitContext;
protected RowValueSerializer<A> rowSerializer;

Expand All @@ -44,12 +46,12 @@ public abstract class BigQuerySinkWriter<A, StreamT extends AutoCloseable> imple
protected Counter numRecordsOutCounter;
protected transient Map<String, BigQueryStreamMetrics> metrics = new HashMap<>();

protected abstract ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows);
protected abstract AppendResult append(String traceId, Rows<A> rows);

public BigQuerySinkWriter(
Sink.InitContext sinkInitContext,
RowValueSerializer<A> rowSerializer,
ClientProvider<StreamT> clientProvider,
ClientProvider<A> clientProvider,
ExecutorProvider executorProvider) {

this.sinkInitContext = sinkInitContext;
Expand All @@ -67,28 +69,28 @@ public BigQuerySinkWriter(

}

protected final StreamT streamWriter(String traceId, String streamName, TableId table) {
protected final BigQueryStreamWriter<A> streamWriter(String traceId, String streamName, TableId table) {
var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next());
return streamMap.computeIfAbsent(streamWithIndex, name -> {
logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex);
// Stream name can't contain index
return this.clientProvider.getWriter(streamName, table);
return this.clientProvider.getWriter(streamName, table, rowSerializer);
});
}

protected final void recreateAllStreamWriters(String traceId, String streamName, TableId table) {
protected final void recreateStreamWriter(String traceId, String streamName, String writerId, TableId table) {
logger.info("Trace-id {} Closing all writers for {}", traceId, streamName);
try {
flush(true);
streamMap.replaceAll((key, writer) -> {
var newWriter = writer;
if (key.startsWith(streamName)) {
if (writer.getWriterId().equals(writerId)) {
try {
writer.close();
} catch (Exception e) {
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table);
newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer);
}
return newWriter;
});
Expand Down Expand Up @@ -136,4 +138,17 @@ protected String createLogMessage(String title, String errorTraceId, Status stat
);
}

public static class AppendResult {
public final ApiFuture<AppendRowsResponse> response;
public final String writerId;

public AppendResult(ApiFuture<AppendRowsResponse> response, String writerId) {
this.response = response;
this.writerId = writerId;
}

public static AppendResult failure(Throwable t, String writerId) {
return new AppendResult(ApiFutures.immediateFailedFuture(t), writerId);
}
}
}
Loading

0 comments on commit cc159ef

Please sign in to comment.