Skip to content

Commit

Permalink
Add API for filtered positional operator (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
SakulK authored Oct 9, 2024
1 parent 8378608 commit 612173d
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 5 deletions.
9 changes: 9 additions & 0 deletions cc/src/main/scala/me/sgrouples/rogue/SelectFields.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,15 @@ class CClassSeqModifyField[C, M <: CcMeta[C], O, CC[_] <: Seq[_]](
fld.childMeta,
fld.owner
)

/** @see
* https://www.mongodb.com/docs/manual/reference/operator/update/positional-filtered
*/
def $(identifier: String) = new SelectableDummyCCField[C, M, O](
fld.name + s".$$[$identifier]",
fld.childMeta,
fld.owner
)
}

class CClassArrayModifyField[C, M <: CcMeta[C], O](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.bson.codecs.{
}
import org.bson.codecs.configuration.CodecRegistries
import org.reactivestreams.Publisher
import scala.jdk.CollectionConverters.*

import scala.reflect._
//rename to reactive
Expand Down Expand Up @@ -292,14 +293,17 @@ class MongoAsyncBsonJavaDriverAdapter[MB](
writeConcern: WriteConcern
)(implicit dba: MongoDatabase): Future[UpdateResult] = {
val modClause = mod
if (!modClause.mod.clauses.isEmpty) {
if (modClause.mod.clauses.nonEmpty) {
val q: Bson = buildCondition(modClause.query.condition)
val m: Bson = buildModify(modClause.mod)
val coll = dbCollectionFactory
.getPrimaryDBCollection(modClause.query)
.withWriteConcern(writeConcern)
val updateOptions = new UpdateOptions().upsert(upsert)

if mod.arrayFilters.nonEmpty then
updateOptions.arrayFilters(mod.arrayFilters.asJava)

val updater = if (multi) {
coll.updateMany(q, m, updateOptions)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import me.sgrouples.rogue.{ListField, VectorField}
import io.fsq.field.Field
import munit.FunSuite
import org.bson.types.ObjectId
import org.mongodb.scala._
import org.mongodb.scala.*
import org.mongodb.scala.model.Filters

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.duration.*
import com.softwaremill.tagging.*

import java.time.temporal.ChronoUnit
import scala.concurrent.ExecutionContext.Implicits.global

class MacroEndToEndSpec extends FunSuite {
Expand Down Expand Up @@ -876,4 +878,83 @@ class MacroEndToEndSpec extends FunSuite {
assert(aggregatedFold.keySet == Set(0L, 1L, 2L, 3L, 4L))
}
}

test("filtered positional operator updates to subarray documents") {
val v = baseTestVenue()
val date1: LocalDateTime = LocalDateTime.now().minusDays(10)
val date2: LocalDateTime = LocalDateTime.now().minusDays(11)
for
_ <- VenueR.insertOneAsync(v)
_ <- VenueR
.where(_.id eqs v._id)
.modify(
_.claims.$("test1").subfield(_.date).setTo(date1)
)
.and(
_.claims.$("test2").subfield(_.date).setTo(date2)
)
.and(
_.claims.$("test2").subfield(_.status).setTo(ClaimStatus.pending)
)
.withArrayFilter("test1", _.claims.childMeta)(_.uid eqs 1234L)
.withArrayFilter("test2", _.claims.childMeta)(_.uid eqs 5678L)
.updateOneAsync()
claimsAfterUpdate <- VenueR
.where(_.id eqs v._id)
.select(_.claims)
.fetchAsync()
.map(_.flatten)
yield assertEquals(
claimsAfterUpdate,
List(
VenueClaimBson(
uid = 1234L,
status = ClaimStatus.pending,
date = date1.truncatedTo(ChronoUnit.MILLIS)
),
VenueClaimBson(
uid = 5678L,
status = ClaimStatus.pending,
date = date2.truncatedTo(ChronoUnit.MILLIS)
)
)
)
}

test("filtered positional operator fail when array filter is missing") {
VenueR
.modify(
_.claims.$("test1").subfield(_.status).setTo(ClaimStatus.approved)
)
.updateOneAsync()
.failed
.map(exception => assert(exception.isInstanceOf[MongoWriteException]))
}

test(
"filtered positional operator fail when array filter has different identifier"
) {
VenueR
.modify(
_.claims.$("test1").subfield(_.status).setTo(ClaimStatus.approved)
)
.withArrayFilter("test2", _.claims.childMeta)(_.uid eqs 1L)
.updateOneAsync()
.failed
.map(exception => assert(exception.isInstanceOf[MongoWriteException]))
}

test(
"filtered positional operator fail when an unused array filter exists"
) {
VenueR
.modify(
_.claims.$("test1").subfield(_.status).setTo(ClaimStatus.approved)
)
.withArrayFilter("test1", _.claims.childMeta)(_.uid eqs 1L)
.withArrayFilter("test2", _.claims.childMeta)(_.uid eqs 2L)
.updateOneAsync()
.failed
.map(exception => assert(exception.isInstanceOf[MongoWriteException]))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,19 @@ assertEquals( Comment.where(_.comments.unsafeField[String]("comment") conta
)
)

assertEquals(
VenueR
.where(_.legacyid eqs 1)
.and(_.claims.subfield(_.uid) contains 2)
.modify(
_.claims.$("test").subfield(_.status) setTo ClaimStatus.approved
)
.q,
pq(
"""db.venues.update({"legId": {"$numberLong": "1"}, "claims.uid": {"$numberLong": "2"}}, {"$set": {"claims.$[test].status": "Approved"}}, false, false)"""
)
)

assertEquals(
VenueR
.where(_.legacyid eqs 1)
Expand Down
32 changes: 30 additions & 2 deletions core/src/main/scala/io/fsq/rogue/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

package io.fsq.rogue

import com.mongodb.{BasicDBObjectBuilder, DBObject, ReadPreference}
import com.mongodb.{
BasicDBObject,
BasicDBObjectBuilder,
DBObject,
ReadPreference
}
import io.fsq.rogue.MongoHelpers.{
AndCondition,
FieldOrderTerm,
Expand All @@ -15,6 +20,7 @@ import io.fsq.rogue.MongoHelpers.{
SearchCondition
}
import io.fsq.rogue.index.MongoIndex

import scala.concurrent.duration.FiniteDuration

// ***************************************************************************
Expand Down Expand Up @@ -3397,7 +3403,11 @@ case class Query[M, R, +State](
// *** Modify Queries
// *******************************************************

case class ModifyQuery[M, +State](query: Query[M, _, State], mod: MongoModify) {
case class ModifyQuery[M, +State](
query: Query[M, _, State],
mod: MongoModify,
arrayFilters: List[BasicDBObject] = List.empty
) {

private def addClause(clause: M => ModifyClause) = {
this.copy(mod = MongoModify(clause(query.meta) :: mod.clauses))
Expand Down Expand Up @@ -3425,6 +3435,24 @@ case class ModifyQuery[M, +State](query: Query[M, _, State], mod: MongoModify) {
MongoBuilder.buildModifyString(query.collectionName, this)

def asDBObject = (this.query.asDBObject, MongoBuilder.buildModify(this.mod))

/** Use to add array filter referenced by identifier in
* `me.sgrouples.rogue.CClassSeqModifyField.$(identifier)`
* @see
* https://www.mongodb.com/docs/manual/reference/operator/update/positional-filtered
*/
def withArrayFilter[CM](identifier: String, childMeta: M => CM)(
filter: CM => QueryClause[?]
): ModifyQuery[M, State] =
val filterQuery = filter(childMeta(query.meta))
val builder = BasicDBObjectBuilder.start()
filterQuery.extend(builder, false)
val bsonFilter = builder.get().asInstanceOf[BasicDBObject]
bsonFilter.put(
s"$identifier.${filterQuery.fieldName}",
bsonFilter.removeField(filterQuery.fieldName)
)
this.copy(arrayFilters = bsonFilter :: arrayFilters)
}

// *******************************************************
Expand Down

0 comments on commit 612173d

Please sign in to comment.