Skip to content

Commit

Permalink
Merge pull request #47 from scalableminds/multi-delete-multi-put
Browse files Browse the repository at this point in the history
Add DeleteAllByPrefix and PutMultipleVersions API endpoints
  • Loading branch information
fm3 authored Dec 16, 2024
2 parents 67d3a92 + b1fe6b9 commit a423a48
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## Added
- New API endpoints `DeleteAllByPrefix` and `PutMultipleVersions`. [#47](https://github.com/scalableminds/fossildb/pull/47)

## Breaking Changes

- The `GetMultipleKeys` call now takes a `startAfterKey` instead of a `key` for pagination. The returned list will only start *after* this key. [#38](https://github.com/scalableminds/fossildb/pull/38)
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ version := getVersionFromGit
scalaVersion := "2.13.12"

libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.4.7",
"ch.qos.logback" % "logback-classic" % "1.5.6",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" % "scalatest_2.13" % "3.2.15" % "test",
"org.scalatest" % "scalatest_2.13" % "3.2.19" % "test",
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"io.grpc" % "grpc-services" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
Expand Down
26 changes: 25 additions & 1 deletion src/main/protobuf/fossildbapi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ message PutReply {
optional string errorMessage = 2;
}

message PutMultipleVersionsRequest {
required string collection = 1;
required string key = 2;
repeated uint64 versions = 3;
repeated bytes values = 4;
}

message PutMultipleVersionsReply {
required bool success = 1;
optional string errorMessage = 2;
}

message DeleteRequest {
required string collection = 1;
required string key = 2;
Expand All @@ -45,6 +57,16 @@ message DeleteReply {
optional string errorMessage = 2;
}

message DeleteAllByPrefixRequest {
required string collection = 1;
required string prefix = 2;
}

message DeleteAllByPrefixReply {
required bool success = 1;
optional string errorMessage = 2;
}

message GetMultipleVersionsRequest {
required string collection = 1;
required string key = 2;
Expand Down Expand Up @@ -154,12 +176,14 @@ service FossilDB {
rpc GetMultipleVersions (GetMultipleVersionsRequest) returns (GetMultipleVersionsReply) {}
rpc GetMultipleKeys (GetMultipleKeysRequest) returns (GetMultipleKeysReply) {}
rpc Put (PutRequest) returns (PutReply) {}
rpc PutMultipleVersions (PutMultipleVersionsRequest) returns (PutMultipleVersionsReply) {}
rpc Delete (DeleteRequest) returns (DeleteReply) {}
rpc DeleteMultipleVersions (DeleteMultipleVersionsRequest) returns (DeleteMultipleVersionsReply) {}
rpc DeleteAllByPrefix (DeleteAllByPrefixRequest) returns (DeleteAllByPrefixReply) {}
rpc ListKeys (ListKeysRequest) returns (ListKeysReply) {}
rpc ListVersions (ListVersionsRequest) returns (ListVersionsReply) {}
rpc Backup (BackupRequest) returns (BackupReply) {}
rpc RestoreFromBackup (RestoreFromBackupRequest) returns (RestoreFromBackupReply) {}
rpc CompactAllData (CompactAllDataRequest) returns (CompactAllDataReply) {}
rpc ExportDB (ExportDBRequest) returns (ExportDBReply) {}
}
}
16 changes: 16 additions & 0 deletions src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
PutReply(success = true)
} { errorMsg => PutReply(success = false, errorMsg) }

override def putMultipleVersions(req: PutMultipleVersionsRequest): Future[PutMultipleVersionsReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
require(req.versions.length == req.values.length, s"Must supply as many versions as values, got ${req.versions.length} versions vs ${req.values.length} values.")
require(req.versions.forall(_ >= 0), "Version numbers must be non-negative")
req.versions.zip(req.values).foreach { case (version, value) =>
store.put(req.key, version, value.toByteArray)
}
PutMultipleVersionsReply(success = true)
} { errorMsg => PutMultipleVersionsReply(success = false, errorMsg)}

override def delete(req: DeleteRequest): Future[DeleteReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
store.delete(req.key, req.version)
Expand All @@ -60,6 +70,12 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
DeleteMultipleVersionsReply(success = true)
} { errorMsg => DeleteMultipleVersionsReply(success = false, errorMsg) }

override def deleteAllByPrefix(req: DeleteAllByPrefixRequest): Future[DeleteAllByPrefixReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
store.withRawRocksIterator{rocksIt => store.deleteAllByPrefix(rocksIt, req.prefix)}
DeleteAllByPrefixReply(success = true)
} { errorMsg => DeleteAllByPrefixReply(success = false, errorMsg)}

override def listKeys(req: ListKeysRequest): Future[ListKeysReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val keys = store.withRawRocksIterator{rocksIt => store.listKeys(rocksIt, req.limit, req.startAfterKey)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
}
options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
val defaultColumnFamilyOptions: ColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions)
println(defaultColumnFamilyOptions)
val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions))
val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
deleteIter(versionsIterator)
}

def deleteAllByPrefix(rocksIt: RocksIterator, prefix: String): Unit = {
RocksDBStore.scanKeysOnly(rocksIt, prefix, Some(prefix)).foreach(underlying.delete)
}

def put(key: String, version: Long, value: Array[Byte]): Unit = {
requireValidKey(key)
underlying.put(VersionedKey(key, version).toString, value)
Expand Down
25 changes: 25 additions & 0 deletions src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers
assert(testData2 == reply.value)
}

"PutMultipleVersions" should "overwrite old values, leave others untouched" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(2), testData1))
client.putMultipleVersions(PutMultipleVersionsRequest(collectionA, aKey, Seq(1,2,3), Seq(testData2, testData3, testData3)))
val reply = client.getMultipleVersions(GetMultipleVersionsRequest(collectionA, aKey))
assert(reply.values.length == 4)
assert(reply.versions == Seq(3,2,1,0))
assert(reply.values == Seq(testData3, testData3, testData2, testData1))
}

it should "fail on non-existent collection" in {
val reply = client.put(PutRequest("nonExistentCollection", aKey, Some(0), testData1))
assert(!reply.success)
Expand Down Expand Up @@ -134,6 +144,21 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers
assert(testData1 == reply.value)
}

"DeleteAllByPrefix" should "delete all versions of all values matching this prefix" in {
client.put(PutRequest(collectionA, "prefixedA", Some(0), testData1))
client.put(PutRequest(collectionA, "prefixedA", Some(1), testData1))
client.put(PutRequest(collectionA, "prefixedB", Some(0), testData2))
client.put(PutRequest(collectionA, "prefixedC", Some(0), testData2))
client.put(PutRequest(collectionA, "differentKey", Some(0), testData2))
client.put(PutRequest(collectionA, "differentKey", Some(1), testData2))
client.put(PutRequest(collectionA, "yetDifferentKey", Some(0), testData2))
client.deleteAllByPrefix(DeleteAllByPrefixRequest(collectionA, "prefixed"))
val reply = client.listKeys(ListKeysRequest(collectionA))
assert(reply.keys.length == 2)
assert(reply.keys.contains("differentKey"))
assert(reply.keys.contains("yetDifferentKey"))
}

"ListKeys" should "list all keys of a collection" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
Expand Down

0 comments on commit a423a48

Please sign in to comment.