Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35360] support Flink cdc pipeline Yarn application mode. #3599

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Mrart
Copy link
Contributor

@Mrart Mrart commented Sep 5, 2024

support Flink cdc pipeline Yarn application mode.

@Mrart Mrart marked this pull request as draft September 5, 2024 13:25
@Mrart Mrart changed the title [Flink-35360] support Flink cdc pipeline Yarn application mode. [FLINK-35360] support Flink cdc pipeline Yarn application mode. Sep 6, 2024
@Mrart Mrart marked this pull request as ready for review September 6, 2024 06:03
@MOBIN-F
Copy link
Contributor

MOBIN-F commented Sep 10, 2024

hi @Mrart ,I merged your PR and tested it, and found the following problem which confuses me. Can you help me check it out?
Commands:
/app/flink-cdc-3.3-SNAPSHOT/bin/flink-cdc.sh -t yarn-application --flink-home /app/flink-1.19.1/ /tmp/mysql-to-paimon.yml

2024-09-09 18:26:26,294 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_65]
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_65]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_65]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_65]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_65]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_65]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_65]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_65]
        at org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka4bfbd710-03a0-4175-8cbf-80e973b929b7.jar:1.19.1]
        at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57) [flink-rpc-akka4bfbd710-03a0-4175-8cbf-80e973b929b7.jar:1.19.1]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_65]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_65]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_65]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_65]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        ... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No ExecutorFactory found to execute the application.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        ... 12 more
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
        at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88) ~[flink-dist-1.19.1.jar:1.19.1]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2963) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2437) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2319) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2293) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.cdc.composer.flink.FlinkPipelineExecution.execute(FlinkPipelineExecution.java:43) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:91) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_65]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_65]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_65]
        at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_65]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-cdc-dist-3.3-SNAPSHOT.jar:3.3-SNAPSHOT]
        ... 12 more

Other non-flink-cdc tasks run normally

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @Mrart's contribution, just left some trivial comments.

return Optional.ofNullable(distJars.get(0));
} catch (IOException e) {
LOG.error(
"Get flink-cdc-dist.jar from Flink cdc home lib is : {} failed",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Get flink-cdc-dist.jar from Flink cdc home lib is : {} failed",
"Failed to fetch Flink CDC dist jar from path {}",

path ->
path.getFileName()
.toString()
.matches("flink-cdc-dist-.*-.*\\.jar"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this regex match release jars like flink-cdc-dist-3.0.0.jar?

.toString()
.matches("flink-cdc-dist-.*-.*\\.jar"))
.forEach(path -> distJars.add(String.valueOf(path.toAbsolutePath())));
return Optional.ofNullable(distJars.get(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distJars.get(0) will never be null. If distJars is empty, an IndexOutOfBoundsException will be thrown instead of returning Optional.empty().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add some test cases to verify this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants