Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Avoid deprecated APIs removed in Flink 2.0 Preview #4567

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
80524c8
[flink][hotfix] Wait for consumer reset before job close
yunfengzhou-hub Nov 21, 2024
a4d79e3
[flink][cdc] Update flink dependency to 1.20
yunfengzhou-hub Nov 21, 2024
2c6ea46
[flink] Adopt open(OpenContext) in RichFunction
yunfengzhou-hub Nov 21, 2024
0a0f42c
[flink] Adopt getTaskInfo() when acquiring parallelism info
yunfengzhou-hub Nov 18, 2024
1778354
[flink] Avoid deprecated usages about Configuration
yunfengzhou-hub Nov 21, 2024
346911d
[flink] Avoid deprecated DataStreamUtils
yunfengzhou-hub Nov 23, 2024
e1f2c68
[flink] Remove deprecated TestEnvironment
yunfengzhou-hub Nov 23, 2024
5a6db98
[flink] Replace deprecated flink Time with java Duration
yunfengzhou-hub Nov 23, 2024
60b51e6
[flink] Avoid deprecated SingleThreadMultiplexSourceReaderBase constr…
yunfengzhou-hub Nov 18, 2024
23e63b6
[flink] Avoid deprecated FileSystem#getKind
yunfengzhou-hub Nov 18, 2024
e402f16
[flink] Avoid deprecated SetupableStreamOperator
yunfengzhou-hub Nov 18, 2024
51052d1
[flink] Avoid deprecated usage on TableSchema, DataType and Descripto…
yunfengzhou-hub Nov 18, 2024
821894a
[flink] Avoid deprecated usage of DiscardingSink
yunfengzhou-hub Nov 18, 2024
9cea553
[flink] Replace legacy SinkFunction with v2 Sink
yunfengzhou-hub Nov 18, 2024
2f8c667
[flink] Replace legacy SourceFunction with v2 Source
yunfengzhou-hub Nov 18, 2024
0259a7e
[flink] Avoid relying on format of table description
yunfengzhou-hub Nov 23, 2024
35b057d
[flink] Avoid deprecated sql syntax
yunfengzhou-hub Nov 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Result run() {

String sinkPathConfig =
BenchmarkGlobalConfiguration.loadConfiguration()
.getString(BenchmarkOptions.SINK_PATH);
.get(BenchmarkOptions.SINK_PATH);
if (sinkPathConfig == null) {
throw new IllegalArgumentException(
BenchmarkOptions.SINK_PATH.key() + " must be set");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import org.apache.flink.annotation.PublicEvolving;

/** The {@link JobInfo} represents the meta information of current job. */
@PublicEvolving
public interface JobInfo {

/**
* Get the ID of the job.
*
* @return the ID of the job
*/
JobID getJobId();

/**
* Get the name of the job.
*
* @return the name of the job
*/
String getJobName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import org.apache.flink.annotation.PublicEvolving;

/**
* The interface indicates that it supports multiple attempts executing at the same time.
*
* <p>Currently, the interface is used for speculative execution. If a sink implementation (SinkV2,
* OutputFormat or SinkFunction) inherits this interface, the sink operator would be considered to
* support speculative execution.
*/
@PublicEvolving
public interface SupportsConcurrentExecutionAttempts {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;

/** The default implementation of {@link OpenContext}. */
@PublicEvolving
public class DefaultOpenContext implements OpenContext {

public static final OpenContext INSTANCE = new DefaultOpenContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.table.catalog;
package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;

/**
* Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink
* 1.20).
* The {@link OpenContext} interface provides necessary information required by the {@link
* RichFunction} when it is opened. The {@link OpenContext} is currently empty because it can be
* used to add more methods without affecting the signature of {@code RichFunction#open}.
*/
public interface CatalogMaterializedTable extends CatalogBaseTable {
/** Dummy LogicalRefreshMode placeholder. */
enum LogicalRefreshMode {}

/** Dummy RefreshMode placeholder. */
enum RefreshMode {}

/** Dummy RefreshStatus placeholder. */
enum RefreshStatus {}
}
@PublicEvolving
public interface OpenContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

/**
* An base interface for all rich user-defined functions. This class defines methods for the life
* cycle of the functions, as well as methods to access the context in which the functions are
* executed.
*/
@Public
public interface RichFunction extends Function {

/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
* part of an iteration, this method will be invoked at the beginning of each iteration
* superstep.
*
* <p>The configuration object passed to the function can be used for configuration and
* initialization. The configuration contains all parameters that were configured on the
* function in the program composition.
*
* <pre>{@code
* public class MyFilter extends RichFilterFunction<String> {
*
* private String searchString;
*
* public void open(Configuration parameters) {
* this.searchString = parameters.getString("foo");
* }
*
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* @param parameters The configuration containing the parameters attached to the contract.
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
* @see org.apache.flink.configuration.Configuration
* @deprecated This method is deprecated since Flink 1.19. The users are recommended to
* implement {@code open(OpenContext openContext)} and implement {@code open(Configuration
* parameters)} with an empty body instead. 1. If you implement {@code open(OpenContext
* openContext)}, the {@code open(OpenContext openContext)} will be invoked and the {@code
* open(Configuration parameters)} won't be invoked. 2. If you don't implement {@code
* open(OpenContext openContext)}, the {@code open(Configuration parameters)} will be
* invoked in the default implementation of the {@code open(OpenContext openContext)}.
* @see <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231">
* FLIP-344: Remove parameter in RichFunction#open </a>
*/
@Deprecated
void open(Configuration parameters) throws Exception;

/**
* Initialization method for the function. It is called before the actual working methods (like
* <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that are
* part of an iteration, this method will be invoked at the beginning of each iteration
* superstep.
*
* <p>The openContext object passed to the function can be used for configuration and
* initialization. The openContext contains some necessary information that were configured on
* the function in the program composition.
*
* <pre>{@code
* public class MyFilter extends RichFilterFunction<String> {
*
* private String searchString;
*
* public void open(OpenContext openContext) {
* // initialize the value of searchString
* }
*
* public boolean filter(String value) {
* return value.equals(searchString);
* }
* }
* }</pre>
*
* <p>By default, this method does nothing.
*
* <p>1. If you implement {@code open(OpenContext openContext)}, the {@code open(OpenContext
* openContext)} will be invoked and the {@code open(Configuration parameters)} won't be
* invoked. 2. If you don't implement {@code open(OpenContext openContext)}, the {@code
* open(Configuration parameters)} will be invoked in the default implementation of the {@code
* open(OpenContext openContext)}.
*
* @param openContext The context containing information about the context in which the function
* is opened.
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
@PublicEvolving
default void open(OpenContext openContext) throws Exception {
open(new Configuration());
}

/**
* Tear-down method for the user code. It is called after the last call to the main working
* methods (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration, this
* method will be invoked after each iteration superstep.
*
* <p>This method can be used for clean up work.
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.
* When the runtime catches an exception, it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
void close() throws Exception;

// ------------------------------------------------------------------------
// Runtime context
// ------------------------------------------------------------------------

/**
* Gets the context that contains information about the UDF's runtime, such as the parallelism
* of the function, the subtask index of the function, or the name of the task that executes the
* function.
*
* <p>The RuntimeContext also gives access to the {@link
* org.apache.flink.api.common.accumulators.Accumulator}s and the {@link
* org.apache.flink.api.common.cache.DistributedCache}.
*
* @return The UDF's runtime context.
*/
RuntimeContext getRuntimeContext();

/**
* Gets a specialized version of the {@link RuntimeContext}, which has additional information
* about the iteration in which the function is executed. This IterationRuntimeContext is only
* available if the function is part of an iteration. Otherwise, this method throws an
* exception.
*
* @return The IterationRuntimeContext.
* @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an
* iteration.
*/
IterationRuntimeContext getIterationRuntimeContext();

/**
* Sets the function's runtime context. Called by the framework when creating a parallel
* instance of the function.
*
* @param t The runtime context.
*/
void setRuntimeContext(RuntimeContext t);
}
Loading
Loading