From 1e67fc53ad1b880b8fdd98182636b878d41008de Mon Sep 17 00:00:00 2001 From: Mikko Siukola Date: Tue, 14 May 2024 13:21:11 +0300 Subject: [PATCH 1/2] OY-4836 Use separate thread pools instead of global for hakemukset and onr --- .../integration/hakemus/HakemusService.scala | 15 ++++++-- .../henkilo/oppijaNumeroRekisteri.scala | 34 ++++++++++++++----- .../rest/OppijaResourceSpec.scala | 10 +++++- .../rest/VirtaSuoritusResourceSpec.scala | 9 ++++- 4 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala b/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala index ad5324a70..eeabf0b59 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala @@ -30,7 +30,12 @@ import fi.vm.sade.hakurekisteri.integration.kouta.{ KoutaInternalHakukohde } import fi.vm.sade.hakurekisteri.integration.organisaatio.{Organisaatio, OrganisaatioActorRef} -import fi.vm.sade.hakurekisteri.integration.{OphUrlProperties, ServiceConfig, VirkailijaRestClient} +import fi.vm.sade.hakurekisteri.integration.{ + ExecutorUtil, + OphUrlProperties, + ServiceConfig, + VirkailijaRestClient +} import fi.vm.sade.hakurekisteri.rest.support.{HakurekisteriJsonSupport, Query} import fi.vm.sade.properties.OphProperties import org.joda.time.{DateTimeZone, LocalDate} @@ -42,8 +47,7 @@ import java.text.SimpleDateFormat import java.util.Date import java.util.concurrent.TimeUnit import scala.compat.Platform -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} @@ -192,6 +196,11 @@ class HakemusService( )(implicit val system: ActorSystem) extends IHakemusService { + implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( + config.integrations.asyncOperationThreadPoolSize, + getClass.getSimpleName + ) + case class SearchParams( aoOids: Seq[String] = null, asId: String = null, diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala b/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala index 1940b1099..5297ef0f6 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala @@ -3,7 +3,7 @@ package fi.vm.sade.hakurekisteri.integration.henkilo import akka.actor.ActorSystem import akka.event.Logging import fi.vm.sade.hakurekisteri.Config -import fi.vm.sade.hakurekisteri.integration.VirkailijaRestClient +import fi.vm.sade.hakurekisteri.integration.{ExecutorUtil, VirkailijaRestClient} import fi.vm.sade.hakurekisteri.integration.hakemus.HakemusHenkilotiedot import fi.vm.sade.hakurekisteri.integration.mocks.HenkiloMock import org.apache.commons.httpclient.HttpStatus @@ -12,8 +12,7 @@ import org.json4s.{DefaultFormats, _} import support.PersonAliasesProvider import scala.collection.Iterator -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} /** @@ -35,12 +34,7 @@ case class LinkedHenkiloOids( trait IOppijaNumeroRekisteri { def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] - - def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { - fetchLinkedHenkiloOidsMap(henkiloOids) - .map(_.oidToLinkedOids) - .map(PersonOidsWithAliases(henkiloOids, _)) - } + def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] def getByHetu(hetu: String): Future[Henkilo] def fetchHenkilotInBatches(henkiloOids: Set[String]): Future[Map[String, Henkilo]] @@ -64,6 +58,11 @@ class OppijaNumeroRekisteri(client: VirkailijaRestClient, val system: ActorSyste extends IOppijaNumeroRekisteri { private val logger = Logging.getLogger(system, this) + implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( + config.integrations.asyncOperationThreadPoolSize, + getClass.getSimpleName + ) + def fetchInBatches(henkiloOids: Set[String], batchSize: Int) = { val started = System.currentTimeMillis() val batches = henkiloOids.grouped(batchSize).zipWithIndex.toList @@ -86,6 +85,12 @@ class OppijaNumeroRekisteri(client: VirkailijaRestClient, val system: ActorSyste } } + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + override def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] = { if (henkiloOids.isEmpty) { @@ -183,6 +188,17 @@ object MockOppijaNumeroRekisteri extends IOppijaNumeroRekisteri { val henkiloOid = "1.2.246.562.24.58099330694" val linkedTestPersonOids = Seq(henkiloOid, masterOid) + implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( + 1, + getClass.getSimpleName + ) + + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] = { Future.successful({ val oidToLinkedOids = henkiloOids.map { queriedOid => diff --git a/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala b/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala index e9aaba8d0..be3c5792a 100644 --- a/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala +++ b/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala @@ -16,7 +16,8 @@ import fi.vm.sade.hakurekisteri.integration.henkilo.{ Henkilo, IOppijaNumeroRekisteri, LinkedHenkiloOids, - MockPersonAliasesProvider + MockPersonAliasesProvider, + PersonOidsWithAliases } import fi.vm.sade.hakurekisteri.integration.tarjonta._ import fi.vm.sade.hakurekisteri.integration.valintarekisteri.{ @@ -91,6 +92,13 @@ class OppijaResourceSpec ) ) } + + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + override def getByHetu(hetu: String): Future[Henkilo] = { throw new UnsupportedOperationException("Not implemented") } diff --git a/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala b/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala index 21eb58f17..9559cb79e 100644 --- a/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala +++ b/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala @@ -10,7 +10,8 @@ import fi.vm.sade.hakurekisteri.integration.hakemus.{ import fi.vm.sade.hakurekisteri.integration.henkilo.{ Henkilo, IOppijaNumeroRekisteri, - LinkedHenkiloOids + LinkedHenkiloOids, + PersonOidsWithAliases } import fi.vm.sade.hakurekisteri.integration.virta.{ VirtaClient, @@ -143,6 +144,12 @@ class VirtaSuoritusResourceSpec extends ScalatraFunSuite with DispatchSupport wi } } + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + override def getByOids(oids: Set[String]): Future[Map[String, Henkilo]] = Future.successful( Map( ( From d4355031ac997211ce8d50964f9777650b2c8a4c Mon Sep 17 00:00:00 2001 From: Mikko Siukola Date: Tue, 14 May 2024 17:09:06 +0300 Subject: [PATCH 2/2] OY-4836 Backtrack hours instead of days, tweaks to YtlFetchActor --- .../suoritusrekisteri.properties.template | 2 +- .../hakurekisteri/integration/ytl/YtlFetchActor.scala | 8 ++++---- src/main/scala/support/Integrations.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/resources/oph-configuration/suoritusrekisteri.properties.template b/src/main/resources/oph-configuration/suoritusrekisteri.properties.template index b4c9aef31..e366a7802 100644 --- a/src/main/resources/oph-configuration/suoritusrekisteri.properties.template +++ b/src/main/resources/oph-configuration/suoritusrekisteri.properties.template @@ -126,7 +126,7 @@ suoritusrekisteri.koski.update.cronJob={{ suoritusrekisteri_koski_update_cronjob suoritusrekisteri.koski.update.kkHaut={{ suoritusrekisteri_koski_update_kkHaut | default('false') }} suoritusrekisteri.koski.update.toisenAsteenHaut={{ suoritusrekisteri_koski_update_toisenAsteenHaut | default('false') }} suoritusrekisteri.koski.update.jatkuvatHaut={{ suoritusrekisteri_koski_update_jatkuvatHaut | default('false') }} -suoritusrekisteri.modifiedhakemukset.backtrack.days={{ suoritusrekisteri_modifiedhakemukset_backtrack_days | default('2')}} +suoritusrekisteri.modifiedhakemukset.backtrack.hours={{ suoritusrekisteri_modifiedhakemukset_backtrack_hours | default('2')}} suoritusrekisteri.oppijanumerorekisteri-service.max-connections={{ suoritusrekisteri_oppijanumerorekisteriservice_max_connections | default('50')}} suoritusrekisteri.oppijanumerorekisteri-service.max-connection-queue-ms={{ suoritusrekisteri_oppijanumerorekisteriservice_max_connection_queue_ms | default('60000')}} suoritusrekisteri.oppijanumerorekisteri-service.max.oppijat.batch.size={{ suoritusrekisteri_oppijanumerorekisteriservice_max_oppijat_batch_size | default('5000')}} diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala b/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala index 597c574d9..0fb5ae155 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala @@ -1,6 +1,6 @@ package fi.vm.sade.hakurekisteri.integration.ytl -import akka.actor.{Actor, ActorLogging, ActorRef} +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem} import akka.pattern.pipe import fi.vm.sade.hakurekisteri.Config import fi.vm.sade.hakurekisteri.integration.ExecutorUtil @@ -52,7 +52,7 @@ class YtlFetchActor( val minIntervalBetween = 1000 * 60 * 60 * 22 //At least 22 hours between nightly syncs implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( - config.integrations.asyncOperationThreadPoolSize, + 20, getClass.getSimpleName ) @@ -99,7 +99,7 @@ class YtlFetchActor( log.error(t, s"($tunniste) Manual sync for haku ${s.hakuOid} failed...") } log.info(s"Ytl-sync käynnistetty haulle ${s.hakuOid} tunnisteella $tunniste") - resultF pipeTo sender + sender ! tunniste case s: YtlSyncSingle => if (s.needsToBeActiveKkHakuOid.forall(oid => activeKKHakuOids.get().contains(oid))) { val tunniste = s.tunniste @@ -110,7 +110,7 @@ class YtlFetchActor( case Failure(t) => log.error(t, s"($tunniste) Manual sync for person ${s.personOid} failed...") } - log.info(s"Ytl-sync käynnistetty haulle ${s.personOid} tunnisteella $tunniste") + log.info(s"Ytl-sync käynnistetty oidille ${s.personOid} tunnisteella $tunniste") resultF pipeTo sender } else { val infoStr = s"Not ytl-syncing $s because the haku is not an active kk-haku" diff --git a/src/main/scala/support/Integrations.scala b/src/main/scala/support/Integrations.scala index 51f20d965..70741f4dd 100644 --- a/src/main/scala/support/Integrations.scala +++ b/src/main/scala/support/Integrations.scala @@ -550,11 +550,11 @@ class BaseIntegrations(rekisterit: Registers, system: ActorSystem, config: Confi hakemusService.addTrigger(arvosanaTrigger) hakemusService.addTrigger(ytlTrigger) - val daysToBacktrack: Int = - OphUrlProperties.getProperty("suoritusrekisteri.modifiedhakemukset.backtrack.days").toInt + val hoursToBacktrack: Int = + OphUrlProperties.getProperty("suoritusrekisteri.modifiedhakemukset.backtrack.hours").toInt implicit val scheduler = system.scheduler hakemusService.processModifiedHakemukset(modifiedAfter = - new Date(Platform.currentTime - TimeUnit.DAYS.toMillis(daysToBacktrack)) + new Date(Platform.currentTime - TimeUnit.HOURS.toMillis(hoursToBacktrack)) ) val quartzScheduler = StdSchedulerFactory.getDefaultScheduler()