Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/simplify structure #20

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,36 @@
import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSink;
import org.apache.flink.connector.base.DeliveryGuarantee;

public class BigQueryStreamSink<A, StreamT> {
public class BigQueryStreamSink<A> {
private RowValueSerializer<A> rowValueSerializer = new NoOpRowSerializer<>();
private ClientProvider<StreamT> clientProvider = null;
private ClientProvider<A> clientProvider = null;

private ExecutorProvider executorProvider = MoreExecutors::directExecutor;

private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private BigQueryStreamSink() {
}

public static <A> BigQueryStreamSink<A, StreamWriter> newProto() {
public static <A> BigQueryStreamSink<A> newBuilder() {
return new BigQueryStreamSink<>();
}

public static <A> BigQueryStreamSink<A, JsonStreamWriter> newJson() {
return new BigQueryStreamSink<>();
}

public BigQueryStreamSink<A, StreamT> withRowValueSerializer(RowValueSerializer<A> serializer) {
public BigQueryStreamSink<A> withRowValueSerializer(RowValueSerializer<A> serializer) {
this.rowValueSerializer = serializer;
return this;
}

public BigQueryStreamSink<A, StreamT> withClientProvider(ClientProvider<StreamT> clientProvider) {
public BigQueryStreamSink<A> withClientProvider(ClientProvider<A> clientProvider) {
this.clientProvider = clientProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withExecutorProvider(ExecutorProvider executorProvider) {
public BigQueryStreamSink<A> withExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
public BigQueryStreamSink<A> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
this.deliveryGuarantee = deliveryGuarantee;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;

public class BigQueryJsonClientProvider implements ClientProvider<JsonStreamWriter> {
public class BigQueryJsonClientProvider<A> implements ClientProvider<A> {
private Credentials credentials;
private WriterSettings writerSettings;

Expand All @@ -42,16 +43,18 @@ public BigQueryWriteClient getClient() {
}

@Override
public JsonStreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
return JsonStreamWriter
var writer = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();

return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.concurrent.Executors;

public class BigQueryProtoClientProvider implements ClientProvider<StreamWriter> {
private Credentials credentials;
private WriterSettings writerSettings;
public class BigQueryProtoClientProvider<A> implements ClientProvider<A> {
private final Credentials credentials;
private final WriterSettings writerSettings;

private transient BigQueryWriteClient bigQueryWriteClient;

Expand All @@ -37,7 +38,7 @@ public BigQueryWriteClient getClient() {
}

@Override
public StreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(getTableSchema(table));
var protoSchema = ProtoSchemaConverter.convert(descriptor);
Expand All @@ -53,7 +54,8 @@ public StreamWriter getWriter(String streamName, TableId table) {
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);
return streamWriterBuilder.build();

return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.vinted.flink.bigquery.model.Rows;

public interface BigQueryStreamWriter<T> extends AutoCloseable {
ApiFuture<AppendRowsResponse> append(Rows<T> data);
ApiFuture<AppendRowsResponse> append(Rows<T> data, long offset);

String getStreamName();

String getWriterId();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.Serializable;

public interface ClientProvider<A> extends Serializable {
BigQueryWriteClient getClient();

A getWriter(String streamName, TableId table);
BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer);

WriterSettings writeSettings();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;

public class JsonStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer;

public JsonStreamWriter(RowValueSerializer<A> rowSerializer, com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray, offset);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.protobuf.ByteString;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.util.stream.Collectors;

public class ProtoStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final StreamWriter writer;

public ProtoStreamWriter(RowValueSerializer<A> rowSerializer, StreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows);
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows, offset);
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.vinted.flink.bigquery.sink;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.common.collect.Iterators;
import com.vinted.flink.bigquery.client.BigQueryStreamWriter;
import com.vinted.flink.bigquery.client.ClientProvider;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
Expand All @@ -28,13 +30,13 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public abstract class BigQuerySinkWriter<A, StreamT extends AutoCloseable> implements SinkWriter<Rows<A>> {
public abstract class BigQuerySinkWriter<A> implements SinkWriter<Rows<A>> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Iterator<Integer> streamIndexIterator;
private final SinkWriterMetricGroup metricGroup;

protected ClientProvider<StreamT> clientProvider;
protected transient Map<String, StreamT> streamMap = new ConcurrentHashMap<>();
protected ClientProvider<A> clientProvider;
protected transient Map<String, BigQueryStreamWriter<A>> streamMap = new ConcurrentHashMap<>();
protected Sink.InitContext sinkInitContext;
protected RowValueSerializer<A> rowSerializer;

Expand All @@ -44,12 +46,12 @@ public abstract class BigQuerySinkWriter<A, StreamT extends AutoCloseable> imple
protected Counter numRecordsOutCounter;
protected transient Map<String, BigQueryStreamMetrics> metrics = new HashMap<>();

protected abstract ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows);
protected abstract AppendResult append(String traceId, Rows<A> rows);

public BigQuerySinkWriter(
Sink.InitContext sinkInitContext,
RowValueSerializer<A> rowSerializer,
ClientProvider<StreamT> clientProvider,
ClientProvider<A> clientProvider,
ExecutorProvider executorProvider) {

this.sinkInitContext = sinkInitContext;
Expand All @@ -67,28 +69,28 @@ public BigQuerySinkWriter(

}

protected final StreamT streamWriter(String traceId, String streamName, TableId table) {
protected final BigQueryStreamWriter<A> streamWriter(String traceId, String streamName, TableId table) {
var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next());
return streamMap.computeIfAbsent(streamWithIndex, name -> {
logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex);
// Stream name can't contain index
return this.clientProvider.getWriter(streamName, table);
return this.clientProvider.getWriter(streamName, table, rowSerializer);
});
}

protected final void recreateAllStreamWriters(String traceId, String streamName, TableId table) {
protected final void recreateStreamWriter(String traceId, String streamName, String writerId, TableId table) {
logger.info("Trace-id {} Closing all writers for {}", traceId, streamName);
try {
flush(true);
streamMap.replaceAll((key, writer) -> {
var newWriter = writer;
if (key.startsWith(streamName)) {
if (writer.getWriterId().equals(writerId)) {
try {
writer.close();
} catch (Exception e) {
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table);
newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer);
}
return newWriter;
});
Expand Down Expand Up @@ -136,4 +138,17 @@ protected String createLogMessage(String title, String errorTraceId, Status stat
);
}

public static class AppendResult {
public final ApiFuture<AppendRowsResponse> response;
public final String writerId;

public AppendResult(ApiFuture<AppendRowsResponse> response, String writerId) {
this.response = response;
this.writerId = writerId;
}

public static AppendResult failure(Throwable t, String writerId) {
return new AppendResult(ApiFutures.immediateFailedFuture(t), writerId);
}
}
}
Loading
Loading