Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds implementation for a Split Assigner and a Split Reader #44

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
eb9dc8f
cleaning up the main branch for new version.
prodriguezdefino Jun 27, 2023
0695673
updates in ignore and readme files
prodriguezdefino Jun 27, 2023
e36cc44
prepping the pom addition, added parent's compliance tools
prodriguezdefino Jun 27, 2023
e167803
adding parent pom and the connector impl project pom
prodriguezdefino Jun 28, 2023
8043378
adding common functionalities
prodriguezdefino Jun 28, 2023
b455b4e
added the bigquery services wrapper and factories
prodriguezdefino Jun 28, 2023
27cd837
creates the split, its state and the enumerator state
prodriguezdefino Jun 28, 2023
4d4b60f
added configs, split reader and split assignment impls
prodriguezdefino Jun 28, 2023
0567b58
applying recommendations from sonartype-lift
prodriguezdefino Jun 28, 2023
5743292
merge changes from master (previous pom deletion resolution)
prodriguezdefino Jul 6, 2023
cab9115
fixing the package name for the schema namespace
prodriguezdefino Jul 6, 2023
3375d54
Merge branch 'common_code_source' into bq_services_wrappers
prodriguezdefino Jul 10, 2023
849f769
merged main branch and took care of few lift comments
prodriguezdefino Jul 11, 2023
fd70b95
Merge branch 'bq_services_wrappers' into source_splits
prodriguezdefino Jul 11, 2023
0f18d14
merge from source_split
prodriguezdefino Jul 11, 2023
6b08119
fixing lift recommendations and spotless
prodriguezdefino Jul 11, 2023
f473d57
addressing comments from review
prodriguezdefino Jul 27, 2023
c178f83
merge from main
prodriguezdefino Aug 1, 2023
09eaaa4
merge from master
prodriguezdefino Aug 1, 2023
def3cc4
Merge branch 'source_splits' into split_assigner_and_reader
prodriguezdefino Aug 1, 2023
ceabb12
fixed type reference Int -> Long
prodriguezdefino Aug 1, 2023
1734bac
merge from main
prodriguezdefino Aug 8, 2023
e96ff59
addressing comments from review
prodriguezdefino Aug 8, 2023
c492f02
improved hashcode and equals readability
prodriguezdefino Aug 8, 2023
9aae0af
changed tests to use google-truth instead of junit or assertj asserti…
prodriguezdefino Aug 9, 2023
edded84
addressing comments from review
prodriguezdefino Oct 9, 2023
c00b79c
resolved commments from review (new specific exception and optional o…
prodriguezdefino Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2023 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.common.exceptions;

/** Represents a general error during the execution of the connector's code. */
public class BigQueryConnectorException extends RuntimeException {

public BigQueryConnectorException(String message) {
super(message);
}

public BigQueryConnectorException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Copyright (C) 2023 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.source.config;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import com.google.auto.value.AutoValue;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import org.threeten.bp.Instant;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** The options available to read data from BigQuery. */
@AutoValue
@PublicEvolving
public abstract class BigQueryReadOptions implements Serializable {

public abstract ImmutableList<String> getColumnNames();

public abstract String getRowRestriction();

public abstract Optional<Long> getSnapshotTimestampInMillis();

public abstract Optional<String> getQuery();

public abstract Optional<String> getQueryExecutionProject();

public abstract Integer getMaxStreamCount();

public abstract Integer getMaxRecordsPerSplitFetch();

public abstract BigQueryConnectOptions getBigQueryConnectOptions();

@Override
public final int hashCode() {
return Objects.hash(
getColumnNames(),
getRowRestriction(),
getSnapshotTimestampInMillis(),
getMaxStreamCount(),
getBigQueryConnectOptions());
}

@Override
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final BigQueryReadOptions other = (BigQueryReadOptions) obj;
return Objects.equals(this.getColumnNames(), other.getColumnNames())
&& Objects.equals(this.getRowRestriction(), other.getRowRestriction())
&& Objects.equals(
this.getSnapshotTimestampInMillis(), other.getSnapshotTimestampInMillis())
&& Objects.equals(this.getMaxStreamCount(), other.getMaxStreamCount())
&& Objects.equals(
this.getBigQueryConnectOptions(), other.getBigQueryConnectOptions());
}

/**
* Transforms the instance into a builder instance for property modification.
*
* @return A {@link Builder} instance for the type.
*/
public abstract Builder toBuilder();

/**
* Creates a builder instance with all the default values set.
*
* @return A {@link Builder} for the type.
*/
public static Builder builder() {
return new AutoValue_BigQueryReadOptions.Builder()
.setRowRestriction("")
.setColumnNames(new ArrayList<>())
.setMaxStreamCount(0)
.setMaxRecordsPerSplitFetch(10000)
.setSnapshotTimestampInMillis(null);
}

/** Builder class for {@link BigQueryReadOptions}. */
@AutoValue.Builder
public abstract static class Builder {

/**
* Prepares this builder to execute a query driven read using the default credentials
* configuration.
*
* @param query A BigQuery standard SQL query.
* @param projectId A GCP project where the query will run.
* @return This {@link Builder} instance.
* @throws IOException In case of problems while setting up the credentials options.
*/
public Builder setQueryAndExecutionProject(String query, String projectId)
throws IOException {
return setQueryWithExecutionProjectAndCredentialsOptions(
query, projectId, CredentialsOptions.builder().build());
}

/**
* Prepares this builder to execute a query driven read.
*
* @param query A BigQuery standard SQL query.
* @param projectId A GCP project where the query will run.
* @param credentialsOptions The GCP credentials options.
* @return This {@link Builder} instance.
* @throws IOException In case of problems while setting up the credentials options.
*/
public Builder setQueryWithExecutionProjectAndCredentialsOptions(
String query, String projectId, CredentialsOptions credentialsOptions)
throws IOException {
this.setQuery(query);
this.setQueryExecutionProject(projectId);
this.setBigQueryConnectOptions(
BigQueryConnectOptions.builderForQuerySource()
.setCredentialsOptions(credentialsOptions)
.build());
return this;
}

/**
* Sets a BigQuery query which will be run first, storing its result in a temporary table,
* and Flink will read the query results from that temporary table. This is an optional
* argument.
*
* @param query A BigQuery standard SQL query.
* @return This {@link Builder} instance.
*/
public abstract Builder setQuery(@Nullable String query);

/**
* Sets the GCP project where the configured query will be run. In case the query
* configuration is not set this configuration is discarded.
*
* @param projectId A GCP project.
* @return This {@link Builder} instance.
*/
public abstract Builder setQueryExecutionProject(@Nullable String projectId);

/**
* Sets the restriction the rows in the BigQuery table must comply to be returned by the
* source.
*
* @param rowRestriction A {@link String} containing the row restrictions.
* @return This {@link Builder} instance.
*/
public abstract Builder setRowRestriction(String rowRestriction);

/**
* Sets the column names that will be projected from the table's retrieved data.
*
* @param colNames The names of the columns as a list of strings.
* @return This {@link Builder} instance.
*/
public abstract Builder setColumnNames(List<String> colNames);

/**
* Sets the snapshot time (in milliseconds since epoch) for the BigQuery table, if not set
* {@code now()} is used.
*
* @param snapshotTs The snapshot's time in milliseconds since epoch.
* @return This {@link Builder} instance.
*/
public abstract Builder setSnapshotTimestampInMillis(@Nullable Long snapshotTs);

/**
* Sets the maximum number of read streams that BigQuery should create to retrieve data from
* the table. BigQuery can return a lower number than the specified.
*
* @param maxStreamCount The maximum number of read streams.
* @return This {@link Builder} instance.
*/
public abstract Builder setMaxStreamCount(Integer maxStreamCount);

/**
* Sets the maximum number of records to read from a streams once a fetch has been requested
* from a particular split. Configuring this number too high may cause memory pressure in
* the task manager, depending on the BigQuery record's size and total rows on the stream.
*
* @param maxStreamCount The maximum number records to read from a split at a time.
* @return This {@link Builder} instance.
*/
public abstract Builder setMaxRecordsPerSplitFetch(Integer maxStreamCount);

/**
* Sets the {@link BigQueryConnectOptions} instance.
*
* @param connect The {@link BigQueryConnectOptions} instance.
* @return This {@link Builder} instance.
*/
public abstract Builder setBigQueryConnectOptions(BigQueryConnectOptions connect);

abstract BigQueryReadOptions autoBuild();

/**
* A fully initialized {@link BigQueryReadOptions} instance.
*
* @return A {@link BigQueryReadOptions} instance.
*/
public final BigQueryReadOptions build() {
BigQueryReadOptions readOptions = autoBuild();
Preconditions.checkState(
readOptions.getMaxStreamCount() >= 0,
"The max number of streams should be zero or positive.");
Preconditions.checkState(
!readOptions
.getSnapshotTimestampInMillis()
// see if the value is lower than the epoch
.filter(timeInMillis -> timeInMillis < Instant.EPOCH.toEpochMilli())
// if present, then fail
.isPresent(),
"The oldest timestamp should be equal or bigger than epoch.");
Preconditions.checkState(
!Optional.ofNullable(readOptions.getQuery())
// if the project was not configured
.filter(q -> readOptions.getQueryExecutionProject() == null)
// if present fail
.isPresent(),
"If a query is configured, then a GCP project should be provided.");

return readOptions;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (C) 2023 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.source.reader;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

import java.util.concurrent.atomic.AtomicInteger;

/** A {@link SourceReaderContext} proxy that adds limit and counts for state management. */
@Internal
public class BigQuerySourceReaderContext implements SourceReaderContext {

private final SourceReaderContext readerContext;
private final AtomicInteger readCount = new AtomicInteger(0);
private final int limit;

public BigQuerySourceReaderContext(SourceReaderContext readerContext, int limit) {
this.readerContext = readerContext;
this.limit = limit;
}

@Override
public SourceReaderMetricGroup metricGroup() {
return readerContext.metricGroup();
}

@Override
public Configuration getConfiguration() {
return readerContext.getConfiguration();
}

@Override
public String getLocalHostName() {
return readerContext.getLocalHostName();
}

@Override
public int getIndexOfSubtask() {
return readerContext.getIndexOfSubtask();
}

@Override
public void sendSplitRequest() {
readerContext.sendSplitRequest();
}

@Override
public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
readerContext.sendSourceEventToCoordinator(sourceEvent);
}

@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}

public int updateReadCount(Integer newReads) {
return readCount.addAndGet(newReads);
}

public int currentReadCount() {
return readCount.get();
}

public boolean isLimitPushedDown() {
return limit > 0;
}

public boolean willExceedLimit(int newReads) {
return limit > 0 && (readCount.get() + newReads) >= limit;
}

public int getLimit() {
return limit;
}
}
Loading
Loading