diff --git a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/exceptions/BigQueryConnectorException.java b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/exceptions/BigQueryConnectorException.java new file mode 100644 index 00000000..d4559549 --- /dev/null +++ b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/exceptions/BigQueryConnectorException.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.common.exceptions; + +/** Represents a general error during the execution of the connector's code. */ +public class BigQueryConnectorException extends RuntimeException { + + public BigQueryConnectorException(String message) { + super(message); + } + + public BigQueryConnectorException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java new file mode 100644 index 00000000..bb0d8630 --- /dev/null +++ b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java @@ -0,0 +1,254 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.source.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import com.google.auto.value.AutoValue; +import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; +import com.google.cloud.flink.bigquery.common.config.CredentialsOptions; +import org.threeten.bp.Instant; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** The options available to read data from BigQuery. */ +@AutoValue +@PublicEvolving +public abstract class BigQueryReadOptions implements Serializable { + + public abstract ImmutableList getColumnNames(); + + public abstract String getRowRestriction(); + + public abstract Optional getSnapshotTimestampInMillis(); + + public abstract Optional getQuery(); + + public abstract Optional getQueryExecutionProject(); + + public abstract Integer getMaxStreamCount(); + + public abstract Integer getMaxRecordsPerSplitFetch(); + + public abstract BigQueryConnectOptions getBigQueryConnectOptions(); + + @Override + public final int hashCode() { + return Objects.hash( + getColumnNames(), + getRowRestriction(), + getSnapshotTimestampInMillis(), + getMaxStreamCount(), + getBigQueryConnectOptions()); + } + + @Override + public final boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final BigQueryReadOptions other = (BigQueryReadOptions) obj; + return Objects.equals(this.getColumnNames(), other.getColumnNames()) + && Objects.equals(this.getRowRestriction(), other.getRowRestriction()) + && Objects.equals( + this.getSnapshotTimestampInMillis(), other.getSnapshotTimestampInMillis()) + && Objects.equals(this.getMaxStreamCount(), other.getMaxStreamCount()) + && Objects.equals( + this.getBigQueryConnectOptions(), other.getBigQueryConnectOptions()); + } + + /** + * Transforms the instance into a builder instance for property modification. + * + * @return A {@link Builder} instance for the type. + */ + public abstract Builder toBuilder(); + + /** + * Creates a builder instance with all the default values set. + * + * @return A {@link Builder} for the type. + */ + public static Builder builder() { + return new AutoValue_BigQueryReadOptions.Builder() + .setRowRestriction("") + .setColumnNames(new ArrayList<>()) + .setMaxStreamCount(0) + .setMaxRecordsPerSplitFetch(10000) + .setSnapshotTimestampInMillis(null); + } + + /** Builder class for {@link BigQueryReadOptions}. */ + @AutoValue.Builder + public abstract static class Builder { + + /** + * Prepares this builder to execute a query driven read using the default credentials + * configuration. + * + * @param query A BigQuery standard SQL query. + * @param projectId A GCP project where the query will run. + * @return This {@link Builder} instance. + * @throws IOException In case of problems while setting up the credentials options. + */ + public Builder setQueryAndExecutionProject(String query, String projectId) + throws IOException { + return setQueryWithExecutionProjectAndCredentialsOptions( + query, projectId, CredentialsOptions.builder().build()); + } + + /** + * Prepares this builder to execute a query driven read. + * + * @param query A BigQuery standard SQL query. + * @param projectId A GCP project where the query will run. + * @param credentialsOptions The GCP credentials options. + * @return This {@link Builder} instance. + * @throws IOException In case of problems while setting up the credentials options. + */ + public Builder setQueryWithExecutionProjectAndCredentialsOptions( + String query, String projectId, CredentialsOptions credentialsOptions) + throws IOException { + this.setQuery(query); + this.setQueryExecutionProject(projectId); + this.setBigQueryConnectOptions( + BigQueryConnectOptions.builderForQuerySource() + .setCredentialsOptions(credentialsOptions) + .build()); + return this; + } + + /** + * Sets a BigQuery query which will be run first, storing its result in a temporary table, + * and Flink will read the query results from that temporary table. This is an optional + * argument. + * + * @param query A BigQuery standard SQL query. + * @return This {@link Builder} instance. + */ + public abstract Builder setQuery(@Nullable String query); + + /** + * Sets the GCP project where the configured query will be run. In case the query + * configuration is not set this configuration is discarded. + * + * @param projectId A GCP project. + * @return This {@link Builder} instance. + */ + public abstract Builder setQueryExecutionProject(@Nullable String projectId); + + /** + * Sets the restriction the rows in the BigQuery table must comply to be returned by the + * source. + * + * @param rowRestriction A {@link String} containing the row restrictions. + * @return This {@link Builder} instance. + */ + public abstract Builder setRowRestriction(String rowRestriction); + + /** + * Sets the column names that will be projected from the table's retrieved data. + * + * @param colNames The names of the columns as a list of strings. + * @return This {@link Builder} instance. + */ + public abstract Builder setColumnNames(List colNames); + + /** + * Sets the snapshot time (in milliseconds since epoch) for the BigQuery table, if not set + * {@code now()} is used. + * + * @param snapshotTs The snapshot's time in milliseconds since epoch. + * @return This {@link Builder} instance. + */ + public abstract Builder setSnapshotTimestampInMillis(@Nullable Long snapshotTs); + + /** + * Sets the maximum number of read streams that BigQuery should create to retrieve data from + * the table. BigQuery can return a lower number than the specified. + * + * @param maxStreamCount The maximum number of read streams. + * @return This {@link Builder} instance. + */ + public abstract Builder setMaxStreamCount(Integer maxStreamCount); + + /** + * Sets the maximum number of records to read from a streams once a fetch has been requested + * from a particular split. Configuring this number too high may cause memory pressure in + * the task manager, depending on the BigQuery record's size and total rows on the stream. + * + * @param maxStreamCount The maximum number records to read from a split at a time. + * @return This {@link Builder} instance. + */ + public abstract Builder setMaxRecordsPerSplitFetch(Integer maxStreamCount); + + /** + * Sets the {@link BigQueryConnectOptions} instance. + * + * @param connect The {@link BigQueryConnectOptions} instance. + * @return This {@link Builder} instance. + */ + public abstract Builder setBigQueryConnectOptions(BigQueryConnectOptions connect); + + abstract BigQueryReadOptions autoBuild(); + + /** + * A fully initialized {@link BigQueryReadOptions} instance. + * + * @return A {@link BigQueryReadOptions} instance. + */ + public final BigQueryReadOptions build() { + BigQueryReadOptions readOptions = autoBuild(); + Preconditions.checkState( + readOptions.getMaxStreamCount() >= 0, + "The max number of streams should be zero or positive."); + Preconditions.checkState( + !readOptions + .getSnapshotTimestampInMillis() + // see if the value is lower than the epoch + .filter(timeInMillis -> timeInMillis < Instant.EPOCH.toEpochMilli()) + // if present, then fail + .isPresent(), + "The oldest timestamp should be equal or bigger than epoch."); + Preconditions.checkState( + !Optional.ofNullable(readOptions.getQuery()) + // if the project was not configured + .filter(q -> readOptions.getQueryExecutionProject() == null) + // if present fail + .isPresent(), + "If a query is configured, then a GCP project should be provided."); + + return readOptions; + } + } +} diff --git a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/BigQuerySourceReaderContext.java b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/BigQuerySourceReaderContext.java new file mode 100644 index 00000000..d0e75096 --- /dev/null +++ b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/BigQuerySourceReaderContext.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.source.reader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +import java.util.concurrent.atomic.AtomicInteger; + +/** A {@link SourceReaderContext} proxy that adds limit and counts for state management. */ +@Internal +public class BigQuerySourceReaderContext implements SourceReaderContext { + + private final SourceReaderContext readerContext; + private final AtomicInteger readCount = new AtomicInteger(0); + private final int limit; + + public BigQuerySourceReaderContext(SourceReaderContext readerContext, int limit) { + this.readerContext = readerContext; + this.limit = limit; + } + + @Override + public SourceReaderMetricGroup metricGroup() { + return readerContext.metricGroup(); + } + + @Override + public Configuration getConfiguration() { + return readerContext.getConfiguration(); + } + + @Override + public String getLocalHostName() { + return readerContext.getLocalHostName(); + } + + @Override + public int getIndexOfSubtask() { + return readerContext.getIndexOfSubtask(); + } + + @Override + public void sendSplitRequest() { + readerContext.sendSplitRequest(); + } + + @Override + public void sendSourceEventToCoordinator(SourceEvent sourceEvent) { + readerContext.sendSourceEventToCoordinator(sourceEvent); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return readerContext.getUserCodeClassLoader(); + } + + public int updateReadCount(Integer newReads) { + return readCount.addAndGet(newReads); + } + + public int currentReadCount() { + return readCount.get(); + } + + public boolean isLimitPushedDown() { + return limit > 0; + } + + public boolean willExceedLimit(int newReads) { + return limit > 0 && (readCount.get() + newReads) >= limit; + } + + public int getLimit() { + return limit; + } +} diff --git a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java new file mode 100644 index 00000000..62291805 --- /dev/null +++ b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java @@ -0,0 +1,252 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.source.split; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; +import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; +import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException; +import com.google.cloud.flink.bigquery.services.BigQueryServices; +import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory; +import com.google.cloud.flink.bigquery.services.QueryResultInfo; +import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions; +import com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumState; +import com.google.protobuf.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A simple split assigner based on the BigQuery {@link ReadSession} streams. */ +@Internal +public class BigQuerySourceSplitAssigner { + + private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceSplitAssigner.class); + + private final BigQueryReadOptions readOptions; + + private final ArrayDeque remainingTableStreams; + private final List alreadyProcessedTableStreams; + private final ArrayDeque remainingSourceSplits; + private final Map assignedSourceSplits; + private boolean initialized; + + public BigQuerySourceSplitAssigner( + BigQueryReadOptions readOptions, BigQuerySourceEnumState sourceEnumState) { + this.readOptions = readOptions; + this.remainingTableStreams = new ArrayDeque<>(sourceEnumState.getRemaniningTableStreams()); + this.alreadyProcessedTableStreams = sourceEnumState.getCompletedTableStreams(); + this.remainingSourceSplits = new ArrayDeque<>(sourceEnumState.getRemainingSourceSplits()); + this.assignedSourceSplits = sourceEnumState.getAssignedSourceSplits(); + this.initialized = sourceEnumState.isInitialized(); + } + + /** + * Reviews the read options argument and see if a query has been configured, in that case run + * that query and then return a modified version of the connect options pointing to the + * temporary location (project, dataset and table) of the query results. + * + * @return The BigQuery connect options with the right project, dataset and table given the + * specified configuration. + */ + Optional fetchOptionsFromQueryRun() { + return this.readOptions + .getQuery() + // if query is available, execute it using the configured GCP project and gather the + // results + .flatMap(query -> runQuery(query)) + // with the query results return the new connection options, fail if the query + // failed + .map( + result -> { + if (result.getStatus().equals(QueryResultInfo.Status.FAILED)) { + throw new IllegalStateException( + "The BigQuery query execution failed with errors: " + + result.getErrorMessages() + .orElse(Lists.newArrayList())); + } + String projectId = result.getDestinationProject().get(); + String dataset = result.getDestinationDataset().get(); + String table = result.getDestinationTable().get(); + LOG.info( + "After BigQuery query execution, switching connect options" + + " to read from table {}.{}.{}", + projectId, + dataset, + table); + return this.readOptions + .getBigQueryConnectOptions() + .toBuilder() + .setProjectId(projectId) + .setDataset(dataset) + .setTable(table) + .build(); + }); + } + + private Optional runQuery(String query) { + return this.readOptions + .getQueryExecutionProject() + .flatMap( + gcpProject -> + BigQueryServicesFactory.instance( + this.readOptions.getBigQueryConnectOptions()) + .queryClient() + .runQuery(gcpProject, query)); + } + + public void open() { + LOG.info("BigQuery source split assigner is opening."); + if (!initialized) { + BigQueryConnectOptions connectionOptions = + fetchOptionsFromQueryRun().orElse(this.readOptions.getBigQueryConnectOptions()); + try (BigQueryServices.StorageReadClient client = + BigQueryServicesFactory.instance(connectionOptions).storageRead()) { + String parent = String.format("projects/%s", connectionOptions.getProjectId()); + + String srcTable = + String.format( + "projects/%s/datasets/%s/tables/%s", + connectionOptions.getProjectId(), + connectionOptions.getDataset(), + connectionOptions.getTable()); + + // We specify the columns to be projected by adding them to the selected fields, + // and set a simple filter to restrict which rows are transmitted. + TableReadOptions.Builder optionsBuilder = TableReadOptions.newBuilder(); + + readOptions + .getColumnNames() + .forEach(name -> optionsBuilder.addSelectedFields(name)); + optionsBuilder.setRowRestriction(readOptions.getRowRestriction()); + + TableReadOptions options = optionsBuilder.build(); + + // Start specifying the read session we want created. + ReadSession.Builder sessionBuilder = + ReadSession.newBuilder() + .setTable(srcTable) + .setDataFormat(DataFormat.AVRO) + .setReadOptions(options); + + // Optionally specify the snapshot time. When unspecified, snapshot time is "now". + if (readOptions.getSnapshotTimestampInMillis().isPresent()) { + long snapshotTimestampInMillis = + readOptions.getSnapshotTimestampInMillis().get(); + Timestamp t = + Timestamp.newBuilder() + .setSeconds(snapshotTimestampInMillis / 1000) + .setNanos((int) ((snapshotTimestampInMillis % 1000) * 1000000)) + .build(); + TableModifiers modifiers = + TableModifiers.newBuilder().setSnapshotTime(t).build(); + sessionBuilder.setTableModifiers(modifiers); + } + + // Begin building the session creation request. + CreateReadSessionRequest.Builder builder = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(sessionBuilder) + .setMaxStreamCount(readOptions.getMaxStreamCount()); + + // request the session + ReadSession session = client.createReadSession(builder.build()); + LOG.info( + "BigQuery Storage Read session, name: {}," + + " estimated row count {}, estimated scanned bytes {}," + + " streams count {}, expired time {} (seconds after epoch).", + session.getName(), + session.getEstimatedRowCount(), + session.getEstimatedTotalBytesScanned(), + session.getStreamsCount(), + session.getExpireTime().getSeconds()); + // get all the stream names added to the initialized state + remainingTableStreams.addAll( + session.getStreamsList().stream() + .map(stream -> stream.getName()) + .collect(Collectors.toList())); + initialized = true; + } catch (IOException ex) { + throw new BigQueryConnectorException( + "Problems creating the BigQuery Storage Read session.", ex); + } + } + } + + public void addSplitsBack(List splits) { + for (BigQuerySourceSplit split : splits) { + remainingSourceSplits.add((BigQuerySourceSplit) split); + // we should remove the add-backed splits from the assigned list, + // because they are failed + assignedSourceSplits.remove(split.splitId()); + } + } + + public BigQuerySourceEnumState snapshotState(long checkpointId) { + return new BigQuerySourceEnumState( + Lists.newArrayList(remainingTableStreams), + alreadyProcessedTableStreams, + Lists.newArrayList(remainingSourceSplits), + assignedSourceSplits, + initialized); + } + + public void close() { + // so far not much to be done here + LOG.info("BigQuery source split assigner is closed."); + } + + public Optional getNext() { + if (!remainingSourceSplits.isEmpty()) { + // return remaining splits firstly + BigQuerySourceSplit split = remainingSourceSplits.poll(); + assignedSourceSplits.put(split.splitId(), split); + return Optional.of(split); + } else { + // it's turn for next collection + String nextStream = remainingTableStreams.poll(); + if (nextStream != null) { + BigQuerySourceSplit split = new BigQuerySourceSplit(nextStream); + remainingSourceSplits.add(split); + alreadyProcessedTableStreams.add(nextStream); + return getNext(); + } else { + return Optional.empty(); + } + } + } + + public boolean noMoreSplits() { + checkState(initialized, "The noMoreSplits method was called but not initialized."); + return remainingTableStreams.isEmpty() && remainingSourceSplits.isEmpty(); + } +} diff --git a/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader.java b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader.java new file mode 100644 index 00000000..a9e0476f --- /dev/null +++ b/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader.java @@ -0,0 +1,338 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.source.split.reader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.util.Preconditions; + +import com.codahale.metrics.SlidingWindowReservoir; +import com.google.cloud.bigquery.storage.v1.AvroRows; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.flink.bigquery.services.BigQueryServices; +import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory; +import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions; +import com.google.cloud.flink.bigquery.source.reader.BigQuerySourceReaderContext; +import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.stream.Collectors; + +/** A split reader for {@link BigQuerySourceSplit}. */ +@Internal +public class BigQuerySourceSplitReader implements SplitReader { + private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceSplitReader.class); + + private final BigQueryReadOptions readOptions; + private final BigQuerySourceReaderContext readerContext; + private final transient Optional readSplitTimeMetric; + private final Queue assignedSplits = new ArrayDeque<>(); + + private Boolean closed = false; + private Schema avroSchema = null; + private Long readSoFar = 0L; + private Long splitStartFetch; + private Iterator readStreamIterator = null; + + public BigQuerySourceSplitReader( + BigQueryReadOptions readOptions, BigQuerySourceReaderContext readerContext) { + this.readOptions = readOptions; + this.readerContext = readerContext; + this.readSplitTimeMetric = + Optional.ofNullable(readerContext.metricGroup()) + .map( + mgroup -> + mgroup.histogram( + "bq.split.read.time.ms", + new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram( + new SlidingWindowReservoir(500))))); + } + + Long offsetToFetch(BigQuerySourceSplit split) { + // honor what is coming as checkpointed + if (split.getOffset() > 0) { + readSoFar = split.getOffset(); + splitStartFetch = System.currentTimeMillis(); + } else if (readSoFar == 0) { + // will start reading the stream from the beginning + splitStartFetch = System.currentTimeMillis(); + } + LOG.debug( + "[subtask #{}] Offset to fetch from {} for stream {}.", + readerContext.getIndexOfSubtask(), + readSoFar, + split.getStreamName()); + return readSoFar; + } + + BigQueryServices.BigQueryServerStream retrieveReadStream( + BigQuerySourceSplit split) throws IOException { + try (BigQueryServices.StorageReadClient client = + BigQueryServicesFactory.instance(readOptions.getBigQueryConnectOptions()) + .storageRead()) { + ReadRowsRequest readRequest = + ReadRowsRequest.newBuilder() + .setReadStream(split.getStreamName()) + .setOffset(offsetToFetch(split)) + .build(); + + return client.readRows(readRequest); + } catch (Exception ex) { + throw new IOException( + String.format( + "[subtask #%d] Problems while opening the stream %s from BigQuery" + + " with connection info %s. Current split offset %d," + + " reader offset %d.", + readerContext.getIndexOfSubtask(), + Optional.ofNullable(split.getStreamName()).orElse("NA"), + readOptions.toString(), + split.getOffset(), + readSoFar), + ex); + } + } + + @Override + public RecordsWithSplitIds fetch() throws IOException { + if (closed) { + throw new IllegalStateException("Can't fetch records from a closed split reader."); + } + + RecordsBySplits.Builder respBuilder = new RecordsBySplits.Builder<>(); + + // nothing to read has been assigned + if (assignedSplits.isEmpty()) { + return respBuilder.build(); + } + + // return when current read count is already over limit + if (readerContext.willExceedLimit(0)) { + LOG.info( + "Completing reading because we are over limit (context reader count {}).", + readerContext.currentReadCount()); + respBuilder.addFinishedSplits( + assignedSplits.stream() + .map(split -> split.splitId()) + .collect(Collectors.toList())); + assignedSplits.clear(); + return respBuilder.build(); + } + + BigQuerySourceSplit assignedSplit = assignedSplits.peek(); + int maxRecordsPerSplitFetch = readOptions.getMaxRecordsPerSplitFetch(); + int read = 0; + Long fetchStartTime = System.currentTimeMillis(); + Boolean truncated = false; + + try { + if (readStreamIterator == null) { + readStreamIterator = retrieveReadStream(assignedSplit).iterator(); + } + Long itStartTime = System.currentTimeMillis(); + while (readStreamIterator.hasNext()) { + ReadRowsResponse response = readStreamIterator.next(); + if (!response.hasAvroRows()) { + LOG.info( + "[subtask #{}] The response contained no avro records for stream {}.", + readerContext.getIndexOfSubtask(), + assignedSplit.getStreamName()); + } + if (avroSchema == null) { + if (response.hasAvroSchema()) { + // this will happen only the first time we read from a particular stream + avroSchema = + new Schema.Parser().parse(response.getAvroSchema().getSchema()); + } else { + throw new IllegalArgumentException( + "Avro schema not initialized and not available in the response."); + } + } + Long decodeStart = System.currentTimeMillis(); + List recordList = + GenericRecordReader.create(avroSchema).processRows(response.getAvroRows()); + Long decodeTimeMS = System.currentTimeMillis() - decodeStart; + LOG.debug( + "[subtask #{}] Iteration decoded records in {}ms from stream {}.", + readerContext.getIndexOfSubtask(), + decodeTimeMS, + assignedSplit.getStreamName()); + + for (GenericRecord record : recordList) { + respBuilder.add(assignedSplit, record); + read++; + // check if the read count will be over the limit + if (readerContext.willExceedLimit(read)) { + break; + } + } + // check if the read count will be over the limit + if (readerContext.willExceedLimit(read)) { + break; + } + Long itTimeMs = System.currentTimeMillis() - itStartTime; + LOG.debug( + "[subtask #{}] Completed reading iteration in {}ms," + + " so far read {} from stream {}.", + readerContext.getIndexOfSubtask(), + itTimeMs, + readSoFar + read, + assignedSplit.getStreamName()); + itStartTime = System.currentTimeMillis(); + /** + * Assuming the record list from the read session have the same size (true in most + * cases but the last one in the response stream) we check if we will be going over + * the per fetch limit, in that case we break the loop and return the partial + * results (enabling the checkpointing of the partial retrieval if wanted by the + * runtime). The read response record count has been observed to have 1024 elements. + */ + if (read + recordList.size() > maxRecordsPerSplitFetch) { + truncated = true; + break; + } + } + readSoFar += read; + // check if we finished to read the stream to finalize the split + if (!truncated) { + readerContext.updateReadCount(read); + Long splitTimeMs = System.currentTimeMillis() - splitStartFetch; + this.readSplitTimeMetric.ifPresent(m -> m.update(splitTimeMs)); + LOG.info( + "[subtask #{}] Completed reading split, {} records in {}ms on stream {}.", + readerContext.getIndexOfSubtask(), + readSoFar, + splitTimeMs, + assignedSplit.splitId()); + readSoFar = 0L; + assignedSplits.poll(); + readStreamIterator = null; + respBuilder.addFinishedSplit(assignedSplit.splitId()); + } else { + Long fetchTimeMs = System.currentTimeMillis() - fetchStartTime; + LOG.debug( + "[subtask #{}] Completed a partial fetch in {}ms," + + " so far read {} from stream {}.", + readerContext.getIndexOfSubtask(), + fetchTimeMs, + readSoFar, + assignedSplit.getStreamName()); + } + return respBuilder.build(); + } catch (Exception ex) { + throw new IOException( + String.format( + "[subtask #%d] Problems while reading stream %s from BigQuery" + + " with connection info %s. Current split offset %d," + + " reader offset %d.", + readerContext.getIndexOfSubtask(), + Optional.ofNullable(assignedSplit.getStreamName()).orElse("NA"), + readOptions.toString(), + assignedSplit.getOffset(), + readSoFar), + ex); + } + } + + @Override + public void handleSplitsChanges(SplitsChange splitsChanges) { + LOG.debug("Handle split changes {}.", splitsChanges); + + if (!(splitsChanges instanceof SplitsAddition)) { + throw new UnsupportedOperationException( + String.format( + "The SplitChange type of %s is not supported.", + splitsChanges.getClass())); + } + + assignedSplits.addAll(splitsChanges.splits()); + } + + @Override + public void wakeUp() { + LOG.debug("[subtask #{}] Wake up called.", readerContext.getIndexOfSubtask()); + // do nothing, for now + } + + @Override + public void close() throws Exception { + LOG.debug( + "[subtask #{}] Close called, assigned splits {}.", + readerContext.getIndexOfSubtask(), + assignedSplits.toString()); + if (!closed) { + closed = true; + readSoFar = 0L; + readStreamIterator = null; + // complete closing with what may be needed + } + } + + static class GenericRecordReader { + + private final Schema schema; + + private GenericRecordReader(Schema schema) { + Preconditions.checkNotNull(schema, "The provided avro schema reference is null."); + this.schema = schema; + } + + public static GenericRecordReader create(Schema schema) { + return new GenericRecordReader(schema); + } + + /** + * Method for processing AVRO rows which only validates decoding. + * + * @param avroRows object returned from the ReadRowsResponse. + */ + public List processRows(AvroRows avroRows) throws IOException { + BinaryDecoder decoder = + DecoderFactory.get() + .binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), null); + DatumReader datumReader = new GenericDatumReader<>(schema); + List records = new ArrayList<>(); + GenericRecord row; + while (!decoder.isEnd()) { + // Reusing object row + row = datumReader.read(null, decoder); + records.add(row); + } + return records; + } + } +} diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/BigQueryStateSerdeTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/BigQueryStateSerdeTest.java index 9382b7cf..46604b3c 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/BigQueryStateSerdeTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/BigQueryStateSerdeTest.java @@ -19,7 +19,6 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; -import org.assertj.core.api.Assertions; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -31,6 +30,8 @@ import java.util.List; import java.util.Map; +import static com.google.common.truth.Truth.assertThat; + /** */ public class BigQueryStateSerdeTest { @@ -50,7 +51,7 @@ public void testListSerDe() throws IOException { List deserialized = BigQueryStateSerde.deserializeList(in, DataInput::readUTF); - Assertions.assertThat(original).isEqualTo(deserialized); + assertThat(original).isEqualTo(deserialized); } } } @@ -76,7 +77,7 @@ public void testMapSerDe() throws IOException { BigQueryStateSerde.deserializeMap( in, DataInput::readUTF, DataInput::readUTF); - Assertions.assertThat(original).isEqualTo(deserialized); + assertThat(original).isEqualTo(deserialized); } } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/SchemaTransformTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/SchemaTransformTest.java index c0976952..9fc22daf 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/SchemaTransformTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/common/utils/SchemaTransformTest.java @@ -26,11 +26,12 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.assertj.core.api.Assertions; import org.junit.Test; import java.util.List; +import static com.google.common.truth.Truth.assertThat; + /** */ public class SchemaTransformTest { private final List subFields = @@ -108,70 +109,70 @@ public void testConvertBigQuerySchemaToAvroSchema() { Schema avroSchema = SchemaTransform.toGenericAvroSchema("testSchema", tableSchema.getFields()); - Assertions.assertThat(avroSchema.getField("number").schema()) + assertThat(avroSchema.getField("number").schema()) .isEqualTo(Schema.create(Schema.Type.LONG)); - Assertions.assertThat(avroSchema.getField("species").schema()) + assertThat(avroSchema.getField("species").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - Assertions.assertThat(avroSchema.getField("quality").schema()) + assertThat(avroSchema.getField("quality").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE))); - Assertions.assertThat(avroSchema.getField("quantity").schema()) + assertThat(avroSchema.getField("quantity").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG))); - Assertions.assertThat(avroSchema.getField("birthday").schema()) + assertThat(avroSchema.getField("birthday").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), LogicalTypes.timestampMicros() .addToSchema(Schema.create(Schema.Type.LONG)))); - Assertions.assertThat(avroSchema.getField("birthdayMoney").schema()) + assertThat(avroSchema.getField("birthdayMoney").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), LogicalTypes.decimal(38, 9) .addToSchema(Schema.create(Schema.Type.BYTES)))); - Assertions.assertThat(avroSchema.getField("lotteryWinnings").schema()) + assertThat(avroSchema.getField("lotteryWinnings").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), LogicalTypes.decimal(77, 38) .addToSchema(Schema.create(Schema.Type.BYTES)))); - Assertions.assertThat(avroSchema.getField("flighted").schema()) + assertThat(avroSchema.getField("flighted").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN))); - Assertions.assertThat(avroSchema.getField("sound").schema()) + assertThat(avroSchema.getField("sound").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES))); - Assertions.assertThat(avroSchema.getField("anniversaryDate").schema()) + assertThat(avroSchema.getField("anniversaryDate").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - Assertions.assertThat(avroSchema.getField("anniversaryDatetime").schema()) + assertThat(avroSchema.getField("anniversaryDatetime").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); - Assertions.assertThat(avroSchema.getField("anniversaryTime").schema()) + assertThat(avroSchema.getField("anniversaryTime").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); Schema geoSchema = Schema.create(Schema.Type.STRING); geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt"); - Assertions.assertThat(avroSchema.getField("geoPositions").schema()) + assertThat(avroSchema.getField("geoPositions").schema()) .isEqualTo(Schema.createUnion(Schema.create(Schema.Type.NULL), geoSchema)); - Assertions.assertThat(avroSchema.getField("scion").schema()) + assertThat(avroSchema.getField("scion").schema()) .isEqualTo( Schema.createUnion( Schema.create(Schema.Type.NULL), @@ -188,7 +189,7 @@ public void testConvertBigQuerySchemaToAvroSchema() { Schema.create(Schema.Type.STRING)), null, (Object) null))))); - Assertions.assertThat(avroSchema.getField("associates").schema()) + assertThat(avroSchema.getField("associates").schema()) .isEqualTo( Schema.createArray( Schema.createRecord( @@ -226,6 +227,6 @@ public void testBQSchemaToTableSchema() { TableSchema transformed = SchemaTransform.bigQuerySchemaToTableSchema(schema); - Assertions.assertThat(transformed).isEqualTo(expected); + assertThat(transformed).isEqualTo(expected); } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java new file mode 100644 index 00000000..aa0b09a6 --- /dev/null +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java @@ -0,0 +1,336 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.fakes; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.function.SerializableFunction; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.AvroRows; +import com.google.cloud.bigquery.storage.v1.AvroSchema; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadStream; +import com.google.cloud.bigquery.storage.v1.StreamStats; +import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; +import com.google.cloud.flink.bigquery.common.config.CredentialsOptions; +import com.google.cloud.flink.bigquery.services.BigQueryServices; +import com.google.cloud.flink.bigquery.services.QueryResultInfo; +import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions; +import com.google.protobuf.ByteString; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.util.RandomData; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** Utility class to generate mocked objects for the BQ storage client. */ +public class StorageClientFaker { + + /** Implementation for the BigQuery services for testing purposes. */ + public static class FakeBigQueryServices implements BigQueryServices { + + private final FakeBigQueryStorageReadClient storageReadClient; + + public FakeBigQueryServices(FakeBigQueryStorageReadClient storageReadClient) { + this.storageReadClient = storageReadClient; + } + + @Override + public StorageReadClient getStorageClient(CredentialsOptions readOptions) + throws IOException { + return storageReadClient; + } + + @Override + public QueryDataClient getQueryDataClient(CredentialsOptions readOptions) { + return new QueryDataClient() { + @Override + public List retrieveTablePartitions( + String project, String dataset, String table) { + return new ArrayList<>(); + } + + @Override + public Optional> retrievePartitionColumnName( + String project, String dataset, String table) { + return Optional.empty(); + } + + @Override + public TableSchema getTableSchema(String project, String dataset, String table) { + return new TableSchema(); + } + + @Override + public Optional runQuery(String projectId, String query) { + return Optional.of(QueryResultInfo.succeed("", "", "")); + } + + @Override + public Job dryRunQuery(String projectId, String query) { + return null; + } + }; + } + + /** Implementation of the server stream for testing purposes. */ + public static class FakeBigQueryServerStream + implements BigQueryServices.BigQueryServerStream { + + private final List toReturn; + + public FakeBigQueryServerStream( + SerializableFunction> dataGenerator, + String schema, + String dataPrefix, + Long size, + Long offset) { + this.toReturn = + createResponse( + schema, + dataGenerator + .apply(new RecordGenerationParams(schema, size.intValue())) + .stream() + .skip(offset) + .collect(Collectors.toList()), + 0, + size); + } + + @Override + public Iterator iterator() { + return toReturn.iterator(); + } + + @Override + public void cancel() {} + } + + /** Implementation for the storage read client for testing purposes. */ + public static class FakeBigQueryStorageReadClient implements StorageReadClient { + + private final ReadSession session; + private final SerializableFunction> + dataGenerator; + + public FakeBigQueryStorageReadClient( + ReadSession session, + SerializableFunction> + dataGenerator) { + this.session = session; + this.dataGenerator = dataGenerator; + } + + @Override + public ReadSession createReadSession(CreateReadSessionRequest request) { + return session; + } + + @Override + public BigQueryServerStream readRows(ReadRowsRequest request) { + try { + // introduce some random delay + Thread.sleep(new Random().nextInt(500)); + } catch (InterruptedException ex) { + } + return new FakeBigQueryServerStream( + dataGenerator, + session.getAvroSchema().getSchema(), + request.getReadStream(), + session.getEstimatedRowCount(), + request.getOffset()); + } + + @Override + public void close() {} + } + } + + public static final String SIMPLE_AVRO_SCHEMA_STRING = + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"RowRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"number\", \"type\": \"long\"}\n" + + " ]\n" + + "}"; + + public static final Schema SIMPLE_AVRO_SCHEMA = + new Schema.Parser().parse(SIMPLE_AVRO_SCHEMA_STRING); + + /** Represents the parameters needed for the Avro data generation. */ + public static class RecordGenerationParams implements Serializable { + private final String avroSchemaString; + private final Integer recordCount; + + public RecordGenerationParams(String avroSchemaString, Integer recordCount) { + this.avroSchemaString = avroSchemaString; + this.recordCount = recordCount; + } + + public String getAvroSchemaString() { + return avroSchemaString; + } + + public Integer getRecordCount() { + return recordCount; + } + } + + public static ReadSession fakeReadSession( + Integer expectedRowCount, Integer expectedReadStreamCount, String avroSchemaString) { + // setup the response for read session request + List readStreams = + IntStream.range(0, expectedReadStreamCount) + .mapToObj(i -> ReadStream.newBuilder().setName("stream" + i).build()) + .collect(Collectors.toList()); + return ReadSession.newBuilder() + .addAllStreams(readStreams) + .setEstimatedRowCount(expectedRowCount) + .setDataFormat(DataFormat.AVRO) + .setAvroSchema(AvroSchema.newBuilder().setSchema(avroSchemaString)) + .build(); + } + + public static List createRecordList(RecordGenerationParams params) { + Schema schema = new Schema.Parser().parse(params.getAvroSchemaString()); + return IntStream.range(0, params.getRecordCount()) + .mapToObj(i -> createRecord(schema)) + .collect(Collectors.toList()); + } + + public static GenericRecord createRecord(Schema schema) { + return (GenericRecord) new RandomData(schema, 0).iterator().next(); + } + + private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); + + @SuppressWarnings("deprecation") + public static List createResponse( + String schemaString, + List genericRecords, + double progressAtResponseStart, + double progressAtResponseEnd) { + // BigQuery delivers the data in 1024 elements chunks, so we partition the generated list + // into multiple ones with that size max. + List> responsesData = Lists.partition(genericRecords, 1024); + + return responsesData.stream() + // for each data response chunk we generate a read response object + .map( + genRecords -> { + try { + Schema schema = new Schema.Parser().parse(schemaString); + GenericDatumWriter writer = + new GenericDatumWriter<>(schema); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Encoder binaryEncoder = + ENCODER_FACTORY.binaryEncoder(outputStream, null); + for (GenericRecord genericRecord : genRecords) { + writer.write(genericRecord, binaryEncoder); + } + + binaryEncoder.flush(); + + return ReadRowsResponse.newBuilder() + .setAvroRows( + AvroRows.newBuilder() + .setSerializedBinaryRows( + ByteString.copyFrom( + outputStream.toByteArray())) + .setRowCount(genRecords.size())) + .setAvroSchema( + AvroSchema.newBuilder() + .setSchema(schema.toString()) + .build()) + .setRowCount(genRecords.size()) + .setStats( + StreamStats.newBuilder() + .setProgress( + StreamStats.Progress.newBuilder() + .setAtResponseStart( + progressAtResponseStart) + .setAtResponseEnd( + progressAtResponseEnd))) + .build(); + } catch (Exception ex) { + throw new RuntimeException( + "Problems generating faked response.", ex); + } + }) + .collect(Collectors.toList()); + } + + public static BigQueryReadOptions createReadOptions( + Integer expectedRowCount, Integer expectedReadStreamCount, String avroSchemaString) + throws IOException { + return createReadOptions( + expectedRowCount, + expectedReadStreamCount, + avroSchemaString, + params -> StorageClientFaker.createRecordList(params)); + } + + public static BigQueryReadOptions createReadOptions( + Integer expectedRowCount, + Integer expectedReadStreamCount, + String avroSchemaString, + SerializableFunction> dataGenerator) + throws IOException { + return BigQueryReadOptions.builder() + .setBigQueryConnectOptions( + BigQueryConnectOptions.builder() + .setDataset("dataset") + .setProjectId("project") + .setTable("table") + .setCredentialsOptions(null) + .setTestingBigQueryServices( + () -> { + return new StorageClientFaker.FakeBigQueryServices( + new StorageClientFaker.FakeBigQueryServices + .FakeBigQueryStorageReadClient( + StorageClientFaker.fakeReadSession( + expectedRowCount, + expectedReadStreamCount, + avroSchemaString), + dataGenerator)); + }) + .build()) + .build(); + } +} diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java index 6a2ceb6f..6e818258 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java @@ -20,11 +20,12 @@ import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.common.config.CredentialsOptions; -import org.assertj.core.api.Assertions; import org.junit.Test; import java.io.IOException; +import static com.google.common.truth.Truth.assertThat; + /** */ public class BigQueryServicesTest { @Test @@ -50,14 +51,14 @@ public BigQueryServices.StorageReadClient getStorageClient( .setTestingBigQueryServices(dummyServices) .build()); - Assertions.assertThat(original.getIsTestingEnabled()).isTrue(); - Assertions.assertThat(original.getTestingServices()).isNotNull(); - Assertions.assertThat(original.queryClient()).isNull(); - Assertions.assertThat(original.storageRead()).isNull(); + assertThat(original.getIsTestingEnabled()).isTrue(); + assertThat(original.getTestingServices()).isNotNull(); + assertThat(original.queryClient()).isNull(); + assertThat(original.storageRead()).isNull(); original.defaultImplementation(); - Assertions.assertThat(original.getIsTestingEnabled()).isFalse(); - Assertions.assertThat(original.getTestingServices()).isNull(); + assertThat(original.getIsTestingEnabled()).isFalse(); + assertThat(original.getTestingServices()).isNull(); } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/QueryResultInfoTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/QueryResultInfoTest.java index e3f7a87d..929c8c50 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/QueryResultInfoTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/services/QueryResultInfoTest.java @@ -18,36 +18,38 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; -import org.assertj.core.api.Assertions; +import com.google.common.truth.Truth8; import org.junit.Test; +import static com.google.common.truth.Truth.assertThat; + /** */ public class QueryResultInfoTest { @Test public void testQueryResultInfoFailed() { QueryResultInfo qri = QueryResultInfo.failed(Lists.newArrayList()); - Assertions.assertThat(qri.getStatus()).isEqualTo(QueryResultInfo.Status.FAILED); - Assertions.assertThat(qri.getDestinationProject()).isEmpty(); - Assertions.assertThat(qri.getDestinationDataset()).isEmpty(); - Assertions.assertThat(qri.getDestinationTable()).isEmpty(); - Assertions.assertThat(qri.getErrorMessages()).isNotEmpty(); + assertThat(qri.getStatus()).isEqualTo(QueryResultInfo.Status.FAILED); + Truth8.assertThat(qri.getDestinationProject()).isEmpty(); + Truth8.assertThat(qri.getDestinationDataset()).isEmpty(); + Truth8.assertThat(qri.getDestinationTable()).isEmpty(); + Truth8.assertThat(qri.getErrorMessages()).isPresent(); } @Test public void testQueryResultInfoSucceeded() { QueryResultInfo qri = QueryResultInfo.succeed("", "", ""); - Assertions.assertThat(qri.getStatus()).isEqualTo(QueryResultInfo.Status.SUCCEED); - Assertions.assertThat(qri.getDestinationProject()).isNotEmpty(); - Assertions.assertThat(qri.getDestinationDataset()).isNotEmpty(); - Assertions.assertThat(qri.getDestinationTable()).isNotEmpty(); - Assertions.assertThat(qri.getErrorMessages().get()).isEmpty(); + assertThat(qri.getStatus()).isEqualTo(QueryResultInfo.Status.SUCCEED); + Truth8.assertThat(qri.getDestinationProject()).isPresent(); + Truth8.assertThat(qri.getDestinationDataset()).isPresent(); + Truth8.assertThat(qri.getDestinationTable()).isPresent(); + assertThat(qri.getErrorMessages().get()).isEmpty(); } @Test public void testNotEquals() { QueryResultInfo succeed = QueryResultInfo.succeed("", "", ""); QueryResultInfo failed = QueryResultInfo.failed(Lists.newArrayList()); - Assertions.assertThat(succeed).isNotEqualTo(failed); + assertThat(succeed).isNotEqualTo(failed); } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/enumerator/BigQuerySourceEnumStateSerializerTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/enumerator/BigQuerySourceEnumStateSerializerTest.java index ae96a191..e116cf74 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/enumerator/BigQuerySourceEnumStateSerializerTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/enumerator/BigQuerySourceEnumStateSerializerTest.java @@ -18,7 +18,6 @@ import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit; import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplitSerializer; -import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -27,6 +26,8 @@ import java.util.Map; import java.util.TreeMap; +import static com.google.common.truth.Truth.assertThat; + /** */ public class BigQuerySourceEnumStateSerializerTest { @@ -65,8 +66,8 @@ public void testEnumStateSerializerInitialState() throws IOException { BigQuerySourceEnumStateSerializer.INSTANCE.deserialize( BigQuerySourceSplitSerializer.VERSION, serialized); - Assert.assertEquals(initialState, enumState1); - Assert.assertEquals(initialState.hashCode(), enumState1.hashCode()); + assertThat(initialState).isEqualTo(enumState1); + assertThat(initialState.hashCode()).isEqualTo(enumState1.hashCode()); } @Test @@ -79,7 +80,7 @@ public void testEnumStateSerializer() throws IOException { BigQuerySourceEnumStateSerializer.INSTANCE.deserialize( BigQuerySourceSplitSerializer.VERSION, serialized); - Assert.assertEquals(enumState, enumState1); + assertThat(enumState).isEqualTo(enumState1); } @Test(expected = IllegalArgumentException.class) @@ -91,6 +92,6 @@ public void testWrongSerializerVersion() throws IOException { BigQuerySourceEnumStateSerializer.INSTANCE.deserialize(1000, serialized); // should never reach here - Assert.fail(); + assertThat(false).isTrue(); } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssignerTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssignerTest.java new file mode 100644 index 00000000..fa77a546 --- /dev/null +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssignerTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.source.split; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.google.cloud.flink.bigquery.fakes.StorageClientFaker; +import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions; +import com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumState; +import com.google.common.truth.Truth8; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Optional; + +import static com.google.common.truth.Truth.assertThat; + +/** */ +public class BigQuerySourceSplitAssignerTest { + + private BigQueryReadOptions readOptions; + + @Before + public void beforeTest() throws IOException { + this.readOptions = + StorageClientFaker.createReadOptions( + 0, 2, StorageClientFaker.SIMPLE_AVRO_SCHEMA_STRING); + } + + @Test + public void testAssignment() { + // initialize the assigner with default options since we are faking the bigquery services + BigQuerySourceSplitAssigner assigner = + new BigQuerySourceSplitAssigner( + this.readOptions, BigQuerySourceEnumState.initialState()); + // request the retrieval of the bigquery table info + assigner.open(); + + // should retrieve the first split representing the firt stream + Optional maybeSplit = assigner.getNext(); + Truth8.assertThat(maybeSplit).isPresent(); + // should retrieve the second split representing the second stream + maybeSplit = assigner.getNext(); + Truth8.assertThat(maybeSplit).isPresent(); + BigQuerySourceSplit split = maybeSplit.get(); + // no more splits should be available + maybeSplit = assigner.getNext(); + Truth8.assertThat(maybeSplit).isEmpty(); + assertThat(assigner.noMoreSplits()).isTrue(); + // lets check on the enum state + BigQuerySourceEnumState state = assigner.snapshotState(0); + assertThat(state.getRemaniningTableStreams()).isEmpty(); + assertThat(state.getRemainingSourceSplits()).isEmpty(); + // add some splits back + assigner.addSplitsBack(Lists.newArrayList(split)); + // check again on the enum state + state = assigner.snapshotState(0); + assertThat(state.getRemaniningTableStreams()).isEmpty(); + assertThat(state.getRemainingSourceSplits()).isNotEmpty(); + // empty it again and check + assigner.getNext(); + maybeSplit = assigner.getNext(); + Truth8.assertThat(maybeSplit).isEmpty(); + assertThat(assigner.noMoreSplits()).isTrue(); + } +} diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitSerializerTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitSerializerTest.java index 7c8a92e3..c3811bc1 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitSerializerTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitSerializerTest.java @@ -16,11 +16,12 @@ package com.google.cloud.flink.bigquery.source.split; -import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import static com.google.common.truth.Truth.assertThat; + /** */ public class BigQuerySourceSplitSerializerTest { @@ -34,7 +35,7 @@ public void testSplitSerializer() throws IOException { BigQuerySourceSplitSerializer.INSTANCE.deserialize( BigQuerySourceSplitSerializer.VERSION, serialized); - Assert.assertEquals(split, split1); + assertThat(split).isEqualTo(split1); } @Test(expected = IllegalArgumentException.class) @@ -46,6 +47,6 @@ public void testWrongSerializerVersion() throws IOException { BigQuerySourceSplitSerializer.INSTANCE.deserialize(1000, serialized); // should never reach here - assert (true); + assertThat(true).isFalse(); } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java index 39ec5e59..e71a4b82 100644 --- a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitStateTest.java @@ -16,9 +16,10 @@ package com.google.cloud.flink.bigquery.source.split; -import org.assertj.core.api.Assertions; import org.junit.Test; +import static com.google.common.truth.Truth.assertThat; + /** */ public class BigQuerySourceSplitStateTest { @@ -27,10 +28,10 @@ public void testSplitStateTransformation() { String streamName = "somestream"; BigQuerySourceSplit originalSplit = new BigQuerySourceSplit(streamName, 10L); - Assertions.assertThat(originalSplit.splitId()).isEqualTo(streamName); + assertThat(originalSplit.splitId()).isEqualTo(streamName); BigQuerySourceSplitState splitState = new BigQuerySourceSplitState(originalSplit); - Assertions.assertThat(splitState.toBigQuerySourceSplit()).isEqualTo(originalSplit); + assertThat(splitState.toBigQuerySourceSplit()).isEqualTo(originalSplit); } @Test @@ -40,17 +41,17 @@ public void testSplitsEquals() { BigQuerySourceSplit split1 = new BigQuerySourceSplit(streamName1, 10L); String streamName2 = "somestream"; BigQuerySourceSplit split2 = new BigQuerySourceSplit(streamName2, 10L); - Assertions.assertThat(split1).isEqualTo(split2); + assertThat(split1).isEqualTo(split2); BigQuerySourceSplitState splitState1 = new BigQuerySourceSplitState(split1); BigQuerySourceSplitState splitState2 = new BigQuerySourceSplitState(split2); - Assertions.assertThat(splitState1).isEqualTo(splitState2); + assertThat(splitState1).isEqualTo(splitState2); BigQuerySourceSplit split3 = new BigQuerySourceSplit(streamName2, 11L); - Assertions.assertThat(split1).isNotEqualTo(split3); + assertThat(split1).isNotEqualTo(split3); BigQuerySourceSplitState splitState3 = new BigQuerySourceSplitState(split3); - Assertions.assertThat(splitState1).isNotEqualTo(splitState3); + assertThat(splitState1).isNotEqualTo(splitState3); } @Test @@ -63,12 +64,11 @@ public void testSplitStateMutation() { splitState.updateOffset(); BigQuerySourceSplit otherSplit = new BigQuerySourceSplit(streamName, 11L); - Assertions.assertThat(splitState.toBigQuerySourceSplit()).isEqualTo(otherSplit); - Assertions.assertThat(splitState.toBigQuerySourceSplit().hashCode()) - .isEqualTo(otherSplit.hashCode()); + assertThat(splitState.toBigQuerySourceSplit()).isEqualTo(otherSplit); + assertThat(splitState.toBigQuerySourceSplit().hashCode()).isEqualTo(otherSplit.hashCode()); // should be different since they started from different splits - Assertions.assertThat(splitState).isNotEqualTo(new BigQuerySourceSplitState(otherSplit)); - Assertions.assertThat(splitState.hashCode()) + assertThat(splitState).isNotEqualTo(new BigQuerySourceSplitState(otherSplit)); + assertThat(splitState.hashCode()) .isNotEqualTo(new BigQuerySourceSplitState(otherSplit).hashCode()); } } diff --git a/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReaderTest.java b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReaderTest.java new file mode 100644 index 00000000..98a04190 --- /dev/null +++ b/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReaderTest.java @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2023 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.flink.bigquery.source.split.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import com.google.cloud.flink.bigquery.fakes.StorageClientFaker; +import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions; +import com.google.cloud.flink.bigquery.source.reader.BigQuerySourceReaderContext; +import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +import static com.google.common.truth.Truth.assertThat; + +/** */ +public class BigQuerySourceSplitReaderTest { + + @Test + public void testSplitReaderSmall() throws IOException { + // init the read options for BQ + BigQueryReadOptions readOptions = + StorageClientFaker.createReadOptions( + 10, 2, StorageClientFaker.SIMPLE_AVRO_SCHEMA_STRING); + SourceReaderContext readerContext = Mockito.mock(SourceReaderContext.class); + BigQuerySourceReaderContext context = new BigQuerySourceReaderContext(readerContext, 10); + BigQuerySourceSplitReader reader = new BigQuerySourceSplitReader(readOptions, context); + // wake the thing up + reader.wakeUp(); + + String splitName = "stream1"; + BigQuerySourceSplit split = new BigQuerySourceSplit(splitName, 0L); + BigQuerySourceSplit split2 = new BigQuerySourceSplit("stream2", 0L); + SplitsAddition change = + new SplitsAddition<>(Lists.newArrayList(split, split2)); + + // send an assignment + reader.handleSplitsChanges(change); + + // this should fetch us some data + RecordsWithSplitIds records = reader.fetch(); + // there is one finished split and is named stream1 + assertThat(records.finishedSplits()).hasSize(1); + + String firstSplit = records.nextSplit(); + assertThat(firstSplit).isNotNull(); + assertThat(firstSplit).isEqualTo(splitName); + + int i = 0; + while (records.nextRecordFromSplit() != null) { + i++; + } + // there were 10 generic records read + assertThat(i).isEqualTo(10); + // there are no more splits + assertThat(records.nextSplit()).isNull(); + + // now there should be another split to process + records = reader.fetch(); + assertThat(records.finishedSplits()).isNotEmpty(); + + // after processing no more splits can be retrieved + records = reader.fetch(); + assertThat(records.finishedSplits()).isEmpty(); + } + + @Test + public void testSplitReaderMultipleFetch() throws IOException { + Integer totalRecordCount = 15000; + // init the read options for BQ + BigQueryReadOptions readOptions = + StorageClientFaker.createReadOptions( + totalRecordCount, 1, StorageClientFaker.SIMPLE_AVRO_SCHEMA_STRING); + SourceReaderContext readerContext = Mockito.mock(SourceReaderContext.class); + // no limits in the read + BigQuerySourceReaderContext context = new BigQuerySourceReaderContext(readerContext, -1); + BigQuerySourceSplitReader reader = new BigQuerySourceSplitReader(readOptions, context); + // wake the thing up + reader.wakeUp(); + + String splitName = "stream1"; + BigQuerySourceSplit split = new BigQuerySourceSplit(splitName, 0L); + SplitsAddition change = + new SplitsAddition<>(Lists.newArrayList(split)); + + // send an assignment + reader.handleSplitsChanges(change); + + // this should fetch us some data + RecordsWithSplitIds records = reader.fetch(); + // there shouldn't be a finished split + assertThat(records.finishedSplits()).isEmpty(); + + String firstPartialSplit = records.nextSplit(); + assertThat(firstPartialSplit).isNotNull(); + assertThat(firstPartialSplit).isEqualTo(splitName); + + int i = 0; + while (records.nextRecordFromSplit() != null) { + i++; + } + // there were less than 10000 generic records read, the max per fetch + assertThat(i).isLessThan(10001); + // there are no more splits + assertThat(records.nextSplit()).isNull(); + + // now there should be more data in the split and now should be able to finalize it + records = reader.fetch(); + assertThat(records.finishedSplits()).isNotEmpty(); + + // after processing no more splits can be retrieved + records = reader.fetch(); + assertThat(records.finishedSplits()).isEmpty(); + } +} diff --git a/pom.xml b/pom.xml index 90bc2911..00f2d1ec 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ under the License. 17.0 5.9.3 - 3.24.2 + 1.1.4 4.11.0 false @@ -114,8 +114,8 @@ under the License. - org.assertj - assertj-core + com.google.truth.extensions + truth-java8-extension test @@ -314,9 +314,9 @@ under the License. - org.assertj - assertj-core - ${assertj.version} + com.google.truth.extensions + truth-java8-extension + ${google-truth.version} test diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index 5d96995b..5138e27c 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -23,4 +23,6 @@ under the License. "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd"> + +