& Serializable> void addDefaultKryoSerializer(
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
+ @Deprecated
public void addDefaultKryoSerializer(
- Class> type, Class extends Serializer>> serializerClass) {
+ Class> type,
+ Class extends com.esotericsoftware.kryo.Serializer>> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
defaultKryoSerializerClasses.put(type, serializerClass);
}
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ * Note that the serializer instance must be serializable (as defined by
+ * java.io.Serializable), because it may be distributed to the worker nodes by java
+ * serialization.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializer The serializer to use.
+ */
+ @PublicEvolving
+ public & Serializable>
+ void addDefaultKryo5Serializer(Class> type, T serializer) {
+ if (type == null || serializer == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+
+ defaultKryo5Serializers.put(type, new SerializableKryo5Serializer<>(serializer));
+ }
+
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializerClass The class of the serializer to use.
+ */
+ @PublicEvolving
+ public void addDefaultKryo5Serializer(
+ Class> type,
+ Class extends com.esotericsoftware.kryo.kryo5.Serializer>> serializerClass) {
+ if (type == null || serializerClass == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+ defaultKryo5SerializerClasses.put(type, serializerClass);
+ }
+
/**
* Registers the given type with a Kryo Serializer.
*
@@ -822,8 +872,9 @@ public void addDefaultKryoSerializer(
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
- public & Serializable> void registerTypeWithKryoSerializer(
- Class> type, T serializer) {
+ @Deprecated
+ public & Serializable>
+ void registerTypeWithKryoSerializer(Class> type, T serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
@@ -838,19 +889,63 @@ public & Serializable> void registerTypeWithKryoSeriali
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
+ @Deprecated
@SuppressWarnings("rawtypes")
public void registerTypeWithKryoSerializer(
- Class> type, Class extends Serializer> serializerClass) {
+ Class> type, Class extends com.esotericsoftware.kryo.Serializer> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
@SuppressWarnings("unchecked")
- Class extends Serializer>> castedSerializerClass =
- (Class extends Serializer>>) serializerClass;
+ Class extends com.esotericsoftware.kryo.Serializer>> castedSerializerClass =
+ (Class extends com.esotericsoftware.kryo.Serializer>>) serializerClass;
registeredTypesWithKryoSerializerClasses.put(type, castedSerializerClass);
}
+ /**
+ * Registers the given type with a Kryo Serializer.
+ *
+ * Note that the serializer instance must be serializable (as defined by
+ * java.io.Serializable), because it may be distributed to the worker nodes by java
+ * serialization.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializer The serializer to use.
+ */
+ @PublicEvolving
+ public & Serializable>
+ void registerTypeWithKryo5Serializer(Class> type, T serializer) {
+ if (type == null || serializer == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+
+ registeredTypesWithKryo5Serializers.put(
+ type, new SerializableKryo5Serializer<>(serializer));
+ }
+
+ /**
+ * Registers the given Serializer via its class as a serializer for the given type at the
+ * KryoSerializer
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializerClass The class of the serializer to use.
+ */
+ @PublicEvolving
+ @SuppressWarnings("rawtypes")
+ public void registerTypeWithKryo5Serializer(
+ Class> type,
+ Class extends com.esotericsoftware.kryo.kryo5.Serializer> serializerClass) {
+ if (type == null || serializerClass == null) {
+ throw new NullPointerException("Cannot register null class or serializer.");
+ }
+
+ @SuppressWarnings("unchecked")
+ Class extends com.esotericsoftware.kryo.kryo5.Serializer>> castedSerializerClass =
+ (Class extends com.esotericsoftware.kryo.kryo5.Serializer>>) serializerClass;
+ registeredTypesWithKryo5SerializerClasses.put(type, castedSerializerClass);
+ }
+
/**
* Registers the given type with the serialization stack. If the type is eventually serialized
* as a POJO, then the type is registered with the POJO serializer. If the type ends up being
@@ -876,6 +971,7 @@ public void registerPojoType(Class> type) {
*
* @param type The class of the type to register.
*/
+ @Deprecated
public void registerKryoType(Class> type) {
if (type == null) {
throw new NullPointerException("Cannot register null type class.");
@@ -883,29 +979,67 @@ public void registerKryoType(Class> type) {
registeredKryoTypes.add(type);
}
+ @PublicEvolving
+ public void registerKryo5Type(Class> type) {
+ if (type == null) {
+ throw new NullPointerException("Cannot register null type class.");
+ }
+ registeredKryo5Types.add(type);
+ }
+
/** Returns the registered types with Kryo Serializers. */
+ @Deprecated
public LinkedHashMap, SerializableSerializer>>
getRegisteredTypesWithKryoSerializers() {
return registeredTypesWithKryoSerializers;
}
+ /** Returns the registered types with Kryo Serializers. */
+ @PublicEvolving
+ public LinkedHashMap, SerializableKryo5Serializer>>
+ getRegisteredTypesWithKryo5Serializers() {
+ return registeredTypesWithKryo5Serializers;
+ }
+
/** Returns the registered types with their Kryo Serializer classes. */
- public LinkedHashMap, Class extends Serializer>>>
+ @Deprecated
+ public LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
getRegisteredTypesWithKryoSerializerClasses() {
return registeredTypesWithKryoSerializerClasses;
}
+ /** Returns the registered types with their Kryo Serializer classes. */
+ @PublicEvolving
+ public LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ getRegisteredTypesWithKryo5SerializerClasses() {
+ return registeredTypesWithKryo5SerializerClasses;
+ }
+
/** Returns the registered default Kryo Serializers. */
+ @Deprecated
public LinkedHashMap, SerializableSerializer>> getDefaultKryoSerializers() {
return defaultKryoSerializers;
}
+ /** Returns the registered default Kryo Serializers. */
+ @PublicEvolving
+ public LinkedHashMap, SerializableKryo5Serializer>> getDefaultKryo5Serializers() {
+ return defaultKryo5Serializers;
+ }
+
/** Returns the registered default Kryo Serializer classes. */
- public LinkedHashMap, Class extends Serializer>>>
+ public LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
getDefaultKryoSerializerClasses() {
return defaultKryoSerializerClasses;
}
+ /** Returns the registered default Kryo Serializer classes. */
+ @PublicEvolving
+ public LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ getDefaultKryo5SerializerClasses() {
+ return defaultKryo5SerializerClasses;
+ }
+
/** Returns the registered Kryo types. */
public LinkedHashSet> getRegisteredKryoTypes() {
if (isForceKryoEnabled()) {
@@ -924,6 +1058,25 @@ public LinkedHashSet> getRegisteredKryoTypes() {
}
}
+ /** Returns the registered Kryo types. */
+ @PublicEvolving
+ public LinkedHashSet> getRegisteredKryo5Types() {
+ if (isForceKryoEnabled()) {
+ // if we force kryo, we must also return all the types that
+ // were previously only registered as POJO
+ LinkedHashSet> result = new LinkedHashSet<>();
+ result.addAll(registeredKryo5Types);
+ for (Class> t : registeredPojoTypes) {
+ if (!result.contains(t)) {
+ result.add(t);
+ }
+ }
+ return result;
+ } else {
+ return registeredKryo5Types;
+ }
+ }
+
/** Returns the registered POJO types. */
public LinkedHashSet> getRegisteredPojoTypes() {
return registeredPojoTypes;
@@ -988,8 +1141,12 @@ public boolean equals(Object obj) {
&& Objects.equals(globalJobParameters, other.globalJobParameters)
&& registeredTypesWithKryoSerializerClasses.equals(
other.registeredTypesWithKryoSerializerClasses)
+ && registeredTypesWithKryo5SerializerClasses.equals(
+ other.registeredTypesWithKryo5SerializerClasses)
&& defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
+ && defaultKryo5SerializerClasses.equals(other.defaultKryo5SerializerClasses)
&& registeredKryoTypes.equals(other.registeredKryoTypes)
+ && registeredKryo5Types.equals(other.registeredKryo5Types)
&& registeredPojoTypes.equals(other.registeredPojoTypes);
} else {
@@ -1006,6 +1163,7 @@ public int hashCode() {
registeredTypesWithKryoSerializerClasses,
defaultKryoSerializerClasses,
registeredKryoTypes,
+ registeredKryo5Types,
registeredPojoTypes);
}
@@ -1030,6 +1188,8 @@ public String toString() {
+ defaultKryoSerializerClasses
+ ", registeredKryoTypes="
+ registeredKryoTypes
+ + ", registeredKryo5Types="
+ + registeredKryo5Types
+ ", registeredPojoTypes="
+ registeredPojoTypes
+ '}';
@@ -1053,7 +1213,9 @@ public ArchivedExecutionConfig archive() {
// ------------------------------ Utilities ----------------------------------
- public static class SerializableSerializer & Serializable>
+ @Deprecated
+ public static class SerializableSerializer<
+ T extends com.esotericsoftware.kryo.Serializer> & Serializable>
implements Serializable {
private static final long serialVersionUID = 4687893502781067189L;
@@ -1068,6 +1230,23 @@ public T getSerializer() {
}
}
+ @PublicEvolving
+ public static class SerializableKryo5Serializer<
+ T extends com.esotericsoftware.kryo.kryo5.Serializer> & Serializable>
+ implements Serializable {
+ private static final long serialVersionUID = 4687893502781067189L;
+
+ private T serializer;
+
+ public SerializableKryo5Serializer(T serializer) {
+ this.serializer = serializer;
+ }
+
+ public T getSerializer() {
+ return serializer;
+ }
+ }
+
/**
* Abstract class for a custom user configuration object registered at the execution config.
*
@@ -1183,6 +1362,10 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS)
.map(s -> parseKryoSerializersWithExceptionHandling(classLoader, s))
.ifPresent(s -> this.defaultKryoSerializerClasses = s);
+ configuration
+ .getOptional(PipelineOptions.KRYO5_DEFAULT_SERIALIZERS)
+ .map(s -> parseKryo5SerializersWithExceptionHandling(classLoader, s))
+ .ifPresent(s -> this.defaultKryo5SerializerClasses = s);
configuration
.getOptional(PipelineOptions.POJO_REGISTERED_CLASSES)
@@ -1194,6 +1377,16 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered."))
.ifPresent(c -> this.registeredKryoTypes = c);
+ configuration
+ .getOptional(PipelineOptions.KRYO5_REGISTERED_CLASSES)
+ .map(
+ c ->
+ loadClasses(
+ c,
+ classLoader,
+ "Could not load kryo 5 type to be registered."))
+ .ifPresent(c -> this.registeredKryo5Types = c);
+
configuration
.getOptional(JobManagerOptions.SCHEDULER)
.ifPresent(t -> this.configuration.set(JobManagerOptions.SCHEDULER, t));
@@ -1216,7 +1409,7 @@ private LinkedHashSet> loadClasses(
.collect(Collectors.toCollection(LinkedHashSet::new));
}
- private LinkedHashMap, Class extends Serializer>>>
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
parseKryoSerializersWithExceptionHandling(
ClassLoader classLoader, List kryoSerializers) {
try {
@@ -1231,8 +1424,23 @@ private LinkedHashSet> loadClasses(
}
}
- private LinkedHashMap, Class extends Serializer>>> parseKryoSerializers(
- ClassLoader classLoader, List kryoSerializers) {
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ parseKryo5SerializersWithExceptionHandling(
+ ClassLoader classLoader, List kryoSerializers) {
+ try {
+ return parseKryo5Serializers(classLoader, kryoSerializers);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not configure kryo 5 serializers from %s. The expected format is:"
+ + "'class:,serializer:;...",
+ kryoSerializers),
+ e);
+ }
+ }
+
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ parseKryoSerializers(ClassLoader classLoader, List kryoSerializers) {
return kryoSerializers.stream()
.map(ConfigurationUtils::parseMap)
.collect(
@@ -1254,6 +1462,29 @@ private LinkedHashMap, Class extends Serializer>>> parseKryoSeriali
LinkedHashMap::new));
}
+ private LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ parseKryo5Serializers(ClassLoader classLoader, List kryoSerializers) {
+ return kryoSerializers.stream()
+ .map(ConfigurationUtils::parseMap)
+ .collect(
+ Collectors.toMap(
+ m ->
+ loadClass(
+ m.get("class"),
+ classLoader,
+ "Could not load class for kryo 5 serialization"),
+ m ->
+ loadClass(
+ m.get("serializer"),
+ classLoader,
+ "Could not load serializer's class"),
+ (m1, m2) -> {
+ throw new IllegalArgumentException(
+ "Duplicated serializer for class: " + m1);
+ },
+ LinkedHashMap::new));
+ }
+
@SuppressWarnings("unchecked")
private T loadClass(
String className, ClassLoader classLoader, String errorMessage) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 184bfc6a0baad..b2458deb9d819 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,7 +18,9 @@
package org.apache.flink.api.common.typeutils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoVersion;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -132,6 +134,23 @@ public abstract class TypeSerializer implements Serializable {
*/
public abstract void serialize(T record, DataOutputView target) throws IOException;
+ /**
+ * De-serializes a record from the given source input view.
+ *
+ * This method takes a kryoVersion hint to suggest which version of Kryo to use.
+ *
+ * @param record The record to serialize.
+ * @param target The output view to write the serialized data to.
+ * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically
+ * raised by the output view, which may have an underlying I/O channel to which it
+ * delegates.
+ */
+ @Internal
+ public void serializeWithKryoVersionHint(
+ T record, DataOutputView target, KryoVersion kryoVersion) throws IOException {
+ serialize(record, target);
+ }
+
/**
* De-serializes a record from the given source input view.
*
@@ -143,6 +162,23 @@ public abstract class TypeSerializer implements Serializable {
*/
public abstract T deserialize(DataInputView source) throws IOException;
+ /**
+ * De-serializes a record from the given source input view.
+ *
+ * This method takes a kryoVersion hint to suggest which version of Kryo to use.
+ *
+ * @param source The input view from which to read the data.
+ * @param kryoVersion The version of Kryo used or unknown for the default version.
+ * @return The deserialized element.
+ * @throws IOException Thrown, if the de-serialization encountered an I/O related error.
+ * Typically raised by the input view, which may have an underlying I/O channel from which
+ * it reads.
+ */
+ public T deserializeWithKryoVersionHint(DataInputView source, KryoVersion kryoVersion)
+ throws IOException {
+ return deserialize(source);
+ }
+
/**
* De-serializes a record from the given source input view into the given reuse record instance
* if mutable.
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 3ee54671583ca..50d6462117eb2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoVersion;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -116,6 +117,12 @@ public int getLength() {
@Override
public void serialize(List list, DataOutputView target) throws IOException {
+ serializeWithKryoVersionHint(list, target, KryoVersion.DEFAULT);
+ }
+
+ @Override
+ public void serializeWithKryoVersionHint(
+ List list, DataOutputView target, KryoVersion kryoVersion) throws IOException {
final int size = list.size();
target.writeInt(size);
@@ -123,18 +130,24 @@ public void serialize(List list, DataOutputView target) throws IOException {
// the given list supports RandomAccess.
// The Iterator should be stack allocated on new JVMs (due to escape analysis)
for (T element : list) {
- elementSerializer.serialize(element, target);
+ elementSerializer.serializeWithKryoVersionHint(element, target, kryoVersion);
}
}
@Override
public List deserialize(DataInputView source) throws IOException {
+ return deserializeWithKryoVersionHint(source, KryoVersion.DEFAULT);
+ }
+
+ @Override
+ public List deserializeWithKryoVersionHint(DataInputView source, KryoVersion kryoVersion)
+ throws IOException {
final int size = source.readInt();
// create new list with (size + 1) capacity to prevent expensive growth when a single
// element is added
final List list = new ArrayList<>(size + 1);
for (int i = 0; i < size; i++) {
- list.add(elementSerializer.deserialize(source));
+ list.add(elementSerializer.deserializeWithKryoVersionHint(source, kryoVersion));
}
return list;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index fb2713251de82..75410fde5bee8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoVersion;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.CollectionUtil;
@@ -129,31 +130,46 @@ public int getLength() {
@Override
public void serialize(Map map, DataOutputView target) throws IOException {
+ serializeWithKryoVersionHint(map, target, KryoVersion.DEFAULT);
+ }
+
+ @Override
+ public void serializeWithKryoVersionHint(
+ Map map, DataOutputView target, KryoVersion kryoVersion) throws IOException {
final int size = map.size();
target.writeInt(size);
for (Map.Entry entry : map.entrySet()) {
- keySerializer.serialize(entry.getKey(), target);
+ keySerializer.serializeWithKryoVersionHint(entry.getKey(), target, kryoVersion);
if (entry.getValue() == null) {
target.writeBoolean(true);
} else {
target.writeBoolean(false);
- valueSerializer.serialize(entry.getValue(), target);
+ valueSerializer.serializeWithKryoVersionHint(entry.getValue(), target, kryoVersion);
}
}
}
@Override
public Map deserialize(DataInputView source) throws IOException {
+ return deserializeWithKryoVersionHint(source, KryoVersion.DEFAULT);
+ }
+
+ @Override
+ public Map deserializeWithKryoVersionHint(DataInputView source, KryoVersion kryoVersion)
+ throws IOException {
final int size = source.readInt();
final Map map = CollectionUtil.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; ++i) {
- K key = keySerializer.deserialize(source);
+ K key = keySerializer.deserializeWithKryoVersionHint(source, kryoVersion);
boolean isNull = source.readBoolean();
- V value = isNull ? null : valueSerializer.deserialize(source);
+ V value =
+ isNull
+ ? null
+ : valueSerializer.deserializeWithKryoVersionHint(source, kryoVersion);
map.put(key, value);
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
index 7f1a2dcdbe2d7..81495dc4bfda7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
@@ -21,8 +21,8 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import java.util.LinkedHashMap;
@@ -69,6 +69,9 @@ public static AvroUtils getAvroUtils() {
public abstract void addAvroGenericDataArrayRegistration(
LinkedHashMap kryoRegistrations);
+ public abstract void addAvroGenericDataArrayRegistration5(
+ LinkedHashMap kryoRegistrations);
+
/**
* Creates an {@code AvroSerializer} if flink-avro is present, otherwise throws an exception.
*/
@@ -111,8 +114,25 @@ public void addAvroGenericDataArrayRegistration(
kryoRegistrations.put(
AVRO_GENERIC_DATA_ARRAY,
new KryoRegistration(
- Serializers.DummyAvroRegisteredClass.class,
- (Class) Serializers.DummyAvroKryoSerializerClass.class));
+ org.apache.flink.api.java.typeutils.runtime.kryo.Serializers
+ .DummyAvroRegisteredClass.class,
+ (Class)
+ org.apache.flink.api.java.typeutils.runtime.kryo.Serializers
+ .DummyAvroKryoSerializerClass.class));
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void addAvroGenericDataArrayRegistration5(
+ LinkedHashMap kryoRegistrations) {
+ kryoRegistrations.put(
+ AVRO_GENERIC_DATA_ARRAY,
+ new Kryo5Registration(
+ org.apache.flink.api.java.typeutils.runtime.kryo5.Serializers
+ .DummyAvroRegisteredClass.class,
+ (Class)
+ org.apache.flink.api.java.typeutils.runtime.kryo5.Serializers
+ .DummyAvroKryoSerializerClass.class));
}
@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 9344f3037c83d..8b8bda362c0cd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -26,7 +26,6 @@
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +87,8 @@ public TypeSerializer createSerializer(ExecutionConfig config) {
+ " is treated as a generic type.");
}
- return new KryoSerializer(this.typeClass, config);
+ return new org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer(
+ this.typeClass, config);
}
@SuppressWarnings("unchecked")
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 021f9dfe333a1..41b1d44a6da40 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -28,7 +28,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.commons.lang3.StringUtils;
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Registration.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Registration.java
new file mode 100644
index 0000000000000..3201cdc19fa08
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Registration.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import com.esotericsoftware.kryo.kryo5.SerializerFactory.ReflectionSerializerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** A {@code KryoRegistration} resembles a registered class and its serializer in Kryo. */
+@Internal
+public class Kryo5Registration implements Serializable {
+
+ private static final long serialVersionUID = 5375110512910892655L;
+
+ /**
+ * IMPORTANT: the order of the enumerations must not change, since their ordinals are used for
+ * serialization.
+ */
+ @Internal
+ public enum SerializerDefinitionType {
+ UNSPECIFIED,
+ CLASS,
+ INSTANCE
+ }
+
+ /** The registered class. */
+ private final Class> registeredClass;
+
+ /**
+ * Class of the serializer to use for the registered class. Exists only if the serializer
+ * definition type is {@link SerializerDefinitionType#CLASS}.
+ */
+ @Nullable private final Class extends Serializer>> serializerClass;
+
+ /**
+ * A serializable instance of the serializer to use for the registered class. Exists only if the
+ * serializer definition type is {@link SerializerDefinitionType#INSTANCE}.
+ */
+ @Nullable
+ private final ExecutionConfig.SerializableKryo5Serializer extends Serializer>>
+ serializableSerializerInstance;
+
+ private final SerializerDefinitionType serializerDefinitionType;
+
+ public Kryo5Registration(Class> registeredClass) {
+ this.registeredClass = Preconditions.checkNotNull(registeredClass);
+
+ this.serializerClass = null;
+ this.serializableSerializerInstance = null;
+
+ this.serializerDefinitionType = SerializerDefinitionType.UNSPECIFIED;
+ }
+
+ public Kryo5Registration(
+ Class> registeredClass, Class extends Serializer>> serializerClass) {
+ this.registeredClass = Preconditions.checkNotNull(registeredClass);
+
+ this.serializerClass = Preconditions.checkNotNull(serializerClass);
+ this.serializableSerializerInstance = null;
+
+ this.serializerDefinitionType = SerializerDefinitionType.CLASS;
+ }
+
+ public Kryo5Registration(
+ Class> registeredClass,
+ ExecutionConfig.SerializableKryo5Serializer extends Serializer>>
+ serializableSerializerInstance) {
+ this.registeredClass = Preconditions.checkNotNull(registeredClass);
+
+ this.serializerClass = null;
+ this.serializableSerializerInstance =
+ Preconditions.checkNotNull(serializableSerializerInstance);
+
+ this.serializerDefinitionType = SerializerDefinitionType.INSTANCE;
+ }
+
+ public Class> getRegisteredClass() {
+ return registeredClass;
+ }
+
+ public SerializerDefinitionType getSerializerDefinitionType() {
+ return serializerDefinitionType;
+ }
+
+ @Nullable
+ public Class extends Serializer>> getSerializerClass() {
+ return serializerClass;
+ }
+
+ @Nullable
+ public ExecutionConfig.SerializableKryo5Serializer extends Serializer>>
+ getSerializableSerializerInstance() {
+ return serializableSerializerInstance;
+ }
+
+ public Serializer> getSerializer(Kryo kryo) {
+ switch (serializerDefinitionType) {
+ case UNSPECIFIED:
+ return null;
+ case CLASS:
+ return ReflectionSerializerFactory.newSerializer(
+ kryo, serializerClass, registeredClass);
+ case INSTANCE:
+ return serializableSerializerInstance.getSerializer();
+ default:
+ // this should not happen; adding as a guard for the future
+ throw new IllegalStateException(
+ "Unrecognized Kryo registration serializer definition type: "
+ + serializerDefinitionType);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj instanceof Kryo5Registration) {
+ Kryo5Registration other = (Kryo5Registration) obj;
+
+ // we cannot include the serializer instances here because they don't implement the
+ // equals method
+ return serializerDefinitionType == other.serializerDefinitionType
+ && registeredClass == other.registeredClass
+ && serializerClass == other.serializerClass;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serializerDefinitionType.hashCode();
+ result = 31 * result + registeredClass.hashCode();
+
+ if (serializerClass != null) {
+ result = 31 * result + serializerClass.hashCode();
+ }
+
+ return result;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Utils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Utils.java
new file mode 100644
index 0000000000000..154d92dc3d50f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Kryo5Utils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** Convenience methods for Kryo */
+@Internal
+public class Kryo5Utils {
+
+ /**
+ * Tries to copy the given record from using the provided Kryo instance. If this fails, then the
+ * record from is copied by serializing it into a byte buffer and deserializing it from there.
+ *
+ * @param from Element to copy
+ * @param kryo Kryo instance to use
+ * @param serializer TypeSerializer which is used in case of a Kryo failure
+ * @param Type of the element to be copied
+ * @return Copied element
+ */
+ public static T copy(T from, Kryo kryo, TypeSerializer serializer) {
+ try {
+ return kryo.copy(from);
+ } catch (KryoException ke) {
+ // Kryo could not copy the object --> try to serialize/deserialize the object
+ try {
+ byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
+
+ return InstantiationUtil.deserializeFromByteArray(serializer, byteArray);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Could not copy object by serializing/deserializing" + " it.", ioe);
+ }
+ }
+ }
+
+ /**
+ * Tries to copy the given record from using the provided Kryo instance. If this fails, then the
+ * record from is copied by serializing it into a byte buffer and deserializing it from there.
+ *
+ * @param from Element to copy
+ * @param reuse Reuse element for the deserialization
+ * @param kryo Kryo instance to use
+ * @param serializer TypeSerializer which is used in case of a Kryo failure
+ * @param Type of the element to be copied
+ * @return Copied element
+ */
+ public static T copy(T from, T reuse, Kryo kryo, TypeSerializer serializer) {
+ try {
+ return kryo.copy(from);
+ } catch (KryoException ke) {
+ // Kryo could not copy the object --> try to serialize/deserialize the object
+ try {
+ byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
+
+ return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Could not copy object by serializing/deserializing" + " it.", ioe);
+ }
+ }
+ }
+
+ /**
+ * Apply a list of {@link KryoRegistration} to a Kryo instance. The list of registrations is
+ * assumed to already be a final resolution of all possible registration overwrites.
+ *
+ * The registrations are applied in the given order and always specify the registration id,
+ * using the given {@code firstRegistrationId} and incrementing it for each registration.
+ *
+ * @param kryo the Kryo instance to apply the registrations
+ * @param resolvedRegistrations the registrations, which should already be resolved of all
+ * possible registration overwrites
+ * @param firstRegistrationId the first registration id to use
+ */
+ public static void applyRegistrations(
+ Kryo kryo,
+ Collection resolvedRegistrations,
+ int firstRegistrationId) {
+
+ int currentRegistrationId = firstRegistrationId;
+ Serializer> serializer;
+ for (Kryo5Registration registration : resolvedRegistrations) {
+ serializer = registration.getSerializer(kryo);
+
+ if (serializer != null) {
+ kryo.register(registration.getRegisteredClass(), serializer, currentRegistrationId);
+ } else {
+ kryo.register(registration.getRegisteredClass(), currentRegistrationId);
+ }
+ // if Kryo already had a serializer for that type then it ignores the registration
+ if (kryo.getRegistration(currentRegistrationId) != null) {
+ currentRegistrationId++;
+ }
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput5.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput5.java
new file mode 100644
index 0000000000000..2639e7f8ed612
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput5.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.io.Input;
+import com.esotericsoftware.kryo.kryo5.io.KryoBufferUnderflowException;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+@Internal
+public class NoFetchingInput5 extends Input {
+ public NoFetchingInput5(InputStream inputStream) {
+ super(inputStream, 8);
+ }
+
+ @Override
+ public int read() throws KryoException {
+ require(1);
+ return buffer[position++] & 0xFF;
+ }
+
+ @Override
+ public boolean canReadInt() throws KryoException {
+ throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+ }
+
+ @Override
+ public boolean canReadLong() throws KryoException {
+ throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+ }
+
+ /**
+ * Require makes sure that at least required number of bytes are kept in the buffer. If not,
+ * then it will load exactly the difference between required and currently available number of
+ * bytes. Thus, it will only load the data which is required and never prefetch data.
+ *
+ * @param required the number of bytes being available in the buffer
+ * @return The number of bytes remaining in the buffer, which will be at least required
+ *
bytes.
+ * @throws KryoException
+ */
+ @Override
+ protected int require(int required) throws KryoException {
+ // The main change between this and Kryo5 Input.require is this will never read more bytes
+ // than required.
+ // There are also formatting changes to be compliant with the Flink project styling rules.
+ int remaining = limit - position;
+ if (remaining >= required) {
+ return remaining;
+ }
+ if (required > capacity) {
+ throw new KryoException(
+ "Buffer too small: capacity: " + capacity + ", required: " + required);
+ }
+
+ int count;
+ // Try to fill the buffer.
+ if (remaining > 0) {
+ // Logical change 1 (from Kryo Input.require): "capacity - limit" -> "required - limit"
+ count = fill(buffer, limit, required - limit);
+ if (count == -1) {
+ throw new KryoBufferUnderflowException("Buffer underflow.");
+ }
+ remaining += count;
+ if (remaining >= required) {
+ limit += count;
+ return remaining;
+ }
+ }
+
+ // Was not enough, compact and try again.
+ System.arraycopy(buffer, position, buffer, 0, remaining);
+ total += position;
+ position = 0;
+
+ do {
+ // Logical change 2 (from Kryo Input.require): "capacity - remaining" -> "required -
+ // remaining"
+ count = fill(buffer, remaining, required - remaining);
+ if (count == -1) {
+ throw new KryoBufferUnderflowException("Buffer underflow.");
+ }
+ remaining += count;
+ } while (remaining < required);
+
+ limit = remaining;
+ return remaining;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int count) throws KryoException {
+ if (bytes == null) {
+ throw new IllegalArgumentException("bytes cannot be null.");
+ }
+
+ try {
+ return inputStream.read(bytes, offset, count);
+ } catch (IOException ex) {
+ throw new KryoException(ex);
+ }
+ }
+
+ @Override
+ public void skip(int count) throws KryoException {
+ try {
+ inputStream.skip(count);
+ } catch (IOException ex) {
+ throw new KryoException(ex);
+ }
+ }
+
+ @Override
+ public void readBytes(byte[] bytes, int offset, int count) throws KryoException {
+ if (bytes == null) {
+ throw new IllegalArgumentException("bytes cannot be null.");
+ }
+
+ try {
+ int bytesRead = 0;
+ int c;
+
+ while (true) {
+ c = inputStream.read(bytes, offset + bytesRead, count - bytesRead);
+
+ if (c == -1) {
+ throw new KryoException(new EOFException("No more bytes left."));
+ }
+
+ bytesRead += c;
+
+ if (bytesRead == count) {
+ break;
+ }
+ }
+ } catch (IOException ex) {
+ throw new KryoException(ex);
+ }
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
index c5f6b5893f633..29db20caee79e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -27,8 +27,9 @@
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.util.DefaultInstantiatorStrategy;
import java.io.IOException;
@@ -65,7 +66,7 @@ public int hash(T record) {
public void setReference(T toCompare) {
checkKryoInitialized();
- reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer(type));
+ reference = Kryo5Utils.copy(toCompare, kryo, new ValueSerializer(type));
}
@Override
@@ -142,12 +143,11 @@ private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
- new Kryo.DefaultInstantiatorStrategy();
+ DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
- this.kryo.setAsmEnabled(true);
+ // this.kryo.setAsmEnabled(true);
this.kryo.register(type);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 666fbd66986fd..89e95ad1bbcbb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -28,8 +28,9 @@
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.InstantiationUtil;
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.util.DefaultInstantiatorStrategy;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -59,7 +60,7 @@ public final class ValueSerializer extends TypeSerializer {
* Currently, we only have one single registration for the value type. Nevertheless, we keep
* this information here for future compatibility.
*/
- private LinkedHashMap kryoRegistrations;
+ private LinkedHashMap kryoRegistrations;
private transient Kryo kryo;
@@ -93,14 +94,14 @@ public T createInstance() {
public T copy(T from) {
checkKryoInitialized();
- return KryoUtils.copy(from, kryo, this);
+ return Kryo5Utils.copy(from, kryo, this);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
- return KryoUtils.copy(from, reuse, kryo, this);
+ return Kryo5Utils.copy(from, reuse, kryo, this);
}
@Override
@@ -138,14 +139,11 @@ private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
- new Kryo.DefaultInstantiatorStrategy();
- instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
+ DefaultInstantiatorStrategy initStrategy = new DefaultInstantiatorStrategy();
+ initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+ kryo.setInstantiatorStrategy(initStrategy);
- this.kryo.setAsmEnabled(true);
-
- KryoUtils.applyRegistrations(
+ Kryo5Utils.applyRegistrations(
this.kryo, kryoRegistrations.values(), this.kryo.getNextRegistrationId());
}
}
@@ -221,12 +219,12 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
}
- private static LinkedHashMap asKryoRegistrations(Class> type) {
+ private static LinkedHashMap asKryoRegistrations(Class> type) {
checkNotNull(type);
- LinkedHashMap registration =
+ LinkedHashMap registration =
CollectionUtil.newLinkedHashMapWithExpectedSize(1);
- registration.put(type.getClass().getName(), new KryoRegistration(type));
+ registration.put(type.getClass().getName(), new Kryo5Registration(type));
return registration;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 3403f5e746977..9b4d673633d3b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
@@ -29,6 +30,7 @@
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoVersion;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.CollectionUtil;
@@ -149,6 +151,10 @@ private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
private final Class type;
+ // This is used for generating new serialization snapshots.
+ private org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer
+ forwardCompatibilitySerializer;
+
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or deserialization.
@@ -187,19 +193,61 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig) {
executionConfig.getRegisteredKryoTypes(),
executionConfig.getRegisteredTypesWithKryoSerializerClasses(),
executionConfig.getRegisteredTypesWithKryoSerializers());
+
+ this.forwardCompatibilitySerializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer<>(
+ type, executionConfig, this);
+ }
+
+ public KryoSerializer(
+ Class type,
+ ExecutionConfig executionConfig,
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer
+ forwardCompatibilitySerializer) {
+ this.type = checkNotNull(type);
+
+ this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
+ this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
+
+ this.kryoRegistrations =
+ buildKryoRegistrations(
+ this.type,
+ executionConfig.getRegisteredKryoTypes(),
+ executionConfig.getRegisteredTypesWithKryoSerializerClasses(),
+ executionConfig.getRegisteredTypesWithKryoSerializers());
+
+ this.forwardCompatibilitySerializer = checkNotNull(forwardCompatibilitySerializer);
}
/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
- protected KryoSerializer(KryoSerializer toCopy) {
+ @Internal
+ public KryoSerializer(KryoSerializer toCopy) {
+ this(toCopy, null);
+
+ if (toCopy.forwardCompatibilitySerializer != null) {
+ this.forwardCompatibilitySerializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer<>(
+ toCopy.forwardCompatibilitySerializer, this);
+ }
+ }
+ /**
+ * Copy-constructor that does not copy transient fields. They will be initialized once required.
+ */
+ @Internal
+ public KryoSerializer(
+ KryoSerializer toCopy,
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer
+ forwardCompatibilitySerializer) {
this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
this.defaultSerializers =
CollectionUtil.newLinkedHashMapWithExpectedSize(toCopy.defaultSerializers.size());
this.kryoRegistrations =
CollectionUtil.newLinkedHashMapWithExpectedSize(toCopy.kryoRegistrations.size());
+ this.forwardCompatibilitySerializer = forwardCompatibilitySerializer;
// deep copy the serializer instances in defaultSerializers
for (Map.Entry, ExecutionConfig.SerializableSerializer>> entry :
@@ -234,11 +282,14 @@ protected KryoSerializer(KryoSerializer toCopy) {
// for KryoSerializerSnapshot
// ------------------------------------------------------------------------
- KryoSerializer(
+ @Internal
+ public KryoSerializer(
Class type,
LinkedHashMap, SerializableSerializer>> defaultSerializers,
LinkedHashMap, Class extends Serializer>>> defaultSerializerClasses,
- LinkedHashMap kryoRegistrations) {
+ LinkedHashMap kryoRegistrations,
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer
+ forwardCompatibilitySerializer) {
this.type = checkNotNull(type, "Type class cannot be null.");
this.defaultSerializerClasses =
@@ -248,21 +299,28 @@ protected KryoSerializer(KryoSerializer toCopy) {
checkNotNull(defaultSerializers, "Default serializers cannot be null.");
this.kryoRegistrations =
checkNotNull(kryoRegistrations, "Kryo registrations cannot be null.");
+
+ this.forwardCompatibilitySerializer = forwardCompatibilitySerializer;
}
- Class getType() {
+ @Internal
+ public Class getType() {
return type;
}
- LinkedHashMap, SerializableSerializer>> getDefaultKryoSerializers() {
+ @Internal
+ public LinkedHashMap, SerializableSerializer>> getDefaultKryoSerializers() {
return defaultSerializers;
}
- LinkedHashMap, Class extends Serializer>>> getDefaultKryoSerializerClasses() {
+ @Internal
+ public LinkedHashMap, Class extends Serializer>>>
+ getDefaultKryoSerializerClasses() {
return defaultSerializerClasses;
}
- LinkedHashMap getKryoRegistrations() {
+ @Internal
+ public LinkedHashMap getKryoRegistrations() {
return kryoRegistrations;
}
@@ -383,6 +441,27 @@ public void serialize(T record, DataOutputView target) throws IOException {
}
}
+ @Override
+ public void serializeWithKryoVersionHint(
+ T record, DataOutputView target, KryoVersion kryoVersion) throws IOException {
+ switch (kryoVersion) {
+ case DEFAULT:
+ case VERSION_2_X:
+ serialize(record, target);
+ break;
+ case VERSION_5_X:
+ if (forwardCompatibilitySerializer == null) {
+ throw new IOException("Required v5 compatability serializer missing");
+ } else {
+ forwardCompatibilitySerializer.serializeWithKryoVersionHint(
+ record, target, kryoVersion);
+ }
+ break;
+ default:
+ throw new IOException(String.format("Unexpected Kryo version: %s", kryoVersion));
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
@@ -417,6 +496,25 @@ public T deserialize(DataInputView source) throws IOException {
}
}
+ @Override
+ public T deserializeWithKryoVersionHint(DataInputView source, KryoVersion kryoVersion)
+ throws IOException {
+ switch (kryoVersion) {
+ case DEFAULT:
+ case VERSION_2_X:
+ return deserialize(source);
+ case VERSION_5_X:
+ if (forwardCompatibilitySerializer == null) {
+ throw new IOException("Required v5 compatability serializer missing");
+ } else {
+ return forwardCompatibilitySerializer.deserializeWithKryoVersionHint(
+ source, kryoVersion);
+ }
+ default:
+ throw new IOException(String.format("Unexpected Kryo version: %s", kryoVersion));
+ }
+ }
+
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
@@ -560,8 +658,12 @@ private void checkKryoInitialized() {
@Override
public TypeSerializerSnapshot snapshotConfiguration() {
- return new KryoSerializerSnapshot<>(
- type, defaultSerializers, defaultSerializerClasses, kryoRegistrations);
+ if (forwardCompatibilitySerializer == null) {
+ return new KryoSerializerSnapshot<>(
+ type, defaultSerializers, defaultSerializerClasses, kryoRegistrations);
+ } else {
+ return forwardCompatibilitySerializer.snapshotConfiguration();
+ }
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 9696b1ef7128d..56f1f4b28af55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -18,13 +18,17 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoUpgradeConfiguration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.LinkedOptionalMap;
import org.apache.flink.util.LinkedOptionalMap.MergeResult;
import com.esotericsoftware.kryo.Serializer;
@@ -33,7 +37,9 @@
import java.io.IOException;
import java.util.LinkedHashMap;
+import java.util.Set;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom;
import static org.apache.flink.util.LinkedOptionalMap.mergeRightIntoLeft;
@@ -51,6 +57,7 @@ public class KryoSerializerSnapshot implements TypeSerializerSnapshot {
@SuppressWarnings("unused")
public KryoSerializerSnapshot() {}
+ @SuppressWarnings("deprecation")
KryoSerializerSnapshot(
Class typeClass,
LinkedHashMap, SerializableSerializer>> defaultKryoSerializers,
@@ -82,27 +89,60 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCode
}
@Override
+ @SuppressWarnings("deprecation")
public TypeSerializer restoreSerializer() {
- return new KryoSerializer<>(
+ LinkedHashMap, ExecutionConfig.SerializableSerializer>> defaultSerializersV2 =
+ snapshotData.getDefaultKryoSerializers().unwrapOptionals();
+ LinkedHashMap, Class extends Serializer>>> defaultSerializerClassesV2 =
+ snapshotData.getDefaultKryoSerializerClasses().unwrapOptionals();
+ LinkedHashMap registrationsV2 =
+ snapshotData.getKryoRegistrations().unwrapOptionals();
+
+ KryoUpgradeConfiguration upgradeConfiguration = KryoUpgradeConfiguration.getActive();
+ LinkedHashMap, ExecutionConfig.SerializableKryo5Serializer>>
+ defaultSerializersV5 =
+ upgradeConfiguration.upgradeDefaultSerializers(defaultSerializersV2);
+ LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ defaultSerializerClassesV5 =
+ upgradeConfiguration.upgradeDefaultSerializerClasses(
+ defaultSerializerClassesV2);
+ LinkedHashMap registrationsV5 =
+ upgradeConfiguration.upgradeRegistrations(registrationsV2);
+
+ return org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer.createReturnV2(
snapshotData.getTypeClass(),
- snapshotData.getDefaultKryoSerializers().unwrapOptionals(),
- snapshotData.getDefaultKryoSerializerClasses().unwrapOptionals(),
- snapshotData.getKryoRegistrations().unwrapOptionals());
+ defaultSerializersV2,
+ defaultSerializerClassesV2,
+ registrationsV2,
+ defaultSerializersV5,
+ defaultSerializerClassesV5,
+ registrationsV5);
}
@Override
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
- if (!(newSerializer instanceof KryoSerializer)) {
+ if (newSerializer instanceof KryoSerializer) {
+ KryoSerializer kryoSerializer = (KryoSerializer) newSerializer;
+ if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return resolveSchemaCompatibility(kryoSerializer);
+ } else if (newSerializer
+ instanceof org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer) {
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer kryoSerializer =
+ (org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer)
+ newSerializer;
+ if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return resolveForwardSchemaCompatibility(kryoSerializer);
+ } else {
return TypeSerializerSchemaCompatibility.incompatible();
}
- KryoSerializer kryoSerializer = (KryoSerializer) newSerializer;
- if (kryoSerializer.getType() != snapshotData.getTypeClass()) {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- return resolveSchemaCompatibility(kryoSerializer);
}
+ @SuppressWarnings("deprecation")
private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
KryoSerializer newSerializer) {
// merge the default serializers
@@ -141,20 +181,6 @@ private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
return TypeSerializerSchemaCompatibility.incompatible();
}
- // there are no missing keys, now we have to decide whether we are compatible as-is or we
- // require reconfiguration.
- return resolveSchemaCompatibility(
- reconfiguredDefaultKryoSerializers,
- reconfiguredDefaultKryoSerializerClasses,
- reconfiguredRegistrations);
- }
-
- private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
- MergeResult, SerializableSerializer>> reconfiguredDefaultKryoSerializers,
- MergeResult, Class extends Serializer>>>
- reconfiguredDefaultKryoSerializerClasses,
- MergeResult reconfiguredRegistrations) {
-
if (reconfiguredDefaultKryoSerializers.isOrderedSubset()
&& reconfiguredDefaultKryoSerializerClasses.isOrderedSubset()
&& reconfiguredRegistrations.isOrderedSubset()) {
@@ -168,7 +194,84 @@ private TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
snapshotData.getTypeClass(),
reconfiguredDefaultKryoSerializers.getMerged(),
reconfiguredDefaultKryoSerializerClasses.getMerged(),
- reconfiguredRegistrations.getMerged());
+ reconfiguredRegistrations.getMerged(),
+ null);
+
+ return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+ reconfiguredSerializer);
+ }
+
+ @SuppressWarnings("deprecation")
+ private TypeSerializerSchemaCompatibility resolveForwardSchemaCompatibility(
+ org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer kryo5Serializer) {
+ // Default Kryo Serializers
+ LinkedOptionalMap, ExecutionConfig.SerializableSerializer>>
+ defaultSnapshotKryo2Serializers = snapshotData.getDefaultKryoSerializers();
+
+ LinkedHashMap, ExecutionConfig.SerializableKryo5Serializer>>
+ defaultKryo5SerializersRaw = kryo5Serializer.getDefaultKryoSerializers();
+ Set defaultKryo5SerializersClassNames =
+ defaultKryo5SerializersRaw.keySet().stream()
+ .map(Class::getName)
+ .collect(Collectors.toSet());
+
+ MergeResult, SerializableSerializer>> defaultKryo5SerializersMergeResult =
+ LinkedOptionalMap.mergeValuesWithPrefixKeys(
+ defaultSnapshotKryo2Serializers, defaultKryo5SerializersClassNames);
+
+ if (defaultKryo5SerializersMergeResult.hasMissingKeys()) {
+ logMissingKeys(defaultKryo5SerializersMergeResult);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // Default Serializer Classes
+ LinkedOptionalMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ defaultSnapshotKryo2SerializerClasses =
+ snapshotData.getDefaultKryoSerializerClasses();
+
+ LinkedHashMap, Class extends com.esotericsoftware.kryo.kryo5.Serializer>>>
+ kryo5SerializerClassesRaw = kryo5Serializer.getDefaultKryoSerializerClasses();
+ Set kryo5SerializerClassesClassNames =
+ kryo5SerializerClassesRaw.keySet().stream()
+ .map(Class::getName)
+ .collect(Collectors.toSet());
+
+ MergeResult, Class extends com.esotericsoftware.kryo.Serializer>>>
+ kryoSerializersClassesMergeResult =
+ LinkedOptionalMap.mergeValuesWithPrefixKeys(
+ defaultSnapshotKryo2SerializerClasses,
+ kryo5SerializerClassesClassNames);
+
+ if (kryoSerializersClassesMergeResult.hasMissingKeys()) {
+ logMissingKeys(kryoSerializersClassesMergeResult);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // Kryo Registrations
+ LinkedOptionalMap snapshotKryo2Registrations =
+ snapshotData.getKryoRegistrations();
+
+ LinkedHashMap kryo5RegistrationsRaw =
+ kryo5Serializer.getKryoRegistrations();
+ Set kryo5RegistrationKeys = kryo5RegistrationsRaw.keySet();
+
+ MergeResult kryo5RegistrationsMergeResult =
+ LinkedOptionalMap.mergeValuesWithPrefixKeys(
+ snapshotKryo2Registrations, kryo5RegistrationKeys);
+
+ if (kryo5RegistrationsMergeResult.hasMissingKeys()) {
+ logMissingKeys(kryo5RegistrationsMergeResult);
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ // reconfigure a new KryoSerializer
+ KryoSerializer reconfiguredSerializer =
+ new KryoSerializer<>(
+ snapshotData.getTypeClass(),
+ defaultKryo5SerializersMergeResult.getMerged(),
+ kryoSerializersClassesMergeResult.getMerged(),
+ kryo5RegistrationsMergeResult.getMerged(),
+ kryo5Serializer);
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
reconfiguredSerializer);
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/ChillSerializerRegistrar.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/ChillSerializerRegistrar.java
new file mode 100644
index 0000000000000..842134c5d87be
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/ChillSerializerRegistrar.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+
+/** Interface for flink-core to interact with the FlinkChillPackageRegistrar in flink-java. */
+@PublicEvolving
+public interface ChillSerializerRegistrar {
+ /**
+ * Registers all serializers with the given {@link Kryo}. All serializers are registered with
+ * specific IDs as a continuous block.
+ *
+ * @param kryo Kryo to register serializers with
+ */
+ void registerSerializers(Kryo kryo);
+
+ /**
+ * Returns the registration ID that immediately follows the last registered serializer.
+ *
+ * @return registration ID that should be used for the next serializer registration
+ */
+ int getNextRegistrationId();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java
new file mode 100644
index 0000000000000..4b17fe7688a41
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java
@@ -0,0 +1,851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo5;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig.SerializableKryo5Serializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.AvroUtils;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Registration;
+import org.apache.flink.api.java.typeutils.runtime.Kryo5Utils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput5;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.kryo5.Kryo;
+import com.esotericsoftware.kryo.kryo5.KryoException;
+import com.esotericsoftware.kryo.kryo5.Serializer;
+import com.esotericsoftware.kryo.kryo5.io.Input;
+import com.esotericsoftware.kryo.kryo5.io.KryoBufferUnderflowException;
+import com.esotericsoftware.kryo.kryo5.io.Output;
+import com.esotericsoftware.kryo.kryo5.objenesis.strategy.StdInstantiatorStrategy;
+import com.esotericsoftware.kryo.kryo5.util.DefaultInstantiatorStrategy;
+import org.apache.commons.lang3.exception.CloneFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A type serializer that serializes its type using the Kryo serialization framework
+ * (https://github.com/EsotericSoftware/kryo).
+ *
+ * This serializer is intended as a fallback serializer for the cases that are not covered by the
+ * basic types, tuples, and POJOs.
+ *
+ *
The set of serializers registered with Kryo via {@link Kryo#register}, with their respective
+ * IDs, depends on whether flink-java or flink-scala are on the classpath. This is for
+ * backwards-compatibility reasons.
+ *
+ *
If neither are available (which should only apply to tests in flink-core), then:
+ *
+ *
+ * - 0-9 are used for Java primitives
+ *
- 10+ are used for user-defined registration
+ *
+ *
+ * If flink-scala is available, then:
+ *
+ *
+ * - 0-9 are used for Java primitives
+ *
- 10-72 are used for Scala classes
+ *
- 73-84 are used for Java classes
+ *
- 85+ are used for user-defined registration
+ *
+ *
+ * If *only* flink-java is available, then:
+ *
+ *
+ * - 0-9 are used for Java primitives
+ *
- 10-72 are unused (to maintain compatibility)
+ *
- 73-84 are used for Java classes
+ *
- 85+ are used for user-defined registration
+ *
+ *
+ * @param The type to be serialized.
+ */
+@PublicEvolving
+public class KryoSerializer extends TypeSerializer {
+
+ private static final long serialVersionUID = 3L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);
+
+ /**
+ * Flag whether to check for concurrent thread access. Because this flag is static final, a
+ * value of 'false' allows the JIT compiler to eliminate the guarded code sections.
+ */
+ private static final boolean CONCURRENT_ACCESS_CHECK =
+ LOG.isDebugEnabled() || KryoSerializerDebugInitHelper.setToDebug;
+
+ static {
+ configureKryoLogging();
+ }
+
+ @Nullable
+ private static final ChillSerializerRegistrar flinkChillPackageRegistrar =
+ loadFlinkChillPackageRegistrar();
+
+ @Nullable
+ private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
+ try {
+ return (ChillSerializerRegistrar)
+ Class.forName(
+ "org.apache.flink.api.java.typeutils.runtime.kryo5.FlinkChillPackageRegistrar")
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ private final LinkedHashMap, ExecutionConfig.SerializableKryo5Serializer>>
+ defaultSerializers;
+ private final LinkedHashMap, Class extends Serializer>>> defaultSerializerClasses;
+
+ /**
+ * Map of class tag (using classname as tag) to their Kryo registration.
+ *
+ * This map serves as a preview of the final registration result of the Kryo instance, taking
+ * into account registration overwrites.
+ */
+ private LinkedHashMap kryoRegistrations;
+
+ private final Class type;
+
+ // This is used for reading legacy serialization data.
+ private org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+ backwardCompatibilitySerializer;
+
+ // ------------------------------------------------------------------------
+ // The fields below are lazily initialized after duplication or deserialization.
+ private transient Kryo kryo;
+ private transient T copyInstance;
+
+ private transient DataOutputView previousOut;
+ private transient DataInputView previousIn;
+
+ private transient Input input;
+ private transient Output output;
+
+ // ------------------------------------------------------------------------
+ // legacy fields; these fields cannot yet be removed to retain backwards compatibility
+
+ private LinkedHashMap, SerializableKryo5Serializer>> registeredTypesWithSerializers;
+ private LinkedHashMap, Class extends Serializer>>>
+ registeredTypesWithSerializerClasses;
+ private LinkedHashSet> registeredTypes;
+
+ // for debugging purposes
+ private transient volatile Thread currentThread;
+
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("deprecation")
+ public KryoSerializer(Class type, ExecutionConfig executionConfig) {
+ this.type = checkNotNull(type);
+
+ this.defaultSerializers = executionConfig.getDefaultKryo5Serializers();
+ this.defaultSerializerClasses = executionConfig.getDefaultKryo5SerializerClasses();
+
+ this.kryoRegistrations =
+ buildKryoRegistrations(
+ this.type,
+ executionConfig.getRegisteredKryo5Types(),
+ executionConfig.getRegisteredTypesWithKryo5SerializerClasses(),
+ executionConfig.getRegisteredTypesWithKryo5Serializers());
+
+ this.backwardCompatibilitySerializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ type, executionConfig, this);
+ }
+
+ @Internal
+ public KryoSerializer(
+ Class type,
+ ExecutionConfig executionConfig,
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+ backwardCompatibilitySerializer) {
+ this.type = checkNotNull(type);
+
+ this.defaultSerializers = executionConfig.getDefaultKryo5Serializers();
+ this.defaultSerializerClasses = executionConfig.getDefaultKryo5SerializerClasses();
+
+ this.kryoRegistrations =
+ buildKryoRegistrations(
+ this.type,
+ executionConfig.getRegisteredKryo5Types(),
+ executionConfig.getRegisteredTypesWithKryo5SerializerClasses(),
+ executionConfig.getRegisteredTypesWithKryo5Serializers());
+
+ this.backwardCompatibilitySerializer = checkNotNull(backwardCompatibilitySerializer);
+ }
+
+ /**
+ * Copy-constructor that does not copy transient fields. They will be initialized once required.
+ */
+ @Internal
+ public KryoSerializer(KryoSerializer toCopy) {
+ this(toCopy, null);
+
+ if (toCopy.backwardCompatibilitySerializer != null) {
+ this.backwardCompatibilitySerializer =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ toCopy.backwardCompatibilitySerializer, this);
+ }
+ }
+
+ /**
+ * Copy-constructor that does not copy transient fields. They will be initialized once required.
+ */
+ @Internal
+ public KryoSerializer(
+ KryoSerializer toCopy,
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+ backwardCompatibilitySerializer) {
+
+ this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+ this.backwardCompatibilitySerializer = backwardCompatibilitySerializer;
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry, SerializableKryo5Serializer>> entry :
+ toCopy.defaultSerializers.entrySet()) {
+
+ this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) {
+
+ Kryo5Registration kryoRegistration = entry.getValue();
+
+ if (kryoRegistration.getSerializerDefinitionType()
+ == Kryo5Registration.SerializerDefinitionType.INSTANCE) {
+
+ SerializableKryo5Serializer extends Serializer>> serializerInstance =
+ kryoRegistration.getSerializableSerializerInstance();
+
+ if (serializerInstance != null) {
+ kryoRegistration =
+ new Kryo5Registration(
+ kryoRegistration.getRegisteredClass(),
+ deepCopySerializer(serializerInstance));
+ }
+ }
+
+ this.kryoRegistrations.put(entry.getKey(), kryoRegistration);
+ }
+ }
+
+ // for KryoSerializerSnapshot
+ // ------------------------------------------------------------------------
+
+ KryoSerializer(
+ Class type,
+ LinkedHashMap, SerializableKryo5Serializer>> defaultSerializers,
+ LinkedHashMap, Class extends Serializer>>> defaultSerializerClasses,
+ LinkedHashMap kryoRegistrations,
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+ backwardCompatibilitySerializer) {
+
+ this.type = checkNotNull(type, "Type class cannot be null.");
+ this.defaultSerializerClasses =
+ checkNotNull(
+ defaultSerializerClasses, "Default serializer classes cannot be null.");
+ this.defaultSerializers =
+ checkNotNull(defaultSerializers, "Default serializers cannot be null.");
+ this.kryoRegistrations =
+ checkNotNull(kryoRegistrations, "Kryo registrations cannot be null.");
+
+ this.backwardCompatibilitySerializer = backwardCompatibilitySerializer;
+ }
+
+ @Internal
+ public static KryoSerializer create(
+ Class type,
+ LinkedHashMap, ExecutionConfig.SerializableSerializer>> defaultSerializersV2,
+ LinkedHashMap, Class extends com.esotericsoftware.kryo.Serializer>>>
+ defaultSerializerClassesV2,
+ LinkedHashMap registrationsV2,
+ LinkedHashMap, SerializableKryo5Serializer>> defaultSerializersV5,
+ LinkedHashMap, Class extends Serializer>>> defaultSerializerClassesV5,
+ LinkedHashMap registrationsV5) {
+ KryoSerializer v5 =
+ new KryoSerializer<>(
+ type,
+ defaultSerializersV5,
+ defaultSerializerClassesV5,
+ registrationsV5,
+ null);
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer v2 =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ type,
+ defaultSerializersV2,
+ defaultSerializerClassesV2,
+ registrationsV2,
+ v5);
+ v5.backwardCompatibilitySerializer = v2;
+ return v5;
+ }
+
+ @Internal
+ public static
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer createReturnV2(
+ Class type,
+ LinkedHashMap, ExecutionConfig.SerializableSerializer>>
+ defaultSerializersV2,
+ LinkedHashMap<
+ Class>,
+ Class extends com.esotericsoftware.kryo.Serializer>>>
+ defaultSerializerClassesV2,
+ LinkedHashMap registrationsV2,
+ LinkedHashMap, SerializableKryo5Serializer>> defaultSerializersV5,
+ LinkedHashMap, Class extends Serializer>>>
+ defaultSerializerClassesV5,
+ LinkedHashMap registrationsV5) {
+ KryoSerializer v5 =
+ new KryoSerializer<>(
+ type,
+ defaultSerializersV5,
+ defaultSerializerClassesV5,
+ registrationsV5,
+ null);
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer v2 =
+ new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
+ type,
+ defaultSerializersV2,
+ defaultSerializerClassesV2,
+ registrationsV2,
+ v5);
+ v5.backwardCompatibilitySerializer = v2;
+ return v5.backwardCompatibilitySerializer;
+ }
+
+ @PublicEvolving
+ public Class getType() {
+ return type;
+ }
+
+ @PublicEvolving
+ public LinkedHashMap, SerializableKryo5Serializer>> getDefaultKryoSerializers() {
+ return defaultSerializers;
+ }
+
+ @PublicEvolving
+ public LinkedHashMap, Class extends Serializer>>>
+ getDefaultKryoSerializerClasses() {
+ return defaultSerializerClasses;
+ }
+
+ @Internal
+ public LinkedHashMap getKryoRegistrations() {
+ return kryoRegistrations;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public KryoSerializer duplicate() {
+ return new KryoSerializer<>(this);
+ }
+
+ @Override
+ public T createInstance() {
+ if (Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers())) {
+ return null;
+ } else {
+ checkKryoInitialized();
+ try {
+ return kryo.newInstance(type);
+ } catch (Throwable e) {
+ return null;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T copy(T from) {
+ if (from == null) {
+ return null;
+ }
+
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+ try {
+ return kryo.copy(from);
+ } catch (KryoException ke) {
+ // kryo was unable to copy it, so we do it through serialization:
+ ByteArrayOutputStream baout = new ByteArrayOutputStream();
+ Output output = new Output(baout);
+
+ kryo.writeObject(output, from);
+
+ output.close();
+
+ ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+ Input input = new Input(bain);
+
+ return (T) kryo.readObject(input, from.getClass());
+ }
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+
+ if (target != previousOut) {
+ DataOutputViewStream outputStream = new DataOutputViewStream(target);
+ output = new Output(outputStream);
+ previousOut = target;
+ }
+
+ // Sanity check: Make sure that the output is cleared/has been flushed by the last call
+ // otherwise data might be written multiple times in case of a previous EOFException
+ if (output.position() != 0) {
+ throw new IllegalStateException(
+ "The Kryo Output still contains data from a previous "
+ + "serialize call. It has to be flushed or cleared at the end of the serialize call.");
+ }
+
+ try {
+ kryo.writeClassAndObject(output, record);
+ output.flush();
+ } catch (KryoException ke) {
+ // make sure that the Kryo output buffer is reset in case that we can recover from
+ // the exception (e.g. EOFException which denotes buffer full)
+ output.reset();
+
+ Throwable cause = ke.getCause();
+ if (cause instanceof EOFException) {
+ throw (EOFException) cause;
+ } else {
+ throw ke;
+ }
+ }
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ @Override
+ public void serializeWithKryoVersionHint(
+ T record, DataOutputView target, KryoVersion kryoVersion) throws IOException {
+ switch (kryoVersion) {
+ case DEFAULT:
+ case VERSION_5_X:
+ serialize(record, target);
+ break;
+ case VERSION_2_X:
+ if (backwardCompatibilitySerializer == null) {
+ throw new IOException("Required v2 compatability serializer missing");
+ } else {
+ backwardCompatibilitySerializer.serializeWithKryoVersionHint(
+ record, target, kryoVersion);
+ }
+ break;
+ default:
+ throw new IOException(String.format("Unexpected Kryo version: %s", kryoVersion));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+
+ if (source != previousIn) {
+ DataInputViewStream inputStream = new DataInputViewStream(source);
+ input = new NoFetchingInput5(inputStream);
+ previousIn = source;
+ }
+
+ try {
+ return (T) kryo.readClassAndObject(input);
+ } catch (KryoBufferUnderflowException ke) {
+ // 2023-04-26: Existing Flink code expects a java.io.EOFException in this scenario
+ throw new EOFException(ke.getMessage());
+ } catch (KryoException ke) {
+ Throwable cause = ke.getCause();
+
+ if (cause instanceof EOFException) {
+ throw (EOFException) cause;
+ } else {
+ throw ke;
+ }
+ }
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ @Override
+ public T deserializeWithKryoVersionHint(DataInputView source, KryoVersion kryoVersion)
+ throws IOException {
+ switch (kryoVersion) {
+ case DEFAULT:
+ case VERSION_5_X:
+ return deserialize(source);
+ case VERSION_2_X:
+ if (backwardCompatibilitySerializer == null) {
+ throw new IOException("Required v2 compatability serializer missing");
+ } else {
+ return backwardCompatibilitySerializer.deserializeWithKryoVersionHint(
+ source, kryoVersion);
+ }
+ default:
+ throw new IOException(String.format("Unexpected Kryo version: %s", kryoVersion));
+ }
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ if (CONCURRENT_ACCESS_CHECK) {
+ enterExclusiveThread();
+ }
+
+ try {
+ checkKryoInitialized();
+ if (this.copyInstance == null) {
+ this.copyInstance = createInstance();
+ }
+
+ T tmp = deserialize(copyInstance, source);
+ serialize(tmp, target);
+ } finally {
+ if (CONCURRENT_ACCESS_CHECK) {
+ exitExclusiveThread();
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ int result = type.hashCode();
+ result = 31 * result + (kryoRegistrations.hashCode());
+ result = 31 * result + (defaultSerializers.hashCode());
+ result = 31 * result + (defaultSerializerClasses.hashCode());
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof KryoSerializer) {
+ KryoSerializer> other = (KryoSerializer>) obj;
+
+ return type == other.type
+ && Objects.equals(kryoRegistrations, other.kryoRegistrations)
+ && Objects.equals(defaultSerializerClasses, other.defaultSerializerClasses)
+ && Objects.equals(defaultSerializers, other.defaultSerializers);
+ } else {
+ return false;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the Chill Kryo Serializer which is implicitly added to the classpath via
+ * flink-runtime. Falls back to the default Kryo serializer if it can't be found.
+ *
+ * @return The Kryo serializer instance.
+ */
+ private Kryo getKryoInstance() {
+
+ try {
+ // check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill
+ // library).
+ // This will be true if Flink's Scala API is used.
+ Class> chillInstantiatorClazz =
+ Class.forName("org.apache.flink.runtime.types.FlinkScalaKryo5Instantiator");
+ Object chillInstantiator = chillInstantiatorClazz.newInstance();
+
+ // obtain a Kryo instance through Twitter Chill
+ Method m = chillInstantiatorClazz.getMethod("newKryo");
+
+ return (Kryo) m.invoke(chillInstantiator);
+ } catch (ClassNotFoundException
+ | InstantiationException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Kryo serializer scala extensions are not available.", e);
+ } else {
+ LOG.info("Kryo serializer scala extensions are not available.");
+ }
+
+ DefaultInstantiatorStrategy initStrategy = new DefaultInstantiatorStrategy();
+ initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+
+ Kryo kryo = new Kryo();
+ kryo.setInstantiatorStrategy(initStrategy);
+
+ if (flinkChillPackageRegistrar != null) {
+ flinkChillPackageRegistrar.registerSerializers(kryo);
+ }
+
+ return kryo;
+ }
+ }
+
+ private void checkKryoInitialized() {
+ if (this.kryo == null) {
+ this.kryo = getKryoInstance();
+
+ // Enable reference tracking.
+ kryo.setReferences(true);
+
+ // Throwable and all subclasses should be serialized via java serialization
+ // Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
+ // This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for
+ // details.
+ // There was a problem with Kryo 2.x JavaSerializer that is fixed in Kryo 5.x
+ kryo.addDefaultSerializer(
+ Throwable.class,
+ new com.esotericsoftware.kryo.kryo5.serializers.JavaSerializer());
+
+ // Add default serializers first, so that the type registrations without a serializer
+ // are registered with a default serializer
+ for (Map.Entry, SerializableKryo5Serializer>> entry :
+ defaultSerializers.entrySet()) {
+ kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
+ }
+
+ for (Map.Entry, Class extends Serializer>>> entry :
+ defaultSerializerClasses.entrySet()) {
+ kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
+ }
+
+ Kryo5Utils.applyRegistrations(
+ this.kryo,
+ kryoRegistrations.values(),
+ flinkChillPackageRegistrar != null
+ ? flinkChillPackageRegistrar.getNextRegistrationId()
+ : kryo.getNextRegistrationId());
+
+ kryo.setRegistrationRequired(false);
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot snapshotConfiguration() {
+ return new KryoSerializerSnapshot<>(
+ type, defaultSerializers, defaultSerializerClasses, kryoRegistrations);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Utility method that takes lists of registered types and their serializers, and resolve them
+ * into a single list such that the result will resemble the final registration result in Kryo.
+ */
+ private static LinkedHashMap buildKryoRegistrations(
+ Class> serializedType,
+ LinkedHashSet> registeredTypes,
+ LinkedHashMap, Class extends Serializer>>>
+ registeredTypesWithSerializerClasses,
+ LinkedHashMap, SerializableKryo5Serializer>>
+ registeredTypesWithSerializers) {
+
+ final LinkedHashMap kryoRegistrations = new LinkedHashMap<>();
+
+ kryoRegistrations.put(serializedType.getName(), new Kryo5Registration(serializedType));
+
+ for (Class> registeredType : checkNotNull(registeredTypes)) {
+ kryoRegistrations.put(registeredType.getName(), new Kryo5Registration(registeredType));
+ }
+
+ for (Map.Entry