Skip to content

Commit

Permalink
use a single thread pool for the http clients
Browse files Browse the repository at this point in the history
  • Loading branch information
nylonee committed Nov 13, 2023
1 parent 9e4aa95 commit ec9014a
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 66 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ intervals, and cannot be customized

### Ombi/Overseer limitation

While Ombi and Overseer have built-in functionality, there are two problems with this:
While Ombi and Overseer have built-in functionality, there are a few problems with this:

* They are customizable down to 5 minute intervals, so doesn't allow the "real-time" sync that Watchlistarr does
* They rely on Plex tokens, which expire and break the sync if you're not regularly logging into Ombi/Overseer
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/Server.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@

import cats.effect._
import configuration.{Configuration, SystemPropertyReader}
import configuration.{ConfigurationUtils, SystemPropertyReader}
import utils.HttpClient

object Server extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val configReader = SystemPropertyReader
val httpClient = new HttpClient()
val config = new Configuration(configReader, httpClient)(runtime)
val config = ConfigurationUtils.create(configReader, httpClient)

def periodicTask: IO[Unit] =
WatchlistSync.run(config) >>
IO.sleep(config.refreshInterval) >>
config.flatMap(c => WatchlistSync.run(c, httpClient)) >>
config.flatMap(c => IO.sleep(c.refreshInterval)) >>
periodicTask

periodicTask.foreverM.as(ExitCode.Success)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/WatchlistSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ object WatchlistSync {

private val logger = LoggerFactory.getLogger(getClass)

def run(config: Configuration): IO[Unit] = {
def run(config: Configuration, client: HttpClient): IO[Unit] = {

logger.debug("Starting watchlist sync")

for {
watchlistDatas <- config.plexWatchlistUrls.map(fetchWatchlist(config.client)).sequence
watchlistDatas <- config.plexWatchlistUrls.map(fetchWatchlist(client)).sequence
watchlistData = watchlistDatas.fold(Watchlist(Set.empty))(mergeWatchLists)
movies <- fetchMovies(config.client)(config.radarrApiKey, config.radarrBaseUrl, config.radarrBypassIgnored)
series <- fetchSeries(config.client)(config.sonarrApiKey, config.sonarrBaseUrl, config.sonarrBypassIgnored)
movies <- fetchMovies(client)(config.radarrApiKey, config.radarrBaseUrl, config.radarrBypassIgnored)
series <- fetchSeries(client)(config.sonarrApiKey, config.sonarrBaseUrl, config.sonarrBypassIgnored)
allIds = merge(movies, series)
_ <- missingIds(config.client)(config)(allIds, watchlistData.items)
_ <- missingIds(client)(config)(allIds, watchlistData.items)
} yield ()
}

Expand Down
109 changes: 69 additions & 40 deletions src/main/scala/configuration/Configuration.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package configuration

import cats.effect.IO
import cats.effect.unsafe.IORuntime
import io.circe.generic.auto._
import model.{QualityProfile, RootFolder}
import org.http4s.Uri
Expand All @@ -10,21 +9,51 @@ import utils.{ArrUtils, HttpClient}

import scala.concurrent.duration._

class Configuration(configReader: ConfigurationReader, val client: HttpClient)(implicit runtime: IORuntime) {
case class Configuration(
refreshInterval: FiniteDuration,
sonarrBaseUrl: Uri,
sonarrApiKey: String,
sonarrQualityProfileId: Int,
sonarrRootFolder: String,
sonarrBypassIgnored: Boolean,
radarrBaseUrl: Uri,
radarrApiKey: String,
radarrQualityProfileId: Int,
radarrRootFolder: String,
radarrBypassIgnored: Boolean,
plexWatchlistUrls: List[Uri]
)

object ConfigurationUtils {

private val logger = LoggerFactory.getLogger(getClass)

val refreshInterval: FiniteDuration = configReader.getConfigOption(Keys.intervalSeconds).flatMap(_.toIntOption).getOrElse(60).seconds

val (sonarrBaseUrl, sonarrApiKey, sonarrQualityProfileId, sonarrRootFolder) = getSonarrConfig.unsafeRunSync()
val sonarrBypassIgnored: Boolean = configReader.getConfigOption(Keys.sonarrBypassIgnored).exists(_.toBoolean)

val (radarrBaseUrl, radarrApiKey, radarrQualityProfileId, radarrRootFolder) = getRadarrConfig.unsafeRunSync()
val radarrBypassIgnored: Boolean = configReader.getConfigOption(Keys.radarrBypassIgnored).exists(_.toBoolean)

val plexWatchlistUrls: List[Uri] = getPlexWatchlistUrls
def create(configReader: ConfigurationReader, client: HttpClient): IO[Configuration] =
for {
sonarrConfig <- getSonarrConfig(configReader, client)
refreshInterval = configReader.getConfigOption(Keys.intervalSeconds).flatMap(_.toIntOption).getOrElse(60).seconds
(sonarrBaseUrl, sonarrApiKey, sonarrQualityProfileId, sonarrRootFolder) = sonarrConfig
sonarrBypassIgnored = configReader.getConfigOption(Keys.sonarrBypassIgnored).exists(_.toBoolean)
radarrConfig <- getRadarrConfig(configReader, client)
(radarrBaseUrl, radarrApiKey, radarrQualityProfileId, radarrRootFolder) = radarrConfig
radarrBypassIgnored = configReader.getConfigOption(Keys.radarrBypassIgnored).exists(_.toBoolean)
plexWatchlistUrls = getPlexWatchlistUrls(configReader)
} yield Configuration(
refreshInterval,
sonarrBaseUrl,
sonarrApiKey,
sonarrQualityProfileId,
sonarrRootFolder,
sonarrBypassIgnored,
radarrBaseUrl,
radarrApiKey,
radarrQualityProfileId,
radarrRootFolder,
radarrBypassIgnored,
plexWatchlistUrls
)

private def getSonarrConfig: IO[(Uri, String, Int, String)] = {
private def getSonarrConfig(configReader: ConfigurationReader, client: HttpClient): IO[(Uri, String, Int, String)] = {
val url = configReader.getConfigOption(Keys.sonarrBaseUrl).flatMap(Uri.fromString(_).toOption).getOrElse {
val default = "http://localhost:8989"
logger.warn(s"Unable to fetch sonarr baseUrl, using default $default")
Expand All @@ -39,7 +68,7 @@ class Configuration(configReader: ConfigurationReader, val client: HttpClient)(i
selectRootFolder(allRootFolders, configReader.getConfigOption(Keys.sonarrRootFolder))
case Left(err) =>
throwError(s"Unable to connect to Sonarr at $url, with error $err")
}.flatMap ( rootFolder =>
}.flatMap(rootFolder =>
ArrUtils.getToArr(client)(url, apiKey, "qualityprofile").map {
case Right(res) =>
val allQualityProfiles = res.as[List[QualityProfile]].getOrElse(List.empty)
Expand All @@ -51,7 +80,7 @@ class Configuration(configReader: ConfigurationReader, val client: HttpClient)(i
)
}

private def getRadarrConfig: IO[(Uri, String, Int, String)] = {
private def getRadarrConfig(configReader: ConfigurationReader, client: HttpClient): IO[(Uri, String, Int, String)] = {
val url = configReader.getConfigOption(Keys.radarrBaseUrl).flatMap(Uri.fromString(_).toOption).getOrElse {
val default = "http://localhost:7878"
logger.warn(s"Unable to fetch radarr baseUrl, using default $default")
Expand All @@ -67,33 +96,17 @@ class Configuration(configReader: ConfigurationReader, val client: HttpClient)(i
case Left(err) =>
throwError(s"Unable to connect to Radarr at $url, with error $err")
}.flatMap(rootFolder =>
ArrUtils.getToArr(client)(url, apiKey, "qualityprofile").map {
case Right(res) =>
val allQualityProfiles = res.as[List[QualityProfile]].getOrElse(List.empty)
val chosenQualityProfile = configReader.getConfigOption(Keys.radarrQualityProfile)
(url, apiKey, getQualityProfileId(allQualityProfiles, chosenQualityProfile), rootFolder)
case Left(err) =>
throwError(s"Unable to connect to Radarr at $url, with error $err")
}
ArrUtils.getToArr(client)(url, apiKey, "qualityprofile").map {
case Right(res) =>
val allQualityProfiles = res.as[List[QualityProfile]].getOrElse(List.empty)
val chosenQualityProfile = configReader.getConfigOption(Keys.radarrQualityProfile)
(url, apiKey, getQualityProfileId(allQualityProfiles, chosenQualityProfile), rootFolder)
case Left(err) =>
throwError(s"Unable to connect to Radarr at $url, with error $err")
}
)
}

private def selectRootFolder(allRootFolders: List[RootFolder], maybeEnvVariable: Option[String]): String =
(allRootFolders, maybeEnvVariable) match {
case (Nil, _) =>
throwError("Could not find any root folders, check your Sonarr/Radarr settings")
case (_, Some(path)) =>
allRootFolders.filter(_.accessible).find(r => normalizePath(r.path) == normalizePath(path)).map(_.path).getOrElse(
throwError(s"Unable to find root folder $path. Possible values are ${allRootFolders.filter(_.accessible).map(_.path)}")
)
case (_, None) =>
allRootFolders.find(_.accessible).map(_.path).getOrElse(
throwError("Found root folders, but they are not accessible by Sonarr/Radarr")
)
}

private def normalizePath(path: String): String = if (path.endsWith("/") && path.length > 1) path.dropRight(1) else path

private def getQualityProfileId(allProfiles: List[QualityProfile], maybeEnvVariable: Option[String]): Int =
(allProfiles, maybeEnvVariable) match {
case (Nil, _) =>
Expand All @@ -110,7 +123,23 @@ class Configuration(configReader: ConfigurationReader, val client: HttpClient)(i
)
}

private def getPlexWatchlistUrls: List[Uri] =
private def selectRootFolder(allRootFolders: List[RootFolder], maybeEnvVariable: Option[String]): String =
(allRootFolders, maybeEnvVariable) match {
case (Nil, _) =>
throwError("Could not find any root folders, check your Sonarr/Radarr settings")
case (_, Some(path)) =>
allRootFolders.filter(_.accessible).find(r => normalizePath(r.path) == normalizePath(path)).map(_.path).getOrElse(
throwError(s"Unable to find root folder $path. Possible values are ${allRootFolders.filter(_.accessible).map(_.path)}")
)
case (_, None) =>
allRootFolders.find(_.accessible).map(_.path).getOrElse(
throwError("Found root folders, but they are not accessible by Sonarr/Radarr")
)
}

private def normalizePath(path: String): String = if (path.endsWith("/") && path.length > 1) path.dropRight(1) else path

private def getPlexWatchlistUrls(configReader: ConfigurationReader): List[Uri] =
Set(
configReader.getConfigOption(Keys.plexWatchlist1),
configReader.getConfigOption(Keys.plexWatchlist2)
Expand Down Expand Up @@ -145,4 +174,4 @@ class Configuration(configReader: ConfigurationReader, val client: HttpClient)(i
logger.error(message)
throw new IllegalArgumentException(message)
}
}
}
6 changes: 4 additions & 2 deletions src/main/scala/utils/HttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import org.http4s.{Header, Method, Request, Uri}
import org.http4s.ember.client.EmberClientBuilder
import org.typelevel.ci.CIString
import org.http4s.circe._

import scala.concurrent.duration.DurationInt
import org.slf4j.LoggerFactory

class HttpClient {
private val logger = LoggerFactory.getLogger(getClass)

private val clientResource = EmberClientBuilder
.default[IO]
.build
Expand All @@ -19,6 +20,7 @@ class HttpClient {
val requestWithApiKey = apiKey.fold(baseRequest)(key => baseRequest.withHeaders(Header.Raw(CIString("X-Api-Key"), key)))
val requestWithPayload = payload.fold(requestWithApiKey)(p => requestWithApiKey.withEntity(p))

logger.warn(s"Current thread: ${Thread.currentThread()}")
clientResource.use(_.expect[Json](requestWithPayload).attempt)
}
}
28 changes: 14 additions & 14 deletions src/test/scala/configuration/ConfigurationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader()
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.radarrApiKey shouldBe "radarr-api-key"
config.sonarrApiKey shouldBe "sonarr-api-key"
Expand All @@ -32,31 +32,31 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(sonarrApiKey = None)
val mockHttpClient = createMockHttpClient()

an[IllegalArgumentException] should be thrownBy new Configuration(mockConfigReader, mockHttpClient)
an[IllegalArgumentException] should be thrownBy ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
}

it should "fail if missing radarr API key" in {

val mockConfigReader = createMockConfigReader(radarrApiKey = None)
val mockHttpClient = createMockHttpClient()

an[IllegalArgumentException] should be thrownBy new Configuration(mockConfigReader, mockHttpClient)
an[IllegalArgumentException] should be thrownBy ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
}

it should "fail if missing plex watchlist 1 and 2" in {

val mockConfigReader = createMockConfigReader(plexWatchlist1 = None)
val mockHttpClient = createMockHttpClient()

an[IllegalArgumentException] should be thrownBy new Configuration(mockConfigReader, mockHttpClient)
an[IllegalArgumentException] should be thrownBy ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
}

it should "pass if missing plex watchlist 1 but there's a plex watchlist 2" in {

val mockConfigReader = createMockConfigReader(plexWatchlist1 = None, plexWatchlist2 = Some(s"https://rss.plex.tv/2"))
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.plexWatchlistUrls shouldBe inAnyOrder(List(Uri.unsafeFromString("https://rss.plex.tv/2")))
}
Expand All @@ -66,7 +66,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(plexWatchlist1 = Some(s"https://rss.plex.tv/1"), plexWatchlist2 = Some(s"https://rss.plex.tv/2"))
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.plexWatchlistUrls shouldBe inAnyOrder(List(
Uri.unsafeFromString("https://rss.plex.tv/1"),
Expand All @@ -79,7 +79,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader()
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.sonarrRootFolder shouldBe "/data2"
}
Expand All @@ -89,7 +89,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(sonarrRootFolder = Some("/data3"))
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.sonarrRootFolder shouldBe "/data3"
}
Expand All @@ -99,7 +99,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(sonarrRootFolder = Some("/data3/"))
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.sonarrRootFolder shouldBe "/data3"
}
Expand All @@ -109,15 +109,15 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(sonarrRootFolder = Some("/unknown"))
val mockHttpClient = createMockHttpClient()

an[IllegalArgumentException] should be thrownBy new Configuration(mockConfigReader, mockHttpClient)
an[IllegalArgumentException] should be thrownBy ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
}

it should "fetch the first accessible root folder of radarr if none is provided" in {

val mockConfigReader = createMockConfigReader()
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.radarrRootFolder shouldBe "/data2"
}
Expand All @@ -128,7 +128,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(radarrRootFolder = Some("/data3"))
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.radarrRootFolder shouldBe "/data3"
}
Expand All @@ -138,7 +138,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(radarrRootFolder = Some("/data3/"))
val mockHttpClient = createMockHttpClient()

val config = new Configuration(mockConfigReader, mockHttpClient)
val config = ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
noException should be thrownBy config
config.radarrRootFolder shouldBe "/data3"
}
Expand All @@ -148,7 +148,7 @@ class ConfigurationSpec extends AnyFlatSpec with Matchers with MockFactory {
val mockConfigReader = createMockConfigReader(radarrRootFolder = Some("/unknown"))
val mockHttpClient = createMockHttpClient()

an[IllegalArgumentException] should be thrownBy new Configuration(mockConfigReader, mockHttpClient)
an[IllegalArgumentException] should be thrownBy ConfigurationUtils.create(mockConfigReader, mockHttpClient).unsafeRunSync()
}

private def createMockConfigReader(
Expand Down

0 comments on commit ec9014a

Please sign in to comment.