diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index f00d93801d23..db4a1008ecdd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.api.services.bigquery.model.QueryResponse; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -33,16 +34,16 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Integration tests for {@link - * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes - * 30MB data to BQ and verify the written row count. + * Integration tests for {@link BigQueryIO#write()}. The batch mode tests write 30MB data to BQ and + * verify the written row count; the streaming mode tests write 3k rows of data to BQ and verify the + * written row count. */ @RunWith(JUnit4.class) public class BigQueryIOStorageWriteIT { @@ -50,7 +51,7 @@ public class BigQueryIOStorageWriteIT { private enum WriteMode { EXACT_ONCE, AT_LEAST_ONCE - }; + } private String project; private static final String DATASET_ID = "big_query_storage"; @@ -73,14 +74,26 @@ private void setUpTestEnvironment(WriteMode writeMode) { } static class FillRowFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa")); } } - private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) { - String tableName = TABLE_PREFIX + System.currentTimeMillis(); + private GenerateSequence stream(int rowCount) { + int timestampIntervalInMilliseconds = 10; + return GenerateSequence.from(0) + .to(rowCount) + .withRate(1, Duration.millis(timestampIntervalInMilliseconds)); + } + + private void runBigQueryIOStorageWritePipeline( + int rowCount, WriteMode writeMode, Boolean isStreaming) { + String tableName = + isStreaming + ? TABLE_PREFIX + "streaming_" + System.currentTimeMillis() + : TABLE_PREFIX + System.currentTimeMillis(); TableSchema schema = new TableSchema() .setFields( @@ -89,7 +102,7 @@ private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode new TableFieldSchema().setName("str").setType("STRING"))); Pipeline p = Pipeline.create(bqOptions); - p.apply("Input", GenerateSequence.from(0).to(rowCount)) + p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount)) .apply("GenerateMessage", ParDo.of(new FillRowFn())) .apply( "WriteToBQ", @@ -109,22 +122,32 @@ private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode assertTrue( Integer.parseInt((String) response.getRows().get(0).getF().get(0).getV()) >= rowCount); } - } catch (IOException e) { - assertTrue("Unexpected exception: " + e.toString(), false); - } catch (InterruptedException e) { - assertTrue("Unexpected exception: " + e.toString(), false); + } catch (IOException | InterruptedException e) { + fail("Unexpected exception: " + e); } } @Test - public void testBigQueryStorageWrite30MProto() throws Exception { + public void testBigQueryStorageWrite30MProto() { + setUpTestEnvironment(WriteMode.EXACT_ONCE); + runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE, false); + } + + @Test + public void testBigQueryStorageWrite30MProtoALO() { + setUpTestEnvironment(WriteMode.AT_LEAST_ONCE); + runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE, false); + } + + @Test + public void testBigQueryStorageWrite3KProtoStreaming() { setUpTestEnvironment(WriteMode.EXACT_ONCE); - runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE); + runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true); } @Test - public void testBigQueryStorageWrite30MProtoALO() throws Exception { + public void testBigQueryStorageWrite3KProtoALOStreaming() { setUpTestEnvironment(WriteMode.AT_LEAST_ONCE); - runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE); + runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true); } }