Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(akka): update config and refactor names #26602

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin/app/AppLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AppLoader extends FrontendApplicationLoader {

trait AdminServices extends I18nComponents {
def wsClient: WSClient
def akkaAsync: AkkaAsync
def pekkoAsync: PekkoAsync
def pekkoActorSystem: PekkoActorSystem
implicit val executionContext: ExecutionContext
lazy val capiHttpClient: HttpClient = wire[CapiHttpClient]
Expand Down
4 changes: 2 additions & 2 deletions admin/app/controllers/AdminControllers.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers
import com.softwaremill.macwire._
import common.AkkaAsync
import common.PekkoAsync
import controllers.admin._
import controllers.admin.commercial._
import controllers.cache.{ImageDecacheController, PageDecacheController}
Expand All @@ -12,7 +12,7 @@ import play.api.mvc.ControllerComponents
import services.{OphanApi, ParameterStoreService, RedirectService}

trait AdminControllers {
def akkaAsync: AkkaAsync
def pekkoAsync: PekkoAsync
def wsClient: WSClient
def ophanApi: OphanApi
implicit def appContext: ApplicationContext
Expand Down
12 changes: 6 additions & 6 deletions admin/app/controllers/FrontPressController.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package controllers

import common.{AkkaAsync, ImplicitControllerExecutionContext, GuLogging}
import common.{PekkoAsync, ImplicitControllerExecutionContext, GuLogging}
import jobs.{HighFrequency, LowFrequency, RefreshFrontsJob, StandardFrequency}
import model.ApplicationContext
import play.api.mvc._

class FrontPressController(akkaAsync: AkkaAsync, val controllerComponents: ControllerComponents)(implicit
class FrontPressController(pekkoAsync: PekkoAsync, val controllerComponents: ControllerComponents)(implicit
context: ApplicationContext,
) extends BaseController
with GuLogging
Expand All @@ -18,25 +18,25 @@ class FrontPressController(akkaAsync: AkkaAsync, val controllerComponents: Contr

def queueAllFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
RefreshFrontsJob.runAll(akkaAsync) match {
RefreshFrontsJob.runAll(pekkoAsync) match {
case Some(l) => Ok(s"Pushed ${l.length} fronts to the SQS queue")
case None => InternalServerError("Could not push to the SQS queue, is there an SNS topic set? (frontPressSns)")
}
}

def queueHighFrequencyFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
runJob(RefreshFrontsJob.runFrequency(akkaAsync)(HighFrequency), "high frequency")
runJob(RefreshFrontsJob.runFrequency(pekkoAsync)(HighFrequency), "high frequency")
}

def queueStandardFrequencyFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
runJob(RefreshFrontsJob.runFrequency(akkaAsync)(StandardFrequency), "standard frequency")
runJob(RefreshFrontsJob.runFrequency(pekkoAsync)(StandardFrequency), "standard frequency")
}

def queueLowFrequencyFrontsForPress(): Action[AnyContent] =
Action { implicit request =>
runJob(RefreshFrontsJob.runFrequency(akkaAsync)(LowFrequency), "low frequency")
runJob(RefreshFrontsJob.runFrequency(pekkoAsync)(LowFrequency), "low frequency")
}

private def runJob(didRun: Boolean, jobName: String): Result = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers.admin

import common.{AkkaAsync, GuLogging, ImplicitControllerExecutionContext}
import common.{PekkoAsync, GuLogging, ImplicitControllerExecutionContext}
import conf.switches.Switches.ContentPresser
import model.ApplicationContext
import play.api.libs.ws.WSClient
Expand All @@ -11,7 +11,7 @@ import scala.concurrent.Future

class InteractiveLibrarianController(
wsClient: WSClient,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
val controllerComponents: ControllerComponents,
)(implicit context: ApplicationContext)
extends BaseController
Expand Down
8 changes: 4 additions & 4 deletions admin/app/controllers/admin/R2PressController.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers.admin

import common.{AkkaAsync, GuLogging, ImplicitControllerExecutionContext}
import common.{PekkoAsync, GuLogging, ImplicitControllerExecutionContext}
import model.{ApplicationContext, R2PressMessage}
import play.api.mvc._
import services.{R2PagePressNotifier, R2PressedPageTakedownNotifier, RedirectService}
Expand All @@ -10,7 +10,7 @@ import java.net.URL
import scala.util.Try

class R2PressController(
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
val controllerComponents: ControllerComponents,
)(implicit context: ApplicationContext)
extends BaseController
Expand Down Expand Up @@ -90,15 +90,15 @@ class R2PressController(

private def normaliseAndEnqueueTakedown(url: String): String = {
getVariations(url) match {
case Some(urls) => urls.map(u => R2PressedPageTakedownNotifier.enqueue(akkaAsync)(u)).mkString("\n")
case Some(urls) => urls.map(u => R2PressedPageTakedownNotifier.enqueue(pekkoAsync)(u)).mkString("\n")
case None => s"$url not recognised as a valid url."
}
}

def normaliseAndEnqueuePress(message: R2PressMessage): String = {
val tryUrl = RedirectService.normaliseURL(message.url)
tryUrl match {
case Some(url) => R2PagePressNotifier.enqueue(akkaAsync)(message.copy(url = url))
case Some(url) => R2PagePressNotifier.enqueue(pekkoAsync)(message.copy(url = url))
case None => s"${message.url} not recognised as a valid url."
}
}
Expand Down
4 changes: 2 additions & 2 deletions admin/app/controllers/admin/SwitchboardController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import tools.Store

import scala.concurrent.Future

class SwitchboardController(akkaAsync: AkkaAsync, val controllerComponents: ControllerComponents)(implicit
class SwitchboardController(pekkoAsync: PekkoAsync, val controllerComponents: ControllerComponents)(implicit
context: ApplicationContext,
) extends BaseController
with GuLogging
Expand Down Expand Up @@ -80,7 +80,7 @@ class SwitchboardController(akkaAsync: AkkaAsync, val controllerComponents: Cont
log.info("switches successfully updated")

val changes = updates filterNot { current contains _ }
SwitchNotification.onSwitchChanges(akkaAsync)(requester, Configuration.environment.stage, changes)
SwitchNotification.onSwitchChanges(pekkoAsync)(requester, Configuration.environment.stage, changes)
changes foreach { change =>
log.info(s"Switch change by $requester: $change")
}
Expand Down
6 changes: 3 additions & 3 deletions admin/app/dfp/DfpAdUnitCacheJob.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package dfp

import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import conf.Configuration
import tools.Store

import scala.concurrent.{ExecutionContext, Future}

class DfpAdUnitCacher(val rootAdUnit: Any, val filename: String, dfpApi: DfpApi) extends GuLogging {

def run(akkaAsync: AkkaAsync)(implicit executionContext: ExecutionContext): Future[Unit] =
def run(pekkoAsync: PekkoAsync)(implicit executionContext: ExecutionContext): Future[Unit] =
Future {
akkaAsync {
pekkoAsync {
val adUnits = dfpApi.readActiveAdUnits(rootAdUnit.toString)
if (adUnits.nonEmpty) {
val rows = adUnits.map(adUnit => s"${adUnit.id},${adUnit.path.mkString(",")}")
Expand Down
10 changes: 5 additions & 5 deletions admin/app/dfp/DfpDataCacheLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DfpDataCacheLifecycle(
dfpMobileAppAdUnitCacheJob: DfpMobileAppAdUnitCacheJob,
dfpFacebookIaAdUnitCacheJob: DfpFacebookIaAdUnitCacheJob,
dfpTemplateCreativeCacheJob: DfpTemplateCreativeCacheJob,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
)(implicit ec: ExecutionContext)
extends LifecycleComponent {

Expand Down Expand Up @@ -75,17 +75,17 @@ class DfpDataCacheLifecycle(
new Job[Unit] {
val name: String = "DFP-Ad-Units-Update"
val interval: Int = 60
def run(): Future[Unit] = dfpAdUnitCacheJob.run(akkaAsync)
def run(): Future[Unit] = dfpAdUnitCacheJob.run(pekkoAsync)
},
new Job[Unit] {
val name: String = "DFP-Mobile-Apps-Ad-Units-Update"
val interval: Int = 60
def run(): Future[Unit] = dfpMobileAppAdUnitCacheJob.run(akkaAsync)
def run(): Future[Unit] = dfpMobileAppAdUnitCacheJob.run(pekkoAsync)
},
new Job[Unit] {
val name: String = "DFP-Facebook-IA-Ad-Units-Update"
val interval: Int = 60
def run(): Future[Unit] = dfpFacebookIaAdUnitCacheJob.run(akkaAsync)
def run(): Future[Unit] = dfpFacebookIaAdUnitCacheJob.run(pekkoAsync)
},
new Job[Seq[GuCreativeTemplate]] {
val name: String = "DFP-Creative-Templates-Update"
Expand Down Expand Up @@ -114,7 +114,7 @@ class DfpDataCacheLifecycle(
}
}

akkaAsync.after1s {
pekkoAsync.after1s {
dfpDataCacheJob.refreshAllDfpData()
creativeTemplateAgent.refresh()
dfpTemplateCreativeCacheJob.run()
Expand Down
4 changes: 2 additions & 2 deletions admin/app/jobs/CommercialDfpReporting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.google.api.ads.admanager.axis.v202308.Column.{AD_SERVER_IMPRESSIONS,
import com.google.api.ads.admanager.axis.v202308.DateRangeType.CUSTOM_DATE
import com.google.api.ads.admanager.axis.v202308.Dimension.{CUSTOM_CRITERIA, DATE}
import com.google.api.ads.admanager.axis.v202308._
import common.{AkkaAsync, Box, JobScheduler, GuLogging}
import common.{PekkoAsync, Box, JobScheduler, GuLogging}
import dfp.DfpApi
import play.api.inject.ApplicationLifecycle

Expand Down Expand Up @@ -86,7 +86,7 @@ object CommercialDfpReporting extends GuLogging {
class CommercialDfpReportingLifecycle(
appLifecycle: ApplicationLifecycle,
jobs: JobScheduler,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
dfpApi: DfpApi,
)(implicit ec: ExecutionContext)
extends LifecycleComponent
Expand Down
12 changes: 7 additions & 5 deletions admin/app/jobs/RefreshFrontsJob.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package jobs

import com.gu.facia.api.models.{CommercialPriority, EditorialPriority, EmailPriority, TrainingPriority}
import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import conf.Configuration
import services.{ConfigAgent, FrontPressNotification}

Expand Down Expand Up @@ -36,12 +36,14 @@ object RefreshFrontsJob extends GuLogging {
}
}

def runFrequency(akkaAsync: AkkaAsync)(frontType: FrontType)(implicit executionContext: ExecutionContext): Boolean = {
def runFrequency(
pekkoAsync: PekkoAsync,
)(frontType: FrontType)(implicit executionContext: ExecutionContext): Boolean = {
if (Configuration.aws.frontPressSns.exists(_.nonEmpty)) {
log.info(s"Putting press jobs on Facia Cron $frontType")
for (update <- getAllCronUpdates.filter(_.frontType == frontType)) {
log.info(s"Pressing $update")
FrontPressNotification.sendWithoutSubject(akkaAsync)(update.path)
FrontPressNotification.sendWithoutSubject(pekkoAsync)(update.path)
}
true
} else {
Expand All @@ -52,13 +54,13 @@ object RefreshFrontsJob extends GuLogging {

//This is used by a route in admin to push ALL paths to the facia-press SQS queue.
//The facia-press boxes will start to pick these off one by one, so there is no direct overloading of these boxes
def runAll(akkaAsync: AkkaAsync)(implicit executionContext: ExecutionContext): Option[Seq[Unit]] = {
def runAll(pekkoAsync: PekkoAsync)(implicit executionContext: ExecutionContext): Option[Seq[Unit]] = {
Configuration.aws.frontPressSns.map(Function.const {
log.info("Putting press jobs on Facia Cron (MANUAL REQUEST)")
for (update <- getAllCronUpdates)
yield {
log.info(s"Pressing $update")
FrontPressNotification.sendWithoutSubject(akkaAsync)(update.path)
FrontPressNotification.sendWithoutSubject(pekkoAsync)(update.path)
}
})
}
Expand Down
11 changes: 6 additions & 5 deletions admin/app/model/AdminLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.concurrent.{ExecutionContext, Future}
class AdminLifecycle(
appLifecycle: ApplicationLifecycle,
jobs: JobScheduler,
akkaAsync: AkkaAsync,
pekkoAsync: PekkoAsync,
emailService: EmailService,
fastlyCloudwatchLoadJob: FastlyCloudwatchLoadJob,
r2PagePressJob: R2PagePressJob,
Expand Down Expand Up @@ -68,17 +68,18 @@ class AdminLifecycle(
}

jobs.scheduleEveryNMinutes("FrontPressJobHighFrequency", adminPressJobHighPushRateInMinutes) {
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(akkaAsync)(HighFrequency)
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(pekkoAsync)(HighFrequency)
Future.successful(())
}

jobs.scheduleEveryNMinutes("FrontPressJobStandardFrequency", adminPressJobStandardPushRateInMinutes) {
if (FrontPressJobSwitchStandardFrequency.isSwitchedOn) RefreshFrontsJob.runFrequency(akkaAsync)(StandardFrequency)
if (FrontPressJobSwitchStandardFrequency.isSwitchedOn)
RefreshFrontsJob.runFrequency(pekkoAsync)(StandardFrequency)
Future.successful(())
}

jobs.scheduleEveryNMinutes("FrontPressJobLowFrequency", adminPressJobLowPushRateInMinutes) {
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(akkaAsync)(LowFrequency)
if (FrontPressJobSwitch.isSwitchedOn) RefreshFrontsJob.runFrequency(pekkoAsync)(LowFrequency)
Future.successful(())
}
//every 2, 17, 32, 47 minutes past the hour, on the 9th second past the minute (e.g 13:02:09, 13:17:09)
Expand Down Expand Up @@ -122,7 +123,7 @@ class AdminLifecycle(
descheduleJobs()
scheduleJobs()

akkaAsync.after1s {
pekkoAsync.after1s {
rebuildIndexJob.run()
AssetMetricsCache.run()
LoadBalancer.refresh()
Expand Down
10 changes: 5 additions & 5 deletions admin/app/services/EmailService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.TimeoutException
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.simpleemail._
import com.amazonaws.services.simpleemail.model.{Destination => EmailDestination, _}
import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import conf.Configuration.aws.mandatoryCredentials

import scala.jdk.CollectionConverters._
Expand All @@ -14,15 +14,15 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

class EmailService(akkaAsync: AkkaAsync) extends GuLogging {
class EmailService(pekkoAsync: PekkoAsync) extends GuLogging {

private lazy val client: AmazonSimpleEmailServiceAsync = AmazonSimpleEmailServiceAsyncClient
.asyncBuilder()
.withCredentials(mandatoryCredentials)
.withRegion(conf.Configuration.aws.region)
.build()

val sendAsync = client.sendAsyncEmail(akkaAsync) _
val sendAsync = client.sendAsyncEmail(pekkoAsync) _

def shutdown(): Unit = client.shutdown()

Expand Down Expand Up @@ -75,10 +75,10 @@ class EmailService(akkaAsync: AkkaAsync) extends GuLogging {

private implicit class RichEmailClient(client: AmazonSimpleEmailServiceAsync) {

def sendAsyncEmail(akkaAsync: AkkaAsync)(request: SendEmailRequest): Future[SendEmailResult] = {
def sendAsyncEmail(pekkoAsync: PekkoAsync)(request: SendEmailRequest): Future[SendEmailResult] = {
val promise = Promise[SendEmailResult]()

akkaAsync.after(1.minute) {
pekkoAsync.after(1.minute) {
promise.tryFailure(new TimeoutException(s"Timed out"))
}

Expand Down
6 changes: 3 additions & 3 deletions admin/app/services/R2PagePressNotifier.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package services

import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}
import implicits.R2PressNotification.pressMessageFormatter
import model.R2PressMessage
import play.api.libs.json.Json
Expand All @@ -9,9 +9,9 @@ import scala.concurrent.ExecutionContext

object R2PagePressNotifier extends GuLogging {

def enqueue(akkaAsync: AkkaAsync)(message: R2PressMessage)(implicit executionContext: ExecutionContext): String = {
def enqueue(pekkoAsync: PekkoAsync)(message: R2PressMessage)(implicit executionContext: ExecutionContext): String = {
try {
R2PressNotification.sendWithoutSubject(akkaAsync)(Json.toJson[R2PressMessage](message).toString())
R2PressNotification.sendWithoutSubject(pekkoAsync)(Json.toJson[R2PressMessage](message).toString())
val msg = s"Queued for pressing: ${message.url}."
log.info(msg)
msg
Expand Down
6 changes: 3 additions & 3 deletions admin/app/services/R2PressedPageTakedownNotifier.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package services

import common.{AkkaAsync, GuLogging}
import common.{PekkoAsync, GuLogging}

import scala.concurrent.ExecutionContext

object R2PressedPageTakedownNotifier extends GuLogging {

def enqueue(akkaAsync: AkkaAsync)(path: String)(implicit executionContext: ExecutionContext): String = {
def enqueue(pekkoAsync: PekkoAsync)(path: String)(implicit executionContext: ExecutionContext): String = {
try {
R2PressedPageTakedownNotification.sendWithoutSubject(akkaAsync)(path)
R2PressedPageTakedownNotification.sendWithoutSubject(pekkoAsync)(path)
val msg = s"Queued for takedown: $path"
log.info(msg)
msg
Expand Down
5 changes: 3 additions & 2 deletions admin/conf/application.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

akka {
loggers = ["akka.event.Logging$DefaultLogger", "akka.event.slf4j.Slf4jLogger"]

pekko {
loggers = ["org.apache.pekko.event.Logging$DefaultLogger", "org.apache.pekko.event.slf4j.Slf4jLogger"]
loglevel = WARNING
actor {
default-dispatcher = {
Expand Down
Loading