Skip to content

Commit

Permalink
[FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…
Browse files Browse the repository at this point in the history
…ith backward compatibility for existing savepoints and checkpoints.
  • Loading branch information
Kurt Ostfeld committed May 29, 2023
1 parent 678370b commit 037ec6a
Show file tree
Hide file tree
Showing 101 changed files with 7,897 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ For Google Protobuf you need the following Maven dependency:

Please adjust the versions of both libraries as needed.

### Issue with using Kryo's `JavaSerializer`
### Issue with using `JavaSerializer` from Kryo 2.x

NOTE: This issues applies to Kryo 2.x and not Kryo 5+.

If you register Kryo's `JavaSerializer` for your custom type, you may
encounter `ClassNotFoundException`s even though your custom type class is
Expand Down
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.default-kryo5-serializers</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo 5 default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.force-avro</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-kryo5-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-pojo-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
8 changes: 4 additions & 4 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down Expand Up @@ -1573,7 +1573,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down Expand Up @@ -1675,7 +1675,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down Expand Up @@ -2028,7 +2028,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down
1 change: 1 addition & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2729,6 +2729,7 @@ components:
type: string
enum:
- CHECKPOINT
- UNALIGNED_CHECKPOINT
- SAVEPOINT
- SYNC_SAVEPOINT
RestoreMode:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
org.apache.flink.api.common.ExecutionConfig.configure(org.apache.flink.configuration.ReadableConfig, java.lang.ClassLoader): Argument leaf type org.apache.flink.configuration.ReadableConfig does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getClosureCleanerLevel(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getDefaultKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getGlobalJobParameters(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getRegisteredTypesWithKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setClosureCleanerLevel(org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel): Argument leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setGlobalJobParameters(org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters): Argument leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache.parseCachedFilesFromString(java.util.List): Returned leaf type org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.connector.file.table.stream.TaskTracker;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
Expand Down
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ under the License.
<!-- managed version -->
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo5</artifactId>
<!-- managed version -->
</dependency>

<!-- The common collections are needed for some hash tables used in the collection execution -->
<dependency>
<groupId>commons-collections</groupId>
Expand Down
Loading

0 comments on commit 037ec6a

Please sign in to comment.