diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/JavaSerializer.java deleted file mode 100644 index a5d9deab534497..00000000000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/JavaSerializer.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime.kryo5; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.util.InstantiationUtil; - -import com.esotericsoftware.kryo.kryo5.Kryo; -import com.esotericsoftware.kryo.kryo5.KryoException; -import com.esotericsoftware.kryo.kryo5.Serializer; -import com.esotericsoftware.kryo.kryo5.io.Input; -import com.esotericsoftware.kryo.kryo5.io.Output; -import com.esotericsoftware.kryo.kryo5.util.ObjectMap; - -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -/** - * This is a reimplementation of Kryo's {@link - * com.esotericsoftware.kryo.serializers.JavaSerializer}, that additionally makes sure the {@link - * ObjectInputStream} used for deserialization specifically uses Kryo's registered classloader. - * - *

Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer}, - * in which the wrong classloader may be used for deserialization, leading to {@link - * ClassNotFoundException}s. - * - * @see FLINK-6025 - * @see Known issue with Kryo's - * JavaSerializer - * @param The type to be serialized. - */ -@PublicEvolving -public class JavaSerializer extends Serializer { - - public JavaSerializer() {} - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public void write(Kryo kryo, Output output, T o) { - try { - ObjectMap graphContext = kryo.getGraphContext(); - ObjectOutputStream objectStream = (ObjectOutputStream) graphContext.get(this); - if (objectStream == null) { - objectStream = new ObjectOutputStream(output); - graphContext.put(this, objectStream); - } - objectStream.writeObject(o); - objectStream.flush(); - } catch (Exception ex) { - throw new KryoException("Error during Java serialization.", ex); - } - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public T read(Kryo kryo, Input input, Class aClass) { - try { - ObjectMap graphContext = kryo.getGraphContext(); - ObjectInputStream objectStream = (ObjectInputStream) graphContext.get(this); - if (objectStream == null) { - // make sure we use Kryo's classloader - objectStream = - new InstantiationUtil.ClassLoaderObjectInputStream( - input, kryo.getClassLoader()); - graphContext.put(this, objectStream); - } - return (T) objectStream.readObject(); - } catch (Exception ex) { - throw new KryoException("Error during Java deserialization.", ex); - } - } -} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java index 1c27c224d5722b..d0d73f2dc815ca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo5/KryoSerializer.java @@ -581,7 +581,10 @@ private void checkKryoInitialized() { // Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's. // This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for // details. - kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); + // There was a problem with Kryo 2.x JavaSerializer that is fixed in Kryo 5.x + kryo.addDefaultSerializer( + Throwable.class, + new com.esotericsoftware.kryo.kryo5.serializers.JavaSerializer()); // Add default serializers first, so that the type registrations without a serializer // are registered with a default serializer diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 818851f40f4d9b..7bcff387e3205b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -45,7 +45,6 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo5.JavaSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateBackendOptions; @@ -5535,7 +5534,7 @@ public Object read( * state backend correctly uses a specified Kryo serializer. */ public static class ExceptionThrowingTestKryo5Serializer - extends org.apache.flink.api.java.typeutils.runtime.kryo5.JavaSerializer { + extends com.esotericsoftware.kryo.kryo5.serializers.JavaSerializer { @Override public void write( com.esotericsoftware.kryo.kryo5.Kryo kryo, @@ -5589,7 +5588,7 @@ public Object read( * restored with a {@code Serializer} that was later registered. */ public static class CustomKryo5TestSerializer - extends org.apache.flink.api.java.typeutils.runtime.kryo5.JavaSerializer { + extends com.esotericsoftware.kryo.kryo5.serializers.JavaSerializer { @Override public void write( com.esotericsoftware.kryo.kryo5.Kryo kryo,