Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/simplify structure #20

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ There are builder classes to simplify constructing a BigQuery sink. The code sni
```java
var credentials = new JsonCredentialsProvider("key");

var clientProvider = new BigQueryProtoClientProvider(credentials,
var clientProvider = new BigQueryProtoClientProvider<String>(credentials,
WriterSettings.newBuilder()
.build()
);

var bigQuerySink = BigQueryStreamSink.<String>newProto()
var bigQuerySink = BigQueryStreamSink.<String>newBuilder()
.withClientProvider(clientProvider)
.withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.withRowValueSerializer(new NoOpRowSerializer<>())
Expand Down Expand Up @@ -73,12 +73,12 @@ BigQuery supports two types of data formats: json and proto. When creating a str
- JSON

```java
var clientProvider = new BigQueryJsonClientProvider(credentials,
var clientProvider = new BigQueryJsonClientProvider<String>(credentials,
WriterSettings.newBuilder()
.build()
);

var bigQuerySink = BigQueryStreamSink.<String>newJson()
var bigQuerySink = BigQueryStreamSink.<String>newBuilder()
```

- Proto
Expand All @@ -89,7 +89,7 @@ var clientProvider = new BigQueryProtoClientProvider(credentials,
.build()
);

var bigQuerySink = BigQueryStreamSink.<String>newProto()
var bigQuerySink = BigQueryStreamSink.<String>newBuilder();
```

# Exactly once
Expand Down
18 changes: 7 additions & 11 deletions src/main/java/com/vinted/flink/bigquery/BigQueryStreamSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,36 @@
import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSink;
import org.apache.flink.connector.base.DeliveryGuarantee;

public class BigQueryStreamSink<A, StreamT> {
public class BigQueryStreamSink<A> {
private RowValueSerializer<A> rowValueSerializer = new NoOpRowSerializer<>();
private ClientProvider<StreamT> clientProvider = null;
private ClientProvider<A> clientProvider = null;

private ExecutorProvider executorProvider = MoreExecutors::directExecutor;

private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private BigQueryStreamSink() {
}

public static <A> BigQueryStreamSink<A, StreamWriter> newProto() {
public static <A> BigQueryStreamSink<A> newBuilder() {
return new BigQueryStreamSink<>();
}

public static <A> BigQueryStreamSink<A, JsonStreamWriter> newJson() {
return new BigQueryStreamSink<>();
}

public BigQueryStreamSink<A, StreamT> withRowValueSerializer(RowValueSerializer<A> serializer) {
public BigQueryStreamSink<A> withRowValueSerializer(RowValueSerializer<A> serializer) {
this.rowValueSerializer = serializer;
return this;
}

public BigQueryStreamSink<A, StreamT> withClientProvider(ClientProvider<StreamT> clientProvider) {
public BigQueryStreamSink<A> withClientProvider(ClientProvider<A> clientProvider) {
this.clientProvider = clientProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withExecutorProvider(ExecutorProvider executorProvider) {
public BigQueryStreamSink<A> withExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

public BigQueryStreamSink<A, StreamT> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
public BigQueryStreamSink<A> withDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
this.deliveryGuarantee = deliveryGuarantee;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;

public class BigQueryJsonClientProvider implements ClientProvider<JsonStreamWriter> {
public class BigQueryJsonClientProvider<A> implements ClientProvider<A> {
private Credentials credentials;
private WriterSettings writerSettings;

Expand All @@ -42,16 +43,18 @@ public BigQueryWriteClient getClient() {
}

@Override
public JsonStreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
return JsonStreamWriter
var writer = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();

return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.IOException;
import java.util.concurrent.Executors;

public class BigQueryProtoClientProvider implements ClientProvider<StreamWriter> {
private Credentials credentials;
private WriterSettings writerSettings;
public class BigQueryProtoClientProvider<A> implements ClientProvider<A> {
private final Credentials credentials;
private final WriterSettings writerSettings;

private transient BigQueryWriteClient bigQueryWriteClient;

Expand All @@ -37,7 +38,7 @@ public BigQueryWriteClient getClient() {
}

@Override
public StreamWriter getWriter(String streamName, TableId table) {
public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer) {
try {
var descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(getTableSchema(table));
var protoSchema = ProtoSchemaConverter.convert(descriptor);
Expand All @@ -53,7 +54,8 @@ public StreamWriter getWriter(String streamName, TableId table) {
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);
return streamWriterBuilder.build();

return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.vinted.flink.bigquery.model.Rows;

public interface BigQueryStreamWriter<T> extends AutoCloseable {
ApiFuture<AppendRowsResponse> append(Rows<T> data);
ApiFuture<AppendRowsResponse> append(Rows<T> data, long offset);

String getStreamName();

String getWriterId();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.io.Serializable;

public interface ClientProvider<A> extends Serializable {
BigQueryWriteClient getClient();

A getWriter(String streamName, TableId table);
BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowValueSerializer<A> serializer);

WriterSettings writeSettings();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;

public class JsonStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer;

public JsonStreamWriter(RowValueSerializer<A> rowSerializer, com.google.cloud.bigquery.storage.v1.JsonStreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var rowArray = new JSONArray();
data.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
try {
return writer.append(rowArray, offset);
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.vinted.flink.bigquery.client;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.protobuf.ByteString;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;

import java.util.stream.Collectors;

public class ProtoStreamWriter<A> implements BigQueryStreamWriter<A>{
private final RowValueSerializer<A> rowSerializer;

private final StreamWriter writer;

public ProtoStreamWriter(RowValueSerializer<A> rowSerializer, StreamWriter writer) {
this.rowSerializer = rowSerializer;
this.writer = writer;
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows);
}

@Override
public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
var prows = ProtoRows
.newBuilder()
.addAllSerializedRows(data.getData().stream().map(r -> ByteString.copyFrom(rowSerializer.serialize(r))).collect(Collectors.toList()))
.build();
return writer.append(prows, offset);
}

@Override
public String getStreamName() {
return writer.getStreamName();
}

@Override
public String getWriterId() {
return writer.getWriterId();
}

@Override
public boolean isClosed() {
return writer.isClosed() || writer.isUserClosed();
}

@Override
public void close() throws Exception {
writer.close();
}
}
Loading
Loading