Skip to content
This repository has been archived by the owner on Jul 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #154 from Banno/metadata/provide-metadata
Browse files Browse the repository at this point in the history
Creates a base container providing request metadata
  • Loading branch information
vendamere authored Dec 9, 2022
2 parents 00b5b8d + a34641e commit 1d3c0bc
Show file tree
Hide file tree
Showing 9 changed files with 850 additions and 148 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ val azureCosmosV = "4.28.1"
val azureDocumentDBV = "2.6.4"
val catsV = "2.8.0"
val catsEffectV = "3.3.14"
val circeJackson210V = "0.14.2"
val circeJackson210V = "0.14.0"
val documentDBBulkExecV = "2.12.5"
val fs2V = "3.3.0"
val circeV = "0.14.3"
Expand Down
155 changes: 155 additions & 0 deletions core/src/main/scala/com/banno/cosmos4s/BaseCosmosContainer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2020 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.cosmos4s

import cats.data.OptionT
import cats.effect._
import cats.syntax.all._
import com.azure.cosmos.CosmosAsyncContainer
import com.azure.cosmos.models.{CosmosItemRequestOptions, PartitionKey, SqlParameter, SqlQuerySpec}
import com.banno.cosmos4s.types._
import com.fasterxml.jackson.databind.JsonNode
import fs2.Stream
import collection.JavaConverters._
import io.circe.jackson._
import io.circe._
import com.azure.cosmos.implementation.NotFoundException

object BaseCosmosContainer {

val convertJson: Option[JsonNode] => Json = _.fold(Json.Null)(jacksonToCirce)

def impl[F[_]: Async](
container: CosmosAsyncContainer
): BaseCosmosContainer[F] =
new CosmosContainer[F, String, String, Either[FeedResponse, *], Json, ItemResponse[*], Json] {

def query(
query: String,
parameters: Map[String, Any],
options: QueryOptions
): Stream[F, Either[FeedResponse, Json]] = {
val sqlParams = parameters
.map {
case (key, value) =>
new SqlParameter(key, value)
}
.toList
.asJava
val querySpec = new SqlQuerySpec(query, sqlParams)
ReactorCore
.fluxToStream(
Sync[F].delay(
container
.queryItems(querySpec, options.build(), classOf[JsonNode])
.byPage()
)
)
.flatMap { page =>
val elements = page.getElements()
val itemStream =
if (elements == null) Stream.empty
else
Stream
.iterable(elements.asScala)
.map(jacksonToCirce(_).asRight[FeedResponse])
Stream.emit(FeedResponse.fromCosmosResponse(page).asLeft[Json]) ++ itemStream
}
}

def delete(
partitionKey: String,
id: String,
options: CosmosItemRequestOptions
): F[ItemResponse[Unit]] =
ReactorCore
.monoToEffect(
Sync[F].delay(
container.deleteItem(id, new PartitionKey(partitionKey), options)
)
)
.map(ItemResponse.fromCosmosResponse(_ => ()))

def insert(
partitionKey: String,
value: Json,
options: CosmosItemRequestOptions
): F[Option[ItemResponse[Json]]] =
OptionT(
ReactorCore.monoToEffectOpt(
Sync[F].delay(
container.createItem(circeToJackson(value), new PartitionKey(partitionKey), options)
)
)
).map(ItemResponse.fromCosmosResponse(convertJson)).value

def lookup(
partitionKey: String,
id: String,
options: CosmosItemRequestOptions
): F[Option[ItemResponse[Json]]] =
OptionT(
ReactorCore
.monoToEffectOpt(
Sync[F].delay(
container.readItem(
id,
new PartitionKey(partitionKey),
options,
classOf[JsonNode]
)
)
)
.recoverWith { case _: NotFoundException => Sync[F].pure(None) }
)
.map(ItemResponse.fromCosmosResponse(convertJson))
.value

def upsert(value: Json, options: CosmosItemRequestOptions): F[Option[ItemResponse[Json]]] =
OptionT(
ReactorCore.monoToEffectOpt(
Sync[F].delay(
container.upsertItem(circeToJackson(value), options)
)
)
)
.map(ItemResponse.fromCosmosResponse(convertJson))
.value

def replace(
partitionKey: String,
id: String,
value: Json,
options: CosmosItemRequestOptions
): F[Option[ItemResponse[Json]]] =
OptionT(
ReactorCore.monoToEffectOpt(
Sync[F].delay(
container.replaceItem(
circeToJackson(value),
id,
new PartitionKey(partitionKey),
options
)
)
)
)
.map(ItemResponse.fromCosmosResponse(convertJson))
.value

}
}
Loading

0 comments on commit 1d3c0bc

Please sign in to comment.