diff --git a/build.sbt b/build.sbt index 5395db1..2a8df2b 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ val azureCosmosV = "4.28.1" val azureDocumentDBV = "2.6.4" val catsV = "2.8.0" val catsEffectV = "3.3.14" -val circeJackson210V = "0.14.2" +val circeJackson210V = "0.14.0" val documentDBBulkExecV = "2.12.5" val fs2V = "3.3.0" val circeV = "0.14.3" diff --git a/core/src/main/scala/com/banno/cosmos4s/BaseCosmosContainer.scala b/core/src/main/scala/com/banno/cosmos4s/BaseCosmosContainer.scala new file mode 100644 index 0000000..8e2a58d --- /dev/null +++ b/core/src/main/scala/com/banno/cosmos4s/BaseCosmosContainer.scala @@ -0,0 +1,155 @@ +/* + * Copyright 2020 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno.cosmos4s + +import cats.data.OptionT +import cats.effect._ +import cats.syntax.all._ +import com.azure.cosmos.CosmosAsyncContainer +import com.azure.cosmos.models.{CosmosItemRequestOptions, PartitionKey, SqlParameter, SqlQuerySpec} +import com.banno.cosmos4s.types._ +import com.fasterxml.jackson.databind.JsonNode +import fs2.Stream +import collection.JavaConverters._ +import io.circe.jackson._ +import io.circe._ +import com.azure.cosmos.implementation.NotFoundException + +object BaseCosmosContainer { + + val convertJson: Option[JsonNode] => Json = _.fold(Json.Null)(jacksonToCirce) + + def impl[F[_]: Async]( + container: CosmosAsyncContainer + ): BaseCosmosContainer[F] = + new CosmosContainer[F, String, String, Either[FeedResponse, *], Json, ItemResponse[*], Json] { + + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions + ): Stream[F, Either[FeedResponse, Json]] = { + val sqlParams = parameters + .map { + case (key, value) => + new SqlParameter(key, value) + } + .toList + .asJava + val querySpec = new SqlQuerySpec(query, sqlParams) + ReactorCore + .fluxToStream( + Sync[F].delay( + container + .queryItems(querySpec, options.build(), classOf[JsonNode]) + .byPage() + ) + ) + .flatMap { page => + val elements = page.getElements() + val itemStream = + if (elements == null) Stream.empty + else + Stream + .iterable(elements.asScala) + .map(jacksonToCirce(_).asRight[FeedResponse]) + Stream.emit(FeedResponse.fromCosmosResponse(page).asLeft[Json]) ++ itemStream + } + } + + def delete( + partitionKey: String, + id: String, + options: CosmosItemRequestOptions + ): F[ItemResponse[Unit]] = + ReactorCore + .monoToEffect( + Sync[F].delay( + container.deleteItem(id, new PartitionKey(partitionKey), options) + ) + ) + .map(ItemResponse.fromCosmosResponse(_ => ())) + + def insert( + partitionKey: String, + value: Json, + options: CosmosItemRequestOptions + ): F[Option[ItemResponse[Json]]] = + OptionT( + ReactorCore.monoToEffectOpt( + Sync[F].delay( + container.createItem(circeToJackson(value), new PartitionKey(partitionKey), options) + ) + ) + ).map(ItemResponse.fromCosmosResponse(convertJson)).value + + def lookup( + partitionKey: String, + id: String, + options: CosmosItemRequestOptions + ): F[Option[ItemResponse[Json]]] = + OptionT( + ReactorCore + .monoToEffectOpt( + Sync[F].delay( + container.readItem( + id, + new PartitionKey(partitionKey), + options, + classOf[JsonNode] + ) + ) + ) + .recoverWith { case _: NotFoundException => Sync[F].pure(None) } + ) + .map(ItemResponse.fromCosmosResponse(convertJson)) + .value + + def upsert(value: Json, options: CosmosItemRequestOptions): F[Option[ItemResponse[Json]]] = + OptionT( + ReactorCore.monoToEffectOpt( + Sync[F].delay( + container.upsertItem(circeToJackson(value), options) + ) + ) + ) + .map(ItemResponse.fromCosmosResponse(convertJson)) + .value + + def replace( + partitionKey: String, + id: String, + value: Json, + options: CosmosItemRequestOptions + ): F[Option[ItemResponse[Json]]] = + OptionT( + ReactorCore.monoToEffectOpt( + Sync[F].delay( + container.replaceItem( + circeToJackson(value), + id, + new PartitionKey(partitionKey), + options + ) + ) + ) + ) + .map(ItemResponse.fromCosmosResponse(convertJson)) + .value + + } +} diff --git a/core/src/main/scala/com/banno/cosmos4s/CosmosContainer.scala b/core/src/main/scala/com/banno/cosmos4s/CosmosContainer.scala new file mode 100644 index 0000000..c57fb52 --- /dev/null +++ b/core/src/main/scala/com/banno/cosmos4s/CosmosContainer.scala @@ -0,0 +1,433 @@ +/* + * Copyright 2020 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno.cosmos4s + +import cats.{~>, Functor} +import cats.implicits._ +import com.azure.cosmos.models.CosmosItemRequestOptions +import com.banno.cosmos4s.types._ +import fs2.Stream +import cats.Monad +import cats.Traverse +import cats.Applicative + +trait CosmosContainer[F[_], Key, Id, StreamResult[_], QueryValue, SingleResult[_], SingleValue] { + def query( + query: String, + parameters: Map[String, Any] = Map.empty, + options: QueryOptions = QueryOptions.default + ): Stream[F, StreamResult[QueryValue]] + def lookup( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions = new CosmosItemRequestOptions() + ): F[Option[SingleResult[SingleValue]]] + def insert( + partitionKey: Key, + value: SingleValue, + options: CosmosItemRequestOptions = new CosmosItemRequestOptions() + ): F[Option[SingleResult[SingleValue]]] + def replace( + partitionKey: Key, + id: Id, + value: SingleValue, + options: CosmosItemRequestOptions = new CosmosItemRequestOptions() + ): F[Option[SingleResult[SingleValue]]] + def upsert( + value: SingleValue, + options: CosmosItemRequestOptions = new CosmosItemRequestOptions() + ): F[Option[SingleResult[SingleValue]]] + def delete( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions = new CosmosItemRequestOptions() + ): F[SingleResult[Unit]] + + def imapK[G[_]]( + fk: F ~> G + ): CosmosContainer[G, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue] = + new CosmosContainer.IMapKCosmosContainer(this, fk) + def contramapPartitionKey[A]( + f: A => Key + ): CosmosContainer[F, A, Id, StreamResult, QueryValue, SingleResult, SingleValue] = + new CosmosContainer.ContramapPartitionKey(this, f) + def contramapId[A]( + f: A => Id + ): CosmosContainer[F, Key, A, StreamResult, QueryValue, SingleResult, SingleValue] = + new CosmosContainer.ContramapId(this, f) + + def semiInvariantFlatMapSingleResult[A](f: SingleValue => F[A], g: A => SingleValue)(implicit + ev1: Monad[F], + ev2: Traverse[SingleResult] + ): CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, A] = + new CosmosContainer.SemiInvariantFlatMapSingleResult(this, f, g) + + def semiInvariantFlatMapStreamResult[A](f: QueryValue => F[A])(implicit + ev1: Applicative[F], + ev2: Traverse[StreamResult] + ): CosmosContainer[F, Key, Id, StreamResult, A, SingleResult, SingleValue] = + new CosmosContainer.SemiInvariantFlatMapStreamResult(this, f) + + def semiInvariantFlatMap[A](f: SingleValue => F[A], g: A => SingleValue, h: QueryValue => F[A])( + implicit + ev1: Monad[F], + ev2: Traverse[SingleResult], + ev3: Traverse[StreamResult] + ): CosmosContainer[F, Key, Id, StreamResult, A, SingleResult, A] = + new CosmosContainer.SemiInvariantFlatMapStreamResult( + new CosmosContainer.SemiInvariantFlatMapSingleResult(this, f, g), + h + ) + + def dropSingleResultContext[A](f: SingleResult[SingleValue] => A, g: A => SingleValue)(implicit + ev: Functor[F] + ): CosmosContainer[F, Key, Id, StreamResult, QueryValue, cats.Id, A] = + new CosmosContainer.SingleResultMap(this, f, g) + + def dropStreamResultContext[A]( + f: StreamResult[QueryValue] => A + ): CosmosContainer[F, Key, Id, cats.Id, A, SingleResult, SingleValue] = + new CosmosContainer.StreamResultMap(this, f) + + def dropResultContext[A, B]( + f: SingleResult[SingleValue] => A, + g: A => SingleValue, + h: StreamResult[QueryValue] => B + )(implicit ev: Functor[F]): CosmosContainer[F, Key, Id, cats.Id, B, cats.Id, A] = + new CosmosContainer.SingleResultMap(new CosmosContainer.StreamResultMap(this, h), f, g) +} + +object CosmosContainer { + + private class IMapKCosmosContainer[F[_], G[_], Key, Id, StreamResult[_], QueryValue, SingleResult[ + _ + ], SingleValue]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + fk: F ~> G + ) extends CosmosContainer[G, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions = QueryOptions.default + ): Stream[G, StreamResult[QueryValue]] = + base.query(query, parameters, options).translate(fk) + def lookup( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): G[Option[SingleResult[SingleValue]]] = + fk(base.lookup(partitionKey, id, options)) + def insert( + partitionKey: Key, + value: SingleValue, + options: CosmosItemRequestOptions + ): G[Option[SingleResult[SingleValue]]] = + fk(base.insert(partitionKey, value, options)) + def replace( + partitionKey: Key, + id: Id, + value: SingleValue, + options: CosmosItemRequestOptions + ): G[Option[SingleResult[SingleValue]]] = + fk(base.replace(partitionKey, id, value, options)) + def upsert( + value: SingleValue, + options: CosmosItemRequestOptions + ): G[Option[SingleResult[SingleValue]]] = + fk(base.upsert(value, options)) + def delete( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): G[SingleResult[Unit]] = + fk(base.delete(partitionKey, id, options)) + } + + private class ContramapPartitionKey[F[_], Key, Key2, Id, StreamResult[ + _ + ], QueryValue, SingleResult[_], SingleValue]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + contra: Key2 => Key + ) extends CosmosContainer[F, Key2, Id, StreamResult, QueryValue, SingleResult, SingleValue] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions = QueryOptions.default + ): Stream[F, StreamResult[QueryValue]] = + base.query(query, parameters, options) + def lookup( + partitionKey: Key2, + id: Id, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.lookup(contra(partitionKey), id, options) + def insert( + partitionKey: Key2, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.insert(contra(partitionKey), value, options) + def replace( + partitionKey: Key2, + id: Id, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.replace(contra(partitionKey), id, value, options) + def upsert( + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.upsert(value, options) + def delete( + partitionKey: Key2, + id: Id, + options: CosmosItemRequestOptions + ): F[SingleResult[Unit]] = + base.delete(contra(partitionKey), id, options) + } + + private class ContramapId[F[_], Key, Id, Id2, StreamResult[_], QueryValue, SingleResult[ + _ + ], SingleValue]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + contra: Id2 => Id + ) extends CosmosContainer[F, Key, Id2, StreamResult, QueryValue, SingleResult, SingleValue] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions + ): Stream[F, StreamResult[QueryValue]] = + base.query(query, parameters, options) + def lookup( + partitionKey: Key, + id: Id2, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.lookup(partitionKey, contra(id), options) + def insert( + partitionKey: Key, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.insert(partitionKey, value, options) + def replace( + partitionKey: Key, + id: Id2, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.replace(partitionKey, contra(id), value, options) + def upsert( + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.upsert(value, options) + def delete( + partitionKey: Key, + id: Id2, + options: CosmosItemRequestOptions + ): F[SingleResult[Unit]] = + base.delete(partitionKey, contra(id), options) + } + + private class SemiInvariantFlatMapSingleResult[F[_]: Monad, Key, Id, StreamResult[ + _ + ], QueryValue, SingleResult[ + _ + ]: Traverse, SingleValue, A]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + f: SingleValue => F[A], + g: A => SingleValue + ) extends CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, A] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions + ): Stream[F, StreamResult[QueryValue]] = + base.query(query, parameters, options) + def lookup( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[A]]] = + base.lookup(partitionKey, id, options).flatMap(_.traverse(_.traverse(f))) + def insert( + partitionKey: Key, + value: A, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[A]]] = + base.insert(partitionKey, g(value), options).flatMap(_.traverse(_.traverse(f))) + def replace( + partitionKey: Key, + id: Id, + value: A, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[A]]] = + base.replace(partitionKey, id, g(value), options).flatMap(_.traverse(_.traverse(f))) + + def upsert( + value: A, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[A]]] = + base.upsert(g(value), options).flatMap(_.traverse(_.traverse(f))) + def delete( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[SingleResult[Unit]] = + base.delete(partitionKey, id, options) + } + + private class SemiInvariantFlatMapStreamResult[F[_]: Applicative, Key, Id, StreamResult[ + _ + ]: Traverse, QueryValue, SingleResult[ + _ + ], SingleValue, A]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + f: QueryValue => F[A] + ) extends CosmosContainer[F, Key, Id, StreamResult, A, SingleResult, SingleValue] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions + ): Stream[F, StreamResult[A]] = + base.query(query, parameters, options).evalMap(_.traverse(f)) + def lookup( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.lookup(partitionKey, id, options) + def insert( + partitionKey: Key, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.insert(partitionKey, value, options) + def replace( + partitionKey: Key, + id: Id, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.replace(partitionKey, id, value, options) + + def upsert( + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.upsert(value, options) + def delete( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[SingleResult[Unit]] = + base.delete(partitionKey, id, options) + } + + private class SingleResultMap[F[_]: Functor, Key, Id, StreamResult[_], QueryValue, SingleResult[ + _ + ], SingleValue, A]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + f: SingleResult[SingleValue] => A, + g: A => SingleValue + ) extends CosmosContainer[F, Key, Id, StreamResult, QueryValue, cats.Id, A] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions + ): Stream[F, StreamResult[QueryValue]] = + base.query(query, parameters, options) + def lookup( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[Option[A]] = + base.lookup(partitionKey, id, options).map(_.map(f)) + def insert( + partitionKey: Key, + value: A, + options: CosmosItemRequestOptions + ): F[Option[A]] = + base.insert(partitionKey, g(value), options).map(_.map(f)) + def replace( + partitionKey: Key, + id: Id, + value: A, + options: CosmosItemRequestOptions + ): F[Option[A]] = + base.replace(partitionKey, id, g(value), options).map(_.map(f)) + + def upsert( + value: A, + options: CosmosItemRequestOptions + ): F[Option[A]] = + base.upsert(g(value), options).map(_.map(f)) + def delete( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[Unit] = + base.delete(partitionKey, id, options).as(()) + } + + private class StreamResultMap[F[_], Key, Id, StreamResult[_], QueryValue, SingleResult[ + _ + ], SingleValue, A]( + base: CosmosContainer[F, Key, Id, StreamResult, QueryValue, SingleResult, SingleValue], + f: StreamResult[QueryValue] => A + ) extends CosmosContainer[F, Key, Id, cats.Id, A, SingleResult, SingleValue] { + def query( + query: String, + parameters: Map[String, Any], + options: QueryOptions + ): Stream[F, A] = + base.query(query, parameters, options).map(f) + def lookup( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.lookup(partitionKey, id, options) + def insert( + partitionKey: Key, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.insert(partitionKey, value, options) + def replace( + partitionKey: Key, + id: Id, + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.replace(partitionKey, id, value, options) + + def upsert( + value: SingleValue, + options: CosmosItemRequestOptions + ): F[Option[SingleResult[SingleValue]]] = + base.upsert(value, options) + def delete( + partitionKey: Key, + id: Id, + options: CosmosItemRequestOptions + ): F[SingleResult[Unit]] = + base.delete(partitionKey, id, options) + } +} diff --git a/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala b/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala index c27e1f6..a803730 100644 --- a/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala +++ b/core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala @@ -19,13 +19,10 @@ package com.banno.cosmos4s import cats._ import cats.effect._ import cats.syntax.all._ -import com.azure.cosmos._ -import com.azure.cosmos.implementation.NotFoundException +import com.azure.cosmos.CosmosDiagnostics import com.azure.cosmos.models._ import com.banno.cosmos4s.types._ -import com.fasterxml.jackson.databind.JsonNode -import fs2.{Chunk, Stream} -import io.circe.jackson._ +import fs2.Stream import io.circe._ trait IndexedCosmosContainer[F[_], K, I, V] { @@ -59,7 +56,7 @@ trait IndexedCosmosContainer[F[_], K, I, V] { def lookup(partitionKey: K, id: I): F[Option[V]] def insert(partitionKey: K, value: V): F[Option[V]] def replace(partitionKey: K, id: I, value: V): F[Option[V]] - def upsert(partitionKey: K, value: V): F[Option[V]] + def upsert(value: V): F[Option[V]] def delete(partitionKey: K, id: I): F[Unit] def imapK[G[_]](fk: F ~> G, gk: G ~> F): IndexedCosmosContainer[G, K, I, V] = @@ -81,13 +78,13 @@ trait IndexedCosmosContainer[F[_], K, I, V] { object IndexedCosmosContainer { def impl[F[_]: Async]( - container: CosmosAsyncContainer, + container: BaseCosmosContainer[F], createFeedOptions: Option[F[QueryOptions]] = None ): IndexedCosmosContainer[F, String, String, Json] = new BaseImpl[F](container, createFeedOptions) private class BaseImpl[F[_]: Async]( - container: CosmosAsyncContainer, + container: BaseCosmosContainer[F], createFeedOptions: Option[F[QueryOptions]] = None ) extends IndexedCosmosContainer[F, String, String, Json] { @@ -126,8 +123,6 @@ object IndexedCosmosContainer { handleDiagnostics ) - import collection.JavaConverters._ - def queryCustom[A: Decoder]( partitionKey: String, query: String, @@ -153,108 +148,43 @@ object IndexedCosmosContainer { .eval(createFeedOptionsAlways) .map(overrides) .flatMap { options => - val sqlParams = parameters - .map { - case (key, value) => - new SqlParameter(key, value) - } - .toList - .asJava - val querySpec = new SqlQuerySpec(query, sqlParams) - ReactorCore.fluxToStream( - Sync[F].delay( - container - .queryItems( - querySpec, - options.build().setPartitionKey(new PartitionKey(partitionKey)), - classOf[JsonNode] - ) - .byPage() - ) + container.query( + query, + parameters, + options.withPartitionKey(new PartitionKey(partitionKey).some) ) } - .flatMap { page => - val elements = page.getElements() - val stream = - if (elements == null) Stream.empty - else - Chunk - .iterable(elements.asScala) - .traverse(jacksonToCirce(_).as[A]) - .fold(Stream.raiseError[F], Stream.chunk) - Stream.exec(handleDiagnostics(page.getCosmosDiagnostics())) ++ stream + .handleDiagnostics(handleDiagnostics) + .evalMapChunk { json => + MonadThrow[F].fromEither(json.as[A]) } def lookup(partitionKey: String, id: String): F[Option[Json]] = - cats.data - .OptionT( - ReactorCore - .monoToEffectOpt( - Sync[F].delay( - container.readItem( - id, - new PartitionKey(partitionKey), - new CosmosItemRequestOptions(), - classOf[JsonNode] - ) - ) - ) - .recoverWith { case _: NotFoundException => Sync[F].pure(None) } + container + .lookup( + partitionKey, + id, + new CosmosItemRequestOptions() ) - .subflatMap(response => Option(response.getItem())) - .map(jacksonToCirce(_)) - .value + .map(_.map(_.item)) def insert(partitionKey: String, value: Json): F[Option[Json]] = - cats.data.OptionT - .liftF(Sync[F].delay(new CosmosItemRequestOptions())) - .flatMap(options => - cats.data.OptionT( - ReactorCore.monoToEffectOpt( - Sync[F].delay( - container.createItem(circeToJackson(value), new PartitionKey(partitionKey), options) - ) - ) - ) - ) - .subflatMap(response => Option(response.getItem())) - .map(jacksonToCirce) - .value + container + .insert(partitionKey, value, new CosmosItemRequestOptions()) + .map(_.map(_.item)) def replace(partitionKey: String, id: String, value: Json): F[Option[Json]] = - cats.data - .OptionT( - ReactorCore.monoToEffectOpt( - Sync[F].delay( - container.replaceItem(circeToJackson(value), id, new PartitionKey(partitionKey)) - ) - ) - ) - .subflatMap(response => Option(response.getItem())) - .map(jacksonToCirce) - .value + container + .replace(partitionKey, id, value, new CosmosItemRequestOptions()) + .map(_.map(_.item)) - def upsert(partitionKey: String, value: Json): F[Option[Json]] = - cats.data - .OptionT( - ReactorCore.monoToEffectOpt( - Sync[F].delay( - container.upsertItem(circeToJackson(value)) - ) - ) - ) - .subflatMap(response => Option(response.getItem())) - .map(jacksonToCirce) - .value + def upsert(value: Json): F[Option[Json]] = + container + .upsert(value, new CosmosItemRequestOptions()) + .map(_.map(_.item)) def delete(partitionKey: String, id: String): F[Unit] = - ReactorCore - .monoToEffect( - Sync[F].delay( - container.deleteItem(id, new PartitionKey(partitionKey)) - ) - ) - .void + container.delete(partitionKey, id, new CosmosItemRequestOptions()).as(()) } private class IMapKIndexedCosmosContainer[F[_], G[_], K, I, V]( @@ -314,8 +244,8 @@ object IndexedCosmosContainer { fk(base.insert(partitionKey, value)) def replace(partitionKey: K, id: I, value: V): G[Option[V]] = fk(base.replace(partitionKey, id, value)) - def upsert(partitionKey: K, value: V): G[Option[V]] = - fk(base.upsert(partitionKey, value)) + def upsert(value: V): G[Option[V]] = + fk(base.upsert(value)) def delete(partitionKey: K, id: I): G[Unit] = fk(base.delete(partitionKey, id)) } @@ -372,8 +302,8 @@ object IndexedCosmosContainer { base.insert(contra(partitionKey), value) def replace(partitionKey: K2, id: I, value: V): F[Option[V]] = base.replace(contra(partitionKey), id, value) - def upsert(partitionKey: K2, value: V): F[Option[V]] = - base.upsert(contra(partitionKey), value) + def upsert(value: V): F[Option[V]] = + base.upsert(value) def delete(partitionKey: K2, id: I): F[Unit] = base.delete(contra(partitionKey), id) } @@ -424,8 +354,8 @@ object IndexedCosmosContainer { base.insert(partitionKey, value) def replace(partitionKey: K, id: I2, value: V): F[Option[V]] = base.replace(partitionKey, contra(id), value) - def upsert(partitionKey: K, value: V): F[Option[V]] = - base.upsert(partitionKey, value) + def upsert(value: V): F[Option[V]] = + base.upsert(value) def delete(partitionKey: K, id: I2): F[Unit] = base.delete(partitionKey, contra(id)) } @@ -481,8 +411,8 @@ object IndexedCosmosContainer { base.insert(partitionKey, g(value)).flatMap(_.traverse(f)) def replace(partitionKey: K, id: I, value: V2): F[Option[V2]] = base.replace(partitionKey, id, g(value)).flatMap(_.traverse(f)) - def upsert(partitionKey: K, value: V2): F[Option[V2]] = - base.upsert(partitionKey, g(value)).flatMap(_.traverse(f)) + def upsert(value: V2): F[Option[V2]] = + base.upsert(g(value)).flatMap(_.traverse(f)) def delete(partitionKey: K, id: I): F[Unit] = base.delete(partitionKey, id) } @@ -532,8 +462,8 @@ object IndexedCosmosContainer { base.insert(partitionKey, g(value)).map(_.map(f)) def replace(partitionKey: K, id: I, value: V2): F[Option[V2]] = base.replace(partitionKey, id, g(value)).map(_.map(f)) - def upsert(partitionKey: K, value: V2): F[Option[V2]] = - base.upsert(partitionKey, g(value)).map(_.map(f)) + def upsert(value: V2): F[Option[V2]] = + base.upsert(g(value)).map(_.map(f)) def delete(partitionKey: K, id: I): F[Unit] = base.delete(partitionKey, id) } diff --git a/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala b/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala index efcb600..32de9d8 100644 --- a/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala +++ b/core/src/main/scala/com/banno/cosmos4s/RawCosmosContainer.scala @@ -18,12 +18,8 @@ package com.banno.cosmos4s import cats._ import cats.effect._ -import com.azure.cosmos._ -import com.azure.cosmos.models._ import com.banno.cosmos4s.types._ -import com.fasterxml.jackson.databind.JsonNode -import fs2.{Chunk, Stream} -import io.circe.jackson._ +import fs2.Stream import io.circe._ trait RawCosmosContainer[F[_], V] { @@ -48,13 +44,13 @@ trait RawCosmosContainer[F[_], V] { object RawCosmosContainer { def impl[F[_]: Async]( - container: CosmosAsyncContainer, + container: BaseCosmosContainer[F], createQueryOptions: Option[F[QueryOptions]] = None ): RawCosmosContainer[F, Json] = new BaseImpl[F](container, createQueryOptions) private class BaseImpl[F[_]: Async]( - container: CosmosAsyncContainer, + container: BaseCosmosContainer[F], createQueryOptions: Option[F[QueryOptions]] = None ) extends RawCosmosContainer[F, Json] { @@ -68,8 +64,6 @@ object RawCosmosContainer { ): Stream[F, Json] = queryCustomRaw[Json](query, parameters, overrides) - import collection.JavaConverters._ - def queryCustomRaw[A: Decoder]( query: String, parameters: Map[String, Any], @@ -79,31 +73,10 @@ object RawCosmosContainer { .eval(createQueryOptionsAlways) .map(overrides) .flatMap { options => - val sqlParams = parameters - .map { - case (key, value) => - new SqlParameter(key, value) - } - .toList - .asJava - val querySpec = new SqlQuerySpec(query, sqlParams) - ReactorCore.fluxToStream( - Sync[F].delay( - container - .queryItems(querySpec, options.build(), classOf[JsonNode]) - .byPage() - ) - ) - } - .flatMap { page => - val elements = page.getElements() - if (elements == null) Stream.empty - else - Chunk - .iterable(elements.asScala) - .traverse(jacksonToCirce(_).as[A]) - .fold(Stream.raiseError[F] _, Stream.chunk(_)) + container.query(query, parameters, options) } + .collect { case Right(json) => json } + .evalMapChunk(json => MonadThrow[F].fromEither(json.as[A])) } private class MapKRawCosmosContainer[F[_], G[_], V]( diff --git a/core/src/main/scala/com/banno/cosmos4s/package.scala b/core/src/main/scala/com/banno/cosmos4s/package.scala new file mode 100644 index 0000000..17a27c8 --- /dev/null +++ b/core/src/main/scala/com/banno/cosmos4s/package.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno + +import cats.Applicative +import cats.effect.Async +import cats.syntax.all._ +import com.banno.cosmos4s.types._ +import com.azure.cosmos.CosmosDiagnostics +import io.circe._ +import fs2._ + +package object cosmos4s { + type BaseCosmosContainer[F[_]] = + CosmosContainer[F, String, String, Either[FeedResponse, *], Json, ItemResponse[*], Json] + + implicit class BaseContainerOps[F[_]](private val base: BaseCosmosContainer[F]) extends AnyVal { + def toIndexedContainer(createFeedOptions: Option[F[QueryOptions]] = None)(implicit + ev: Async[F] + ): IndexedCosmosContainer[F, String, String, Json] = + IndexedCosmosContainer.impl(base, createFeedOptions) + def toRawContainer(createFeedOptions: Option[F[QueryOptions]] = None)(implicit + ev: Async[F] + ): RawCosmosContainer[F, Json] = + RawCosmosContainer.impl(base, createFeedOptions) + } + + implicit class ResultStream[F[_], A](private val stream: Stream[F, Either[FeedResponse, A]]) + extends AnyVal { + def handleResultMeta(f: FeedResponse => F[Unit])(implicit ev: Applicative[F]): Stream[F, A] = + stream + .evalMapChunk { + case Left(response) => f(response).as[Option[A]](None) + case Right(a) => a.some.pure + } + .collect { case Some(a) => a } + + def handleDiagnostics(f: CosmosDiagnostics => F[Unit])(implicit + ev: Applicative[F] + ): Stream[F, A] = + stream.handleResultMeta(response => f(response.cosmosDiagnostics)) + } +} diff --git a/core/src/main/scala/com/banno/cosmos4s/types/FeedResponse.scala b/core/src/main/scala/com/banno/cosmos4s/types/FeedResponse.scala new file mode 100644 index 0000000..063b9df --- /dev/null +++ b/core/src/main/scala/com/banno/cosmos4s/types/FeedResponse.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2020 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno.cosmos4s.types + +import collection.JavaConverters._ +import com.azure.cosmos.CosmosDiagnostics + +object FeedResponse { + + def fromCosmosResponse(response: com.azure.cosmos.models.FeedResponse[_]): FeedResponse = + new FeedResponse( + activityId = response.getActivityId(), + collectionQuota = response.getCollectionQuota(), + collectionSizeQuota = response.getCollectionSizeQuota(), + collectionSizeUsage = response.getCollectionSizeUsage(), + collectionUsage = response.getCollectionUsage(), + continuationToken = response.getContinuationToken(), + cosmosDiagnostics = response.getCosmosDiagnostics(), + currentResourceQuotaUsage = response.getCurrentResourceQuotaUsage(), + databaseQuota = response.getDatabaseQuota(), + databaseUsage = response.getDatabaseUsage(), + maxResourceQuota = response.getMaxResourceQuota(), + permissionQuota = response.getPermissionQuota(), + permissionUsage = response.getPermissionUsage(), + requestCharge = response.getRequestCharge(), + responseHeaders = response.getResponseHeaders().asScala.toMap, + sessionToken = response.getSessionToken(), + storedProceduresQuota = response.getStoredProceduresQuota(), + storedProceduresUsage = response.getStoredProceduresUsage(), + triggersQuota = response.getTriggersQuota(), + triggersUsage = response.getTriggersUsage(), + userDefinedFunctionsQuota = response.getUserDefinedFunctionsQuota(), + userDefinedFunctionsUsage = response.getUserDefinedFunctionsUsage(), + userQuota = response.getUserQuota(), + userUsage = response.getUserUsage() + ) +} + +case class FeedResponse( + activityId: String, + collectionQuota: Long, + collectionSizeQuota: Long, + collectionSizeUsage: Long, + collectionUsage: Long, + continuationToken: String, + cosmosDiagnostics: CosmosDiagnostics, + currentResourceQuotaUsage: String, + databaseQuota: Long, + databaseUsage: Long, + maxResourceQuota: String, + permissionQuota: Long, + permissionUsage: Long, + requestCharge: Double, + responseHeaders: Map[String, String], + sessionToken: String, + storedProceduresQuota: Long, + storedProceduresUsage: Long, + triggersQuota: Long, + triggersUsage: Long, + userDefinedFunctionsQuota: Long, + userDefinedFunctionsUsage: Long, + userQuota: Long, + userUsage: Long +) diff --git a/core/src/main/scala/com/banno/cosmos4s/types/ItemResponse.scala b/core/src/main/scala/com/banno/cosmos4s/types/ItemResponse.scala new file mode 100644 index 0000000..56faf2f --- /dev/null +++ b/core/src/main/scala/com/banno/cosmos4s/types/ItemResponse.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2020 Jack Henry & Associates, Inc.® + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.banno.cosmos4s.types + +import cats.Traverse +import cats.syntax.all._ +import com.azure.cosmos.CosmosDiagnostics +import com.azure.cosmos.models.CosmosItemResponse +import collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration +import java.util.concurrent.TimeUnit +import cats.Applicative +import cats.Eval + +object ItemResponse { + + implicit val traverse: Traverse[ItemResponse] = new Traverse[ItemResponse] { + def traverse[G[_]: Applicative, A, B](fa: ItemResponse[A])(f: A => G[B]): G[ItemResponse[B]] = + f(fa.item).map(b => fa.copy(item = b)) + def foldLeft[A, B](fa: ItemResponse[A], b: B)(f: (B, A) => B): B = f(b, fa.item) + def foldRight[A, B](fa: ItemResponse[A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = + f(fa.item, lb) + } + + def fromCosmosResponse[A, B](transform: Option[A] => B)( + response: CosmosItemResponse[A] + ): ItemResponse[B] = + ItemResponse( + activityId = response.getActivityId(), + currentResourceQuotaUsage = response.getCurrentResourceQuotaUsage(), + diagnostics = response.getDiagnostics(), + duration = FiniteDuration(response.getDuration().toNanos, TimeUnit.NANOSECONDS), + eTag = response.getETag(), + item = transform(Option(response.getItem)), + maxResourceQuota = response.getMaxResourceQuota(), + requestCharge = response.getRequestCharge(), + responseHeaders = response.getResponseHeaders().asScala.toMap, + sessionToken = response.getSessionToken(), + statusCode = response.getStatusCode() + ) + +} + +case class ItemResponse[A]( + activityId: String, + currentResourceQuotaUsage: String, + diagnostics: CosmosDiagnostics, + duration: FiniteDuration, + eTag: String, + item: A, + maxResourceQuota: String, + requestCharge: Double, + responseHeaders: Map[String, String], + sessionToken: String, + statusCode: Int +) diff --git a/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala b/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala index c49bde8..0d2cf75 100644 --- a/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala +++ b/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala @@ -16,18 +16,20 @@ package com.banno.cosmos4s.types -import com.azure.cosmos.models.CosmosQueryRequestOptions +import com.azure.cosmos.models.{CosmosQueryRequestOptions, PartitionKey} final class QueryOptions private ( maxDegreeOfParallelism: Option[Int], - maxBufferedItemCount: Option[Int] + maxBufferedItemCount: Option[Int], + partitionKey: Option[PartitionKey] ) extends Serializable { private[this] def copy( maxDegreeOfParallelism: Option[Int] = maxDegreeOfParallelism, - maxBufferedItemCount: Option[Int] = maxBufferedItemCount + maxBufferedItemCount: Option[Int] = maxBufferedItemCount, + partitionKey: Option[PartitionKey] = partitionKey ): QueryOptions = - new QueryOptions(maxDegreeOfParallelism, maxBufferedItemCount) + new QueryOptions(maxDegreeOfParallelism, maxBufferedItemCount, partitionKey) def withMaxDegreeOfParallelism(value: Option[Int]): QueryOptions = this.copy(maxDegreeOfParallelism = value) @@ -35,6 +37,9 @@ final class QueryOptions private ( def withMaxBufferedItemCount(value: Option[Int]): QueryOptions = this.copy(maxBufferedItemCount = value) + def withPartitionKey(value: Option[PartitionKey]): QueryOptions = + this.copy(partitionKey = value) + private val getMaxDegreeOfParallelism: Option[Int] = maxDegreeOfParallelism private val getMaxBufferedItemCount: Option[Int] = maxBufferedItemCount @@ -54,10 +59,11 @@ final class QueryOptions private ( val cosmosQueryOptions = new CosmosQueryRequestOptions() maxDegreeOfParallelism.foreach(cosmosQueryOptions.setMaxDegreeOfParallelism) maxBufferedItemCount.foreach(cosmosQueryOptions.setMaxBufferedItemCount) + partitionKey.foreach(cosmosQueryOptions.setPartitionKey) cosmosQueryOptions } } object QueryOptions { - val default: QueryOptions = new QueryOptions(None, None) + val default: QueryOptions = new QueryOptions(None, None, None) }