Skip to content

Commit

Permalink
Add new migration strategy for Pekko Persistence snapshots (#1423)
Browse files Browse the repository at this point in the history
* Add new migration strategy for Pekko Persistence snapshots

Update reference.conf

Update SnapshotSerializerSpec.scala

new impl

* add more tests

* add akka mode test

* rename file

* Update SnapshotSerializerMigrationAkkaSpec.scala

* extend akka test

* Update reference.conf

* Update SnapshotSerializer.scala

* rename config

* fix name of tests
  • Loading branch information
pjfanning committed Aug 21, 2024
1 parent 9b8ddc8 commit 0fa7083
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 18 deletions.
2 changes: 1 addition & 1 deletion actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#####################################
# Pekko Actor Reference Config File #
########################3############
#####################################

# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
Expand Down
9 changes: 7 additions & 2 deletions persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# SPDX-License-Identifier: Apache-2.0

###########################################################
############################################################
# Pekko Persistence Extension Reference Configuration File #
###########################################################
############################################################

# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
Expand Down Expand Up @@ -42,6 +42,11 @@ pekko.persistence {
plugin = ""
# List of snapshot stores to start automatically. Use "" for the default snapshot store.
auto-start-snapshot-stores = []
# When migrating from using Akka Persistence to using Pekko Persistence,
# you may need to have the serializer handle Akka or Pekko created snapshots.
# Supported values are "pekko", "akka" and "no-migration".
# See https://cwiki.apache.org/confluence/display/PEKKO/Pekko+Akka+Compatibility
auto-migrate-manifest = "pekko"
}
# used as default-snapshot store if no plugin configured
# (see `pekko.persistence.snapshot-store`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,39 @@ import pekko.util.ByteString.UTF_8
@SerialVersionUID(1L)
final case class Snapshot(data: Any)

private[serialization] sealed trait SnapshotAutoMigration

private[serialization] object SnapshotAutoMigration {
val ConfigName = "pekko.persistence.snapshot-store.auto-migrate-manifest"

// Ignore the snapshot migration strategy - means that Pekko will not be able to work with snapshots saved by Akka
object NoMigration extends SnapshotAutoMigration
// When saving snapshots, migrate any manifests with `akka` to `org.apache.pekko`
object Pekko extends SnapshotAutoMigration
// When saving snapshots, migrate any manifests with `org.apache.pekko` to `akka`
object Akka extends SnapshotAutoMigration

def fromString(s: String): SnapshotAutoMigration = s match {
case "no-migration" => NoMigration
case "pekko" => Pekko
case "akka" => Akka
case _ => throw new IllegalArgumentException(s"Unknown snapshot migration strategy: $s")
}
}

/**
* [[Snapshot]] serializer.
*/
class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import SnapshotAutoMigration._

override val includeManifest: Boolean = false

private lazy val serialization = SerializationExtension(system)

private lazy val migrationStrategy = SnapshotAutoMigration.fromString(
system.settings.config.getString(ConfigName))

/**
* Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching
* `org.apache.pekko.serialization.Serializer`.
Expand All @@ -58,7 +82,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val out = new ByteArrayOutputStream
writeInt(out, snapshotSerializer.identifier)

val ms = Serializers.manifestFor(snapshotSerializer, snapshot)
val ms = migrateManifestIfNecessary(Serializers.manifestFor(snapshotSerializer, snapshot))
if (ms.nonEmpty) out.write(ms.getBytes(UTF_8))

out.toByteArray
Expand All @@ -77,11 +101,44 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
else {
val manifestBytes = new Array[Byte](remaining)
in.read(manifestBytes)
new String(manifestBytes, UTF_8)
migrateManifestToPekkoIfNecessary(new String(manifestBytes, UTF_8))
}
(serializerId, manifest)
}

// when writing the data, we want to allow the serialized data to
// support Akka and Pekko serializers as required by configuration
private def migrateManifestIfNecessary(manifest: String): String = {
migrationStrategy match {
case NoMigration => manifest
case Pekko =>
if (manifest.startsWith("akka")) {
manifest.replaceFirst("akka", "org.apache.pekko")
} else {
manifest
}
case Akka =>
if (manifest.startsWith("org.apache.pekko")) {
manifest.replaceFirst("org.apache.pekko", "akka")
} else {
manifest
}
}
}

// when reading the data, we want to force use of the Pekko serializer
private def migrateManifestToPekkoIfNecessary(manifest: String): String = {
migrationStrategy match {
case NoMigration => manifest
case _ =>
if (manifest.startsWith("akka")) {
manifest.replaceFirst("akka", "org.apache.pekko")
} else {
manifest
}
}
}

private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
def serialize() = {
val snapshotSerializer = serialization.findSerializerFor(snapshot)
Expand Down Expand Up @@ -112,14 +169,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer

val (serializerId, manifest) = headerFromBinary(headerBytes)

// suggested in https://github.com/scullxbones/pekko-persistence-mongo/pull/14#issuecomment-1847223850
serialization
.deserialize(snapshotBytes, serializerId, manifest)
.recoverWith {
case _: NotSerializableException if manifest.startsWith("akka") =>
serialization
.deserialize(snapshotBytes, serializerId, manifest.replaceFirst("akka", "org.apache.pekko"))
}
.get
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.serialization

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec

import java.io.NotSerializableException
import java.util.Base64

class SnapshotSerializerMigrationAkkaSpec extends PekkoSpec(
s"${SnapshotAutoMigration.ConfigName}=akka"
) {

import SnapshotSerializerTestData._

"Snapshot serializer with migration to Akka" should {
"deserialize akka snapshots" in {
val serialization = SerializationExtension(system)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"serialize snapshot with Akka class name" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val cfg = ConfigFactory.parseString(s"${SnapshotAutoMigration.ConfigName}=no-migration")
.withFallback(system.settings.config)
val pekkoOnlySystem = ActorSystem("pekko-only-serialization", cfg)
try {
val pekkoOnlySerialization = SerializationExtension(pekkoOnlySystem)
intercept[NotSerializableException] {
pekkoOnlySerialization.deserialize(bytes, classOf[Snapshot]).get
}
} finally {
pekkoOnlySystem.terminate()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.serialization

import org.apache.pekko
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec

import java.io.NotSerializableException
import java.util.Base64

class SnapshotSerializerNoMigrationSpec extends PekkoSpec(
s"${SnapshotAutoMigration.ConfigName}=no-migration"
) {

import SnapshotSerializerTestData._

"Snapshot serializer with no migration" should {
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"fail to deserialize akka snapshots" in {
val serialization = SerializationExtension(system)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
intercept[NotSerializableException] {
serialization.deserialize(bytes, classOf[Snapshot]).get
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,53 @@
package org.apache.pekko.persistence.serialization

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.fsm.PersistentFSM.PersistentFSMSnapshot
import pekko.serialization.SerializationExtension
import pekko.testkit.PekkoSpec

import java.util.Base64

private[serialization] object SnapshotSerializerTestData {
val fsmSnapshot = PersistentFSMSnapshot[String]("test-identifier", "test-data", None)
// https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val akkaSnapshotData =
"PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
}

class SnapshotSerializerSpec extends PekkoSpec {

import SnapshotSerializerTestData._

"Snapshot serializer" should {
"deserialize akka snapshots" in {
val system = ActorSystem()
val serialization = SerializationExtension(system)
// https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val data =
"PAAAAAcAAABha2thLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
val bytes = Base64.getDecoder.decode(data)
val bytes = Base64.getDecoder.decode(akkaSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pekko snapshots" in {
val serialization = SerializationExtension(system)
val bytes = serialization.serialize(Snapshot(fsmSnapshot)).get
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual fsmSnapshot
}
"deserialize pre-saved pekko snapshots" in {
val serialization = SerializationExtension(system)
// this is Pekko encoded snapshot based on https://github.com/apache/pekko/pull/837#issuecomment-1847320309
val pekkoSnapshotData =
"SAAAAAcAAABvcmcuYXBhY2hlLnBla2tvLnBlcnNpc3RlbmNlLmZzbS5QZXJzaXN0ZW50RlNNJFBlcnNpc3RlbnRGU01TbmFwc2hvdAoPdGVzdC1pZGVudGlmaWVyEg0IFBIJdGVzdC1kYXRh"
val bytes = Base64.getDecoder.decode(pekkoSnapshotData)
val result = serialization.deserialize(bytes, classOf[Snapshot]).get
val deserialized = result.data
deserialized shouldBe a[PersistentFSMSnapshot[_]]
val persistentFSMSnapshot = deserialized.asInstanceOf[PersistentFSMSnapshot[_]]
persistentFSMSnapshot shouldEqual PersistentFSMSnapshot[String]("test-identifier", "test-data", None)
persistentFSMSnapshot shouldEqual fsmSnapshot
}
}
}

0 comments on commit 0fa7083

Please sign in to comment.