diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
index e2812cfad3ab11..d80c2ebfcf83aa 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
+++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
@@ -1,8 +1,6 @@
org.apache.flink.api.common.ExecutionConfig.configure(org.apache.flink.configuration.ReadableConfig, java.lang.ClassLoader): Argument leaf type org.apache.flink.configuration.ReadableConfig does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getClosureCleanerLevel(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
-org.apache.flink.api.common.ExecutionConfig.getDefaultKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getGlobalJobParameters(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
-org.apache.flink.api.common.ExecutionConfig.getRegisteredTypesWithKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setClosureCleanerLevel(org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel): Argument leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setGlobalJobParameters(org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters): Argument leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache.parseCachedFilesFromString(java.util.List): Returned leaf type org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
index b4ae2a8a5f3da3..9308564fd16938 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
@@ -25,7 +25,7 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.connector.file.table.stream.TaskTracker;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
index fa51d4aa767482..b080b4d2d47f20 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
@@ -24,7 +24,7 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index f291b05bce9af5..e7c73804143c47 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -33,7 +33,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 5ba7b3496395ca..5c42ad8766f361 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -77,6 +77,12 @@ under the License.
+
+ com.esotericsoftware.kryo
+ kryo5
+
+
+
commons-collections
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index e09cd74a8a8c92..b19c6917398691 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -163,16 +163,29 @@ public class ExecutionConfig implements Serializable, Archiveable, SerializableSerializer>> registeredTypesWithKryoSerializers =
new LinkedHashMap<>();
+ private LinkedHashMap, SerializableKryo5Serializer>>
+ registeredTypesWithKryo5Serializers = new LinkedHashMap<>();
+
private LinkedHashMap, Class extends Serializer>>>
registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ registeredTypesWithKryo5SerializerClasses = new LinkedHashMap<>();
+
private LinkedHashMap, SerializableSerializer>> defaultKryoSerializers =
new LinkedHashMap<>();
+ private LinkedHashMap, SerializableKryo5Serializer>> defaultKryo5Serializers =
+ new LinkedHashMap<>();
+
private LinkedHashMap, Class extends Serializer>>> defaultKryoSerializerClasses =
new LinkedHashMap<>();
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ defaultKryo5SerializerClasses = new LinkedHashMap<>();
+
private LinkedHashSet> registeredKryoTypes = new LinkedHashSet<>();
+ private LinkedHashSet> registeredKryo5Types = new LinkedHashSet<>();
private LinkedHashSet> registeredPojoTypes = new LinkedHashSet<>();
@@ -773,6 +786,7 @@ public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) {
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
+ @Deprecated
public & Serializable> void addDefaultKryoSerializer(
Class> type, T serializer) {
if (type == null || serializer == null) {
@@ -788,6 +802,7 @@ public & Serializable> void addDefaultKryoSerializer(
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
+ @Deprecated
public void addDefaultKryoSerializer(
Class> type, Class extends Serializer>> serializerClass) {
if (type == null || serializerClass == null) {
@@ -796,6 +811,42 @@ public void addDefaultKryoSerializer(
defaultKryoSerializerClasses.put(type, serializerClass);
}
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ *
Note that the serializer instance must be serializable (as defined by
+ * java.io.Serializable), because it may be distributed to the worker nodes by java
+ * serialization.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializer The serializer to use.
+ */
+ @PublicEvolving
+ public & Serializable>
+ void addDefaultKryo5Serializer(Class> type, T serializer) {
+ if (type == null || serializer == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+
+ defaultKryo5Serializers.put(type, new SerializableKryo5Serializer<>(serializer));
+ }
+
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializerClass The class of the serializer to use.
+ */
+ @PublicEvolving
+ public void addDefaultKryo5Serializer(
+ Class> type,
+ Class extends com.esotericsoftware.kryo.kryo5.Serializer>> serializerClass) {
+ if (type == null || serializerClass == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+ defaultKryo5SerializerClasses.put(type, serializerClass);
+ }
+
/**
* Registers the given type with a Kryo Serializer.
*
@@ -806,6 +857,7 @@ public void addDefaultKryoSerializer(
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
+ @Deprecated
public & Serializable> void registerTypeWithKryoSerializer(
Class> type, T serializer) {
if (type == null || serializer == null) {
@@ -822,6 +874,7 @@ public & Serializable> void registerTypeWithKryoSeriali
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
+ @Deprecated
@SuppressWarnings("rawtypes")
public void registerTypeWithKryoSerializer(
Class> type, Class extends Serializer> serializerClass) {
@@ -835,6 +888,49 @@ public void registerTypeWithKryoSerializer(
registeredTypesWithKryoSerializerClasses.put(type, castedSerializerClass);
}
+ /**
+ * Registers the given type with a Kryo Serializer.
+ *
+ *
Note that the serializer instance must be serializable (as defined by
+ * java.io.Serializable), because it may be distributed to the worker nodes by java
+ * serialization.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializer The serializer to use.
+ */
+ @PublicEvolving
+ public & Serializable>
+ void registerTypeWithKryo5Serializer(Class> type, T serializer) {
+ if (type == null || serializer == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+
+ registeredTypesWithKryo5Serializers.put(
+ type, new SerializableKryo5Serializer<>(serializer));
+ }
+
+ /**
+ * Registers the given Serializer via its class as a serializer for the given type at the
+ * KryoSerializer
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializerClass The class of the serializer to use.
+ */
+ @PublicEvolving
+ @SuppressWarnings("rawtypes")
+ public void registerTypeWithKryo5Serializer(
+ Class> type,
+ Class extends com.esotericsoftware.kryo.kryo5.Serializer> serializerClass) {
+ if (type == null || serializerClass == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+
+ @SuppressWarnings("unchecked")
+ Class extends com.esotericsoftware.kryo.kryo5.Serializer>> castedSerializerClass =
+ (Class extends com.esotericsoftware.kryo.kryo5.Serializer>>) serializerClass;
+ registeredTypesWithKryo5SerializerClasses.put(type, castedSerializerClass);
+ }
+
/**
* Registers the given type with the serialization stack. If the type is eventually serialized
* as a POJO, then the type is registered with the POJO serializer. If the type ends up being
@@ -860,6 +956,7 @@ public void registerPojoType(Class> type) {
*
* @param type The class of the type to register.
*/
+ @Deprecated
public void registerKryoType(Class> type) {
if (type == null) {
throw new NullPointerException("Cannot register null type class.");
@@ -867,29 +964,67 @@ public void registerKryoType(Class> type) {
registeredKryoTypes.add(type);
}
+ @PublicEvolving
+ public void registerKryo5Type(Class> type) {
+ if (type == null) {
+ throw new NullPointerException("Cannot register null type class.");
+ }
+ registeredKryo5Types.add(type);
+ }
+
/** Returns the registered types with Kryo Serializers. */
+ @Deprecated
public LinkedHashMap, SerializableSerializer>>
getRegisteredTypesWithKryoSerializers() {
return registeredTypesWithKryoSerializers;
}
+ /** Returns the registered types with Kryo Serializers. */
+ @PublicEvolving
+ public LinkedHashMap, SerializableKryo5Serializer>>
+ getRegisteredTypesWithKryo5Serializers() {
+ return registeredTypesWithKryo5Serializers;
+ }
+
/** Returns the registered types with their Kryo Serializer classes. */
+ @Deprecated
public LinkedHashMap, Class extends Serializer>>>
getRegisteredTypesWithKryoSerializerClasses() {
return registeredTypesWithKryoSerializerClasses;
}
+ /** Returns the registered types with their Kryo Serializer classes. */
+ @PublicEvolving
+ public LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ getRegisteredTypesWithKryo5SerializerClasses() {
+ return registeredTypesWithKryo5SerializerClasses;
+ }
+
/** Returns the registered default Kryo Serializers. */
+ @Deprecated
public LinkedHashMap, SerializableSerializer>> getDefaultKryoSerializers() {
return defaultKryoSerializers;
}
+ /** Returns the registered default Kryo Serializers. */
+ @PublicEvolving
+ public LinkedHashMap, SerializableKryo5Serializer>> getDefaultKryo5Serializers() {
+ return defaultKryo5Serializers;
+ }
+
/** Returns the registered default Kryo Serializer classes. */
public LinkedHashMap, Class extends Serializer>>>
getDefaultKryoSerializerClasses() {
return defaultKryoSerializerClasses;
}
+ /** Returns the registered default Kryo Serializer classes. */
+ @PublicEvolving
+ public LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ getDefaultKryo5SerializerClasses() {
+ return defaultKryo5SerializerClasses;
+ }
+
/** Returns the registered Kryo types. */
public LinkedHashSet> getRegisteredKryoTypes() {
if (isForceKryoEnabled()) {
@@ -908,6 +1043,25 @@ public LinkedHashSet> getRegisteredKryoTypes() {
}
}
+ /** Returns the registered Kryo types. */
+ @PublicEvolving
+ public LinkedHashSet> getRegisteredKryo5Types() {
+ if (isForceKryoEnabled()) {
+ // if we force kryo, we must also return all the types that
+ // were previously only registered as POJO
+ LinkedHashSet> result = new LinkedHashSet<>();
+ result.addAll(registeredKryo5Types);
+ for (Class> t : registeredPojoTypes) {
+ if (!result.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ } else {
+ return registeredKryo5Types;
+ }
+ }
+
/** Returns the registered POJO types. */
public LinkedHashSet> getRegisteredPojoTypes() {
return registeredPojoTypes;
@@ -951,8 +1105,12 @@ public boolean equals(Object obj) {
&& Objects.equals(globalJobParameters, other.globalJobParameters)
&& registeredTypesWithKryoSerializerClasses.equals(
other.registeredTypesWithKryoSerializerClasses)
+ && registeredTypesWithKryo5SerializerClasses.equals(
+ other.registeredTypesWithKryo5SerializerClasses)
&& defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
+ && defaultKryo5SerializerClasses.equals(other.defaultKryo5SerializerClasses)
&& registeredKryoTypes.equals(other.registeredKryoTypes)
+ && registeredKryo5Types.equals(other.registeredKryo5Types)
&& registeredPojoTypes.equals(other.registeredPojoTypes);
} else {
@@ -969,6 +1127,7 @@ public int hashCode() {
registeredTypesWithKryoSerializerClasses,
defaultKryoSerializerClasses,
registeredKryoTypes,
+ registeredKryo5Types,
registeredPojoTypes);
}
@@ -993,6 +1152,8 @@ public String toString() {
+ defaultKryoSerializerClasses
+ ", registeredKryoTypes="
+ registeredKryoTypes
+ + ", registeredKryo5Types="
+ + registeredKryo5Types
+ ", registeredPojoTypes="
+ registeredPojoTypes
+ '}';
@@ -1025,6 +1186,23 @@ public T getSerializer() {
}
}
+ @PublicEvolving
+ public static class SerializableKryo5Serializer<
+ T extends com.esotericsoftware.kryo.kryo5.Serializer> & Serializable>
+ implements Serializable {
+ private static final long serialVersionUID = 4687893502781067189L;
+
+ private T serializer;
+
+ public SerializableKryo5Serializer(T serializer) {
+ this.serializer = serializer;
+ }
+
+ public T getSerializer() {
+ return serializer;
+ }
+ }
+
/**
* Abstract class for a custom user configuration object registered at the execution config.
*
@@ -1140,6 +1318,10 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS)
.map(s -> parseKryoSerializersWithExceptionHandling(classLoader, s))
.ifPresent(s -> this.defaultKryoSerializerClasses = s);
+ configuration
+ .getOptional(PipelineOptions.KRYO5_DEFAULT_SERIALIZERS)
+ .map(s -> parseKryo5SerializersWithExceptionHandling(classLoader, s))
+ .ifPresent(s -> this.defaultKryo5SerializerClasses = s);
configuration
.getOptional(PipelineOptions.POJO_REGISTERED_CLASSES)
@@ -1151,6 +1333,16 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered."))
.ifPresent(c -> this.registeredKryoTypes = c);
+ configuration
+ .getOptional(PipelineOptions.KRYO5_REGISTERED_CLASSES)
+ .map(
+ c ->
+ loadClasses(
+ c,
+ classLoader,
+ "Could not load kryo 5 type to be registered."))
+ .ifPresent(c -> this.registeredKryo5Types = c);
+
configuration
.getOptional(JobManagerOptions.SCHEDULER)
.ifPresent(t -> this.configuration.set(JobManagerOptions.SCHEDULER, t));
@@ -1188,6 +1380,21 @@ private LinkedHashSet> loadClasses(
}
}
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ parseKryo5SerializersWithExceptionHandling(
+ ClassLoader classLoader, List kryoSerializers) {
+ try {
+ return parseKryo5Serializers(classLoader, kryoSerializers);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not configure kryo 5 serializers from %s. The expected format is:"
+ + "'class:,serializer:;...",
+ kryoSerializers),
+ e);
+ }
+ }
+
private LinkedHashMap, Class extends Serializer>>> parseKryoSerializers(
ClassLoader classLoader, List kryoSerializers) {
return kryoSerializers.stream()
@@ -1211,6 +1418,29 @@ private LinkedHashMap, Class extends Serializer>>> parseKryoSeriali
LinkedHashMap::new));
}
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ parseKryo5Serializers(ClassLoader classLoader, List kryoSerializers) {
+ return kryoSerializers.stream()
+ .map(ConfigurationUtils::parseMap)
+ .collect(
+ Collectors.toMap(
+ m ->
+ loadClass(
+ m.get("class"),
+ classLoader,
+ "Could not load class for kryo 5 serialization"),
+ m ->
+ loadClass(
+ m.get("serializer"),
+ classLoader,
+ "Could not load serializer's class"),
+ (m1, m2) -> {
+ throw new IllegalArgumentException(
+ "Duplicated serializer for class: " + m1);
+ },
+ LinkedHashMap::new));
+ }
+
@SuppressWarnings("unchecked")
private T loadClass(
String className, ClassLoader classLoader, String errorMessage) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 184bfc6a0baadb..b131ef89365ce3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -143,6 +143,23 @@ public abstract class TypeSerializer implements Serializable {
*/
public abstract T deserialize(DataInputView source) throws IOException;
+ /**
+ * De-serializes a record from the given source input view.
+ *
+ *
This method takes a keyedBackendVersion to determine if Kryo v2 vs Kryo v5 was used.
+ *
+ * @param source The input view from which to read the data.
+ * @param keyedBackendVersion The version specified in KeyedBackendSerializationProxy
+ * @return The deserialized element.
+ * @throws IOException Thrown, if the de-serialization encountered an I/O related error.
+ * Typically raised by the input view, which may have an underlying I/O channel from which
+ * it reads.
+ */
+ public T deserializeWithKeyedBackendVersion(DataInputView source, int keyedBackendVersion)
+ throws IOException {
+ return deserialize(source);
+ }
+
/**
* De-serializes a record from the given source input view into the given reuse record instance
* if mutable.
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
index 7f1a2dcdbe2d7f..81495dc4bfda7e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
@@ -21,8 +21,8 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import java.util.LinkedHashMap;
@@ -69,6 +69,9 @@ public static AvroUtils getAvroUtils() {
public abstract void addAvroGenericDataArrayRegistration(
LinkedHashMap kryoRegistrations);
+ public abstract void addAvroGenericDataArrayRegistration5(
+ LinkedHashMap kryoRegistrations);
+
/**
* Creates an {@code AvroSerializer} if flink-avro is present, otherwise throws an exception.
*/
@@ -111,8 +114,25 @@ public void addAvroGenericDataArrayRegistration(
kryoRegistrations.put(
AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(
- Serializers.DummyAvroRegisteredClass.class,
- (Class) Serializers.DummyAvroKryoSerializerClass.class));
+ org.apache.flink.api.java.typeutils.runtime.kryo.Serializers
+ .DummyAvroRegisteredClass.class,
+ (Class)
+ org.apache.flink.api.java.typeutils.runtime.kryo.Serializers
+ .DummyAvroKryoSerializerClass.class));
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void addAvroGenericDataArrayRegistration5(
+ LinkedHashMap kryoRegistrations) {
+ kryoRegistrations.put(
+ AVRO_GENERIC_DATA_ARRAY,
+ new Kryo5Registration(
+ org.apache.flink.api.java.typeutils.runtime.kryo5.Serializers
+ .DummyAvroRegisteredClass.class,
+ (Class)
+ org.apache.flink.api.java.typeutils.runtime.kryo5.Serializers
+ .DummyAvroKryoSerializerClass.class));
}
@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 9344f3037c83dc..8b8bda362c0cd3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -26,7 +26,6 @@
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +87,8 @@ public TypeSerializer createSerializer(ExecutionConfig config) {
+ " is treated as a generic type.");
}
- return new KryoSerializer(this.typeClass, config);
+ return new org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer(
+ this.typeClass, config);
}
@SuppressWarnings("unchecked")
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 021f9dfe333a14..41b1d44a6da40f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -28,7 +28,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.commons.lang3.StringUtils;
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Registration.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Registration.java
new file mode 100644
index 00000000000000..3201cdc19fa08c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Registration.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import com.esotericsoftware.kryo.kryo5.SerializerFactory.ReflectionSerializerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** A {@code KryoRegistration} resembles a registered class and its serializer in Kryo. */
+@Internal
+public class Kryo5Registration implements Serializable {
+
+ private static final long serialVersionUID = 5375110512910892655L;
+
+ /**
+ * IMPORTANT: the order of the enumerations must not change, since their ordinals are used for
+ * serialization.
+ */
+ @Internal
+ public enum SerializerDefinitionType {
+ UNSPECIFIED,
+ CLASS,
+ INSTANCE
+ }
+
+ /** The registered class. */
+ private final Class> registeredClass;
+
+ /**
+ * Class of the serializer to use for the registered class. Exists only if the serializer
+ * definition type is {@link SerializerDefinitionType#CLASS}.
+ */
+ @Nullable private final Class extends Serializer>> serializerClass;
+
+ /**
+ * A serializable instance of the serializer to use for the registered class. Exists only if the
+ * serializer definition type is {@link SerializerDefinitionType#INSTANCE}.
+ */
+ @Nullable
+ private final ExecutionConfig.SerializableKryo5Serializer extends Serializer>>
+ serializableSerializerInstance;
+
+ private final SerializerDefinitionType serializerDefinitionType;
+
+ public Kryo5Registration(Class> registeredClass) {
+ this.registeredClass = Preconditions.checkNotNull(registeredClass);
+
+ this.serializerClass = null;
+ this.serializableSerializerInstance = null;
+
+ this.serializerDefinitionType = SerializerDefinitionType.UNSPECIFIED;
+ }
+
+ public Kryo5Registration(
+ Class> registeredClass, Class extends Serializer>> serializerClass) {
+ this.registeredClass = Preconditions.checkNotNull(registeredClass);
+
+ this.serializerClass = Preconditions.checkNotNull(serializerClass);
+ this.serializableSerializerInstance = null;
+
+ this.serializerDefinitionType = SerializerDefinitionType.CLASS;
+ }
+
+ public Kryo5Registration(
+ Class> registeredClass,
+ ExecutionConfig.SerializableKryo5Serializer extends Serializer>>
+ serializableSerializerInstance) {
+ this.registeredClass = Preconditions.checkNotNull(registeredClass);
+
+ this.serializerClass = null;
+ this.serializableSerializerInstance =
+ Preconditions.checkNotNull(serializableSerializerInstance);
+
+ this.serializerDefinitionType = SerializerDefinitionType.INSTANCE;
+ }
+
+ public Class> getRegisteredClass() {
+ return registeredClass;
+ }
+
+ public SerializerDefinitionType getSerializerDefinitionType() {
+ return serializerDefinitionType;
+ }
+
+ @Nullable
+ public Class extends Serializer>> getSerializerClass() {
+ return serializerClass;
+ }
+
+ @Nullable
+ public ExecutionConfig.SerializableKryo5Serializer extends Serializer>>
+ getSerializableSerializerInstance() {
+ return serializableSerializerInstance;
+ }
+
+ public Serializer> getSerializer(Kryo kryo) {
+ switch (serializerDefinitionType) {
+ case UNSPECIFIED:
+ return null;
+ case CLASS:
+ return ReflectionSerializerFactory.newSerializer(
+ kryo, serializerClass, registeredClass);
+ case INSTANCE:
+ return serializableSerializerInstance.getSerializer();
+ default:
+ // this should not happen; adding as a guard for the future
+ throw new IllegalStateException(
+ "Unrecognized Kryo registration serializer definition type: "
+ + serializerDefinitionType);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj instanceof Kryo5Registration) {
+ Kryo5Registration other = (Kryo5Registration) obj;
+
+ // we cannot include the serializer instances here because they don't implement the
+ // equals method
+ return serializerDefinitionType == other.serializerDefinitionType
+ && registeredClass == other.registeredClass
+ && serializerClass == other.serializerClass;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serializerDefinitionType.hashCode();
+ result = 31 * result + registeredClass.hashCode();
+
+ if (serializerClass != null) {
+ result = 31 * result + serializerClass.hashCode();
+ }
+
+ return result;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Utils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Utils.java
new file mode 100644
index 00000000000000..154d92dc3d50f5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Utils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** Convenience methods for Kryo */
+@Internal
+public class Kryo5Utils {
+
+ /**
+ * Tries to copy the given record from using the provided Kryo instance. If this fails, then the
+ * record from is copied by serializing it into a byte buffer and deserializing it from there.
+ *
+ * @param from Element to copy
+ * @param kryo Kryo instance to use
+ * @param serializer TypeSerializer which is used in case of a Kryo failure
+ * @param Type of the element to be copied
+ * @return Copied element
+ */
+ public static T copy(T from, Kryo kryo, TypeSerializer serializer) {
+ try {
+ return kryo.copy(from);
+ } catch (KryoException ke) {
+ // Kryo could not copy the object --> try to serialize/deserialize the object
+ try {
+ byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
+
+ return InstantiationUtil.deserializeFromByteArray(serializer, byteArray);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Could not copy object by serializing/deserializing" + " it.", ioe);
+ }
+ }
+ }
+
+ /**
+ * Tries to copy the given record from using the provided Kryo instance. If this fails, then the
+ * record from is copied by serializing it into a byte buffer and deserializing it from there.
+ *
+ * @param from Element to copy
+ * @param reuse Reuse element for the deserialization
+ * @param kryo Kryo instance to use
+ * @param serializer TypeSerializer which is used in case of a Kryo failure
+ * @param Type of the element to be copied
+ * @return Copied element
+ */
+ public static T copy(T from, T reuse, Kryo kryo, TypeSerializer serializer) {
+ try {
+ return kryo.copy(from);
+ } catch (KryoException ke) {
+ // Kryo could not copy the object --> try to serialize/deserialize the object
+ try {
+ byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
+
+ return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Could not copy object by serializing/deserializing" + " it.", ioe);
+ }
+ }
+ }
+
+ /**
+ * Apply a list of {@link KryoRegistration} to a Kryo instance. The list of registrations is
+ * assumed to already be a final resolution of all possible registration overwrites.
+ *
+ *
The registrations are applied in the given order and always specify the registration id,
+ * using the given {@code firstRegistrationId} and incrementing it for each registration.
+ *
+ * @param kryo the Kryo instance to apply the registrations
+ * @param resolvedRegistrations the registrations, which should already be resolved of all
+ * possible registration overwrites
+ * @param firstRegistrationId the first registration id to use
+ */
+ public static void applyRegistrations(
+ Kryo kryo,
+ Collection resolvedRegistrations,
+ int firstRegistrationId) {
+
+ int currentRegistrationId = firstRegistrationId;
+ Serializer> serializer;
+ for (Kryo5Registration registration : resolvedRegistrations) {
+ serializer = registration.getSerializer(kryo);
+
+ if (serializer != null) {
+ kryo.register(registration.getRegisteredClass(), serializer, currentRegistrationId);
+ } else {
+ kryo.register(registration.getRegisteredClass(), currentRegistrationId);
+ }
+ // if Kryo already had a serializer for that type then it ignores the registration
+ if (kryo.getRegistration(currentRegistrationId) != null) {
+ currentRegistrationId++;
+ }
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput5.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput5.java
new file mode 100644
index 00000000000000..2639e7f8ed612d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput5.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.io.Input;
+import com.esotericsoftware.kryo.kryo5.io.KryoBufferUnderflowException;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+@Internal
+public class NoFetchingInput5 extends Input {
+ public NoFetchingInput5(InputStream inputStream) {
+ super(inputStream, 8);
+ }
+
+ @Override
+ public int read() throws KryoException {
+ require(1);
+ return buffer[position++] & 0xFF;
+ }
+
+ @Override
+ public boolean canReadInt() throws KryoException {
+ throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+ }
+
+ @Override
+ public boolean canReadLong() throws KryoException {
+ throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+ }
+
+ /**
+ * Require makes sure that at least required number of bytes are kept in the buffer. If not,
+ * then it will load exactly the difference between required and currently available number of
+ * bytes. Thus, it will only load the data which is required and never prefetch data.
+ *
+ * @param required the number of bytes being available in the buffer
+ * @return The number of bytes remaining in the buffer, which will be at least required
+ * bytes.
+ * @throws KryoException
+ */
+ @Override
+ protected int require(int required) throws KryoException {
+ // The main change between this and Kryo5 Input.require is this will never read more bytes
+ // than required.
+ // There are also formatting changes to be compliant with the Flink project styling rules.
+ int remaining = limit - position;
+ if (remaining >= required) {
+ return remaining;
+ }
+ if (required > capacity) {
+ throw new KryoException(
+ "Buffer too small: capacity: " + capacity + ", required: " + required);
+ }
+
+ int count;
+ // Try to fill the buffer.
+ if (remaining > 0) {
+ // Logical change 1 (from Kryo Input.require): "capacity - limit" -> "required - limit"
+ count = fill(buffer, limit, required - limit);
+ if (count == -1) {
+ throw new KryoBufferUnderflowException("Buffer underflow.");
+ }
+ remaining += count;
+ if (remaining >= required) {
+ limit += count;
+ return remaining;
+ }
+ }
+
+ // Was not enough, compact and try again.
+ System.arraycopy(buffer, position, buffer, 0, remaining);
+ total += position;
+ position = 0;
+
+ do {
+ // Logical change 2 (from Kryo Input.require): "capacity - remaining" -> "required -
+ // remaining"
+ count = fill(buffer, remaining, required - remaining);
+ if (count == -1) {
+ throw new KryoBufferUnderflowException("Buffer underflow.");
+ }
+ remaining += count;
+ } while (remaining < required);
+
+ limit = remaining;
+ return remaining;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int count) throws KryoException {
+ if (bytes == null) {
+ throw new IllegalArgumentException("bytes cannot be null.");
+ }
+
+ try {
+ return inputStream.read(bytes, offset, count);
+ } catch (IOException ex) {
+ throw new KryoException(ex);
+ }
+ }
+
+ @Override
+ public void skip(int count) throws KryoException {
+ try {
+ inputStream.skip(count);
+ } catch (IOException ex) {
+ throw new KryoException(ex);
+ }
+ }
+
+ @Override
+ public void readBytes(byte[] bytes, int offset, int count) throws KryoException {
+ if (bytes == null) {
+ throw new IllegalArgumentException("bytes cannot be null.");
+ }
+
+ try {
+ int bytesRead = 0;
+ int c;
+
+ while (true) {
+ c = inputStream.read(bytes, offset + bytesRead, count - bytesRead);
+
+ if (c == -1) {
+ throw new KryoException(new EOFException("No more bytes left."));
+ }
+
+ bytesRead += c;
+
+ if (bytesRead == count) {
+ break;
+ }
+ }
+ } catch (IOException ex) {
+ throw new KryoException(ex);
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
index c5f6b5893f6336..29db20caee79ec 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -27,8 +27,9 @@
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.util.DefaultInstantiatorStrategy;
import java.io.IOException;
@@ -65,7 +66,7 @@ public int hash(T record) {
public void setReference(T toCompare) {
checkKryoInitialized();
- reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer(type));
+ reference = Kryo5Utils.copy(toCompare, kryo, new ValueSerializer(type));
}
@Override
@@ -142,12 +143,11 @@ private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
- new Kryo.DefaultInstantiatorStrategy();
+ DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
- this.kryo.setAsmEnabled(true);
+ // this.kryo.setAsmEnabled(true);
this.kryo.register(type);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 9a4b1448250c1a..ba98e25912f0f9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -27,8 +27,9 @@
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.util.DefaultInstantiatorStrategy;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -58,7 +59,7 @@ public final class ValueSerializer extends TypeSerializer {
*
Currently, we only have one single registration for the value type. Nevertheless, we keep
* this information here for future compatibility.
*/
- private LinkedHashMap kryoRegistrations;
+ private LinkedHashMap kryoRegistrations;
private transient Kryo kryo;
@@ -92,14 +93,14 @@ public T createInstance() {
public T copy(T from) {
checkKryoInitialized();
- return KryoUtils.copy(from, kryo, this);
+ return Kryo5Utils.copy(from, kryo, this);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
- return KryoUtils.copy(from, reuse, kryo, this);
+ return Kryo5Utils.copy(from, reuse, kryo, this);
}
@Override
@@ -137,14 +138,13 @@ private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
- new Kryo.DefaultInstantiatorStrategy();
+ DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
- this.kryo.setAsmEnabled(true);
+ // this.kryo.setAsmEnabled(true);
- KryoUtils.applyRegistrations(
+ Kryo5Utils.applyRegistrations(
this.kryo, kryoRegistrations.values(), this.kryo.getNextRegistrationId());
}
}
@@ -220,11 +220,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
}
- private static LinkedHashMap asKryoRegistrations(Class> type) {
+ private static LinkedHashMap asKryoRegistrations(Class> type) {
checkNotNull(type);
- LinkedHashMap registration = new LinkedHashMap<>(1);
- registration.put(type.getClass().getName(), new KryoRegistration(type));
+ LinkedHashMap registration = new LinkedHashMap<>(1);
+ registration.put(type.getClass().getName(), new Kryo5Registration(type));
return registration;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 594236f59adbd2..8b508995f12aa7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
@@ -191,7 +192,8 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig) {
/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
- protected KryoSerializer(KryoSerializer toCopy) {
+ @Internal
+ public KryoSerializer(KryoSerializer toCopy) {
this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
@@ -231,7 +233,8 @@ protected KryoSerializer(KryoSerializer toCopy) {
// for KryoSerializerSnapshot
// ------------------------------------------------------------------------
- KryoSerializer(
+ @Internal
+ public KryoSerializer(
Class type,
LinkedHashMap, SerializableSerializer>> defaultSerializers,
LinkedHashMap, Class extends Serializer>>> defaultSerializerClasses,
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 9696b1ef7128d7..4dc9a4d0ea33ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -18,13 +18,16 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.LinkedOptionalMap;
import org.apache.flink.util.LinkedOptionalMap.MergeResult;
import com.esotericsoftware.kryo.Serializer;
@@ -33,6 +36,7 @@
import java.io.IOException;
import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.function.Function;
import static org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom;
@@ -93,14 +97,24 @@ public TypeSerializer restoreSerializer() {
@Override
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
- if (!(newSerializer instanceof KryoSerializer)) {
+ if (newSerializer instanceof KryoSerializer) {
+ KryoSerializer kryoSerializer = (KryoSerializer) newSerializer;
+ if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return resolveSchemaCompatibility(kryoSerializer);
+ } else if (newSerializer
+ instanceof org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer) {
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer kryoSerializer =
+ (org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer)
+ newSerializer;
+ if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return resolveKryo5SchemaCompatibility(kryoSerializer);
+ } else {
return TypeSerializerSchemaCompatibility.incompatible();
}
- KryoSerializer kryoSerializer = (KryoSerializer) newSerializer;
- if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- return resolveSchemaCompatibility(kryoSerializer);
}
private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
@@ -149,6 +163,90 @@ private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
reconfiguredRegistrations);
}
+ private TypeSerializerSchemaCompatibility resolveKryo5SchemaCompatibility(
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer kryo5Serializer) {
+ // Default Kryo Serializers
+ LinkedOptionalMap, ExecutionConfig.SerializableSerializer>>
+ defaultSnapshotKryo2Serializers = snapshotData.getDefaultKryoSerializers();
+
+ LinkedHashMap, ExecutionConfig.SerializableKryo5Serializer>>
+ defaultKryo5SerializersRaw = kryo5Serializer.getDefaultKryoSerializers();
+ LinkedHashMap, ExecutionConfig.SerializableSerializer>>
+ defaultKryo5SerializersEmpty =
+ buildLinkedHashMapWithNullValues(defaultKryo5SerializersRaw);
+ LinkedOptionalMap, ExecutionConfig.SerializableSerializer>>
+ defaultKryo5SerializersEmptyOptional =
+ optionalMapOf(defaultKryo5SerializersEmpty, Class::getName);
+
+ MergeResult, SerializableSerializer>> defaultKryo5SerializersMergeResult =
+ LinkedOptionalMap.mergeRightIntoLeft(
+ defaultSnapshotKryo2Serializers, defaultKryo5SerializersEmptyOptional);
+
+ if (defaultKryo5SerializersMergeResult.hasMissingKeys()) {
+ logMissingKeys(defaultKryo5SerializersMergeResult);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // Default Serializer Classes
+ LinkedOptionalMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ defaultSnapshotKryo2SerializerClasses =
+ snapshotData.getDefaultKryoSerializerClasses();
+
+ LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ kryo5SerializerClassesRaw = kryo5Serializer.getDefaultKryoSerializerClasses();
+ LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ kryo5SerializerClasses =
+ buildLinkedHashMapWithNullValues(kryo5SerializerClassesRaw);
+ LinkedOptionalMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ kryo5SerializerClassesOptional =
+ optionalMapOf(kryo5SerializerClasses, Class::getName);
+
+ MergeResult, Class extends com.esotericsoftware.kryo.Serializer>>>
+ kryoSerializersClassesMergeResult =
+ LinkedOptionalMap.mergeRightIntoLeft(
+ defaultSnapshotKryo2SerializerClasses,
+ kryo5SerializerClassesOptional);
+
+ if (kryoSerializersClassesMergeResult.hasMissingKeys()) {
+ logMissingKeys(kryoSerializersClassesMergeResult);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // Kryo Registrations
+ LinkedOptionalMap snapshotKryo2Registrations =
+ snapshotData.getKryoRegistrations();
+ LinkedHashMap kryo5RegistrationsRaw =
+ kryo5Serializer.getKryoRegistrations();
+ LinkedHashMap kryo5Registrations =
+ buildLinkedHashMapWithNullValues(kryo5RegistrationsRaw);
+ LinkedOptionalMap kryo5RegistrationsOptional =
+ optionalMapOf(kryo5Registrations, Function.identity());
+
+ MergeResult kryo5RegistrationsMergeResult =
+ LinkedOptionalMap.mergeRightIntoLeft(
+ snapshotKryo2Registrations, kryo5RegistrationsOptional);
+
+ if (kryo5RegistrationsMergeResult.hasMissingKeys()) {
+ logMissingKeys(kryo5RegistrationsMergeResult);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // there are no missing keys, now we have to decide whether we are compatible as-is or we
+ // require reconfiguration.
+ return resolveSchemaCompatibility(
+ defaultKryo5SerializersMergeResult,
+ kryoSerializersClassesMergeResult,
+ kryo5RegistrationsMergeResult);
+ }
+
+ private LinkedHashMap buildLinkedHashMapWithNullValues(Map source) {
+ LinkedHashMap result = new LinkedHashMap<>();
+ for (K key : source.keySet()) {
+ result.put(key, null);
+ }
+ return result;
+ }
+
private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
MergeResult, SerializableSerializer>> reconfiguredDefaultKryoSerializers,
MergeResult, Class extends Serializer>>>
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/ChillSerializerRegistrar.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/ChillSerializerRegistrar.java
new file mode 100644
index 00000000000000..842134c5d87bee
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/ChillSerializerRegistrar.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+
+/** Interface for flink-core to interact with the FlinkChillPackageRegistrar in flink-java. */
+@PublicEvolving
+public interface ChillSerializerRegistrar {
+ /**
+ * Registers all serializers with the given {@link Kryo}. All serializers are registered with
+ * specific IDs as a continuous block.
+ *
+ * @param kryo Kryo to register serializers with
+ */
+ void registerSerializers(Kryo kryo);
+
+ /**
+ * Returns the registration ID that immediately follows the last registered serializer.
+ *
+ * @return registration ID that should be used for the next serializer registration
+ */
+ int getNextRegistrationId();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/JavaSerializer.java
new file mode 100644
index 00000000000000..a5d9deab534497
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/JavaSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import com.esotericsoftware.kryo.kryo5.io.Input;
+import com.esotericsoftware.kryo.kryo5.io.Output;
+import com.esotericsoftware.kryo.kryo5.util.ObjectMap;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link
+ * com.esotericsoftware.kryo.serializers.JavaSerializer}, that additionally makes sure the {@link
+ * ObjectInputStream} used for deserialization specifically uses Kryo's registered classloader.
+ *
+ *
Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer},
+ * in which the wrong classloader may be used for deserialization, leading to {@link
+ * ClassNotFoundException}s.
+ *
+ * @see FLINK-6025
+ * @see Known issue with Kryo's
+ * JavaSerializer
+ * @param The type to be serialized.
+ */
+@PublicEvolving
+public class JavaSerializer extends Serializer {
+
+ public JavaSerializer() {}
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public void write(Kryo kryo, Output output, T o) {
+ try {
+ ObjectMap graphContext = kryo.getGraphContext();
+ ObjectOutputStream objectStream = (ObjectOutputStream) graphContext.get(this);
+ if (objectStream == null) {
+ objectStream = new ObjectOutputStream(output);
+ graphContext.put(this, objectStream);
+ }
+ objectStream.writeObject(o);
+ objectStream.flush();
+ } catch (Exception ex) {
+ throw new KryoException("Error during Java serialization.", ex);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public T read(Kryo kryo, Input input, Class aClass) {
+ try {
+ ObjectMap graphContext = kryo.getGraphContext();
+ ObjectInputStream objectStream = (ObjectInputStream) graphContext.get(this);
+ if (objectStream == null) {
+ // make sure we use Kryo's classloader
+ objectStream =
+ new InstantiationUtil.ClassLoaderObjectInputStream(
+ input, kryo.getClassLoader());
+ graphContext.put(this, objectStream);
+ }
+ return (T) objectStream.readObject();
+ } catch (Exception ex) {
+ throw new KryoException("Error during Java deserialization.", ex);
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java
new file mode 100644
index 00000000000000..1c27c224d5722b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig.SerializableKryo5Serializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Utils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput5;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import com.esotericsoftware.kryo.kryo5.io.Input;
+import com.esotericsoftware.kryo.kryo5.io.KryoBufferUnderflowException;
+import com.esotericsoftware.kryo.kryo5.io.Output;
+import com.esotericsoftware.kryo.kryo5.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.util.DefaultInstantiatorStrategy;
+import org.apache.commons.lang3.exception.CloneFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A type serializer that serializes its type using the Kryo serialization framework
+ * (https://github.com/EsotericSoftware/kryo).
+ *
+ *
This serializer is intended as a fallback serializer for the cases that are not covered by the
+ * basic types, tuples, and POJOs.
+ *
+ *
The set of serializers registered with Kryo via {@link Kryo#register}, with their respective
+ * IDs, depends on whether flink-java or flink-scala are on the classpath. This is for
+ * backwards-compatibility reasons.
+ *
+ *
If neither are available (which should only apply to tests in flink-core), then:
+ *
+ *
+ *
0-9 are used for Java primitives
+ *
10+ are used for user-defined registration
+ *
+ *
+ *
If flink-scala is available, then:
+ *
+ *
+ *
0-9 are used for Java primitives
+ *
10-72 are used for Scala classes
+ *
73-84 are used for Java classes
+ *
85+ are used for user-defined registration
+ *
+ *
+ *
If *only* flink-java is available, then:
+ *
+ *
+ *
0-9 are used for Java primitives
+ *
10-72 are unused (to maintain compatibility)
+ *
73-84 are used for Java classes
+ *
85+ are used for user-defined registration
+ *
+ *
+ * @param The type to be serialized.
+ */
+@PublicEvolving
+public class KryoSerializer extends TypeSerializer {
+
+ private static final long serialVersionUID = 3L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);
+
+ /**
+ * Flag whether to check for concurrent thread access. Because this flag is static final, a
+ * value of 'false' allows the JIT compiler to eliminate the guarded code sections.
+ */
+ private static final boolean CONCURRENT_ACCESS_CHECK =
+ LOG.isDebugEnabled() || KryoSerializerDebugInitHelper.setToDebug;
+
+ static {
+ configureKryoLogging();
+ }
+
+ @Nullable
+ private static final ChillSerializerRegistrar flinkChillPackageRegistrar =
+ loadFlinkChillPackageRegistrar();
+
+ @Nullable
+ private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
+ try {
+ return (ChillSerializerRegistrar)
+ Class.forName(
+ "org.apache.flink.api.java.typeutils.runtime.kryo5.FlinkChillPackageRegistrar")
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ private final LinkedHashMap, ExecutionConfig.SerializableKryo5Serializer>>
+ defaultSerializers;
+ private final LinkedHashMap, Class extends Serializer>>> defaultSerializerClasses;
+
+ /**
+ * Map of class tag (using classname as tag) to their Kryo registration.
+ *
+ *
This map serves as a preview of the final registration result of the Kryo instance, taking
+ * into account registration overwrites.
+ */
+ private LinkedHashMap kryoRegistrations;
+
+ private final Class type;
+
+ // ------------------------------------------------------------------------
+ // The fields below are lazily initialized after duplication or deserialization.
+ private org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+ compatibilityV2Serializer;
+
+ private transient Kryo kryo;
+ private transient T copyInstance;
+
+ private transient DataOutputView previousOut;
+ private transient DataInputView previousIn;
+
+ private transient Input input;
+ private transient Output output;
+
+ // ------------------------------------------------------------------------
+ // legacy fields; these fields cannot yet be removed to retain backwards compatibility
+
+ private LinkedHashMap, SerializableKryo5Serializer>> registeredTypesWithSerializers;
+ private LinkedHashMap, Class extends Serializer>>>
+ registeredTypesWithSerializerClasses;
+ private LinkedHashSet> registeredTypes;
+
+ // for debugging purposes
+ private transient volatile Thread currentThread;
+
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("deprecation")
+ public KryoSerializer(Class type, ExecutionConfig executionConfig) {
+ this.type = checkNotNull(type);
+
+ this.defaultSerializers = executionConfig.getDefaultKryo5Serializers();
+ this.defaultSerializerClasses = executionConfig.getDefaultKryo5SerializerClasses();
+
+ this.kryoRegistrations =
+ buildKryoRegistrations(
+ this.type,
+ executionConfig.getRegisteredKryoTypes(),
+ executionConfig.getRegisteredTypesWithKryo5SerializerClasses(),
+ executionConfig.getRegisteredTypesWithKryo5Serializers());
+
+ this.compatibilityV2Serializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ type, executionConfig);
+ }
+
+ /**
+ * Copy-constructor that does not copy transient fields. They will be initialized once required.
+ */
+ protected KryoSerializer(KryoSerializer toCopy) {
+
+ this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ if (toCopy.compatibilityV2Serializer != null) {
+ this.compatibilityV2Serializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ toCopy.compatibilityV2Serializer);
+ }
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry, SerializableKryo5Serializer>> entry :
+ toCopy.defaultSerializers.entrySet()) {
+
+ this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) {
+
+ Kryo5Registration kryoRegistration = entry.getValue();
+
+ if (kryoRegistration.getSerializerDefinitionType()
+ == Kryo5Registration.SerializerDefinitionType.INSTANCE) {
+
+ SerializableKryo5Serializer extends Serializer>> serializerInstance =
+ kryoRegistration.getSerializableSerializerInstance();
+
+ if (serializerInstance != null) {
+ kryoRegistration =
+ new Kryo5Registration(
+ kryoRegistration.getRegisteredClass(),
+ deepCopySerializer(serializerInstance));
+ }
+ }
+
+ this.kryoRegistrations.put(entry.getKey(), kryoRegistration);
+ }
+ }
+
+ // for KryoSerializerSnapshot
+ // ------------------------------------------------------------------------
+
+ KryoSerializer(
+ Class type,
+ LinkedHashMap, SerializableKryo5Serializer>> defaultSerializers,
+ LinkedHashMap, Class extends Serializer>>> defaultSerializerClasses,
+ LinkedHashMap kryoRegistrations,
+ LinkedHashMap, ExecutionConfig.SerializableSerializer>>
+ legacyDefaultSerializers,
+ LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ legacyDefaultSerializerClasses,
+ LinkedHashMap legacyKryoRegistrations) {
+
+ this.type = checkNotNull(type, "Type class cannot be null.");
+ this.defaultSerializerClasses =
+ checkNotNull(
+ defaultSerializerClasses, "Default serializer classes cannot be null.");
+ this.defaultSerializers =
+ checkNotNull(defaultSerializers, "Default serializers cannot be null.");
+ this.kryoRegistrations =
+ checkNotNull(kryoRegistrations, "Kryo registrations cannot be null.");
+
+ this.compatibilityV2Serializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ type,
+ legacyDefaultSerializers,
+ legacyDefaultSerializerClasses,
+ legacyKryoRegistrations);
+ }
+
+ @PublicEvolving
+ public Class getType() {
+ return type;
+ }
+
+ @PublicEvolving
+ public LinkedHashMap, SerializableKryo5Serializer>> getDefaultKryoSerializers() {
+ return defaultSerializers;
+ }
+
+ @PublicEvolving
+ public LinkedHashMap, Class extends Serializer>>>
+ getDefaultKryoSerializerClasses() {
+ return defaultSerializerClasses;
+ }
+
+ @Internal
+ public LinkedHashMap getKryoRegistrations() {
+ return kryoRegistrations;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public KryoSerializer duplicate() {
+ return new KryoSerializer<>(this);
+ }
+
+ @Override
+ public T createInstance() {
+ if (Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers())) {
+ return null;
+ } else {
+ checkKryoInitialized();
+ try {
+ return kryo.newInstance(type);
+ } catch (Throwable e) {
+ return null;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T copy(T from) {
+ if (from == null) {
+ return null;
+ }
+
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+ try {
+ return kryo.copy(from);
+ } catch (KryoException ke) {
+ // kryo was unable to copy it, so we do it through serialization:
+ ByteArrayOutputStream baout = new ByteArrayOutputStream();
+ Output output = new Output(baout);
+
+ kryo.writeObject(output, from);
+
+ output.close();
+
+ ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+ Input input = new Input(bain);
+
+ return (T) kryo.readObject(input, from.getClass());
+ }
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+
+ if (target != previousOut) {
+ DataOutputViewStream outputStream = new DataOutputViewStream(target);
+ output = new Output(outputStream);
+ previousOut = target;
+ }
+
+ // Sanity check: Make sure that the output is cleared/has been flushed by the last call
+ // otherwise data might be written multiple times in case of a previous EOFException
+ if (output.position() != 0) {
+ throw new IllegalStateException(
+ "The Kryo Output still contains data from a previous "
+ + "serialize call. It has to be flushed or cleared at the end of the serialize call.");
+ }
+
+ try {
+ kryo.writeClassAndObject(output, record);
+ output.flush();
+ } catch (KryoException ke) {
+ // make sure that the Kryo output buffer is reset in case that we can recover from
+ // the exception (e.g. EOFException which denotes buffer full)
+ output.reset();
+
+ Throwable cause = ke.getCause();
+ if (cause instanceof EOFException) {
+ throw (EOFException) cause;
+ } else {
+ throw ke;
+ }
+ }
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+
+ java.util.ArrayList l = kryo.newInstance(java.util.ArrayList.class);
+ l.add(123);
+
+ if (source != previousIn) {
+ DataInputViewStream inputStream = new DataInputViewStream(source);
+ input = new NoFetchingInput5(inputStream);
+ previousIn = source;
+ }
+
+ try {
+ return (T) kryo.readClassAndObject(input);
+ } catch (KryoBufferUnderflowException ke) {
+ // 2023-04-26: Existing Flink code expects a java.io.EOFException in this scenario
+ throw new EOFException(ke.getMessage());
+ } catch (KryoException ke) {
+ Throwable cause = ke.getCause();
+
+ if (cause instanceof EOFException) {
+ throw (EOFException) cause;
+ } else {
+ throw ke;
+ }
+ }
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ @Override
+ public T deserializeWithKeyedBackendVersion(DataInputView source, int keyedBackendVersion)
+ throws IOException {
+ if (keyedBackendVersion >= 7) {
+ return deserialize(source);
+ } else if (compatibilityV2Serializer != null) {
+ return compatibilityV2Serializer.deserializeWithKeyedBackendVersion(
+ source, keyedBackendVersion);
+ } else {
+ throw new IOException(
+ String.format(
+ "Need v2 compatability serializer to deserialize version %d",
+ keyedBackendVersion));
+ }
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+ if (this.copyInstance == null) {
+ this.copyInstance = createInstance();
+ }
+
+ T tmp = deserialize(copyInstance, source);
+ serialize(tmp, target);
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int result = type.hashCode();
+ result = 31 * result + (kryoRegistrations.hashCode());
+ result = 31 * result + (defaultSerializers.hashCode());
+ result = 31 * result + (defaultSerializerClasses.hashCode());
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof KryoSerializer) {
+ KryoSerializer> other = (KryoSerializer>) obj;
+
+ return type == other.type
+ && Objects.equals(kryoRegistrations, other.kryoRegistrations)
+ && Objects.equals(defaultSerializerClasses, other.defaultSerializerClasses)
+ && Objects.equals(defaultSerializers, other.defaultSerializers);
+ } else {
+ return false;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the Chill Kryo Serializer which is implicitly added to the classpath via
+ * flink-runtime. Falls back to the default Kryo serializer if it can't be found.
+ *
+ * @return The Kryo serializer instance.
+ */
+ private Kryo getKryoInstance() {
+
+ try {
+ // check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill
+ // library).
+ // This will be true if Flink's Scala API is used.
+ Class> chillInstantiatorClazz =
+ Class.forName("org.apache.flink.runtime.types.FlinkScalaKryo5Instantiator");
+ Object chillInstantiator = chillInstantiatorClazz.newInstance();
+
+ // obtain a Kryo instance through Twitter Chill
+ Method m = chillInstantiatorClazz.getMethod("newKryo");
+
+ return (Kryo) m.invoke(chillInstantiator);
+ } catch (ClassNotFoundException
+ | InstantiationException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Kryo serializer scala extensions are not available.", e);
+ } else {
+ LOG.info("Kryo serializer scala extensions are not available.");
+ }
+
+ DefaultInstantiatorStrategy initStrategy = new DefaultInstantiatorStrategy();
+ initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+
+ Kryo kryo = new Kryo();
+ kryo.setInstantiatorStrategy(initStrategy);
+
+ if (flinkChillPackageRegistrar != null) {
+ flinkChillPackageRegistrar.registerSerializers(kryo);
+ }
+
+ return kryo;
+ }
+ }
+
+ private void checkKryoInitialized() {
+ if (this.kryo == null) {
+ this.kryo = getKryoInstance();
+
+ // Enable reference tracking.
+ kryo.setReferences(true);
+
+ // Throwable and all subclasses should be serialized via java serialization
+ // Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
+ // This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for
+ // details.
+ kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
+
+ // Add default serializers first, so that the type registrations without a serializer
+ // are registered with a default serializer
+ for (Map.Entry, SerializableKryo5Serializer>> entry :
+ defaultSerializers.entrySet()) {
+ kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
+ }
+
+ for (Map.Entry, Class extends Serializer>>> entry :
+ defaultSerializerClasses.entrySet()) {
+ kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
+ }
+
+ Kryo5Utils.applyRegistrations(
+ this.kryo,
+ kryoRegistrations.values(),
+ flinkChillPackageRegistrar != null
+ ? flinkChillPackageRegistrar.getNextRegistrationId()
+ : kryo.getNextRegistrationId());
+
+ kryo.setRegistrationRequired(false);
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot snapshotConfiguration() {
+ return new KryoSerializerSnapshot<>(
+ type, defaultSerializers, defaultSerializerClasses, kryoRegistrations);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Utility method that takes lists of registered types and their serializers, and resolve them
+ * into a single list such that the result will resemble the final registration result in Kryo.
+ */
+ private static LinkedHashMap buildKryoRegistrations(
+ Class> serializedType,
+ LinkedHashSet> registeredTypes,
+ LinkedHashMap, Class extends Serializer>>>
+ registeredTypesWithSerializerClasses,
+ LinkedHashMap, SerializableKryo5Serializer>>
+ registeredTypesWithSerializers) {
+
+ final LinkedHashMap kryoRegistrations = new LinkedHashMap<>();
+
+ kryoRegistrations.put(serializedType.getName(), new Kryo5Registration(serializedType));
+
+ for (Class> registeredType : checkNotNull(registeredTypes)) {
+ kryoRegistrations.put(registeredType.getName(), new Kryo5Registration(registeredType));
+ }
+
+ for (Map.Entry, Class extends Serializer>>>
+ registeredTypeWithSerializerClassEntry :
+ checkNotNull(registeredTypesWithSerializerClasses).entrySet()) {
+
+ kryoRegistrations.put(
+ registeredTypeWithSerializerClassEntry.getKey().getName(),
+ new Kryo5Registration(
+ registeredTypeWithSerializerClassEntry.getKey(),
+ registeredTypeWithSerializerClassEntry.getValue()));
+ }
+
+ for (Map.Entry, SerializableKryo5Serializer>> registeredTypeWithSerializerEntry :
+ checkNotNull(registeredTypesWithSerializers).entrySet()) {
+
+ kryoRegistrations.put(
+ registeredTypeWithSerializerEntry.getKey().getName(),
+ new Kryo5Registration(
+ registeredTypeWithSerializerEntry.getKey(),
+ registeredTypeWithSerializerEntry.getValue()));
+ }
+
+ // add Avro support if flink-avro is available; a dummy otherwise
+ AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration5(kryoRegistrations);
+
+ return kryoRegistrations;
+ }
+
+ static void configureKryoLogging() {
+ // Kryo uses only DEBUG and TRACE levels
+ // we only forward TRACE level, because even DEBUG levels results in
+ // a logging for each object, which is infeasible in Flink.
+ if (LOG.isTraceEnabled()) {
+ com.esotericsoftware.minlog.Log.setLogger(new MinlogForwarder(LOG));
+ com.esotericsoftware.minlog.Log.TRACE();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // kryoRegistrations may be null if this Kryo serializer is deserialized from an old version
+ if (kryoRegistrations == null) {
+ this.kryoRegistrations =
+ buildKryoRegistrations(
+ type,
+ registeredTypes,
+ registeredTypesWithSerializerClasses,
+ registeredTypesWithSerializers);
+ }
+ }
+
+ private SerializableKryo5Serializer extends Serializer>> deepCopySerializer(
+ SerializableKryo5Serializer extends Serializer>> original) {
+ try {
+ return InstantiationUtil.clone(
+ original, Thread.currentThread().getContextClassLoader());
+ } catch (IOException | ClassNotFoundException ex) {
+ throw new CloneFailedException(
+ "Could not clone serializer instance of class " + original.getClass(), ex);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // For testing
+ // --------------------------------------------------------------------------------------------
+
+ private void enterExclusiveThread() {
+ // we use simple get, check, set here, rather than CAS
+ // we don't need lock-style correctness, this is only a sanity-check and we thus
+ // favor speed at the cost of some false negatives in this check
+ Thread previous = currentThread;
+ Thread thisThread = Thread.currentThread();
+
+ if (previous == null) {
+ currentThread = thisThread;
+ } else if (previous != thisThread) {
+ throw new IllegalStateException(
+ "Concurrent access to KryoSerializer. Thread 1: "
+ + thisThread.getName()
+ + " , Thread 2: "
+ + previous.getName());
+ }
+ }
+
+ private void exitExclusiveThread() {
+ currentThread = null;
+ }
+
+ @VisibleForTesting
+ public Kryo getKryo() {
+ checkKryoInitialized();
+ return this.kryo;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerDebugInitHelper.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerDebugInitHelper.java
new file mode 100644
index 00000000000000..7c87b9f7ab57cd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerDebugInitHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Simple helper class to initialize the concurrency checks for tests.
+ *
+ *
The flag is automatically set to true when assertions are activated (tests) and can be set to
+ * true manually in other tests as well;
+ */
+@Internal
+class KryoSerializerDebugInitHelper {
+
+ /**
+ * This captures the initial setting after initialization. It is used to validate in tests that
+ * we never change the default to true.
+ */
+ static final boolean INITIAL_SETTING;
+
+ /** The flag that is used to initialize the KryoSerializer's concurrency check flag. */
+ static boolean setToDebug = false;
+
+ static {
+ // capture the default setting, for tests
+ INITIAL_SETTING = setToDebug;
+
+ // if assertions are active, the check should be activated
+ //noinspection AssertWithSideEffects,ConstantConditions
+ assert setToDebug = true;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerSnapshot.java
new file mode 100644
index 00000000000000..8011673cc57980
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerSnapshot.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig.SerializableKryo5Serializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.LinkedOptionalMap.MergeResult;
+
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializerSnapshotData.createFrom;
+import static org.apache.flink.util.LinkedOptionalMap.mergeRightIntoLeft;
+import static org.apache.flink.util.LinkedOptionalMap.optionalMapOf;
+
+/** {@link TypeSerializerSnapshot} for {@link KryoSerializer}. */
+@PublicEvolving
+public class KryoSerializerSnapshot implements TypeSerializerSnapshot {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KryoSerializerSnapshot.class);
+
+ private static final int VERSION = 2;
+
+ private KryoSerializerSnapshotData snapshotData;
+
+ @SuppressWarnings("unused")
+ public KryoSerializerSnapshot() {}
+
+ KryoSerializerSnapshot(
+ Class typeClass,
+ LinkedHashMap, SerializableKryo5Serializer>> defaultKryoSerializers,
+ LinkedHashMap, Class extends Serializer>>> defaultKryoSerializerClasses,
+ LinkedHashMap kryoRegistrations) {
+
+ this.snapshotData =
+ createFrom(
+ typeClass,
+ defaultKryoSerializers,
+ defaultKryoSerializerClasses,
+ kryoRegistrations);
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void writeSnapshot(DataOutputView out) throws IOException {
+ snapshotData.writeSnapshotData(out);
+ }
+
+ @Override
+ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
+ throws IOException {
+ this.snapshotData = createFrom(in, userCodeClassLoader);
+ }
+
+ @Override
+ public TypeSerializer restoreSerializer() {
+ return new KryoSerializer<>(
+ snapshotData.getTypeClass(),
+ snapshotData.getDefaultKryoSerializers().unwrapOptionals(),
+ snapshotData.getDefaultKryoSerializerClasses().unwrapOptionals(),
+ snapshotData.getKryoRegistrations().unwrapOptionals(),
+ new LinkedHashMap<>(),
+ new LinkedHashMap<>(),
+ new LinkedHashMap<>());
+ }
+
+ @Override
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializer newSerializer) {
+ if (!(newSerializer instanceof KryoSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ KryoSerializer kryoSerializer = (KryoSerializer) newSerializer;
+ if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return resolveSchemaCompatibility(kryoSerializer);
+ }
+
+ private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ KryoSerializer newSerializer) {
+ // merge the default serializers
+ final MergeResult, SerializableKryo5Serializer>>
+ reconfiguredDefaultKryoSerializers =
+ mergeRightIntoLeft(
+ snapshotData.getDefaultKryoSerializers(),
+ optionalMapOf(
+ newSerializer.getDefaultKryoSerializers(), Class::getName));
+
+ if (reconfiguredDefaultKryoSerializers.hasMissingKeys()) {
+ logMissingKeys(reconfiguredDefaultKryoSerializers);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // merge default serializer classes
+ final MergeResult, Class extends Serializer>>>
+ reconfiguredDefaultKryoSerializerClasses =
+ mergeRightIntoLeft(
+ snapshotData.getDefaultKryoSerializerClasses(),
+ optionalMapOf(
+ newSerializer.getDefaultKryoSerializerClasses(),
+ Class::getName));
+
+ if (reconfiguredDefaultKryoSerializerClasses.hasMissingKeys()) {
+ logMissingKeys(reconfiguredDefaultKryoSerializerClasses);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // merge registration
+ final MergeResult reconfiguredRegistrations =
+ mergeRightIntoLeft(
+ snapshotData.getKryoRegistrations(),
+ optionalMapOf(newSerializer.getKryoRegistrations(), Function.identity()));
+
+ if (reconfiguredRegistrations.hasMissingKeys()) {
+ logMissingKeys(reconfiguredRegistrations);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // there are no missing keys, now we have to decide whether we are compatible as-is or we
+ // require reconfiguration.
+ return resolveSchemaCompatibility(
+ reconfiguredDefaultKryoSerializers,
+ reconfiguredDefaultKryoSerializerClasses,
+ reconfiguredRegistrations);
+ }
+
+ private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ MergeResult, SerializableKryo5Serializer>>
+ reconfiguredDefaultKryoSerializers,
+ MergeResult, Class extends Serializer>>>
+ reconfiguredDefaultKryoSerializerClasses,
+ MergeResult reconfiguredRegistrations) {
+
+ if (reconfiguredDefaultKryoSerializers.isOrderedSubset()
+ && reconfiguredDefaultKryoSerializerClasses.isOrderedSubset()
+ && reconfiguredRegistrations.isOrderedSubset()) {
+
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+
+ // reconfigure a new KryoSerializer
+ KryoSerializer reconfiguredSerializer =
+ new KryoSerializer<>(
+ snapshotData.getTypeClass(),
+ reconfiguredDefaultKryoSerializers.getMerged(),
+ reconfiguredDefaultKryoSerializerClasses.getMerged(),
+ reconfiguredRegistrations.getMerged(),
+ new LinkedHashMap<>(),
+ new LinkedHashMap<>(),
+ new LinkedHashMap<>());
+
+ return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+ reconfiguredSerializer);
+ }
+
+ private void logMissingKeys(MergeResult, ?> mergeResult) {
+ mergeResult
+ .missingKeys()
+ .forEach(
+ key ->
+ LOG.warn(
+ "The Kryo registration for a previously registered class {} does not have a "
+ + "proper serializer, because its previous serializer cannot be loaded or is no "
+ + "longer valid but a new serializer is not available",
+ key));
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerSnapshotData.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerSnapshotData.java
new file mode 100644
index 00000000000000..048917de9cf45e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializerSnapshotData.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig.SerializableKryo5Serializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.LinkedOptionalMap;
+import org.apache.flink.util.function.BiFunctionWithException;
+
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.LinkedHashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.util.LinkedOptionalMap.optionalMapOf;
+import static org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap;
+import static org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@PublicEvolving
+final class KryoSerializerSnapshotData {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KryoSerializerSnapshotData.class);
+
+ // --------------------------------------------------------------------------------------------
+ // Factories
+ // --------------------------------------------------------------------------------------------
+
+ static KryoSerializerSnapshotData createFrom(
+ Class typeClass,
+ LinkedHashMap, SerializableKryo5Serializer>> defaultKryoSerializers,
+ LinkedHashMap, Class extends Serializer>>> defaultKryoSerializerClasses,
+ LinkedHashMap kryoRegistrations) {
+
+ return new KryoSerializerSnapshotData<>(
+ typeClass,
+ optionalMapOf(defaultKryoSerializers, Class::getName),
+ optionalMapOf(defaultKryoSerializerClasses, Class::getName),
+ optionalMapOf(kryoRegistrations, Function.identity()));
+ }
+
+ static KryoSerializerSnapshotData createFrom(DataInputView in, ClassLoader cl)
+ throws IOException {
+ Class typeClass = readTypeClass(in, cl);
+ LinkedOptionalMap kryoRegistrations =
+ readKryoRegistrations(in, cl);
+ LinkedOptionalMap, SerializableKryo5Serializer>> defaultSerializer =
+ readDefaultKryoSerializers(in, cl);
+ LinkedOptionalMap, Class extends Serializer>>> defaultSerializerClasses =
+ readDefaultKryoSerializerClasses(in, cl);
+
+ return new KryoSerializerSnapshotData<>(
+ typeClass, defaultSerializer, defaultSerializerClasses, kryoRegistrations);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Fields
+ // --------------------------------------------------------------------------------------------
+
+ private final Class typeClass;
+ private final LinkedOptionalMap, SerializableKryo5Serializer>>
+ defaultKryoSerializers;
+ private final LinkedOptionalMap, Class extends Serializer>>>
+ defaultKryoSerializerClasses;
+ private final LinkedOptionalMap kryoRegistrations;
+
+ private KryoSerializerSnapshotData(
+ Class typeClass,
+ LinkedOptionalMap, SerializableKryo5Serializer>> defaultKryoSerializers,
+ LinkedOptionalMap, Class extends Serializer>>>
+ defaultKryoSerializerClasses,
+ LinkedOptionalMap kryoRegistrations) {
+
+ this.typeClass = typeClass;
+ this.defaultKryoSerializers = defaultKryoSerializers;
+ this.defaultKryoSerializerClasses = defaultKryoSerializerClasses;
+ this.kryoRegistrations = kryoRegistrations;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Getters
+ // --------------------------------------------------------------------------------------------
+
+ Class getTypeClass() {
+ return typeClass;
+ }
+
+ LinkedOptionalMap, SerializableKryo5Serializer>> getDefaultKryoSerializers() {
+ return defaultKryoSerializers;
+ }
+
+ LinkedOptionalMap, Class extends Serializer>>> getDefaultKryoSerializerClasses() {
+ return defaultKryoSerializerClasses;
+ }
+
+ LinkedOptionalMap getKryoRegistrations() {
+ return kryoRegistrations;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Write
+ // --------------------------------------------------------------------------------------------
+
+ void writeSnapshotData(DataOutputView out) throws IOException {
+ writeTypeClass(out);
+ writeKryoRegistrations(out, kryoRegistrations);
+ writeDefaultKryoSerializers(out, defaultKryoSerializers);
+ writeDefaultKryoSerializerClasses(out, defaultKryoSerializerClasses);
+ }
+
+ private void writeTypeClass(DataOutputView out) throws IOException {
+ out.writeUTF(typeClass.getName());
+ }
+
+ private static void writeKryoRegistrations(
+ DataOutputView out, LinkedOptionalMap kryoRegistrations)
+ throws IOException {
+
+ writeOptionalMap(
+ out,
+ kryoRegistrations,
+ DataOutput::writeUTF,
+ KryoRegistrationUtil::writeKryoRegistration);
+ }
+
+ private void writeDefaultKryoSerializers(
+ DataOutputView out,
+ LinkedOptionalMap, SerializableKryo5Serializer>> defaultKryoSerializers)
+ throws IOException {
+
+ writeOptionalMap(
+ out,
+ defaultKryoSerializers,
+ (stream, klass) -> stream.writeUTF(klass.getName()),
+ (stream, instance) -> {
+ try (final DataOutputViewStream outViewWrapper =
+ new DataOutputViewStream(stream)) {
+ InstantiationUtil.serializeObject(outViewWrapper, instance);
+ }
+ });
+ }
+
+ private static void writeDefaultKryoSerializerClasses(
+ DataOutputView out,
+ LinkedOptionalMap, Class extends Serializer>>>
+ defaultKryoSerializerClasses)
+ throws IOException {
+
+ writeOptionalMap(
+ out,
+ defaultKryoSerializerClasses,
+ (stream, klass) -> stream.writeUTF(klass.getName()),
+ (stream, klass) -> stream.writeUTF(klass.getName()));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Read
+ // --------------------------------------------------------------------------------------------
+
+ private static Class readTypeClass(DataInputView in, ClassLoader userCodeClassLoader)
+ throws IOException {
+ return InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
+ }
+
+ private static LinkedOptionalMap readKryoRegistrations(
+ DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+
+ return readOptionalMap(
+ in,
+ (stream, unused) -> stream.readUTF(),
+ (stream, unused) ->
+ KryoRegistrationUtil.tryReadKryoRegistration(stream, userCodeClassLoader));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static LinkedOptionalMap, SerializableKryo5Serializer>>
+ readDefaultKryoSerializers(DataInputView in, ClassLoader cl) throws IOException {
+ return readOptionalMap(
+ in, new ClassResolverByName(cl), new SerializeableSerializerResolver(cl));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static LinkedOptionalMap, Class extends Serializer>>>
+ readDefaultKryoSerializerClasses(DataInputView in, ClassLoader cl) throws IOException {
+
+ return readOptionalMap(
+ in, new ClassResolverByName(cl), new ClassResolverByName>(cl));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helpers
+ // --------------------------------------------------------------------------------------------
+
+ private static final class KryoRegistrationUtil {
+
+ static void writeKryoRegistration(DataOutputView out, Kryo5Registration kryoRegistration)
+ throws IOException {
+
+ checkNotNull(kryoRegistration);
+ out.writeUTF(kryoRegistration.getRegisteredClass().getName());
+
+ final Kryo5Registration.SerializerDefinitionType serializerDefinitionType =
+ kryoRegistration.getSerializerDefinitionType();
+
+ out.writeInt(serializerDefinitionType.ordinal());
+ switch (serializerDefinitionType) {
+ case UNSPECIFIED:
+ {
+ // nothing else to write
+ break;
+ }
+ case CLASS:
+ {
+ Class extends Serializer>> serializerClass =
+ kryoRegistration.getSerializerClass();
+ assert serializerClass != null;
+ out.writeUTF(serializerClass.getName());
+ break;
+ }
+ case INSTANCE:
+ {
+ try (final DataOutputViewStream outViewWrapper =
+ new DataOutputViewStream(out)) {
+ InstantiationUtil.serializeObject(
+ outViewWrapper,
+ kryoRegistration.getSerializableSerializerInstance());
+ }
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException(
+ "Unrecognized Kryo registration serializer definition type: "
+ + serializerDefinitionType);
+ }
+ }
+ }
+
+ static Kryo5Registration tryReadKryoRegistration(
+ DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+
+ String registeredClassname = in.readUTF();
+ Class> registeredClass;
+ try {
+ registeredClass = Class.forName(registeredClassname, true, userCodeClassLoader);
+ } catch (ClassNotFoundException e) {
+ LOG.warn(
+ "Cannot find registered class "
+ + registeredClassname
+ + " for Kryo serialization in classpath;"
+ + " using a dummy class as a placeholder.",
+ e);
+ return null;
+ }
+
+ final Kryo5Registration.SerializerDefinitionType serializerDefinitionType =
+ Kryo5Registration.SerializerDefinitionType.values()[in.readInt()];
+
+ switch (serializerDefinitionType) {
+ case UNSPECIFIED:
+ {
+ return new Kryo5Registration(registeredClass);
+ }
+ case CLASS:
+ {
+ return tryReadWithSerializerClass(
+ in, userCodeClassLoader, registeredClassname, registeredClass);
+ }
+ case INSTANCE:
+ {
+ return tryReadWithSerializerInstance(
+ in, userCodeClassLoader, registeredClassname, registeredClass);
+ }
+ default:
+ {
+ throw new IllegalStateException(
+ "Unrecognized Kryo registration serializer definition type: "
+ + serializerDefinitionType);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Kryo5Registration tryReadWithSerializerClass(
+ DataInputView in,
+ ClassLoader userCodeClassLoader,
+ String registeredClassname,
+ Class> registeredClass)
+ throws IOException {
+ String serializerClassname = in.readUTF();
+ Class serializerClass;
+ try {
+ serializerClass = Class.forName(serializerClassname, true, userCodeClassLoader);
+ return new Kryo5Registration(registeredClass, serializerClass);
+ } catch (ClassNotFoundException e) {
+ LOG.warn(
+ "Cannot find registered Kryo serializer class for class "
+ + registeredClassname
+ + " in classpath; using a dummy Kryo serializer that should be replaced as soon as"
+ + " a new Kryo serializer for the class is present",
+ e);
+ }
+ return null;
+ }
+
+ private static Kryo5Registration tryReadWithSerializerInstance(
+ DataInputView in,
+ ClassLoader userCodeClassLoader,
+ String registeredClassname,
+ Class> registeredClass)
+ throws IOException {
+ SerializableKryo5Serializer extends Serializer>> serializerInstance;
+
+ try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
+ serializerInstance =
+ InstantiationUtil.deserializeObject(inViewWrapper, userCodeClassLoader);
+ return new Kryo5Registration(registeredClass, serializerInstance);
+ } catch (ClassNotFoundException e) {
+ LOG.warn(
+ "Cannot find registered Kryo serializer class for class "
+ + registeredClassname
+ + " in classpath; using a dummy Kryo serializer that should be replaced as soon as"
+ + " a new Kryo serializer for the class is present",
+ e);
+ } catch (InvalidClassException e) {
+ LOG.warn(
+ "The registered Kryo serializer class for class "
+ + registeredClassname
+ + " has changed and is no longer valid; using a dummy Kryo serializer that should be replaced"
+ + " as soon as a new Kryo serializer for the class is present.",
+ e);
+ }
+ return null;
+ }
+ }
+
+ private static class ClassResolverByName
+ implements BiFunctionWithException, IOException> {
+ private final ClassLoader classLoader;
+
+ private ClassResolverByName(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class apply(DataInputView stream, String unused) throws IOException {
+ String className = stream.readUTF();
+ try {
+ return (Class) Class.forName(className, false, classLoader);
+ } catch (ClassNotFoundException e) {
+ LOG.warn(
+ "Cannot find registered class "
+ + className
+ + " for Kryo serialization in classpath.",
+ e);
+ return null;
+ }
+ }
+ }
+
+ private static final class SerializeableSerializerResolver
+ implements BiFunctionWithException<
+ DataInputView, String, SerializableKryo5Serializer>, IOException> {
+
+ private final ClassLoader classLoader;
+
+ private SerializeableSerializerResolver(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ public SerializableKryo5Serializer> apply(DataInputView stream, String className) {
+ try {
+ try (final DataInputViewStream inViewWrapper = new DataInputViewStream(stream)) {
+ return InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
+ }
+ } catch (Throwable e) {
+ LOG.warn(
+ "Cannot deserialize a previously serialized kryo serializer for the type "
+ + className,
+ e);
+ return null;
+ }
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/MinlogForwarder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/MinlogForwarder.java
new file mode 100644
index 00000000000000..29d7ff27950403
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/MinlogForwarder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.Internal;
+
+import com.esotericsoftware.minlog.Log;
+import com.esotericsoftware.minlog.Log.Logger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An implementation of the Minlog Logger that forwards to slf4j. */
+@Internal
+class MinlogForwarder extends Logger {
+
+ private final org.slf4j.Logger log;
+
+ MinlogForwarder(org.slf4j.Logger log) {
+ this.log = checkNotNull(log);
+ }
+
+ @Override
+ public void log(int level, String category, String message, Throwable ex) {
+ final String logString = "[KRYO " + category + "] " + message;
+ switch (level) {
+ case Log.LEVEL_ERROR:
+ log.error(logString, ex);
+ break;
+ case Log.LEVEL_WARN:
+ log.warn(logString, ex);
+ break;
+ case Log.LEVEL_INFO:
+ log.info(logString, ex);
+ break;
+ case Log.LEVEL_DEBUG:
+ log.debug(logString, ex);
+ break;
+ case Log.LEVEL_TRACE:
+ log.trace(logString, ex);
+ break;
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/Serializers.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/Serializers.java
new file mode 100644
index 00000000000000..b09273ee2ecc42
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/Serializers.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import com.esotericsoftware.kryo.kryo5.io.Input;
+import com.esotericsoftware.kryo.kryo5.io.Output;
+import com.esotericsoftware.kryo.kryo5.serializers.CollectionSerializer;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Class containing utilities for the serializers of the Flink Runtime.
+ *
+ *
Most of the serializers are automatically added to the system.
+ *
+ *
Note that users can also implement the {@link com.esotericsoftware.kryo.KryoSerializable}
+ * interface to provide custom serialization for their classes. Also, there is a Java Annotation for
+ * adding a default serializer (@DefaultSerializer) to classes.
+ */
+@Internal
+public class Serializers {
+
+ public static void recursivelyRegisterType(
+ TypeInformation> typeInfo, ExecutionConfig config, Set> alreadySeen) {
+ if (typeInfo instanceof GenericTypeInfo) {
+ GenericTypeInfo> genericTypeInfo = (GenericTypeInfo>) typeInfo;
+ Serializers.recursivelyRegisterType(
+ genericTypeInfo.getTypeClass(), config, alreadySeen);
+ } else if (typeInfo instanceof CompositeType) {
+ List> genericTypesInComposite = new ArrayList<>();
+ getContainedGenericTypes((CompositeType>) typeInfo, genericTypesInComposite);
+ for (GenericTypeInfo> gt : genericTypesInComposite) {
+ Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen);
+ }
+ } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+ ObjectArrayTypeInfo, ?> objectArrayTypeInfo = (ObjectArrayTypeInfo, ?>) typeInfo;
+ recursivelyRegisterType(objectArrayTypeInfo.getComponentInfo(), config, alreadySeen);
+ }
+ }
+
+ public static void recursivelyRegisterType(
+ Class> type, ExecutionConfig config, Set> alreadySeen) {
+ // don't register or remember primitives
+ if (type == null || type.isPrimitive() || type == Object.class) {
+ return;
+ }
+
+ // prevent infinite recursion for recursive types
+ if (!alreadySeen.add(type)) {
+ return;
+ }
+
+ if (type.isArray()) {
+ recursivelyRegisterType(type.getComponentType(), config, alreadySeen);
+ } else {
+ config.registerKryoType(type);
+ // add serializers for Avro type if necessary
+ AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type);
+
+ Field[] fields = type.getDeclaredFields();
+ for (Field field : fields) {
+ if (Modifier.isStatic(field.getModifiers())
+ || Modifier.isTransient(field.getModifiers())) {
+ continue;
+ }
+ Type fieldType = field.getGenericType();
+ recursivelyRegisterGenericType(fieldType, config, alreadySeen);
+ }
+ }
+ }
+
+ private static void recursivelyRegisterGenericType(
+ Type fieldType, ExecutionConfig config, Set> alreadySeen) {
+ if (fieldType instanceof ParameterizedType) {
+ // field has generics
+ ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType;
+
+ for (Type t : parameterizedFieldType.getActualTypeArguments()) {
+ if (TypeExtractionUtils.isClassType(t)) {
+ recursivelyRegisterType(
+ TypeExtractionUtils.typeToClass(t), config, alreadySeen);
+ }
+ }
+
+ recursivelyRegisterGenericType(
+ parameterizedFieldType.getRawType(), config, alreadySeen);
+ } else if (fieldType instanceof GenericArrayType) {
+ GenericArrayType genericArrayType = (GenericArrayType) fieldType;
+ recursivelyRegisterGenericType(
+ genericArrayType.getGenericComponentType(), config, alreadySeen);
+ } else if (fieldType instanceof Class) {
+ Class> clazz = (Class>) fieldType;
+ recursivelyRegisterType(clazz, config, alreadySeen);
+ }
+ }
+
+ /**
+ * Returns all GenericTypeInfos contained in a composite type.
+ *
+ * @param typeInfo {@link CompositeType}
+ */
+ private static void getContainedGenericTypes(
+ CompositeType> typeInfo, List> target) {
+ for (int i = 0; i < typeInfo.getArity(); i++) {
+ TypeInformation> type = typeInfo.getTypeAt(i);
+ if (type instanceof CompositeType) {
+ getContainedGenericTypes((CompositeType>) type, target);
+ } else if (type instanceof GenericTypeInfo) {
+ if (!target.contains(type)) {
+ target.add((GenericTypeInfo>) type);
+ }
+ }
+ }
+ }
+
+ /**
+ * This is used in case we don't have Avro on the classpath. Flink versions before 1.4 always
+ * registered special Serializers for Kryo but starting with Flink 1.4 we don't have Avro on the
+ * classpath by default anymore. We still have to retain the same registered Serializers for
+ * backwards compatibility of savepoints.
+ */
+ @Internal
+ public static class DummyAvroRegisteredClass {}
+
+ /**
+ * This is used in case we don't have Avro on the classpath. Flink versions before 1.4 always
+ * registered special Serializers for Kryo but starting with Flink 1.4 we don't have Avro on the
+ * classpath by default anymore. We still have to retain the same registered Serializers for
+ * backwards compatibility of savepoints.
+ */
+ @Internal
+ public static class DummyAvroKryoSerializerClass extends Serializer {
+ @Override
+ public void write(Kryo kryo, Output output, Object o) {
+ throw new UnsupportedOperationException("Could not find required Avro dependency.");
+ }
+
+ @Override
+ public T read(Kryo kryo, Input input, Class extends T> aClass) {
+ throw new UnsupportedOperationException("Could not find required Avro dependency.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Serializers
+ // --------------------------------------------------------------------------------------------
+
+ /** Special serializer for Java's {@link ArrayList} used for Avro's GenericData.Array. */
+ @SuppressWarnings("rawtypes")
+ @Internal
+ public static class SpecificInstanceCollectionSerializerForArrayList
+ extends SpecificInstanceCollectionSerializer {
+ private static final long serialVersionUID = 1L;
+
+ public SpecificInstanceCollectionSerializerForArrayList() {
+ super(ArrayList.class);
+ }
+ }
+
+ /**
+ * Special serializer for Java collections enforcing certain instance types. Avro is serializing
+ * collections with an "GenericData.Array" type. Kryo is not able to handle this type, so we use
+ * ArrayLists.
+ */
+ @SuppressWarnings("rawtypes")
+ @Internal
+ public static class SpecificInstanceCollectionSerializer
+ extends CollectionSerializer implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private Class type;
+
+ public SpecificInstanceCollectionSerializer(Class type) {
+ this.type = type;
+ }
+
+ @Override
+ protected T create(Kryo kryo, Input input, Class extends T> type, int size) {
+ return kryo.newInstance(this.type);
+ }
+
+ @Override
+ protected T createCopy(Kryo kryo, T original) {
+ return kryo.newInstance(this.type);
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 4c739e984268f3..d1e835b6b21ffc 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -214,6 +214,26 @@ public class PipelineOptions {
+ " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
.build());
+ public static final ConfigOption> KRYO5_DEFAULT_SERIALIZERS =
+ key("pipeline.default-kryo5-serializers")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
+ + " as Kryo 5 default serializers")
+ .linebreak()
+ .linebreak()
+ .text("Example:")
+ .linebreak()
+ .add(
+ TextElement.code(
+ "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
+ + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
+ .build());
+
public static final ConfigOption> KRYO_REGISTERED_CLASSES =
key("pipeline.registered-kryo-types")
.stringType()
@@ -228,6 +248,20 @@ public class PipelineOptions {
+ " sure that only tags are written.")
.build());
+ public static final ConfigOption> KRYO5_REGISTERED_CLASSES =
+ key("pipeline.registered-kryo5-types")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Semicolon separated list of types to be registered with the serialization stack. If the type"
+ + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
+ + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
+ + " sure that only tags are written.")
+ .build());
+
public static final ConfigOption> POJO_REGISTERED_CLASSES =
key("pipeline.registered-pojo-types")
.stringType()
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 04bccafc4e7985..512bce3370dbdb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -23,16 +23,12 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.SerializedValue;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -56,14 +52,19 @@ void testDoubleTypeRegistration() {
for (Class> tpe : types) {
config.registerKryoType(tpe);
+ config.registerKryo5Type(tpe);
}
int counter = 0;
-
for (Class> tpe : config.getRegisteredKryoTypes()) {
assertThat(tpe).isEqualTo(expectedTypes.get(counter++));
}
+ assertThat(expectedTypes.size()).isEqualTo(counter);
+ counter = 0;
+ for (Class> tpe : config.getRegisteredKryo5Types()) {
+ assertThat(tpe).isEqualTo(expectedTypes.get(counter++));
+ }
assertThat(expectedTypes.size()).isEqualTo(counter);
}
@@ -190,7 +191,7 @@ void testReadingDefaultConfig() {
void testLoadingRegisteredKryoTypesFromConfiguration() {
ExecutionConfig configFromSetters = new ExecutionConfig();
configFromSetters.registerKryoType(ExecutionConfigTest.class);
- configFromSetters.registerKryoType(TestSerializer1.class);
+ configFromSetters.registerKryoType(TestKryo2Serializer1.class);
ExecutionConfig configFromConfiguration = new ExecutionConfig();
@@ -198,7 +199,28 @@ void testLoadingRegisteredKryoTypesFromConfiguration() {
configuration.setString(
"pipeline.registered-kryo-types",
"org.apache.flink.api.common.ExecutionConfigTest;"
- + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1");
+ + "org.apache.flink.api.common.ExecutionConfigTest$TestKryo2Serializer1");
+
+ // mutate config according to configuration
+ configFromConfiguration.configure(
+ configuration, Thread.currentThread().getContextClassLoader());
+
+ assertThat(configFromConfiguration).isEqualTo(configFromSetters);
+ }
+
+ @Test
+ void testLoadingRegisteredKryo5TypesFromConfiguration() {
+ ExecutionConfig configFromSetters = new ExecutionConfig();
+ configFromSetters.registerKryo5Type(ExecutionConfigTest.class);
+ configFromSetters.registerKryo5Type(TestKryo5Serializer1.class);
+
+ ExecutionConfig configFromConfiguration = new ExecutionConfig();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(
+ "pipeline.registered-kryo5-types",
+ "org.apache.flink.api.common.ExecutionConfigTest;"
+ + "org.apache.flink.api.common.ExecutionConfigTest$TestKryo5Serializer1");
// mutate config according to configuration
configFromConfiguration.configure(
@@ -211,7 +233,7 @@ void testLoadingRegisteredKryoTypesFromConfiguration() {
void testLoadingRegisteredPojoTypesFromConfiguration() {
ExecutionConfig configFromSetters = new ExecutionConfig();
configFromSetters.registerPojoType(ExecutionConfigTest.class);
- configFromSetters.registerPojoType(TestSerializer1.class);
+ configFromSetters.registerPojoType(TestKryo2Serializer1.class);
ExecutionConfig configFromConfiguration = new ExecutionConfig();
@@ -219,7 +241,7 @@ void testLoadingRegisteredPojoTypesFromConfiguration() {
configuration.setString(
"pipeline.registered-pojo-types",
"org.apache.flink.api.common.ExecutionConfigTest;"
- + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1");
+ + "org.apache.flink.api.common.ExecutionConfigTest$TestKryo2Serializer1");
// mutate config according to configuration
configFromConfiguration.configure(
@@ -252,8 +274,9 @@ void testLoadingRestartStrategyFromConfiguration() {
void testLoadingDefaultKryoSerializersFromConfiguration() {
ExecutionConfig configFromSetters = new ExecutionConfig();
configFromSetters.addDefaultKryoSerializer(
- ExecutionConfigTest.class, TestSerializer1.class);
- configFromSetters.addDefaultKryoSerializer(TestSerializer1.class, TestSerializer2.class);
+ ExecutionConfigTest.class, TestKryo2Serializer1.class);
+ configFromSetters.addDefaultKryoSerializer(
+ TestKryo2Serializer1.class, TestKryo2Serializer2.class);
ExecutionConfig configFromConfiguration = new ExecutionConfig();
@@ -261,9 +284,34 @@ void testLoadingDefaultKryoSerializersFromConfiguration() {
configuration.setString(
"pipeline.default-kryo-serializers",
"class:org.apache.flink.api.common.ExecutionConfigTest,"
- + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;"
- + "class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1,"
- + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2");
+ + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestKryo2Serializer1;"
+ + "class:org.apache.flink.api.common.ExecutionConfigTest$TestKryo2Serializer1,"
+ + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestKryo2Serializer2");
+
+ // mutate config according to configuration
+ configFromConfiguration.configure(
+ configuration, Thread.currentThread().getContextClassLoader());
+
+ assertThat(configFromConfiguration).isEqualTo(configFromSetters);
+ }
+
+ @Test
+ void testLoadingDefaultKryo5SerializersFromConfiguration() {
+ ExecutionConfig configFromSetters = new ExecutionConfig();
+ configFromSetters.addDefaultKryo5Serializer(
+ ExecutionConfigTest.class, TestKryo5Serializer1.class);
+ configFromSetters.addDefaultKryo5Serializer(
+ TestKryo5Serializer1.class, TestKryo5Serializer2.class);
+
+ ExecutionConfig configFromConfiguration = new ExecutionConfig();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(
+ "pipeline.default-kryo5-serializers",
+ "class:org.apache.flink.api.common.ExecutionConfigTest,"
+ + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestKryo5Serializer1;"
+ + "class:org.apache.flink.api.common.ExecutionConfigTest$TestKryo5Serializer1,"
+ + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestKryo5Serializer2");
// mutate config according to configuration
configFromConfiguration.configure(
@@ -295,7 +343,7 @@ private void testLoadingSchedulerTypeFromConfiguration(
void testNotOverridingRegisteredKryoTypesWithDefaultsFromConfiguration() {
ExecutionConfig config = new ExecutionConfig();
config.registerKryoType(ExecutionConfigTest.class);
- config.registerKryoType(TestSerializer1.class);
+ config.registerKryoType(TestKryo2Serializer1.class);
Configuration configuration = new Configuration();
@@ -304,7 +352,7 @@ void testNotOverridingRegisteredKryoTypesWithDefaultsFromConfiguration() {
LinkedHashSet