Skip to content

Commit

Permalink
Add streaming test for Write API sink (apache#21903)
Browse files Browse the repository at this point in the history
* Add streaming test for Write API sink

* Update BigQueryIOStorageWriteIT.java

* Update BigQueryIOStorageWriteIT.java

* Update BigQueryIOStorageWriteIT.java

* Update streaming source type

Change streaming source type from TestStream to GenerateSequence

* Update BigQueryIOStorageWriteIT.java

Try improved formatting

* Update BigQueryIOStorageWriteIT.java

Formatting
  • Loading branch information
AlexZMLyu authored Sep 13, 2022
1 parent 2a9327d commit 6f25fba
Showing 1 changed file with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.TableFieldSchema;
Expand All @@ -33,24 +34,24 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Integration tests for {@link
* org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
* 30MB data to BQ and verify the written row count.
* Integration tests for {@link BigQueryIO#write()}. The batch mode tests write 30MB data to BQ and
* verify the written row count; the streaming mode tests write 3k rows of data to BQ and verify the
* written row count.
*/
@RunWith(JUnit4.class)
public class BigQueryIOStorageWriteIT {

private enum WriteMode {
EXACT_ONCE,
AT_LEAST_ONCE
};
}

private String project;
private static final String DATASET_ID = "big_query_storage";
Expand All @@ -73,14 +74,26 @@ private void setUpTestEnvironment(WriteMode writeMode) {
}

static class FillRowFn extends DoFn<Long, TableRow> {

@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
}
}

private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
String tableName = TABLE_PREFIX + System.currentTimeMillis();
private GenerateSequence stream(int rowCount) {
int timestampIntervalInMilliseconds = 10;
return GenerateSequence.from(0)
.to(rowCount)
.withRate(1, Duration.millis(timestampIntervalInMilliseconds));
}

private void runBigQueryIOStorageWritePipeline(
int rowCount, WriteMode writeMode, Boolean isStreaming) {
String tableName =
isStreaming
? TABLE_PREFIX + "streaming_" + System.currentTimeMillis()
: TABLE_PREFIX + System.currentTimeMillis();
TableSchema schema =
new TableSchema()
.setFields(
Expand All @@ -89,7 +102,7 @@ private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode
new TableFieldSchema().setName("str").setType("STRING")));

Pipeline p = Pipeline.create(bqOptions);
p.apply("Input", GenerateSequence.from(0).to(rowCount))
p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount))
.apply("GenerateMessage", ParDo.of(new FillRowFn()))
.apply(
"WriteToBQ",
Expand All @@ -109,22 +122,32 @@ private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode
assertTrue(
Integer.parseInt((String) response.getRows().get(0).getF().get(0).getV()) >= rowCount);
}
} catch (IOException e) {
assertTrue("Unexpected exception: " + e.toString(), false);
} catch (InterruptedException e) {
assertTrue("Unexpected exception: " + e.toString(), false);
} catch (IOException | InterruptedException e) {
fail("Unexpected exception: " + e);
}
}

@Test
public void testBigQueryStorageWrite30MProto() throws Exception {
public void testBigQueryStorageWrite30MProto() {
setUpTestEnvironment(WriteMode.EXACT_ONCE);
runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE, false);
}

@Test
public void testBigQueryStorageWrite30MProtoALO() {
setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE, false);
}

@Test
public void testBigQueryStorageWrite3KProtoStreaming() {
setUpTestEnvironment(WriteMode.EXACT_ONCE);
runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE);
runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true);
}

@Test
public void testBigQueryStorageWrite30MProtoALO() throws Exception {
public void testBigQueryStorageWrite3KProtoALOStreaming() {
setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE);
runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true);
}
}

0 comments on commit 6f25fba

Please sign in to comment.