Skip to content

Commit

Permalink
akka-loader: telegram alerting
Browse files Browse the repository at this point in the history
  • Loading branch information
Blackmorse committed Jan 30, 2024
1 parent 70ca704 commit 8d6e3fa
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 9 deletions.
5 changes: 5 additions & 0 deletions akka-loader/src/main/resources/application-example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ tokens {
tokenSecret =
}

telegram {
botToken = ""
chatId =
}

database_name = "hattrick"
hattid_web_url = ""

Expand Down
6 changes: 4 additions & 2 deletions akka-loader/src/main/scala/executors/CupExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import chpp.OauthTokens
import chpp.commonmodels.MatchType
import chpp.worlddetails.models.{League, WorldDetails}
import clickhouse.PlayerStatsClickhouseClient
import telegram.LoaderTelegramClient

import scala.concurrent.Future

class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]],
playerStatsClickhouseClient: PlayerStatsClickhouseClient,
worldDetails: WorldDetails
worldDetails: WorldDetails,
telegramClient: LoaderTelegramClient
) (implicit oauthTokens: OauthTokens)
extends TaskExecutorActor(graph, worldDetails, (m => m): Future[Done] => Future[Done]) {
extends TaskExecutorActor(graph, worldDetails, (m => m): Future[Done] => Future[Done], telegramClient) {
override def postProcessLoadedResults(league: League, matValue: Done): Future[_] = {
playerStatsClickhouseClient.join(league, MatchType.CUP_MATCH)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.crobox.clickhouse.internal.QuerySettings
import com.crobox.clickhouse.stream.{ClickhouseSink, Insert}
import com.typesafe.config.Config
import loadergraph.{CupMatchesFlow, LeagueMatchesFlow}
import telegram.LoaderTelegramClient

import javax.inject.{Inject, Singleton}

Expand All @@ -21,7 +22,8 @@ class ExecutorActorFactory @Inject()
val clickhouseClient: ClickhouseClient,
val config: Config,
val hattidClient: PlayerStatsClickhouseClient,
val alltidClient: AlltidClient) {
val alltidClient: AlltidClient,
val telegramClient: LoaderTelegramClient) {
import actorSystem.dispatcher

private implicit val querySettings: QuerySettings = QuerySettings(authentication = Some((
Expand All @@ -33,13 +35,13 @@ class ExecutorActorFactory @Inject()
def createLeagueExecutorActor(worldDetails: WorldDetails, lastMatchesWindow: Int): ActorRef = {
val countryMap = getCountryMap(worldDetails)
val graph = LeagueMatchesFlow.apply(config, countryMap, lastMatchesWindow).toMat(chSink)(Keep.both)
actorSystem.actorOf(Props(new LeagueExecutorActor(graph, chSink, hattidClient, worldDetails, config, alltidClient)))
actorSystem.actorOf(Props(new LeagueExecutorActor(graph, chSink, hattidClient, worldDetails, config, alltidClient, telegramClient)))
}

def createCupExecutorActor(worldDetails: WorldDetails, lastMatchesWindow: Int): ActorRef = {
val countryMap = getCountryMap(worldDetails)
val graph = CupMatchesFlow(config, countryMap, lastMatchesWindow).toMat(chSink)(Keep.right)
actorSystem.actorOf(Props(new CupExecutorActor(graph, hattidClient, worldDetails)))
actorSystem.actorOf(Props(new CupExecutorActor(graph, hattidClient, worldDetails, telegramClient)))
}

private def getCountryMap(worldDetails: WorldDetails): Map[Int, Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.crobox.clickhouse.stream.Insert
import com.typesafe.config.Config
import models.stream.StreamTeam
import promotions.PromotionsCalculator
import telegram.LoaderTelegramClient

import scala.concurrent.Future

Expand All @@ -25,8 +26,9 @@ class LeagueExecutorActor
playerStatsClickhouseClient: PlayerStatsClickhouseClient,
worldDetails: WorldDetails,
config: Config,
alltidClient: AlltidClient)(implicit oauthTokens: OauthTokens)
extends TaskExecutorActor[LeagueMat, (List[StreamTeam], Done)](graph, worldDetails, lm => lm._1.zip(lm._2)) {
alltidClient: AlltidClient,
telegramClient: LoaderTelegramClient)(implicit oauthTokens: OauthTokens)
extends TaskExecutorActor[LeagueMat, (List[StreamTeam], Done)](graph, worldDetails, lm => lm._1.zip(lm._2), telegramClient) {

import context.{dispatcher, system}

Expand Down
6 changes: 5 additions & 1 deletion akka-loader/src/main/scala/executors/TaskExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import chpp.OauthTokens
import chpp.worlddetails.models.{League, WorldDetails}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
import telegram.LoaderTelegramClient
import utils.WorldDetailsSingleRequest

import java.util.Date
Expand All @@ -27,7 +28,8 @@ object TaskExecutorActor {

abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
worldDetails: WorldDetails,
matToFuture: GraphMat => Future[MatValue])
matToFuture: GraphMat => Future[MatValue],
telegramClient: LoaderTelegramClient)
(implicit oauthTokens: OauthTokens)
extends Actor {
private val logger = Logger(LoggerFactory.getLogger(this.getClass))
Expand Down Expand Up @@ -74,6 +76,7 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
logger.error(s"Failed to upload ${task.leagueId}", exception)
self ! ScheduleTask(task.leagueId,
new Date(System.currentTimeMillis() + 30 * 60 * 1000))
telegramClient.sendException("Loader failed at streaming stage", exception)
self ! TaskFinished
case Success(matValue) =>
val updatedLeagueFuture = WorldDetailsSingleRequest.request(leagueId = Some(league.leagueId)).map(_.leagueList.head);
Expand All @@ -83,6 +86,7 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
result.onComplete {
case Failure(exception) =>
logger.error(exception.getMessage, exception)
telegramClient.sendException("Loader failed at post processing stage", exception)
self ! TaskFinished
case Success(_) =>
logger.info(s"(${updatedLeague.leagueId}, ${updatedLeague.leagueName}) successfully loaded")
Expand Down
8 changes: 7 additions & 1 deletion akka-loader/src/main/scala/guice/LoaderModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import chpp.OauthTokens
import com.crobox.clickhouse.ClickhouseClient
import com.google.inject.AbstractModule
import com.typesafe.config.Config
import hattid.telegram.TelegramClient
import hattid.telegram.TelegramClient.TelegramCreds

import javax.inject.{Inject, Singleton}

class LoaderModule(config: Config, actorSystem: ActorSystem) extends AbstractModule {
override def configure(): Unit = {
Expand All @@ -19,5 +23,7 @@ class LoaderModule(config: Config, actorSystem: ActorSystem) extends AbstractMod
bind(classOf[Config]).toInstance(config)
bind(classOf[ClickhouseClient]).toInstance(new ClickhouseClient(Some(config)))
bind(classOf[OauthTokens]).toInstance(oauthTokens)

bind(classOf[LoaderTelegramClient]).toInstance(new LoaderTelegramClient(config.ha))
}
}
}
29 changes: 29 additions & 0 deletions akka-loader/src/main/scala/telegram/LoaderTelegramClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package telegram

import akka.actor.ActorSystem
import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
import hattid.telegram.TelegramClient
import hattid.telegram.TelegramClient.TelegramCreds
import org.slf4j.LoggerFactory

import javax.inject.{Inject, Singleton}

@Singleton
class LoaderTelegramClient @Inject()(config: Config,
implicit val actorSystem: ActorSystem) {
private val logger = Logger(LoggerFactory.getLogger(this.getClass))
def sendException(message: String, e: Throwable): Unit = {
val enabled = config.hasPath("telegram.chatId") && config.hasPath("telegram.botToken")
if (enabled) {
implicit val creds: TelegramCreds = TelegramCreds(config.getString("telegram.chatId"), config.getString("telegram.botToken"))
val stacktrace = e.getStackTrace.mkString("\n")
val text = s"$message: ${e.getMessage} \n\n $stacktrace"
try {
TelegramClient.sendMessage(text.take(4095))
} catch {
case e: Exception => logger.warn("Telegram reporting is not working", e)
}
}
}
}

0 comments on commit 8d6e3fa

Please sign in to comment.