Skip to content

Commit

Permalink
Add migration for remote storage events (#3132)
Browse files Browse the repository at this point in the history
* Add migration for remote storage events

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Mar 1, 2022
1 parent 8dec70d commit a37c53e
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ActorSystem => ActorSystemClassic}
import akka.cluster.Cluster
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route, RouteResult}
import cats.effect.ExitCode
import ch.epfl.bluebrain.nexus.delta.config.{AppConfig, BuildInfo}
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMonitoring
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
import ch.epfl.bluebrain.nexus.delta.sdk.error.PluginError
import ch.epfl.bluebrain.nexus.delta.sdk.http.StrictEntity
import ch.epfl.bluebrain.nexus.delta.sdk.migration.Migration
import ch.epfl.bluebrain.nexus.delta.sdk.migration.{Migration, RemoteStorageMigration}
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.{Plugin, PluginDef}
import ch.epfl.bluebrain.nexus.delta.service.plugin.PluginsLoader.PluginLoaderConfig
Expand All @@ -30,6 +31,7 @@ import org.slf4j.{Logger, LoggerFactory}
import pureconfig.error.ConfigReaderFailures

import scala.concurrent.duration.DurationInt
import scala.util.Try

object Main extends BIOApp {

Expand Down Expand Up @@ -146,6 +148,22 @@ object Main extends BIOApp {
locator.get[Migration].run.runSyncUnsafe()
}

if (sys.env.contains("MIGRATION_REMOTE_STORAGE")) {
Task
.fromEither {
for {
str <- sys.env
.get("MIGRATION_REMOTE_STORAGE")
.toRight(new IllegalArgumentException("'MIGRATION_REMOTE_STORAGE' must be defined"))
baseUri <- Try(Uri(str)).toEither.flatMap(BaseUri(_))
} yield baseUri
}
.flatMap { baseUri =>
locator.get[RemoteStorageMigration].run(baseUri)
}
.runSyncUnsafe()
}

sys.env.get("DELETE_PERSISTENCE_IDS").foreach { persistenceIds =>
DeletePersistenceIds.delete(persistenceIds.split(",").toSeq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{FileEventExchange, F
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.Storages.{StoragesAggregate, StoragesCache}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.contexts.{storages => storageCtxId, storagesMetadata => storageMetaCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.migration.RemoteStorageMigrationImpl
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{StorageEvent, StorageStatsCollection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes.StoragesRoutes
Expand All @@ -25,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.crypto.Crypto
import ch.epfl.bluebrain.nexus.delta.sdk.eventlog.EventLogUtils.databaseEventLog
import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientConfig, HttpClientWorthRetry}
import ch.epfl.bluebrain.nexus.delta.sdk.migration.RemoteStorageMigration
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.model.identities.ServiceAccount
import ch.epfl.bluebrain.nexus.delta.sdk.model.projects.ApiMappings
Expand Down Expand Up @@ -289,4 +291,10 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
many[EventExchange].ref[StorageEventExchange].ref[FileEventExchange]
many[EventExchange].named("resources").ref[StorageEventExchange].ref[FileEventExchange]
many[EntityType].addSet(Set(EntityType(Storages.moduleType), EntityType(Files.moduleType)))

if (sys.env.contains("MIGRATION_REMOTE_STORAGE")) {
make[RemoteStorageMigration].fromEffect((as: ActorSystem[Nothing], databaseConfig: DatabaseConfig) =>
RemoteStorageMigrationImpl(as, databaseConfig.cassandra)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.migration

import akka.actor.typed.ActorSystem
import akka.persistence.cassandra.reconciler.Reconciliation
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.Storages
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.migration.RemoteStorageMigrationImpl.ToMigrateEvent
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageEvent.{StorageCreated, StorageUpdated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.RemoteDiskStorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageEvent, StorageValue}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.crypto.EncryptionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.migration.RemoteStorageMigration
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.identities.Identity
import ch.epfl.bluebrain.nexus.delta.sdk.model.identities.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.config.CassandraConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.utils.CassandraUtils
import com.datastax.oss.driver.api.core.cql.PreparedStatement
import com.typesafe.scalalogging.Logger
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
import io.circe.parser.parse
import io.circe.syntax.EncoderOps
import io.circe.{Codec, Decoder, Encoder, Error, Json, Printer}
import monix.bio.Task
import pureconfig.ConfigSource
import software.amazon.awssdk.regions.Region

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Path
import java.util.UUID
import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.util.Try

@SuppressWarnings(Array("TryGet"))
@nowarn("cat=unused")
final class RemoteStorageMigrationImpl private (
session: CassandraSession,
config: CassandraConfig,
as: ActorSystem[Nothing]
) extends RemoteStorageMigration {

private val reconciliation = new Reconciliation(as)

private val crypto =
ConfigSource
.fromConfig(as.settings.config)
.at("app")
.at("encryption")
.loadOrThrow[EncryptionConfig]
.crypto

implicit private val configuration: Configuration =
Configuration.default.withDiscriminator(keywords.tpe)

implicit private val subjectCodec: Codec.AsObject[Subject] = deriveConfiguredCodec[Subject]
implicit private val identityCodec: Codec.AsObject[Identity] = deriveConfiguredCodec[Identity]
implicit private val pathEncoder: Encoder[Path] = Encoder.encodeString.contramap(_.toString)
implicit private val pathDecoder: Decoder[Path] = Decoder.decodeString.emapTry(str => Try(Path.of(str)))
implicit private val regionEncoder: Encoder[Region] = Encoder.encodeString.contramap(_.toString)
implicit private val regionDecoder: Decoder[Region] = Decoder.decodeString.map(Region.of)

implicit val jsonSecretEncryptEncoder: Encoder[Secret[Json]] =
Encoder.encodeJson.contramap(Storage.encryptSourceUnsafe(_, crypto))

implicit val stringSecretEncryptEncoder: Encoder[Secret[String]] = Encoder.encodeString.contramap {
case Secret(value) => crypto.encrypt(value).get
}

implicit val jsonSecretDecryptDecoder: Decoder[Secret[Json]] =
Decoder.decodeJson.emap(Storage.decryptSource(_, crypto).toEither.leftMap(_.getMessage))

implicit val stringSecretEncryptDecoder: Decoder[Secret[String]] =
Decoder.decodeString.map(str => Secret(crypto.decrypt(str).get))

implicit private val storageValueCodec: Codec.AsObject[StorageValue] =
deriveConfiguredCodec[StorageValue]
implicit private val storageEventCodec: Codec.AsObject[StorageEvent] =
deriveConfiguredCodec[StorageEvent]

private val logger: Logger = Logger[RemoteStorageMigrationImpl]

private val selectStorageEvents =
s"SELECT persistence_id, event, sequence_nr, timestamp FROM ${config.keyspace}.tag_views where tag_name = ? limit 1000000 allow filtering;"

private val updateMessage: String =
s"UPDATE ${config.keyspace}.messages set event = ? where persistence_id = ? and partition_nr = 0 and sequence_nr = ? and timestamp = ?"

private val deleteTagProgress: String = s"DELETE from ${config.keyspace}.tag_write_progress WHERE persistence_id = ?"

private val printer: Printer = Printer.noSpaces.copy(dropNullValues = true)

def run(newBaseUri: BaseUri): Task[Unit] =
for {
_ <- Task.delay(logger.info("Starting remote storages migration"))
updateStatement <- Task.deferFuture(session.prepare(updateMessage))
deleteTagProgressStatement <- Task.deferFuture(session.prepare(deleteTagProgress))
// It is ok to get them all as we don't have that many storages
allEsEvents <-
Task
.deferFuture(
session.selectAll(selectStorageEvents, Storages.moduleType)
)
.map {
_.map { row =>
ToMigrateEvent(
row.getString("persistence_id"),
row.getByteBuffer("event").array(),
row.getLong("sequence_nr"),
row.getUuid("timestamp")
)
}
}
_ <- Task.delay(logger.info(s"Migration of ${allEsEvents.size} events"))
_ <- Task.delay(logger.info("Updating events"))
modifiedPIds <- allEsEvents.traverseFilter(process(newBaseUri, _, updateStatement))
_ <- Task.delay(logger.info("Deleting tag progress"))
_ <- modifiedPIds.traverse { pid =>
Task.deferFuture(session.executeWrite(deleteTagProgressStatement.bind(pid)))
}
_ <- Task.sleep(10.seconds)
_ <- Task.delay(logger.info("Rebuild tags"))
_ <- modifiedPIds.traverse { pid =>
Task.deferFuture(reconciliation.rebuildTagViewForPersistenceIds(pid))
}
_ <- Task.delay(logger.info(s"${modifiedPIds.size} events have been modified."))
_ <- Task.delay(logger.info("Migrating remote storages is now completed."))
} yield ()

def process(
newBaseUri: BaseUri,
toMigrateEvent: ToMigrateEvent,
updateStatement: PreparedStatement
): Task[Option[String]] = {
Task.delay(logger.info(s"Migrating event for ${toMigrateEvent.persistenceId}")) >>
Task.fromEither(migrateEvent(newBaseUri, toMigrateEvent.data)).flatMap { newEvent =>
newEvent.fold(Task.none[String]) { event =>
Task
.deferFuture(
session.executeWrite(
updateStatement.bind(
ByteBuffer.wrap(printer.print(event.asJson).getBytes(StandardCharsets.UTF_8)),
toMigrateEvent.persistenceId,
toMigrateEvent.sequenceNr,
toMigrateEvent.timestamp
)
)
)
.as(Some(toMigrateEvent.persistenceId))
}
}
}

def migrateEvent(newBaseUri: BaseUri, payload: Array[Byte]): Either[Error, Option[StorageEvent]] =
parse(new String(payload, StandardCharsets.UTF_8)).flatMap {
_.as[StorageEvent].map {
case sc: StorageCreated =>
sc.value match {
case rd: RemoteDiskStorageValue =>
Some(sc.copy(value = rd.copy(endpoint = newBaseUri)))
case _ => None
}
case su: StorageUpdated =>
su.value match {
case rd: RemoteDiskStorageValue =>
Some(su.copy(value = rd.copy(endpoint = newBaseUri)))
case _ => None
}
case _ => None
}
}

}

object RemoteStorageMigrationImpl {

final case class ToMigrateEvent(persistenceId: String, data: Array[Byte], sequenceNr: java.lang.Long, timestamp: UUID)

def apply(as: ActorSystem[Nothing], config: CassandraConfig): Task[RemoteStorageMigration] =
CassandraUtils.session(as).map { session =>
new RemoteStorageMigrationImpl(
session,
config,
as
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ch.epfl.bluebrain.nexus.delta.sdk.migration

import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import monix.bio.Task

trait RemoteStorageMigration {

def run(newBaseUri: BaseUri): Task[Unit]

}

0 comments on commit a37c53e

Please sign in to comment.