Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from Akka to Pekko #26563

Merged
merged 8 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions admin/app/AppLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import conf.switches.SwitchboardLifecycle
import conf.CachedHealthCheckLifeCycle
import controllers.{AdminControllers, HealthCheck}
import _root_.dfp.DfpDataCacheLifecycle
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import concurrent.BlockingOperations
import contentapi.{CapiHttpClient, ContentApiClient, HttpClient}
import http.{AdminFilters, AdminHttpErrorHandler, CommonGzipFilter}
Expand Down Expand Up @@ -36,7 +36,7 @@ class AppLoader extends FrontendApplicationLoader {
trait AdminServices extends I18nComponents {
def wsClient: WSClient
def akkaAsync: AkkaAsync
def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
implicit val executionContext: ExecutionContext
lazy val capiHttpClient: HttpClient = wire[CapiHttpClient]
lazy val contentApiClient = wire[ContentApiClient]
Expand Down Expand Up @@ -101,7 +101,7 @@ trait AppComponents extends FrontendComponents with AdminControllers with AdminS
DfpApiMetrics.DfpApiErrors,
)

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem

override lazy val httpErrorHandler: HttpErrorHandler = wire[AdminHttpErrorHandler]
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonGzipFilter].filters ++ wire[AdminFilters].filters
Expand Down
2 changes: 1 addition & 1 deletion admin/test/dfp/DfpApiValidationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import concurrent.BlockingOperations
import common.dfp.{GuAdUnit, GuLineItem, GuTargeting, Sponsorship}
import com.google.api.ads.admanager.axis.v202308._
import org.joda.time.DateTime
import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
2 changes: 1 addition & 1 deletion admin/test/services/ParameterStoreServiceTest.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
DanielCliftonGuardian marked this conversation as resolved.
Show resolved Hide resolved
import concurrent.BlockingOperations
import org.scalatest.concurrent.ScalaFutures
import org.mockito.Mockito._
Expand Down
4 changes: 2 additions & 2 deletions applications/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
import common.dfp.DfpAgentLifecycle
Expand Down Expand Up @@ -97,5 +97,5 @@ trait AppComponents extends FrontendComponents with ApplicationsControllers with
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
5 changes: 2 additions & 3 deletions archive/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import http.{CommonFilters, CorsHttpErrorHandler}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
Expand Down Expand Up @@ -49,6 +49,5 @@ trait AppComponents extends FrontendComponents {
override lazy val httpErrorHandler: HttpErrorHandler = wire[CorsHttpErrorHandler]
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
4 changes: 2 additions & 2 deletions article/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _root_.commercial.targeting.TargetingLifecycle
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
import common.Assets.DiscussionExternalAssetsLifecycle
Expand Down Expand Up @@ -98,5 +98,5 @@ trait AppComponents extends FrontendComponents with ArticleControllers with Topi
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.LifecycleComponent
import common.AutoRefresh
import model.{TagDefinition, TagIndexListings}
Expand All @@ -12,7 +12,7 @@ import scala.language.postfixOps
class NewspaperBooksAndSectionsAutoRefresh(
newspaperBookSectionTagAgent: NewspaperBookSectionTagAgent,
newspaperBookTagAgent: NewspaperBookTagAgent,
)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
)(implicit pekkoActorSystem: PekkoActorSystem, executionContext: ExecutionContext)
extends LifecycleComponent {
override def start(): Unit = {
newspaperBookTagAgent.start()
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ val common = library("common")
identityModel,
capiAws,
okhttp,
pekkoActor,
pekkoStream,
) ++ jackson,
TestAssets / mappings ~= filterAssets,
)
Expand Down
6 changes: 3 additions & 3 deletions commercial/app/AppLoader.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.{FrontendApplicationLoader, FrontendBuildInfo, FrontendComponents}
import com.softwaremill.macwire._
import commercial.CommercialLifecycle
Expand Down Expand Up @@ -35,7 +35,7 @@ class AppLoader extends FrontendApplicationLoader {

trait CommercialServices {
def wsClient: WSClient
def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
implicit val executionContext: ExecutionContext

lazy val magentoService = wire[MagentoService]
Expand Down Expand Up @@ -79,5 +79,5 @@ trait AppComponents extends FrontendComponents with CommercialControllers with C
override lazy val httpFilters: Seq[EssentialFilter] = wire[CommonFilters].filters
override lazy val httpRequestHandler: HttpRequestHandler = wire[DevParametersHttpRequestHandler]

def actorSystem: ActorSystem
def pekkoActorSystem: PekkoActorSystem
}
16 changes: 8 additions & 8 deletions commercial/app/model/merchandise/books/BookFinder.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package commercial.model.merchandise.books

import akka.actor.ActorSystem
import akka.pattern.CircuitBreaker
import akka.util.Timeout
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.pattern.CircuitBreaker
import org.apache.pekko.util.Timeout
import commercial.model.feeds.{FeedParseException, FeedReadException, FeedReader, FeedRequest}
import commercial.model.merchandise.Book
import common.{Box, GuLogging}
Expand All @@ -15,10 +15,10 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class BookFinder(actorSystem: ActorSystem, magentoService: MagentoService) extends GuLogging {
class BookFinder(pekkoActorSystem: PekkoActorSystem, magentoService: MagentoService) extends GuLogging {

private implicit val bookActorExecutionContext: ExecutionContext =
actorSystem.dispatchers.lookup("akka.actor.book-lookup")
pekkoActorSystem.dispatchers.lookup("akka.actor.book-lookup")
private implicit val bookActorTimeout: Timeout = 0.2.seconds
private implicit val magentoServiceImplicit = magentoService

Expand Down Expand Up @@ -48,7 +48,7 @@ object BookAgent extends GuLogging {
}
}

class MagentoService(actorSystem: ActorSystem, wsClient: WSClient) extends GuLogging {
class MagentoService(pekkoActorSystem: PekkoActorSystem, wsClient: WSClient) extends GuLogging {

private case class MagentoProperties(oauth: WSSignatureCalculator, urlPrefix: String)

Expand All @@ -72,10 +72,10 @@ class MagentoService(actorSystem: ActorSystem, wsClient: WSClient) extends GuLog
}

private implicit val bookLookupExecutionContext: ExecutionContext =
actorSystem.dispatchers.lookup("akka.actor.book-lookup")
pekkoActorSystem.dispatchers.lookup("akka.actor.book-lookup")

private final val circuitBreaker = new CircuitBreaker(
scheduler = actorSystem.scheduler,
scheduler = pekkoActorSystem.scheduler,
maxFailures = 5,
callTimeout = 3.seconds,
resetTimeout = 5.minutes,
Expand Down
8 changes: 6 additions & 2 deletions common/app/app/FrontendApplicationLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import play.api.routing.Router
import play.filters.csrf.CSRFComponents
import controllers.AssetsComponents
import play.api.{Application, ApplicationLoader, BuiltInComponents, LoggerConfigurator, OptionalDevContext}
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}

trait FrontendApplicationLoader extends ApplicationLoader {

Expand All @@ -36,10 +37,13 @@ trait FrontendComponents

lazy val prefix = "/"

implicit lazy val as = actorSystem
implicit val pekkoActorSystem: PekkoActorSystem = PekkoActorSystem.create()
applicationLifecycle.addStopHook(() => {
pekkoActorSystem.terminate()
})

lazy val jobScheduler = new JobScheduler(appContext)
lazy val akkaAsync = new AkkaAsync(environment, actorSystem)
lazy val akkaAsync = new AkkaAsync(environment, pekkoActorSystem)
lazy val appMetrics = ApplicationMetrics()
lazy val guardianConf = new GuardianConfiguration
lazy val mode = environment.mode
Expand Down
6 changes: 3 additions & 3 deletions common/app/common/AutoRefresh.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package common

import scala.concurrent.duration.FiniteDuration
import akka.actor.{ActorSystem, Cancellable}
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem, Cancellable}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -35,9 +35,9 @@ abstract class AutoRefresh[A](initialDelay: FiniteDuration, interval: FiniteDura
}
}

final def start()(implicit actorSystem: ActorSystem, executionContext: ExecutionContext): Unit = {
final def start()(implicit pekkoActorSystem: PekkoActorSystem, executionContext: ExecutionContext): Unit = {
log.info(s"Starting refresh cycle after $initialDelay repeatedly over $interval delay")
val cancellable = actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, interval) { new Task() }
val cancellable = pekkoActorSystem.scheduler.scheduleWithFixedDelay(initialDelay, interval) { new Task() }
subscription = Some(cancellable)
}

Expand Down
12 changes: 6 additions & 6 deletions common/app/common/Logback/KinesisAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package common.Logback

import java.util.concurrent.ThreadPoolExecutor
import akka.actor.ActorSystem
import akka.dispatch.MessageDispatcher
import akka.pattern.CircuitBreaker
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.dispatch.MessageDispatcher
import org.apache.pekko.pattern.CircuitBreaker
import ch.qos.logback.classic.spi.ILoggingEvent
import com.amazonaws.ClientConfiguration
import com.amazonaws.auth.AWSCredentialsProvider
Expand All @@ -16,16 +16,16 @@ import scala.concurrent.Future
import scala.annotation.nowarn

// LogbackOperationsPool must be wired as a singleton
class LogbackOperationsPool(val actorSystem: ActorSystem) {
val logbackOperations: MessageDispatcher = actorSystem.dispatchers.lookup("akka.logback-operations")
class LogbackOperationsPool(val pekkoActorSystem: PekkoActorSystem) {
val logbackOperations: MessageDispatcher = pekkoActorSystem.dispatchers.lookup("akka.logback-operations")
}

// The KinesisAppender[ILoggingEvent] blocks logging operations on putMessage. This overrides the KinesisAppender api, executing putMessage in an
// independent threadpool
class SafeBlockingKinesisAppender(logbackOperations: LogbackOperationsPool) extends KinesisAppender[ILoggingEvent] {

private val breaker = new CircuitBreaker(
logbackOperations.actorSystem.scheduler,
logbackOperations.pekkoActorSystem.scheduler,
maxFailures = 1,
callTimeout = 1.seconds,
resetTimeout = 10.seconds,
Expand Down
8 changes: 4 additions & 4 deletions common/app/common/akka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import scala.concurrent.duration._
import play.api.{Environment => PlayEnv, Mode}

import scala.concurrent.ExecutionContext
import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}

class AkkaAsync(env: PlayEnv, actorSystem: ActorSystem) {
implicit val ec: ExecutionContext = actorSystem.dispatcher
class AkkaAsync(env: PlayEnv, pekkoActorSystem: PekkoActorSystem) {
implicit val ec: ExecutionContext = pekkoActorSystem.dispatcher

// "apply" isn't expressive and doesn't explain what it does.
// If you were considering using that function, use after1s instead as it doesn't leave any ambiguity.
Expand All @@ -20,6 +20,6 @@ class AkkaAsync(env: PlayEnv, actorSystem: ActorSystem) {
// want to check in
def after(delay: FiniteDuration)(body: => Unit): Unit =
if (env.mode != Mode.Test) {
actorSystem.scheduler.scheduleOnce(delay)(body)
pekkoActorSystem.scheduler.scheduleOnce(delay)(body)
}
}
2 changes: 1 addition & 1 deletion common/app/common/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package common

import java.util.concurrent.TimeoutException
import akka.pattern.CircuitBreakerOpenException
import com.gu.contentapi.client.model.ContentApiError
import com.gu.contentapi.client.model.v1.ErrorResponse
import conf.switches.Switch
Expand All @@ -17,6 +16,7 @@ import play.twirl.api.Html
import model.ApplicationContext
import http.ResultWithPreconnectPreload
import http.HttpPreconnections
import org.apache.pekko.pattern.CircuitBreakerOpenException
import renderers.{DCRLocalConnectException, DCRTimeoutException}

object `package`
Expand Down
8 changes: 4 additions & 4 deletions common/app/concurrent/BlockingOperations.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package concurrent

import akka.actor.ActorSystem
import akka.dispatch.MessageDispatcher
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import org.apache.pekko.dispatch.MessageDispatcher

import scala.concurrent.Future

class BlockingOperations(actorSystem: ActorSystem) {
private val blockingOperations: MessageDispatcher = actorSystem.dispatchers.lookup("akka.blocking-operations")
class BlockingOperations(pekkoActorSystem: PekkoActorSystem) {
private val blockingOperations: MessageDispatcher = pekkoActorSystem.dispatchers.lookup("akka.blocking-operations")

def executeBlocking[T](block: => T): Future[T] = {
Future(block)(blockingOperations)
Expand Down
4 changes: 2 additions & 2 deletions common/app/concurrent/CircuitBreakerRegistry.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package concurrent

import akka.actor.ActorSystem
import akka.pattern.CircuitBreaker
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.pattern.CircuitBreaker
import common.GuLogging

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down
2 changes: 1 addition & 1 deletion common/app/contentapi/ContentApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package contentapi

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import com.github.nscala_time.time.Implicits._
import com.gu.contentapi.client.model._
import com.gu.contentapi.client.model.v1.{Edition => _, _}
Expand Down
4 changes: 2 additions & 2 deletions common/app/renderers/DotcomRenderingService.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package renderers

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import com.gu.contentapi.client.model.v1.{Block, Blocks, Content}
import common.{DCRMetrics, GuLogging}
import concurrent.CircuitBreakerRegistry
Expand Down Expand Up @@ -49,7 +49,7 @@ class DotcomRenderingService extends GuLogging with ResultWithPreconnectPreload

private[this] val circuitBreaker = CircuitBreakerRegistry.withConfig(
name = "dotcom-rendering-client",
system = ActorSystem("dotcom-rendering-client-circuit-breaker"),
system = PekkoActorSystem("dotcom-rendering-client-circuit-breaker"),
maxFailures = Configuration.rendering.circuitBreakerMaxFailures,
callTimeout = Configuration.rendering.timeout.plus(200.millis),
resetTimeout = Configuration.rendering.timeout * 4,
Expand Down
2 changes: 1 addition & 1 deletion common/app/services/ConfigAgentTrait.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package services

import akka.util.Timeout
import app.LifecycleComponent
import com.gu.facia.api.models.{Front, _}
import com.gu.facia.client.ApiClient
Expand All @@ -11,6 +10,7 @@ import conf.Configuration
import fronts.FrontsApi
import model.pressed.CollectionConfig
import model.{ApplicationContext, FrontProperties, SeoDataJson}
import org.apache.pekko.util.Timeout
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.Json

Expand Down
4 changes: 2 additions & 2 deletions common/app/services/indexAutoRefreshes.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import akka.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem => PekkoActorSystem}
import app.LifecycleComponent
import common.AutoRefresh
import model.TagIndexListings
Expand All @@ -9,7 +9,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.language.postfixOps

class IndexListingsLifecycle(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
class IndexListingsLifecycle(implicit pekkoActorSystem: PekkoActorSystem, executionContext: ExecutionContext)
extends LifecycleComponent {
override def start(): Unit = {
KeywordSectionIndexAutoRefresh.start()
Expand Down
2 changes: 1 addition & 1 deletion common/test/concurrent/BlockingOperationsTest.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package concurrent

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down
2 changes: 1 addition & 1 deletion common/test/package.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package test

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem

import java.io.File
import java.net.URL
Expand Down
Loading