Skip to content

Commit

Permalink
fix: Correct _default stream path
Browse files Browse the repository at this point in the history
  • Loading branch information
s-gelazevicius committed Oct 3, 2023
1 parent 9f00f96 commit 5eba60c
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.vinted.flink.bigquery.schema.SchemaTransformer;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;

public class BigQueryJsonClientProvider implements ClientProvider<JsonStreamWriter> {
Expand Down Expand Up @@ -61,17 +62,18 @@ public WriterSettings writeSettings() {
return this.writerSettings;
}

TableSchema getTableSchema(TableId table) {
var schema = BigQueryOptions
TableSchema getTableSchema(TableId tableId) {
var table = BigQueryOptions
.newBuilder()
.setProjectId(table.getProject())
.setProjectId(tableId.getProject())
.setCredentials(credentials.getCredentials())
.build()
.getService()
.getTable(table.getDataset(), table.getTable())
.getTable(tableId.getDataset(), tableId.getTable());
var schema = Optional.ofNullable(table)
.orElseThrow(() -> new IllegalArgumentException("Non existing table: " + tableId))
.getDefinition()
.getSchema();

return SchemaTransformer.convertTableSchema(schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public BigQueryWriteClient getClient() {
throw new RuntimeException(e);
}
}

return bigQueryWriteClient;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/vinted/flink/bigquery/model/Rows.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public Rows<A> updateBatch(List<A> data, long offset) {

public static <A> Rows<A> defaultStream(List<A> data, TableId table) {
var fullPath = TableName.of(table.getProject(), table.getDataset(), table.getTable()).toString();
return new Rows<>(data, -1, String.format("%s/_default", fullPath), table);
return new Rows<>(data, -1, String.format("%s/streams/_default", fullPath), table);
}

public Rows() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class RowBatcher<A extends BigQueryRecord, K, W extends Window> extends ProcessWindowFunction<A, Rows<A>, K, W> {
private static final Logger logger = LoggerFactory.getLogger(RowBatcher.class);

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Expand Down

0 comments on commit 5eba60c

Please sign in to comment.