-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds implementation for a Split Assigner and a Split Reader #44
Adds implementation for a Split Assigner and a Split Reader #44
Conversation
...igquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Outdated
Show resolved
Hide resolved
...bigquery/src/main/java/com/google/cloud/flink/bigquery/common/config/CredentialsOptions.java
Outdated
Show resolved
Hide resolved
...uery/src/main/java/com/google/cloud/flink/bigquery/common/config/BigQueryConnectOptions.java
Outdated
Show resolved
Hide resolved
|
||
private static Credentials createCredentialsFromFile(String file) { | ||
try { | ||
return GoogleCredentials.fromStream(new FileInputStream(file)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PATH_TRAVERSAL_IN: This API (java/io/FileInputStream.(Ljava/lang/String;)V) reads a file whose location might be specified by user input
ℹ️ Expand to see all @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
...tor-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/utils/SchemaTransform.java
Outdated
Show resolved
Hide resolved
/** An options object that covers the possible {@link Credentials} configurations. */ | ||
@AutoValue | ||
@PublicEvolving | ||
public abstract class CredentialsOptions implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AutoValueCouldNotWrite: Could not write generated class com.google.cloud.flink.bigquery.common.config.AutoValue_CredentialsOptions: javax.annotation.processing.FilerException: Attempt to recreate a file for type com.google.cloud.flink.bigquery.common.config.AutoValue_CredentialsOptions
❗❗ 2 similar findings have been found in this PR
🔎 Expand here to view all instances of this finding
File Path | Line Number |
---|---|
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java | 41 |
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/config/BigQueryConnectOptions.java | 34 |
Visit the Lift Web Console to find more details in your report.
ℹ️ Expand to see all @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
🛠 Lift Auto-fixSome of the Lift findings in this PR can be automatically fixed. You can download and apply these changes in your local project directory of your branch to review the suggestions before committing.1 # Download the patch
curl https://lift.sonatype.com/api/patch/github.com/GoogleCloudDataproc/flink-bigquery-connector/44.diff -o lift-autofixes.diff
# Apply the patch with git
git apply lift-autofixes.diff
# Review the changes
git diff Want it all in a single command? Open a terminal in your project's directory and copy and paste the following command: curl https://lift.sonatype.com/api/patch/github.com/GoogleCloudDataproc/flink-bigquery-connector/44.diff | git apply Once you're satisfied, commit and push your changes in your project. Footnotes |
/gcbrun |
/gcbrun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please convert the asserts in the unit tests to use google-truth assert (they are more readable and less error-prone). Other than that it looks ok
/gcbrun |
...igquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/google/cloud/flink/bigquery/source/reader/BigQuerySourceReaderContext.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/google/cloud/flink/bigquery/source/reader/BigQuerySourceReaderContext.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Show resolved
Hide resolved
/gcbrun |
|
||
/** A simple split assigner based on the BigQuery {@link ReadSession} streams. */ | ||
@Internal | ||
public class BigQuerySourceSplitAssigner { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
Yes, PR #45 should have references to it (BigQuerySiurceEnumerator).
…On Mon, Oct 9, 2023 at 8:14 PM Jayadeep Jayaraman ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
<#44 (comment)>
:
> +import com.google.protobuf.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A simple split assigner based on the BigQuery ***@***.*** ReadSession} streams. */
***@***.***
+public class BigQuerySourceSplitAssigner {
I don't see a BigquerySource API using this class in this PR. Is it
available in some other PR?
—
Reply to this email directly, view it on GitHub
<#44 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AA2HMF2AV7HMDO55J5GGZVLX6S4TDAVCNFSM6AAAAAAZXWE26KVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMNRVHE2TAMBZGY>
.
You are receiving this because you authored the thread.Message ID:
<GoogleCloudDataproc/flink-bigquery-connector/pull/44/review/1665950096@
github.com>
|
|
||
import org.apache.flink.annotation.Internal; | ||
|
||
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for DP 2.1 we plan on supporting 1.15.4 flink version which uses guava30 and for DP 2.2 we can upgrade it to guava31 when it is available.
My understanding was that we have some effort ongoing to make this code
work on Dataproc latest image (which supports Flink 1.15 if I’m not
mistaken).
I was holding on upgrading to 1.17.1 until we had more info on the runtimes
dependencies fixed by the Dataproc image.
Do we want to move all the deps to latest and greatest?
…On Mon, Oct 9, 2023 at 8:19 PM Jayadeep Jayaraman ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
<#44 (comment)>
:
> + * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.flink.bigquery.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
Flink 1.17 shaded project has moved to guava31
https://github.com/apache/flink-shaded/tree/release-17.0. Can we also
upgrade our dependencies to this.
—
Reply to this email directly, view it on GitHub
<#44 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AA2HMF7LRG34GBBX53VTGJ3X6S5C7AVCNFSM6AAAAAAZXWE26KVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMNRVHE2TMNBRGU>
.
You are receiving this because you authored the thread.Message ID:
<GoogleCloudDataproc/flink-bigquery-connector/pull/44/review/1665956415@
github.com>
|
projectId, | ||
dataset, | ||
table); | ||
return this.readOptions |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
That’s correct this is the case on where a query SQL has been provided.
But no, the temporary results of a query execution (when not using legacy
SQL) end up in a BigQuery table, see
https://cloud.google.com/bigquery/docs/writing-results#temporary_and_permanent_tables
…On Mon, Oct 9, 2023 at 8:29 PM Jayadeep Jayaraman ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
<#44 (comment)>
:
> + if (result.getStatus().equals(QueryResultInfo.Status.FAILED)) {
+ throw new IllegalStateException(
+ "The BigQuery query execution failed with errors: "
+ + result.getErrorMessages()
+ .orElse(Lists.newArrayList()));
+ }
+ String projectId = result.getDestinationProject().get();
+ String dataset = result.getDestinationDataset().get();
+ String table = result.getDestinationTable().get();
+ LOG.info(
+ "After BigQuery query execution, switching connect options"
+ + " to read from table {}.{}.{}",
+ projectId,
+ dataset,
+ table);
+ return this.readOptions
Is this for situations when user passes an entire SQL query and the
connector does not use the storage API ? If that is the case then as per my
understanding the intermediate data is materialised in GCS bucket but in
this case looks like we are reading from a temporary table.
—
Reply to this email directly, view it on GitHub
<#44 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AA2HMFYEQKGRXRABERSOZODX6S6K3AVCNFSM6AAAAAAZXWE26KVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMNRVHE4TOMBZGM>
.
You are receiving this because you authored the thread.Message ID:
<GoogleCloudDataproc/flink-bigquery-connector/pull/44/review/1665997093@
github.com>
|
In this case then, should I keep the dependencies as they are?
…On Mon, Oct 9, 2023 at 8:34 PM Jayadeep Jayaraman ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
<#44 (comment)>
:
> + * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.flink.bigquery.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
Yes for DP 2.1 we plan on supporting 1.15.4 flink version which uses
guava30 and for DP 2.2 we can upgrade it to guava31 when it is available.
—
Reply to this email directly, view it on GitHub
<#44 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AA2HMF3QJ4CMFWPOUFT5N6LX6S65JAVCNFSM6AAAAAAZXWE26KVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMNRWGAYTSOBVG4>
.
You are receiving this because you authored the thread.Message ID:
<GoogleCloudDataproc/flink-bigquery-connector/pull/44/review/1666019857@
github.com>
|
Yes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you use the Flink built-in shaded guava. Does the package change between flink versions? As we shade the connector, can we use our own guava, especially as versions previous to 32.0.1 are affected with security issues ? See https://mvnrepository.com/artifact/com.google.guava/guava
|
||
public abstract String getRowRestriction(); | ||
|
||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please replace Nullable fields with Optional
as it better indicate that he field may have no value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Show resolved
Hide resolved
…n config builder)
/gcbrun |
It does change change between Flink versions, AFAIK the last released one is 31.1 shaded with version 17.0 https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-guava/31.1-jre-17.0 Also, there is a rule on check style that disallow the usage of guava related replacements, similar problems we when through with the request to use Google Truth specific libraries. I can open another issue to cover this particular version migration, since is not a trivial change, @davidrabinowitz please advise. |
Does it mean we will need to create different connectors for each Flink version? |
I guess there is a reason why the runtime wants to take control of guava related versions (alongside other libraries), Flink may have run into problems opening the connectors to use any dependencies. They go a long way into the project pom config trying to quickly surface errors and emit messages to avoid the usage of certain packages on the dependency tree. We can try to see in the future to shade all our dependencies into the distribution jarfile, see #47 where this distro jar project is added. It will imply validating afterwards against few versions of the runtime; that work is not trivial since we don't know how a specific version of our dependencies may meddle with the runtime or even other connectors that run with the same assumptions imposed by the runtime. Also, can try to get rid of the code explicit dependencies on guava related packages and rely on java native capabilities to implement the needed functionalities. That will avoid the need of having vendored references on the actual code, but we still need to see how this code's dependencies on guava (to say one) will work with the versions imposed by the runtime. |
The assigner is in charge of creating the units of work for the Source enumerator (in an upcoming branch) and the enumerator will use the assigner output to deliver those units of works (the splits) to a split reader, in charge of getting the actual data from the source.