From 7187c862f44a1692e3b7979faa12a1c924ff44c7 Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Tue, 3 Oct 2023 17:14:20 +0300 Subject: [PATCH] fix: Correct _default stream path --- .../bigquery/client/BigQueryJsonClientProvider.java | 12 +++++++----- .../bigquery/client/BigQueryProtoClientProvider.java | 1 - .../java/com/vinted/flink/bigquery/model/Rows.java | 2 +- .../vinted/flink/bigquery/process/RowBatcher.java | 4 ---- .../com/vinted/flink/bigquery/RowBatcherTest.java | 4 ++-- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java index 23b2455..dbe6b0f 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java @@ -13,6 +13,7 @@ import com.vinted.flink.bigquery.schema.SchemaTransformer; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.Executors; public class BigQueryJsonClientProvider implements ClientProvider { @@ -61,17 +62,18 @@ public WriterSettings writeSettings() { return this.writerSettings; } - TableSchema getTableSchema(TableId table) { - var schema = BigQueryOptions + TableSchema getTableSchema(TableId tableId) { + var table = BigQueryOptions .newBuilder() - .setProjectId(table.getProject()) + .setProjectId(tableId.getProject()) .setCredentials(credentials.getCredentials()) .build() .getService() - .getTable(table.getDataset(), table.getTable()) + .getTable(tableId.getDataset(), tableId.getTable()); + var schema = Optional.ofNullable(table) + .orElseThrow(() -> new IllegalArgumentException("Non existing table: " + tableId)) .getDefinition() .getSchema(); - return SchemaTransformer.convertTableSchema(schema); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index c4fd18e..b3049cd 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -33,7 +33,6 @@ public BigQueryWriteClient getClient() { throw new RuntimeException(e); } } - return bigQueryWriteClient; } diff --git a/src/main/java/com/vinted/flink/bigquery/model/Rows.java b/src/main/java/com/vinted/flink/bigquery/model/Rows.java index 7796ec5..67c498d 100644 --- a/src/main/java/com/vinted/flink/bigquery/model/Rows.java +++ b/src/main/java/com/vinted/flink/bigquery/model/Rows.java @@ -18,7 +18,7 @@ public Rows updateBatch(List data, long offset) { public static Rows defaultStream(List data, TableId table) { var fullPath = TableName.of(table.getProject(), table.getDataset(), table.getTable()).toString(); - return new Rows<>(data, -1, String.format("%s/_default", fullPath), table); + return new Rows<>(data, -1, String.format("%s/streams/_default", fullPath), table); } public Rows() { diff --git a/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java b/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java index 43ffe2b..65789c8 100644 --- a/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java +++ b/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java @@ -7,15 +7,11 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.stream.Collectors; import java.util.stream.StreamSupport; public class RowBatcher extends ProcessWindowFunction, K, W> { - private static final Logger logger = LoggerFactory.getLogger(RowBatcher.class); - @Override public void open(Configuration parameters) throws Exception { super.open(parameters); diff --git a/src/test/java/com/vinted/flink/bigquery/RowBatcherTest.java b/src/test/java/com/vinted/flink/bigquery/RowBatcherTest.java index 4617b15..acedeaa 100644 --- a/src/test/java/com/vinted/flink/bigquery/RowBatcherTest.java +++ b/src/test/java/com/vinted/flink/bigquery/RowBatcherTest.java @@ -34,8 +34,8 @@ public void shouldReleaseTwoBatches(@FlinkTest.FlinkParam FlinkTest.PipelineRunn assertThat(result) .usingRecursiveComparison() .isEqualTo(List.of( - new Rows<>(List.of(new Record("key", "1"), new Record("key", "2")), -1, "projects/test-project/datasets/test-dataset/tables/test-table/_default", testTable), - new Rows<>(List.of(new Record("key", "3"), new Record("key", "4")), -1, "projects/test-project/datasets/test-dataset/tables/test-table/_default", testTable) + new Rows<>(List.of(new Record("key", "1"), new Record("key", "2")), -1, "projects/test-project/datasets/test-dataset/tables/test-table/streams/_default", testTable), + new Rows<>(List.of(new Record("key", "3"), new Record("key", "4")), -1, "projects/test-project/datasets/test-dataset/tables/test-table/streams/_default", testTable) )); }