diff --git a/core/src/main/scala/com/banno/cosmos4s/package.scala b/core/src/main/scala/com/banno/cosmos4s/package.scala index 17a27c8..c6cc5d7 100644 --- a/core/src/main/scala/com/banno/cosmos4s/package.scala +++ b/core/src/main/scala/com/banno/cosmos4s/package.scala @@ -42,12 +42,15 @@ package object cosmos4s { implicit class ResultStream[F[_], A](private val stream: Stream[F, Either[FeedResponse, A]]) extends AnyVal { def handleResultMeta(f: FeedResponse => F[Unit])(implicit ev: Applicative[F]): Stream[F, A] = - stream - .evalMapChunk { - case Left(response) => f(response).as[Option[A]](None) - case Right(a) => a.some.pure + stream.chunks + .flatMap { c => + Stream.evalUnChunk( + c.traverseFilter { + case Left(response) => f(response).as[Option[A]](None) + case Right(a) => a.some.pure + } + ) } - .collect { case Some(a) => a } def handleDiagnostics(f: CosmosDiagnostics => F[Unit])(implicit ev: Applicative[F] diff --git a/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala b/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala index fd81db4..1a745b4 100644 --- a/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala +++ b/core/src/main/scala/com/banno/cosmos4s/types/QueryOptions.scala @@ -21,15 +21,22 @@ import com.azure.cosmos.models.{CosmosQueryRequestOptions, PartitionKey} final class QueryOptions private ( maxDegreeOfParallelism: Option[Int], maxBufferedItemCount: Option[Int], - partitionKey: Option[PartitionKey] + partitionKey: Option[PartitionKey], + private val indexMetricsEnabled: Option[Boolean] ) extends Serializable { private[this] def copy( maxDegreeOfParallelism: Option[Int] = maxDegreeOfParallelism, maxBufferedItemCount: Option[Int] = maxBufferedItemCount, - partitionKey: Option[PartitionKey] = partitionKey + partitionKey: Option[PartitionKey] = partitionKey, + indexMetricsEnabled: Option[Boolean] = indexMetricsEnabled ): QueryOptions = - new QueryOptions(maxDegreeOfParallelism, maxBufferedItemCount, partitionKey) + new QueryOptions( + maxDegreeOfParallelism, + maxBufferedItemCount, + partitionKey, + indexMetricsEnabled + ) def withMaxDegreeOfParallelism(value: Option[Int]): QueryOptions = this.copy(maxDegreeOfParallelism = value) @@ -40,30 +47,37 @@ final class QueryOptions private ( def withPartitionKey(value: Option[PartitionKey]): QueryOptions = this.copy(partitionKey = value) + def enableIndexMetrics: QueryOptions = copy(indexMetricsEnabled = Some(true)) + private val getMaxDegreeOfParallelism: Option[Int] = maxDegreeOfParallelism private val getMaxBufferedItemCount: Option[Int] = maxBufferedItemCount + private def getPartitionKey: Option[PartitionKey] = partitionKey override def toString: String = s"QueryOptions($maxDegreeOfParallelism, $maxBufferedItemCount)" override def equals(o: Any): Boolean = o match { case x: QueryOptions => - (this.maxDegreeOfParallelism == x.getMaxDegreeOfParallelism) && (this.maxBufferedItemCount == x.getMaxBufferedItemCount) + (this.maxDegreeOfParallelism == x.getMaxDegreeOfParallelism) && + (this.maxBufferedItemCount == x.getMaxBufferedItemCount) + (this.partitionKey == x.getPartitionKey) && + (this.indexMetricsEnabled == x.indexMetricsEnabled) case _ => false } override def hashCode: Int = - 37 * (37 * (17 + partitionKey.## + (37 * (17 + maxDegreeOfParallelism.##) + maxBufferedItemCount.##))) + 37 * (37 * (17 + indexMetricsEnabled.## + (37 * (17 + partitionKey.## + (37 * (17 + maxDegreeOfParallelism.##) + maxBufferedItemCount.##))))) private[cosmos4s] def build(): CosmosQueryRequestOptions = { val cosmosQueryOptions = new CosmosQueryRequestOptions() maxDegreeOfParallelism.foreach(cosmosQueryOptions.setMaxDegreeOfParallelism) maxBufferedItemCount.foreach(cosmosQueryOptions.setMaxBufferedItemCount) partitionKey.foreach(cosmosQueryOptions.setPartitionKey) + indexMetricsEnabled.foreach(cosmosQueryOptions.setIndexMetricsEnabled) cosmosQueryOptions } } object QueryOptions { - val default: QueryOptions = new QueryOptions(None, None, None) + val default: QueryOptions = new QueryOptions(None, None, None, None) }