Skip to content
This repository has been archived by the owner on Jul 29, 2024. It is now read-only.

Commit

Permalink
Adds query parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
vendamere committed Aug 23, 2022
1 parent f13dd71 commit bbc5ccf
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 40 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ libraryDependencies ++= Seq(

| cosmos4s | Scala 2.12 | Scala 2.13 | Scala 3.0 | Cats | FS2/CE |
| :------: | :--------: | :--------: | :-------: | :---: | :----: |
| `1.0.x` |||| `2.x` | `3.x` |
| `0.3.x` |||| `2.x` | `3.x` |
| `0.2.x` |||| `2.x` | `3.x` |
| `0.1.x` |||| `2.x` | `2.x` |
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val catsV = "2.7.0"
val catsEffectV = "3.3.11"
val fs2V = "3.2.7"
val circeV = "0.14.1"
val catsV = "2.8.0"
val catsEffectV = "3.3.14"
val fs2V = "3.2.12"
val circeV = "0.14.2"
val munitV = "0.7.29"
val munitCatsEffectV = "1.0.7"
val kindProjectorV = "0.13.2"
Expand Down Expand Up @@ -80,7 +80,7 @@ lazy val commonSettings = Seq(
headerLicense := Some(HeaderLicense.ALv2("2020", "Jack Henry & Associates, Inc.®")),
crossScalaVersions := Seq(scalaVersion.value, "2.13.8", "2.12.15"),
libraryDependencies ++= Seq(
"com.azure" % "azure-cosmos" % "4.28.1",
"com.azure" % "azure-cosmos" % "4.34.0",
"com.microsoft.azure" % "azure-documentdb" % "2.6.4",
"com.microsoft.azure" % "documentdb-bulkexecutor" % "2.12.5",
"org.typelevel" %% "cats-core" % catsV,
Expand Down
131 changes: 108 additions & 23 deletions core/src/main/scala/com/banno/cosmos4s/IndexedCosmosContainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,26 @@ trait IndexedCosmosContainer[F[_], K, I, V] {
def query(
partitionKey: K,
query: String,
parameters: Map[String, Any] = Map.empty,
overrides: QueryOptions => QueryOptions = identity
): Stream[F, V]
def queryWithDiagnostics(
partitionKey: K,
query: String,
parameters: Map[String, Any] = Map.empty,
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, V]
def queryCustom[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any] = Map.empty,
overrides: QueryOptions => QueryOptions = identity
): Stream[F, A]
def queryCustomWithDiagnostics[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any] = Map.empty,
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, A]
Expand Down Expand Up @@ -96,42 +100,72 @@ object IndexedCosmosContainer {
def query(
partitionKey: String,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions = identity
): Stream[F, Json] =
queryCustomWithDiagnostics[Json](partitionKey, query, overrides, defaultDiagnoticsHandler)
queryCustomWithDiagnostics[Json](
partitionKey,
query,
parameters,
overrides,
defaultDiagnoticsHandler
)

def queryWithDiagnostics(
partitionKey: String,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, Json] =
queryCustomWithDiagnostics[Json](partitionKey, query, overrides, handleDiagnostics)
queryCustomWithDiagnostics[Json](
partitionKey,
query,
parameters,
overrides,
handleDiagnostics
)

import collection.JavaConverters._

def queryCustom[A: Decoder](
partitionKey: String,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions = identity
): Stream[F, A] =
queryCustomWithDiagnostics[A](partitionKey, query, overrides, defaultDiagnoticsHandler)
queryCustomWithDiagnostics[A](
partitionKey,
query,
parameters,
overrides,
defaultDiagnoticsHandler
)

def queryCustomWithDiagnostics[A: Decoder](
partitionKey: String,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, A] =
Stream
.eval(createFeedOptionsAlways)
.map(overrides)
.flatMap { options =>
val sqlParams = parameters
.map {
case (key, value) =>
new SqlParameter(key, value)
}
.toList
.asJava
val querySpec = new SqlQuerySpec(query, sqlParams)
ReactorCore.fluxToStream(
Sync[F].delay(
container
.queryItems(
query,
querySpec,
options.build().setPartitionKey(new PartitionKey(partitionKey)),
classOf[JsonNode]
)
Expand Down Expand Up @@ -231,34 +265,45 @@ object IndexedCosmosContainer {
def query(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions = identity
): Stream[G, V] =
base.query(partitionKey, query, overrides).translate(fk)
base.query(partitionKey, query, parameters, overrides).translate(fk)
def queryWithDiagnostics(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => G[Unit]
): Stream[G, V] =
base
.queryWithDiagnostics(partitionKey, query, overrides, d => gk(handleDiagnostics(d)))
.queryWithDiagnostics(
partitionKey,
query,
parameters,
overrides,
d => gk(handleDiagnostics(d))
)
.translate(fk)
def queryCustom[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions = identity
): Stream[G, A] =
base.queryCustom[A](partitionKey, query, overrides).translate(fk)
base.queryCustom[A](partitionKey, query, parameters, overrides).translate(fk)
def queryCustomWithDiagnostics[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => G[Unit]
): Stream[G, A] =
base
.queryCustomWithDiagnostics[A](
partitionKey,
query,
parameters,
overrides,
d => gk(handleDiagnostics(d))
)
Expand All @@ -282,29 +327,45 @@ object IndexedCosmosContainer {
def query(
partitionKey: K2,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, V] =
base.query(contra(partitionKey), query, overrides)
base.query(contra(partitionKey), query, parameters, overrides)
def queryWithDiagnostics(
partitionKey: K2,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, V] =
base.queryWithDiagnostics(contra(partitionKey), query, overrides, handleDiagnostics)
base.queryWithDiagnostics(
contra(partitionKey),
query,
parameters,
overrides,
handleDiagnostics
)
def queryCustom[A: Decoder](
partitionKey: K2,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, A] =
base.queryCustom(contra(partitionKey), query, overrides)
base.queryCustom(contra(partitionKey), query, parameters, overrides)
def queryCustomWithDiagnostics[A: Decoder](
partitionKey: K2,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, A] =
base.queryCustomWithDiagnostics[A](contra(partitionKey), query, overrides, handleDiagnostics)
base.queryCustomWithDiagnostics[A](
contra(partitionKey),
query,
parameters,
overrides,
handleDiagnostics
)
def lookup(partitionKey: K2, id: I): F[Option[V]] =
base.lookup(contra(partitionKey), id)
def insert(partitionKey: K2, value: V): F[Option[V]] =
Expand All @@ -324,29 +385,39 @@ object IndexedCosmosContainer {
def query(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, V] =
base.query(partitionKey, query, overrides)
base.query(partitionKey, query, parameters, overrides)
def queryWithDiagnostics(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, V] =
base.queryWithDiagnostics(partitionKey, query, overrides, handleDiagnostics)
base.queryWithDiagnostics(partitionKey, query, parameters, overrides, handleDiagnostics)
def queryCustom[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, A] =
base.queryCustom(partitionKey, query, overrides)
base.queryCustom(partitionKey, query, parameters, overrides)
def queryCustomWithDiagnostics[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, A] =
base.queryCustomWithDiagnostics[A](partitionKey, query, overrides, handleDiagnostics)
base.queryCustomWithDiagnostics[A](
partitionKey,
query,
parameters,
overrides,
handleDiagnostics
)
def lookup(partitionKey: K, id: I2): F[Option[V]] =
base.lookup(partitionKey, contra(id))
def insert(partitionKey: K, value: V): F[Option[V]] =
Expand All @@ -367,33 +438,43 @@ object IndexedCosmosContainer {
def query(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, V2] =
base
.query(partitionKey, query, overrides)
.query(partitionKey, query, parameters, overrides)
.evalMapChunk(f)
def queryWithDiagnostics(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, V2] =
base
.queryWithDiagnostics(partitionKey, query, overrides, handleDiagnostics)
.queryWithDiagnostics(partitionKey, query, parameters, overrides, handleDiagnostics)
.evalMapChunk(f)
def queryCustom[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, A] =
base.queryCustom(partitionKey, query, overrides)
base.queryCustom(partitionKey, query, parameters, overrides)
def queryCustomWithDiagnostics[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, A] =
base.queryCustomWithDiagnostics[A](partitionKey, query, overrides, handleDiagnostics)
base.queryCustomWithDiagnostics[A](
partitionKey,
query,
parameters,
overrides,
handleDiagnostics
)
def lookup(partitionKey: K, id: I): F[Option[V2]] =
base.lookup(partitionKey, id).flatMap(_.traverse(f))
def insert(partitionKey: K, value: V2): F[Option[V2]] =
Expand All @@ -414,33 +495,37 @@ object IndexedCosmosContainer {
def query(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, V2] =
base
.query(partitionKey, query, overrides)
.query(partitionKey, query, parameters, overrides)
.map(f)
def queryWithDiagnostics(
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, V2] =
base
.queryWithDiagnostics(partitionKey, query, overrides, handleDiagnostics)
.queryWithDiagnostics(partitionKey, query, parameters, overrides, handleDiagnostics)
.map(f)
def queryCustom[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions
): Stream[F, A] =
base.queryCustom(partitionKey, query, overrides)
base.queryCustom(partitionKey, query, parameters, overrides)
def queryCustomWithDiagnostics[A: Decoder](
partitionKey: K,
query: String,
parameters: Map[String, Any],
overrides: QueryOptions => QueryOptions,
handleDiagnostics: CosmosDiagnostics => F[Unit]
): Stream[F, A] =
base.queryCustomWithDiagnostics(partitionKey, query, overrides, handleDiagnostics)
base.queryCustomWithDiagnostics(partitionKey, query, parameters, overrides, handleDiagnostics)
def lookup(partitionKey: K, id: I): F[Option[V2]] =
base.lookup(partitionKey, id).map(_.map(f))
def insert(partitionKey: K, value: V2): F[Option[V2]] =
Expand Down
Loading

0 comments on commit bbc5ccf

Please sign in to comment.