diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 7ef70b85606..46890c5ba1f 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -2,7 +2,7 @@ ##################################### # Pekko Actor Reference Config File # -########################3############ +##################################### # This is the reference config file that contains all the default settings. # Make your edits/overrides in your application.conf. diff --git a/persistence/src/main/resources/reference.conf b/persistence/src/main/resources/reference.conf index 435341de456..2c07f51d81f 100644 --- a/persistence/src/main/resources/reference.conf +++ b/persistence/src/main/resources/reference.conf @@ -1,8 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 -########################################################### +############################################################ # Pekko Persistence Extension Reference Configuration File # -########################################################### +############################################################ # This is the reference config file that contains all the default settings. # Make your edits in your application.conf in order to override these settings. @@ -42,6 +42,11 @@ pekko.persistence { plugin = "" # List of snapshot stores to start automatically. Use "" for the default snapshot store. auto-start-snapshot-stores = [] + # When migrating from using Akka Persistence to using Pekko Persistence, + # you may need to have the serializer handle Akka or Pekko created snapshots. + # Supported values are "pekko", "akka" and "no-migration". + # See https://cwiki.apache.org/confluence/display/PEKKO/Pekko+Akka+Compatibility + auto-migrate-manifest = "pekko" } # used as default-snapshot store if no plugin configured # (see `pekko.persistence.snapshot-store`) diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala index d9926f2a35a..15691025f20 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala @@ -29,15 +29,39 @@ import pekko.util.ByteString.UTF_8 @SerialVersionUID(1L) final case class Snapshot(data: Any) +private[serialization] sealed trait SnapshotAutoMigration + +private[serialization] object SnapshotAutoMigration { + val ConfigName = "pekko.persistence.snapshot-store.auto-migrate-manifest" + + // Ignore the snapshot migration strategy - means that Pekko will not be able to work with snapshots saved by Akka + object NoMigration extends SnapshotAutoMigration + // When saving snapshots, migrate any manifests with `akka` to `org.apache.pekko` + object Pekko extends SnapshotAutoMigration + // When saving snapshots, migrate any manifests with `org.apache.pekko` to `akka` + object Akka extends SnapshotAutoMigration + + def fromString(s: String): SnapshotAutoMigration = s match { + case "no-migration" => NoMigration + case "pekko" => Pekko + case "akka" => Akka + case _ => throw new IllegalArgumentException(s"Unknown snapshot migration strategy: $s") + } +} + /** * [[Snapshot]] serializer. */ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer { + import SnapshotAutoMigration._ override val includeManifest: Boolean = false private lazy val serialization = SerializationExtension(system) + private lazy val migrationStrategy = SnapshotAutoMigration.fromString( + system.settings.config.getString(ConfigName)) + /** * Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching * `org.apache.pekko.serialization.Serializer`. @@ -58,7 +82,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer val out = new ByteArrayOutputStream writeInt(out, snapshotSerializer.identifier) - val ms = Serializers.manifestFor(snapshotSerializer, snapshot) + val ms = migrateManifestIfNecessary(Serializers.manifestFor(snapshotSerializer, snapshot)) if (ms.nonEmpty) out.write(ms.getBytes(UTF_8)) out.toByteArray @@ -77,11 +101,44 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer else { val manifestBytes = new Array[Byte](remaining) in.read(manifestBytes) - new String(manifestBytes, UTF_8) + migrateManifestToPekkoIfNecessary(new String(manifestBytes, UTF_8)) } (serializerId, manifest) } + // when writing the data, we want to allow the serialized data to + // support Akka and Pekko serializers as required by configuration + private def migrateManifestIfNecessary(manifest: String): String = { + migrationStrategy match { + case NoMigration => manifest + case Pekko => + if (manifest.startsWith("akka")) { + manifest.replaceFirst("akka", "org.apache.pekko") + } else { + manifest + } + case Akka => + if (manifest.startsWith("org.apache.pekko")) { + manifest.replaceFirst("org.apache.pekko", "akka") + } else { + manifest + } + } + } + + // when reading the data, we want to force use of the Pekko serializer + private def migrateManifestToPekkoIfNecessary(manifest: String): String = { + migrationStrategy match { + case NoMigration => manifest + case _ => + if (manifest.startsWith("akka")) { + manifest.replaceFirst("akka", "org.apache.pekko") + } else { + manifest + } + } + } + private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = { def serialize() = { val snapshotSerializer = serialization.findSerializerFor(snapshot) @@ -112,14 +169,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer val (serializerId, manifest) = headerFromBinary(headerBytes) - // suggested in https://github.com/scullxbones/pekko-persistence-mongo/pull/14#issuecomment-1847223850 serialization .deserialize(snapshotBytes, serializerId, manifest) - .recoverWith { - case _: NotSerializableException if manifest.startsWith("akka") => - serialization - .deserialize(snapshotBytes, serializerId, manifest.replaceFirst("akka", "org.apache.pekko")) - } .get } diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerMigrationAkkaSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerMigrationAkkaSpec.scala new file mode 100644 index 00000000000..952df9ed9c0 --- /dev/null +++ b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerMigrationAkkaSpec.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.serialization + +import com.typesafe.config.ConfigFactory +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot +import pekko.serialization.SerializationExtension +import pekko.testkit.PekkoSpec + +import java.io.NotSerializableException +import java.util.Base64 + +class SnapshotSerializerMigrationAkkaSpec extends PekkoSpec( + s"${SnapshotAutoMigration.ConfigName}=akka" + ) { + + import SnapshotSerializerTestData._ + + "Snapshot serializer with migration to Akka" should { + "deserialize akka snapshots" in { + val serialization = SerializationExtension(system) + val bytes = Base64.getDecoder.decode(akkaSnapshotData) + val result = serialization.deserialize(bytes, classOf[Snapshot]).get + val deserialized = result.data + deserialized shouldBe a[PersistentFSMSnapshot[_]] + val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]] + persistentFSMSnapshot shouldEqual fsmSnapshot + } + "deserialize pekko snapshots" in { + val serialization = SerializationExtension(system) + val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get + val result = serialization.deserialize(bytes, classOf[Snapshot]).get + val deserialized = result.data + deserialized shouldBe a[PersistentFSMSnapshot[_]] + val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]] + persistentFSMSnapshot shouldEqual fsmSnapshot + } + "serialize snapshot with Akka class name" in { + val serialization = SerializationExtension(system) + val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get + val cfg = ConfigFactory.parseString(s"${SnapshotAutoMigration.ConfigName}=no-migration") + .withFallback(system.settings.config) + val pekkoOnlySystem = ActorSystem("pekko-only-serialization", cfg) + try { + val pekkoOnlySerialization = SerializationExtension(pekkoOnlySystem) + intercept[NotSerializableException] { + pekkoOnlySerialization.deserialize(bytes, classOf[Snapshot]).get + } + } finally { + pekkoOnlySystem.terminate() + } + } + } +} diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerNoMigrationSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerNoMigrationSpec.scala new file mode 100644 index 00000000000..bab9ede5504 --- /dev/null +++ b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerNoMigrationSpec.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.serialization + +import org.apache.pekko +import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot +import pekko.serialization.SerializationExtension +import pekko.testkit.PekkoSpec + +import java.io.NotSerializableException +import java.util.Base64 + +class SnapshotSerializerNoMigrationSpec extends PekkoSpec( + s"${SnapshotAutoMigration.ConfigName}=no-migration" + ) { + + import SnapshotSerializerTestData._ + + "Snapshot serializer with no migration" should { + "deserialize pekko snapshots" in { + val serialization = SerializationExtension(system) + val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get + val result = serialization.deserialize(bytes, classOf[Snapshot]).get + val deserialized = result.data + deserialized shouldBe a[PersistentFSMSnapshot[_]] + val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]] + persistentFSMSnapshot shouldEqual fsmSnapshot + } + "fail to deserialize akka snapshots" in { + val serialization = SerializationExtension(system) + val bytes = Base64.getDecoder.decode(akkaSnapshotData) + intercept[NotSerializableException] { + serialization.deserialize(bytes, classOf[Snapshot]).get + } + } + } +} diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala index 6291aaa61c1..5a383cee499 100644 --- a/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala +++ b/persistence/src/test/scala/org/apache/pekko/persistence/serialization/SnapshotSerializerSpec.scala @@ -18,28 +18,53 @@ package org.apache.pekko.persistence.serialization import org.apache.pekko -import pekko.actor.ActorSystem import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot import pekko.serialization.SerializationExtension import pekko.testkit.PekkoSpec import java.util.Base64 +private[serialization] object SnapshotSerializerTestData { + val fsmSnapshot = PersistentFSMSnapshot[String]("test-identifier", "test-data", None) + // https://github.com/apache/pekko/pull/837#issuecomment-1847320309 + val akkaSnapshotData = + "PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" +} + class SnapshotSerializerSpec extends PekkoSpec { + import SnapshotSerializerTestData._ + "Snapshot serializer" should { "deserialize akka snapshots" in { - val system = ActorSystem() val serialization = SerializationExtension(system) - // https://github.com/apache/pekko/pull/837#issuecomment-1847320309 - val data = - "PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" - val bytes = Base64.getDecoder.decode(data) + val bytes = Base64.getDecoder.decode(akkaSnapshotData) + val result = serialization.deserialize(bytes, classOf[Snapshot]).get + val deserialized = result.data + deserialized shouldBe a[PersistentFSMSnapshot[_]] + val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]] + persistentFSMSnapshot shouldEqual fsmSnapshot + } + "deserialize pekko snapshots" in { + val serialization = SerializationExtension(system) + val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get + val result = serialization.deserialize(bytes, classOf[Snapshot]).get + val deserialized = result.data + deserialized shouldBe a[PersistentFSMSnapshot[_]] + val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]] + persistentFSMSnapshot shouldEqual fsmSnapshot + } + "deserialize pre-saved pekko snapshots" in { + val serialization = SerializationExtension(system) + // this is Pekko encoded snapshot based on https://github.com/apache/pekko/pull/837#issuecomment-1847320309 + val pekkoSnapshotData = + "SAAAAAcAAABvcmcuYXBhY2hlLnBla2tvLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh" + val bytes = Base64.getDecoder.decode(pekkoSnapshotData) val result = serialization.deserialize(bytes, classOf[Snapshot]).get val deserialized = result.data deserialized shouldBe a[PersistentFSMSnapshot[_]] val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]] - persistentFSMSnapshot shouldEqual PersistentFSMSnapshot[String]("test-identifier", "test-data", None) + persistentFSMSnapshot shouldEqual fsmSnapshot } } }