diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fc62473..6eda382 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -3,7 +3,7 @@ on: [push, pull_request] env: CI: true CI_SNAPSHOT_RELEASE: +publishSigned - SCALA_VERSION: 2.12.11 + SCALA_VERSION: 2.12.12 jobs: validate: name: Scala ${{ matrix.scala }}, Java ${{ matrix.java }} @@ -12,7 +12,7 @@ jobs: fail-fast: false matrix: java: [adopt@1.8, adopt@1.11, adopt@1.14] - scala: [2.12.11, 2.13.1] + scala: [2.12.12, 2.13.3] env: SCALA_VERSION: ${{ matrix.scala }} steps: diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 1a6bb02..ceabb47 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -3,11 +3,11 @@ on: push: branches: - master - tags: + tags: - '*' env: CI: true - SCALA_VERSION: 2.12.11 + SCALA_VERSION: 2.12.12 jobs: release: name: Release @@ -17,7 +17,7 @@ jobs: with: fetch-depth: 0 - uses: olafurpg/setup-scala@v5 - - uses: olafurpg/setup-gpg@v2 + - uses: olafurpg/setup-gpg@v2 - name: Cache Coursier uses: actions/cache@v1 with: @@ -34,7 +34,7 @@ jobs: PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }} PGP_SECRET: ${{ secrets.PGP_SECRET }} SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} - name: Set up Ruby 2.6 uses: actions/setup-ruby@v1 with: diff --git a/.scalafmt.conf b/.scalafmt.conf index e6fc0ce..5194c09 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,18 +1,35 @@ -version = 2.5.3 - -style = default +version = "2.5.3" maxColumn = 100 -// Vertical alignment is pretty, but leads to bigger diffs -align.preset = none +continuationIndent.callSite = 2 + +newlines { + sometimesBeforeColonInMethodReturnType = false +} + +align { + preset = none + arrowEnumeratorGenerator = false + ifWhileOpenParen = false + multiline = false + openParenCallSite = false + openParenDefnSite = false + + tokens = ["%", "%%"] +} danglingParentheses.preset = false -rewrite.rules = [ - AvoidInfix - RedundantBraces - RedundantParens - AsciiSortImports - PreferCurlyFors -] \ No newline at end of file +docstrings = JavaDoc + +rewrite { + rules = [ + AvoidInfix + RedundantBraces + RedundantParens + AsciiSortImports + PreferCurlyFors + ] + redundantBraces.maxLines = 1 +} diff --git a/build.sbt b/build.sbt index 60dabce..ae2086e 100644 --- a/build.sbt +++ b/build.sbt @@ -1,15 +1,12 @@ val catsV = "2.1.1" -val catsEffectV = "2.1.3" - -val fs2V = "2.4.1" - -val circeV = "0.12.3" -val specs2V = "4.9.4" - +val catsEffectV = "2.1.4" +val fs2V = "2.4.4" +val circeV = "0.13.0" +val specs2V = "4.10.3" +// compiler plugins val kindProjectorV = "0.11.0" val betterMonadicForV = "0.3.1" -// Projects lazy val `cosmos4s` = project .in(file(".")) .disablePlugins(MimaPlugin) @@ -79,21 +76,21 @@ lazy val site = project // General Settings lazy val commonSettings = Seq( - scalaVersion := "2.13.1", - crossScalaVersions := Seq(scalaVersion.value, "2.12.11"), + scalaVersion := "2.13.3", + crossScalaVersions := Seq(scalaVersion.value, "2.12.12"), addCompilerPlugin( ("org.typelevel" %% "kind-projector" % kindProjectorV).cross(CrossVersion.full)), addCompilerPlugin("com.olegpy" %% "better-monadic-for" % betterMonadicForV), libraryDependencies ++= Seq( - "com.azure" % "azure-cosmos" % "4.0.1-beta.2", - "org.typelevel" %% "cats-core" % catsV, - "org.typelevel" %% "cats-effect" % catsEffectV, - "co.fs2" %% "fs2-reactive-streams" % fs2V, - "io.circe" %% "circe-core" % circeV, - "io.circe" %% "circe-parser" % circeV, - "io.circe" %% "circe-jackson210" % "0.13.0", - "org.specs2" %% "specs2-core" % specs2V % Test, - "org.specs2" %% "specs2-scalacheck" % specs2V % Test + "com.azure" % "azure-cosmos" % "4.3.2-beta.2", + "org.typelevel" %% "cats-core" % catsV, + "org.typelevel" %% "cats-effect" % catsEffectV, + "co.fs2" %% "fs2-reactive-streams" % fs2V, + "io.circe" %% "circe-core" % circeV, + "io.circe" %% "circe-parser" % circeV, + "io.circe" %% "circe-jackson210" % "0.13.0", + "org.specs2" %% "specs2-core" % specs2V % Test, + "org.specs2" %% "specs2-scalacheck" % specs2V % Test ) ) diff --git a/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala b/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala index cff5b29..8327cb0 100644 --- a/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala +++ b/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala @@ -16,27 +16,26 @@ package com.banno.cosmos4s -import _root_.io.circe._ import cats._ -import cats.implicits._ import cats.effect._ -import _root_.fs2._ - -import _root_.io.circe.jackson._ -import com.fasterxml.jackson.databind.JsonNode - +import cats.implicits._ import com.azure.cosmos._ import com.azure.cosmos.models._ +import com.banno.cosmos4s.types._ +import com.fasterxml.jackson.databind.JsonNode +import fs2.Stream +import io.circe._ +import io.circe.jackson._ trait IndexedCosmosContainer[F[_], K, I, V] { def query( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[F, V] + overrides: QueryOptions => QueryOptions = identity): Stream[F, V] def queryCustom[A: Decoder]( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[F, A] + overrides: QueryOptions => QueryOptions = identity): Stream[F, A] def lookup(partitionKey: K, id: I): F[Option[V]] def insert(partitionKey: K, value: V): F[Option[V]] def replace(partitionKey: K, id: I, value: V): F[Option[V]] @@ -61,26 +60,33 @@ object IndexedCosmosContainer { def impl[F[_]: ConcurrentEffect: ContextShift]( container: CosmosAsyncContainer, - createFeedOptions: Option[F[FeedOptions]] = None) - : IndexedCosmosContainer[F, String, String, Json] = + createFeedOptions: Option[F[QueryOptions]] = None): IndexedCosmosContainer[ + F, + String, + String, + Json] = new BaseImpl[F](container, createFeedOptions) private class BaseImpl[F[_]: ConcurrentEffect: ContextShift]( container: CosmosAsyncContainer, - createFeedOptions: Option[F[FeedOptions]] = None) + createFeedOptions: Option[F[QueryOptions]] = None) extends IndexedCosmosContainer[F, String, String, Json] { - def createFeedOptionsAlways = createFeedOptions.getOrElse(Sync[F].delay(new FeedOptions())) + private def createFeedOptionsAlways: F[QueryOptions] = + createFeedOptions.getOrElse(Sync[F].delay(QueryOptions.default)) + import scala.collection.JavaConverters._ + def query( partitionKey: String, query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[F, Json] = + overrides: QueryOptions => QueryOptions = identity): Stream[F, Json] = queryCustom[Json](partitionKey, query, overrides) + def queryCustom[A: Decoder]( partitionKey: String, query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[F, A] = + overrides: QueryOptions => QueryOptions = identity): Stream[F, A] = Stream .eval(createFeedOptionsAlways) .map(overrides) @@ -90,15 +96,16 @@ object IndexedCosmosContainer { container .queryItems( query, - options.setPartitionKey(new PartitionKey(partitionKey)), + options.build().setPartitionKey(new PartitionKey(partitionKey)), classOf[JsonNode]) .byPage() ) ) } .flatMap(page => Stream.fromIterator(page.getElements().iterator().asScala)) - .map(jacksonToCirce) + .map(jacksonToCirce(_)) .evalMap(_.as[A].liftTo[F]) + def lookup(partitionKey: String, id: String): F[Option[Json]] = cats.data .OptionT( @@ -112,8 +119,9 @@ object IndexedCosmosContainer { )) ) .subflatMap(response => Option(response.getItem())) - .map(jacksonToCirce) + .map(jacksonToCirce(_)) .value + def insert(partitionKey: String, value: Json): F[Option[Json]] = cats.data.OptionT .liftF(Sync[F].delay(new CosmosItemRequestOptions())) @@ -123,6 +131,7 @@ object IndexedCosmosContainer { .subflatMap(response => Option(response.getItem())) .map(jacksonToCirce) .value + def replace(partitionKey: String, id: String, value: Json): F[Option[Json]] = cats.data .OptionT( @@ -134,6 +143,7 @@ object IndexedCosmosContainer { .subflatMap(response => Option(response.getItem())) .map(jacksonToCirce) .value + def upsert(partitionKey: String, value: Json): F[Option[Json]] = cats.data .OptionT( @@ -162,12 +172,12 @@ object IndexedCosmosContainer { def query( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[G, V] = + overrides: QueryOptions => QueryOptions = identity): Stream[G, V] = base.query(partitionKey, query, overrides).translate(fk) def queryCustom[A: Decoder]( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[G, A] = + overrides: QueryOptions => QueryOptions = identity): Stream[G, A] = base.queryCustom[A](partitionKey, query, overrides).translate(fk) def lookup(partitionKey: K, id: I): G[Option[V]] = fk(base.lookup(partitionKey, id)) @@ -188,12 +198,12 @@ object IndexedCosmosContainer { def query( partitionKey: K2, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, V] = + overrides: QueryOptions => QueryOptions): Stream[F, V] = base.query(contra(partitionKey), query, overrides) def queryCustom[A: Decoder]( partitionKey: K2, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, A] = + overrides: QueryOptions => QueryOptions): Stream[F, A] = base.queryCustom(contra(partitionKey), query, overrides) def lookup(partitionKey: K2, id: I): F[Option[V]] = base.lookup(contra(partitionKey), id) @@ -211,12 +221,15 @@ object IndexedCosmosContainer { base: IndexedCosmosContainer[F, K, I, V], contra: I2 => I ) extends IndexedCosmosContainer[F, K, I2, V] { - def query(partitionKey: K, query: String, overrides: FeedOptions => FeedOptions): Stream[F, V] = + def query( + partitionKey: K, + query: String, + overrides: QueryOptions => QueryOptions): Stream[F, V] = base.query(partitionKey, query, overrides) def queryCustom[A: Decoder]( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, A] = + overrides: QueryOptions => QueryOptions): Stream[F, A] = base.queryCustom(partitionKey, query, overrides) def lookup(partitionKey: K, id: I2): F[Option[V]] = base.lookup(partitionKey, contra(id)) @@ -238,14 +251,14 @@ object IndexedCosmosContainer { def query( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, V2] = + overrides: QueryOptions => QueryOptions): Stream[F, V2] = base .query(partitionKey, query, overrides) .evalMap(f) def queryCustom[A: Decoder]( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, A] = + overrides: QueryOptions => QueryOptions): Stream[F, A] = base.queryCustom(partitionKey, query, overrides) def lookup(partitionKey: K, id: I): F[Option[V2]] = base.lookup(partitionKey, id).flatMap(_.traverse(f)) @@ -267,14 +280,14 @@ object IndexedCosmosContainer { def query( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, V2] = + overrides: QueryOptions => QueryOptions): Stream[F, V2] = base .query(partitionKey, query, overrides) .map(f) def queryCustom[A: Decoder]( partitionKey: K, query: String, - overrides: FeedOptions => FeedOptions): Stream[F, A] = + overrides: QueryOptions => QueryOptions): Stream[F, A] = base.queryCustom(partitionKey, query, overrides) def lookup(partitionKey: K, id: I): F[Option[V2]] = base.lookup(partitionKey, id).map(_.map(f)) diff --git a/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala b/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala index e1c414f..489ce0b 100644 --- a/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala +++ b/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala @@ -16,23 +16,21 @@ package com.banno.cosmos4s -import _root_.io.circe._ import cats._ -import cats.implicits._ import cats.effect._ -import _root_.fs2._ - -import _root_.io.circe.jackson._ -import com.fasterxml.jackson.databind.JsonNode - +import cats.implicits._ import com.azure.cosmos._ -import com.azure.cosmos.models._ +import com.banno.cosmos4s.types._ +import com.fasterxml.jackson.databind.JsonNode +import fs2.Stream +import io.circe._ +import io.circe.jackson._ trait RawCosmosContainer[F[_], V] { - def queryRaw(query: String, overrides: FeedOptions => FeedOptions = identity): Stream[F, V] + def queryRaw(query: String, overrides: QueryOptions => QueryOptions = identity): Stream[F, V] def queryCustomRaw[A: Decoder]( query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[F, A] + overrides: QueryOptions => QueryOptions = identity): Stream[F, A] def map[A](f: V => A): RawCosmosContainer[F, A] = new RawCosmosContainer.MapValueRawCosmosContainter(this, f) @@ -45,34 +43,41 @@ trait RawCosmosContainer[F[_], V] { object RawCosmosContainer { def impl[F[_]: ConcurrentEffect: ContextShift]( container: CosmosAsyncContainer, - createFeedOptions: Option[F[FeedOptions]] = None): RawCosmosContainer[F, Json] = - new BaseImpl[F](container, createFeedOptions) + createQueryOptions: Option[F[QueryOptions]] = None): RawCosmosContainer[F, Json] = + new BaseImpl[F](container, createQueryOptions) private class BaseImpl[F[_]: ConcurrentEffect: ContextShift]( container: CosmosAsyncContainer, - createFeedOptions: Option[F[FeedOptions]] = None) + createQueryOptions: Option[F[QueryOptions]] = None) extends RawCosmosContainer[F, Json] { - def createFeedOptionsAlways = createFeedOptions.getOrElse(Sync[F].delay(new FeedOptions())) + + def createQueryOptionsAlways: F[QueryOptions] = + createQueryOptions.getOrElse(Sync[F].delay(QueryOptions.default)) + import scala.collection.JavaConverters._ - def queryRaw(query: String, overrides: FeedOptions => FeedOptions = identity): Stream[F, Json] = + + def queryRaw( + query: String, + overrides: QueryOptions => QueryOptions = identity): Stream[F, Json] = queryCustomRaw[Json](query, overrides) + def queryCustomRaw[A: Decoder]( query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[F, A] = + overrides: QueryOptions => QueryOptions = identity): Stream[F, A] = Stream - .eval(createFeedOptionsAlways) + .eval(createQueryOptionsAlways) .map(overrides) .flatMap { options => ReactorCore.fluxToStream( Sync[F].delay( container - .queryItems(query, options, classOf[JsonNode]) + .queryItems(query, options.build(), classOf[JsonNode]) .byPage() ) ) } .flatMap(page => Stream.fromIterator(page.getElements().iterator().asScala)) - .map(jacksonToCirce) + .map(jacksonToCirce(_)) .evalMap(_.as[A].liftTo[F]) } @@ -80,11 +85,11 @@ object RawCosmosContainer { base: RawCosmosContainer[F, V], fk: F ~> G ) extends RawCosmosContainer[G, V] { - def queryRaw(query: String, overrides: FeedOptions => FeedOptions = identity): Stream[G, V] = + def queryRaw(query: String, overrides: QueryOptions => QueryOptions = identity): Stream[G, V] = base.queryRaw(query, overrides).translate(fk) def queryCustomRaw[A: Decoder]( query: String, - overrides: FeedOptions => FeedOptions = identity): Stream[G, A] = + overrides: QueryOptions => QueryOptions = identity): Stream[G, A] = base.queryCustomRaw(query, overrides).translate(fk) } @@ -92,12 +97,12 @@ object RawCosmosContainer { base: RawCosmosContainer[F, V], f: V => A ) extends RawCosmosContainer[F, A] { - def queryRaw(query: String, overrides: FeedOptions => FeedOptions = identity): Stream[F, A] = + def queryRaw(query: String, overrides: QueryOptions => QueryOptions = identity): Stream[F, A] = base.queryRaw(query, overrides).map(f) def queryCustomRaw[B: Decoder]( query: String, - overrides: FeedOptions => FeedOptions): Stream[F, B] = + overrides: QueryOptions => QueryOptions): Stream[F, B] = base.queryCustomRaw(query, overrides) } @@ -105,12 +110,12 @@ object RawCosmosContainer { base: RawCosmosContainer[F, V], f: V => F[A] ) extends RawCosmosContainer[F, A] { - def queryRaw(query: String, overrides: FeedOptions => FeedOptions = identity): Stream[F, A] = + def queryRaw(query: String, overrides: QueryOptions => QueryOptions = identity): Stream[F, A] = base.queryRaw(query, overrides).evalMap(f) def queryCustomRaw[B: Decoder]( query: String, - overrides: FeedOptions => FeedOptions): Stream[F, B] = + overrides: QueryOptions => QueryOptions): Stream[F, B] = base.queryCustomRaw(query, overrides) } diff --git a/core/src/main/scala/com/banno/cosmos4s/ReactorCore.scala b/core/src/main/scala/com/banno/cosmos4s/ReactorCore.scala index 724b2d3..d84626e 100644 --- a/core/src/main/scala/com/banno/cosmos4s/ReactorCore.scala +++ b/core/src/main/scala/com/banno/cosmos4s/ReactorCore.scala @@ -21,7 +21,7 @@ import cats.effect._ import cats.effect.implicits._ import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import fs2._ +import fs2.Stream import fs2.interop.reactivestreams._ object ReactorCore { @@ -39,6 +39,7 @@ object ReactorCore { Sync[F].raiseError[A](new Throwable("Mono to Effect Conversion failed to produce value"))) { Sync[F].pure }) + def fluxToStream[F[_]: ConcurrentEffect: ContextShift, A](m: F[Flux[A]]): fs2.Stream[F, A] = Stream .eval(m) diff --git a/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala b/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala new file mode 100644 index 0000000..f03af41 --- /dev/null +++ b/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2020 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno.cosmos4s.types + +import com.azure.cosmos.models.CosmosQueryRequestOptions +import cats.implicits._ + +final class QueryOptions private ( + maxDegreeOfParallelism: Option[Int], + maxBufferedItemCount: Option[Int]) + extends Serializable { + + private[this] def copy( + maxDegreeOfParallelism: Option[Int] = maxDegreeOfParallelism, + maxBufferedItemCount: Option[Int] = maxBufferedItemCount): QueryOptions = + new QueryOptions(maxDegreeOfParallelism, maxBufferedItemCount) + + def withMaxDegreeOfParallelism(value: Option[Int]): QueryOptions = + this.copy(maxDegreeOfParallelism = value) + + def withMaxBufferedItemCount(value: Option[Int]): QueryOptions = + this.copy(maxBufferedItemCount = value) + + private val getMaxDegreeOfParallelism: Option[Int] = maxDegreeOfParallelism + private val getMaxBufferedItemCount: Option[Int] = maxBufferedItemCount + + override def toString: String = s"QueryOptions($maxDegreeOfParallelism, $maxBufferedItemCount)" + + override def equals(o: Any): Boolean = + o match { + case x: QueryOptions => + (this.maxDegreeOfParallelism == x.getMaxDegreeOfParallelism) && (this.maxBufferedItemCount == x.getMaxBufferedItemCount) + case _ => false + } + + override def hashCode: Int = + 37 * (37 * (17 + maxDegreeOfParallelism.##) + maxBufferedItemCount.##) + + private[cosmos4s] def build(): CosmosQueryRequestOptions = { + val cosmosQueryOptions = new CosmosQueryRequestOptions() + maxDegreeOfParallelism.foreach(cosmosQueryOptions.setMaxDegreeOfParallelism) + maxBufferedItemCount.foreach(cosmosQueryOptions.setMaxBufferedItemCount) + cosmosQueryOptions + } +} + +object QueryOptions { + val default: QueryOptions = new QueryOptions(None, None) +} diff --git a/project/build.properties b/project/build.properties index 654fe70..0837f7a 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.12 +sbt.version=1.3.13 diff --git a/project/plugins.sbt b/project/plugins.sbt index b5fe49e..c0bf547 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,9 @@ // addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.12") -addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.3") +addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.3") addSbtPlugin("io.chrisdavenport" % "sbt-mima-version-check" % "0.1.2") -addSbtPlugin("io.chrisdavenport" % "sbt-no-publish" % "0.1.0") - -addSbtPlugin("com.47deg" % "sbt-microsites" % "1.1.5") -addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.1.5") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.0") -addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0") +addSbtPlugin("io.chrisdavenport" % "sbt-no-publish" % "0.1.0") +addSbtPlugin("com.47deg" % "sbt-microsites" % "1.2.1") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.2.5") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") +addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")