Skip to content

Commit

Permalink
refactor(akka): update config and refactor names (#26602)
Browse files Browse the repository at this point in the history
* update akka config

* add logger path

---------

Co-authored-by: Charlotte <[email protected]>
  • Loading branch information
DanielCliftonGuardian and cemms1 authored Sep 29, 2023
1 parent 710a9f3 commit 5c0b831
Show file tree
Hide file tree
Showing 58 changed files with 155 additions and 148 deletions.
2 changes: 1 addition & 1 deletion admin/app/AppLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AppLoader extends FrontendApplicationLoader {

trait AdminServices extends I18nComponents {
def wsClient: WSClient
def akkaAsync: AkkaAsync
def pekkoAsync: PekkoAsync
def pekkoActorSystem: PekkoActorSystem
implicit val executionContext: ExecutionContext
lazy val capiHttpClient: HttpClient = wire[CapiHttpClient]
Expand Down
4 changes: 2 additions & 2 deletions admin/app/controllers/AdminControllers.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers
import com.softwaremill.macwire._
import common.AkkaAsync
import common.PekkoAsync
import controllers.admin._
import controllers.admin.commercial._
import controllers.cache.{ImageDecacheController, PageDecacheController}
Expand All @@ -12,7 +12,7 @@ import play.api.mvc.ControllerComponents
import services.{OphanApi, ParameterStoreService, RedirectService}

trait AdminControllers {
def akkaAsync: AkkaAsync
def pekkoAsync: PekkoAsync
def wsClient: WSClient
def ophanApi: OphanApi
implicit def appContext: ApplicationContext
Expand Down
12 changes: 6 additions & 6 deletions admin/app/controllers/FrontPressController.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package controllers

import common.{AkkaAsync, ImplicitControllerExecutionContext, GuLogging}
import common.{PekkoAsync, ImplicitControllerExecutionContext, GuLogging}
import jobs.{HighFrequency, LowFrequency, RefreshFrontsJob, StandardFrequency}
import model.ApplicationContext
import play.api.mvc._

class FrontPressController(akkaAsync: AkkaAsync, val controllerComponents: ControllerComponents)(implicit
class FrontPressController(pekkoAsync: PekkoAsync, val controllerComponents: ControllerComponents)(implicit
context: ApplicationContext,
) extends BaseController
with GuLogging
Expand All @@ -18,25 +18,25 @@ class FrontPressController(akkaAsync: AkkaAsync, val controllerComponents: Contr

def queueAllFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
RefreshFrontsJob.runAll(akkaAsync) match {
RefreshFrontsJob.runAll(pekkoAsync) match {
case Some(l) => Ok(s"Pushed ${l.length} fronts to the SQS queue")
case None => InternalServerError("Could not push to the SQS queue, is there an SNS topic set? (frontPressSns)")
}
}

def queueHighFrequencyFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
runJob(RefreshFrontsJob.runFrequency(akkaAsync)(HighFrequency), "high frequency")
runJob(RefreshFrontsJob.runFrequency(pekkoAsync)(HighFrequency), "high frequency")
}

def queueStandardFrequencyFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
runJob(RefreshFrontsJob.runFrequency(akkaAsync)(StandardFrequency), "standard frequency")
runJob(RefreshFrontsJob.runFrequency(pekkoAsync)(StandardFrequency), "standard frequency")
}

def queueLowFrequencyFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
runJob(RefreshFrontsJob.runFrequency(akkaAsync)(LowFrequency), "low frequency")
runJob(RefreshFrontsJob.runFrequency(pekkoAsync)(LowFrequency), "low frequency")
}

private def runJob(didRun: Boolean, jobName: String): Result = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers.admin

import common.{AkkaAsync, GuLogging, ImplicitControllerExecutionContext}
import common.{PekkoAsync, GuLogging, ImplicitControllerExecutionContext}
import conf.switches.Switches.ContentPresser
import model.ApplicationContext
import play.api.libs.ws.WSClient
Expand All @@ -11,7 +11,7 @@ import scala.concurrent.Future

class InteractiveLibrarianController(
wsClient: WSClient,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
val controllerComponents: ControllerComponents,
)(implicit context: ApplicationContext)
extends BaseController
Expand Down
8 changes: 4 additions & 4 deletions admin/app/controllers/admin/R2PressController.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers.admin

import common.{AkkaAsync, GuLogging, ImplicitControllerExecutionContext}
import common.{PekkoAsync, GuLogging, ImplicitControllerExecutionContext}
import model.{ApplicationContext, R2PressMessage}
import play.api.mvc._
import services.{R2PagePressNotifier, R2PressedPageTakedownNotifier, RedirectService}
Expand All @@ -10,7 +10,7 @@ import java.net.URL
import scala.util.Try

class R2PressController(
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
val controllerComponents: ControllerComponents,
)(implicit context: ApplicationContext)
extends BaseController
Expand Down Expand Up @@ -90,15 +90,15 @@ class R2PressController(

private def normaliseAndEnqueueTakedown(url: String): String = {
getVariations(url) match {
case Some(urls) => urls.map(u => R2PressedPageTakedownNotifier.enqueue(akkaAsync)(u)).mkString("\n")
case Some(urls) => urls.map(u => R2PressedPageTakedownNotifier.enqueue(pekkoAsync)(u)).mkString("\n")
case None => s"$url not recognised as a valid url."
}
}

def normaliseAndEnqueuePress(message: R2PressMessage): String = {
val tryUrl = RedirectService.normaliseURL(message.url)
tryUrl match {
case Some(url) => R2PagePressNotifier.enqueue(akkaAsync)(message.copy(url = url))
case Some(url) => R2PagePressNotifier.enqueue(pekkoAsync)(message.copy(url = url))
case None => s"${message.url} not recognised as a valid url."
}
}
Expand Down
4 changes: 2 additions & 2 deletions admin/app/controllers/admin/SwitchboardController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import tools.Store

import scala.concurrent.Future

class SwitchboardController(akkaAsync: AkkaAsync, val controllerComponents: ControllerComponents)(implicit
class SwitchboardController(pekkoAsync: PekkoAsync, val controllerComponents: ControllerComponents)(implicit
context: ApplicationContext,
) extends BaseController
with GuLogging
Expand Down Expand Up @@ -80,7 +80,7 @@ class SwitchboardController(akkaAsync: AkkaAsync, val controllerComponents: Cont
log.info("switches successfully updated")

val changes = updates filterNot { current contains _ }
SwitchNotification.onSwitchChanges(akkaAsync)(requester, Configuration.environment.stage, changes)
SwitchNotification.onSwitchChanges(pekkoAsync)(requester, Configuration.environment.stage, changes)
changes foreach { change =>
log.info(s"Switch change by $requester: $change")
}
Expand Down
6 changes: 3 additions & 3 deletions admin/app/dfp/DfpAdUnitCacheJob.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package dfp

import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import conf.Configuration
import tools.Store

import scala.concurrent.{ExecutionContext, Future}

class DfpAdUnitCacher(val rootAdUnit: Any, val filename: String, dfpApi: DfpApi) extends GuLogging {

def run(akkaAsync: AkkaAsync)(implicit executionContext: ExecutionContext): Future[Unit] =
def run(pekkoAsync: PekkoAsync)(implicit executionContext: ExecutionContext): Future[Unit] =
Future {
akkaAsync {
pekkoAsync {
val adUnits = dfpApi.readActiveAdUnits(rootAdUnit.toString)
if (adUnits.nonEmpty) {
val rows = adUnits.map(adUnit => s"${adUnit.id},${adUnit.path.mkString(",")}")
Expand Down
10 changes: 5 additions & 5 deletions admin/app/dfp/DfpDataCacheLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DfpDataCacheLifecycle(
dfpMobileAppAdUnitCacheJob: DfpMobileAppAdUnitCacheJob,
dfpFacebookIaAdUnitCacheJob: DfpFacebookIaAdUnitCacheJob,
dfpTemplateCreativeCacheJob: DfpTemplateCreativeCacheJob,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
)(implicit ec: ExecutionContext)
extends LifecycleComponent {

Expand Down Expand Up @@ -75,17 +75,17 @@ class DfpDataCacheLifecycle(
new Job[Unit] {
val name: String = "DFP-Ad-Units-Update"
val interval: Int = 60
def run(): Future[Unit] = dfpAdUnitCacheJob.run(akkaAsync)
def run(): Future[Unit] = dfpAdUnitCacheJob.run(pekkoAsync)
},
new Job[Unit] {
val name: String = "DFP-Mobile-Apps-Ad-Units-Update"
val interval: Int = 60
def run(): Future[Unit] = dfpMobileAppAdUnitCacheJob.run(akkaAsync)
def run(): Future[Unit] = dfpMobileAppAdUnitCacheJob.run(pekkoAsync)
},
new Job[Unit] {
val name: String = "DFP-Facebook-IA-Ad-Units-Update"
val interval: Int = 60
def run(): Future[Unit] = dfpFacebookIaAdUnitCacheJob.run(akkaAsync)
def run(): Future[Unit] = dfpFacebookIaAdUnitCacheJob.run(pekkoAsync)
},
new Job[Seq[GuCreativeTemplate]] {
val name: String = "DFP-Creative-Templates-Update"
Expand Down Expand Up @@ -114,7 +114,7 @@ class DfpDataCacheLifecycle(
}
}

akkaAsync.after1s {
pekkoAsync.after1s {
dfpDataCacheJob.refreshAllDfpData()
creativeTemplateAgent.refresh()
dfpTemplateCreativeCacheJob.run()
Expand Down
4 changes: 2 additions & 2 deletions admin/app/jobs/CommercialDfpReporting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.google.api.ads.admanager.axis.v202308.Column.{AD_SERVER_IMPRESSIONS,
import com.google.api.ads.admanager.axis.v202308.DateRangeType.CUSTOM_DATE
import com.google.api.ads.admanager.axis.v202308.Dimension.{CUSTOM_CRITERIA, DATE}
import com.google.api.ads.admanager.axis.v202308._
import common.{AkkaAsync, Box, JobScheduler, GuLogging}
import common.{PekkoAsync, Box, JobScheduler, GuLogging}
import dfp.DfpApi
import play.api.inject.ApplicationLifecycle

Expand Down Expand Up @@ -86,7 +86,7 @@ object CommercialDfpReporting extends GuLogging {
class CommercialDfpReportingLifecycle(
appLifecycle: ApplicationLifecycle,
jobs: JobScheduler,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
dfpApi: DfpApi,
)(implicit ec: ExecutionContext)
extends LifecycleComponent
Expand Down
12 changes: 7 additions & 5 deletions admin/app/jobs/RefreshFrontsJob.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package jobs

import com.gu.facia.api.models.{CommercialPriority, EditorialPriority, EmailPriority, TrainingPriority}
import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import conf.Configuration
import services.{ConfigAgent, FrontPressNotification}

Expand Down Expand Up @@ -36,12 +36,14 @@ object RefreshFrontsJob extends GuLogging {
}
}

def runFrequency(akkaAsync: AkkaAsync)(frontType: FrontType)(implicit executionContext: ExecutionContext): Boolean = {
def runFrequency(
pekkoAsync: PekkoAsync,
)(frontType: FrontType)(implicit executionContext: ExecutionContext): Boolean = {
if (Configuration.aws.frontPressSns.exists(_.nonEmpty)) {
log.info(s"Putting press jobs on Facia Cron $frontType")
for (update <- getAllCronUpdates.filter(_.frontType == frontType)) {
log.info(s"Pressing $update")
FrontPressNotification.sendWithoutSubject(akkaAsync)(update.path)
FrontPressNotification.sendWithoutSubject(pekkoAsync)(update.path)
}
true
} else {
Expand All @@ -52,13 +54,13 @@ object RefreshFrontsJob extends GuLogging {

//This is used by a route in admin to push ALL paths to the facia-press SQS queue.
//The facia-press boxes will start to pick these off one by one, so there is no direct overloading of these boxes
def runAll(akkaAsync: AkkaAsync)(implicit executionContext: ExecutionContext): Option[Seq[Unit]] = {
def runAll(pekkoAsync: PekkoAsync)(implicit executionContext: ExecutionContext): Option[Seq[Unit]] = {
Configuration.aws.frontPressSns.map(Function.const {
log.info("Putting press jobs on Facia Cron (MANUAL REQUEST)")
for (update <- getAllCronUpdates)
yield {
log.info(s"Pressing $update")
FrontPressNotification.sendWithoutSubject(akkaAsync)(update.path)
FrontPressNotification.sendWithoutSubject(pekkoAsync)(update.path)
}
})
}
Expand Down
11 changes: 6 additions & 5 deletions admin/app/model/AdminLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.concurrent.{ExecutionContext, Future}
class AdminLifecycle(
appLifecycle: ApplicationLifecycle,
jobs: JobScheduler,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
emailService: EmailService,
fastlyCloudwatchLoadJob: FastlyCloudwatchLoadJob,
r2PagePressJob: R2PagePressJob,
Expand Down Expand Up @@ -68,17 +68,18 @@ class AdminLifecycle(
}

jobs.scheduleEveryNMinutes("FrontPressJobHighFrequency", adminPressJobHighPushRateInMinutes) {
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(akkaAsync)(HighFrequency)
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(pekkoAsync)(HighFrequency)
Future.successful(())
}

jobs.scheduleEveryNMinutes("FrontPressJobStandardFrequency", adminPressJobStandardPushRateInMinutes) {
if (FrontPressJobSwitchStandardFrequency.isSwitchedOn) RefreshFrontsJob.runFrequency(akkaAsync)(StandardFrequency)
if (FrontPressJobSwitchStandardFrequency.isSwitchedOn)
RefreshFrontsJob.runFrequency(pekkoAsync)(StandardFrequency)
Future.successful(())
}

jobs.scheduleEveryNMinutes("FrontPressJobLowFrequency", adminPressJobLowPushRateInMinutes) {
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(akkaAsync)(LowFrequency)
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(pekkoAsync)(LowFrequency)
Future.successful(())
}
//every 2, 17, 32, 47 minutes past the hour, on the 9th second past the minute (e.g 13:02:09, 13:17:09)
Expand Down Expand Up @@ -122,7 +123,7 @@ class AdminLifecycle(
descheduleJobs()
scheduleJobs()

akkaAsync.after1s {
pekkoAsync.after1s {
rebuildIndexJob.run()
AssetMetricsCache.run()
LoadBalancer.refresh()
Expand Down
10 changes: 5 additions & 5 deletions admin/app/services/EmailService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.TimeoutException
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.simpleemail._
import com.amazonaws.services.simpleemail.model.{Destination => EmailDestination, _}
import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import conf.Configuration.aws.mandatoryCredentials

import scala.jdk.CollectionConverters._
Expand All @@ -14,15 +14,15 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

class EmailService(akkaAsync: AkkaAsync) extends GuLogging {
class EmailService(pekkoAsync: PekkoAsync) extends GuLogging {

private lazy val client: AmazonSimpleEmailServiceAsync = AmazonSimpleEmailServiceAsyncClient
.asyncBuilder()
.withCredentials(mandatoryCredentials)
.withRegion(conf.Configuration.aws.region)
.build()

val sendAsync = client.sendAsyncEmail(akkaAsync) _
val sendAsync = client.sendAsyncEmail(pekkoAsync) _

def shutdown(): Unit = client.shutdown()

Expand Down Expand Up @@ -75,10 +75,10 @@ class EmailService(akkaAsync: AkkaAsync) extends GuLogging {

private implicit class RichEmailClient(client: AmazonSimpleEmailServiceAsync) {

def sendAsyncEmail(akkaAsync: AkkaAsync)(request: SendEmailRequest): Future[SendEmailResult] = {
def sendAsyncEmail(pekkoAsync: PekkoAsync)(request: SendEmailRequest): Future[SendEmailResult] = {
val promise = Promise[SendEmailResult]()

akkaAsync.after(1.minute) {
pekkoAsync.after(1.minute) {
promise.tryFailure(new TimeoutException(s"Timed out"))
}

Expand Down
6 changes: 3 additions & 3 deletions admin/app/services/R2PagePressNotifier.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import implicits.R2PressNotification.pressMessageFormatter
import model.R2PressMessage
import play.api.libs.json.Json
Expand All @@ -9,9 +9,9 @@ import scala.concurrent.ExecutionContext

object R2PagePressNotifier extends GuLogging {

def enqueue(akkaAsync: AkkaAsync)(message: R2PressMessage)(implicit executionContext: ExecutionContext): String = {
def enqueue(pekkoAsync: PekkoAsync)(message: R2PressMessage)(implicit executionContext: ExecutionContext): String = {
try {
R2PressNotification.sendWithoutSubject(akkaAsync)(Json.toJson[R2PressMessage](message).toString())
R2PressNotification.sendWithoutSubject(pekkoAsync)(Json.toJson[R2PressMessage](message).toString())
val msg = s"Queued for pressing: ${message.url}."
log.info(msg)
msg
Expand Down
6 changes: 3 additions & 3 deletions admin/app/services/R2PressedPageTakedownNotifier.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package services

import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}

import scala.concurrent.ExecutionContext

object R2PressedPageTakedownNotifier extends GuLogging {

def enqueue(akkaAsync: AkkaAsync)(path: String)(implicit executionContext: ExecutionContext): String = {
def enqueue(pekkoAsync: PekkoAsync)(path: String)(implicit executionContext: ExecutionContext): String = {
try {
R2PressedPageTakedownNotification.sendWithoutSubject(akkaAsync)(path)
R2PressedPageTakedownNotification.sendWithoutSubject(pekkoAsync)(path)
val msg = s"Queued for takedown: $path"
log.info(msg)
msg
Expand Down
5 changes: 3 additions & 2 deletions admin/conf/application.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

akka {
loggers = ["akka.event.Logging$DefaultLogger", "akka.event.slf4j.Slf4jLogger"]

pekko {
loggers = ["org.apache.pekko.event.Logging$DefaultLogger", "org.apache.pekko.event.slf4j.Slf4jLogger"]
loglevel = WARNING
actor {
default-dispatcher = {
Expand Down
Loading

0 comments on commit 5c0b831

Please sign in to comment.