Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w… #22660

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .idea/icon.png
Binary file not shown.
3 changes: 1 addition & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ For Google Protobuf you need the following Maven dependency:

Please adjust the versions of both libraries as needed.

### Issue with using Kryo's `JavaSerializer`
### Issue with using `JavaSerializer` from Kryo 2.x

NOTE: This issues applies to Kryo 2.x and not Kryo 5+.

If you register Kryo's `JavaSerializer` for your custom type, you may
encounter `ClassNotFoundException`s even though your custom type class is
Expand Down
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.default-kryo5-serializers</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo 5 default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.force-avro</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-kryo5-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-pojo-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
org.apache.flink.api.common.ExecutionConfig.configure(org.apache.flink.configuration.ReadableConfig, java.lang.ClassLoader): Argument leaf type org.apache.flink.configuration.ReadableConfig does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getClosureCleanerLevel(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getDefaultKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getGlobalJobParameters(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getRegisteredTypesWithKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setClosureCleanerLevel(org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel): Argument leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setGlobalJobParameters(org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters): Argument leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache.parseCachedFilesFromString(java.util.List): Returned leaf type org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier$Context does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier$SupplierFromSerializableTimestampAssigner does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,6 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:193)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:115)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:115)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactCoordinator.java:115)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactCoordinator.java:106)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (CompactCoordinator.java:106)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactCoordinator.java:126)
Expand All @@ -785,7 +784,6 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.end
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactOperator.java:160)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:108)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:108)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactOperator.java:108)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactOperator.java:102)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in (CompactOperator.java:138)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()> in (CompactOperator.java:139)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.connector.file.table.stream.TaskTracker;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ under the License.
<!-- managed version -->
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo5</artifactId>
<!-- managed version -->
</dependency>

<!-- The common collections are needed for some hash tables used in the collection execution -->
<dependency>
<groupId>commons-collections</groupId>
Expand Down
Loading