Skip to content

Commit

Permalink
[FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…
Browse files Browse the repository at this point in the history
…ith backward compatibility for existing savepoints and checkpoints.
  • Loading branch information
Kurt Ostfeld committed Jun 14, 2023
1 parent 626d70d commit f6d020a
Show file tree
Hide file tree
Showing 120 changed files with 8,851 additions and 276 deletions.
Binary file removed .idea/icon.png
Binary file not shown.
23 changes: 2 additions & 21 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ For Google Protobuf you need the following Maven dependency:

Please adjust the versions of both libraries as needed.

### Issue with using Kryo's `JavaSerializer`
### Issue with using `JavaSerializer` from Kryo 2.x

NOTE: This issues applies to Kryo 2.x and not Kryo 5+.

If you register Kryo's `JavaSerializer` for your custom type, you may
encounter `ClassNotFoundException`s even though your custom type class is
Expand Down
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.default-kryo5-serializers</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo 5 default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.force-avro</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-kryo5-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-pojo-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
org.apache.flink.api.common.ExecutionConfig.configure(org.apache.flink.configuration.ReadableConfig, java.lang.ClassLoader): Argument leaf type org.apache.flink.configuration.ReadableConfig does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getClosureCleanerLevel(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getDefaultKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getGlobalJobParameters(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getRegisteredTypesWithKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setClosureCleanerLevel(org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel): Argument leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setGlobalJobParameters(org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters): Argument leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache.parseCachedFilesFromString(java.util.List): Returned leaf type org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier$Context does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier$SupplierFromSerializableTimestampAssigner does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,6 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:193)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:115)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:115)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactCoordinator.java:115)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactCoordinator.java:106)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (CompactCoordinator.java:106)
Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactCoordinator.java:126)
Expand All @@ -911,7 +910,6 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.end
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactOperator.java:160)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:108)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:108)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactOperator.java:108)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactOperator.java:102)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in (CompactOperator.java:138)
Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()> in (CompactOperator.java:139)
Expand Down Expand Up @@ -1081,7 +1079,6 @@ Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.creat
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createFetcher(org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.api.operators.StreamingRuntimeContext, org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode, org.apache.flink.metrics.MetricGroup, boolean)> has parameter of type <org.apache.flink.streaming.api.operators.StreamingRuntimeContext> in (FlinkKafkaConsumerBase.java:0)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createFetcher(org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, java.util.Map, org.apache.flink.util.SerializedValue, org.apache.flink.streaming.api.operators.StreamingRuntimeContext, org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode, org.apache.flink.metrics.MetricGroup, boolean)> has parameter of type <org.apache.flink.util.SerializedValue> in (FlinkKafkaConsumerBase.java:0)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createStateSerializer(org.apache.flink.api.common.ExecutionConfig)> calls constructor <org.apache.flink.api.java.typeutils.runtime.TupleSerializer.<init>(java.lang.Class, [Lorg.apache.flink.api.common.typeutils.TypeSerializer;)> in (FlinkKafkaConsumerBase.java:1225)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createStateSerializer(org.apache.flink.api.common.ExecutionConfig)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (FlinkKafkaConsumerBase.java:1217)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createStateSerializer(org.apache.flink.api.common.ExecutionConfig)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (FlinkKafkaConsumerBase.java:1217)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createStateSerializer(org.apache.flink.api.common.ExecutionConfig)> has return type <org.apache.flink.api.java.typeutils.runtime.TupleSerializer> in (FlinkKafkaConsumerBase.java:0)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.createStateSerializer(org.apache.flink.api.common.ExecutionConfig)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FlinkKafkaConsumerBase.java:0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.connector.file.table.stream.TaskTracker;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
Expand Down
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ under the License.
<!-- managed version -->
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo5</artifactId>
<!-- managed version -->
</dependency>

<!-- The common collections are needed for some hash tables used in the collection execution -->
<dependency>
<groupId>commons-collections</groupId>
Expand Down
Loading

0 comments on commit f6d020a

Please sign in to comment.