Skip to content

Commit

Permalink
Merge pull request #50 from Kezzsim/metadata_revisions
Browse files Browse the repository at this point in the history
Add support for updating `stop` and `descriptor` documents
  • Loading branch information
danielballan authored Feb 5, 2024
2 parents 2f0520b + 34f19df commit 0e5509e
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 130 deletions.
207 changes: 125 additions & 82 deletions suitcase/mongo_normalized/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
import pymongo
from ._version import get_versions

__version__ = get_versions()['version']
__version__ = get_versions()["version"]
del get_versions


class Serializer(event_model.DocumentRouter):
def __init__(self, metadatastore_db, asset_registry_db,
ignore_duplicates=True, resource_uid_unique=False,
tls=False):
def __init__(
self,
metadatastore_db,
asset_registry_db,
ignore_duplicates=True,
resource_uid_unique=False,
tls=False,
):
"""
Insert documents into MongoDB using layout v1.
Expand Down Expand Up @@ -48,22 +53,31 @@ def __init__(self, metadatastore_db, asset_registry_db,
assets_db = _get_database(asset_registry_db, tls)
else:
assets_db = asset_registry_db
self._run_start_collection = mds_db.get_collection('run_start')
self._run_start_collection_revisions = mds_db.get_collection('run_start_revisions')
self._run_stop_collection = mds_db.get_collection('run_stop')
self._event_descriptor_collection = mds_db.get_collection(
'event_descriptor')
self._event_collection = mds_db.get_collection('event')

self._resource_collection = assets_db.get_collection('resource')
self._datum_collection = assets_db.get_collection('datum')

self._collections = {'start': self._run_start_collection,
'stop': self._run_stop_collection,
'resource': self._resource_collection,
'descriptor': self._event_descriptor_collection,
'event': self._event_collection,
'datum': self._datum_collection}
self._run_start_collection = mds_db.get_collection("run_start")
self._run_start_collection_revisions = mds_db.get_collection(
"run_start_revisions"
)
self._run_stop_collection = mds_db.get_collection("run_stop")
self._run_stop_collection_revisions = mds_db.get_collection(
"run_stop_revisions"
)
self._event_descriptor_collection_revisions = mds_db.get_collection(
"event_descriptor_revisions"
)
self._event_descriptor_collection = mds_db.get_collection("event_descriptor")
self._event_collection = mds_db.get_collection("event")

self._resource_collection = assets_db.get_collection("resource")
self._datum_collection = assets_db.get_collection("datum")

self._collections = {
"start": self._run_start_collection,
"stop": self._run_stop_collection,
"resource": self._resource_collection,
"descriptor": self._event_descriptor_collection,
"event": self._event_collection,
"datum": self._datum_collection,
}

self._metadatastore_db = mds_db
self._asset_registry_db = assets_db
Expand All @@ -77,47 +91,54 @@ def _create_indexes(self):
If the index already exists, this has no effect.
"""
self._resource_collection.create_index(
'uid', unique=self._resource_uid_unique)
self._resource_collection.create_index('resource_id') # legacy
self._resource_collection.create_index("uid", unique=self._resource_uid_unique)
self._resource_collection.create_index("resource_id") # legacy
# TODO: Migrate all Resources to have a RunStart UID, and then make a
# unique index on:
# [('uid', pymongo.ASCENDING), ('run_start', pymongo.ASCENDING)]
self._datum_collection.create_index('datum_id', unique=True)
self._datum_collection.create_index('resource')
self._run_start_collection.create_index('uid', unique=True)
self._run_start_collection.create_index('scan_id', unique=False)
self._datum_collection.create_index("datum_id", unique=True)
self._datum_collection.create_index("resource")
self._run_start_collection.create_index("uid", unique=True)
self._run_start_collection.create_index("scan_id", unique=False)
self._run_start_collection.create_index(
[('scan_id', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("scan_id", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.ASCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("time", pymongo.ASCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("time", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.DESCENDING), ('scan_id', pymongo.DESCENDING)],
unique=False, background=True)
[("time", pymongo.DESCENDING), ("scan_id", pymongo.DESCENDING)],
unique=False,
background=True,
)
self._run_start_collection.create_index([("$**", "text")])
self._run_start_collection.create_index('data_session', unique=False)
self._run_start_collection.create_index('data_groups', unique=False)
self._run_stop_collection.create_index('run_start', unique=True)
self._run_stop_collection.create_index('uid', unique=True)
self._run_start_collection.create_index("data_session", unique=False)
self._run_start_collection.create_index("data_groups", unique=False)
self._run_stop_collection.create_index("run_start", unique=True)
self._run_stop_collection.create_index("uid", unique=True)
self._run_stop_collection.create_index(
[('time', pymongo.DESCENDING)], unique=False, background=True)
[("time", pymongo.DESCENDING)], unique=False, background=True
)
self._run_stop_collection.create_index([("$**", "text")])
self._event_descriptor_collection.create_index('uid', unique=True)
self._event_descriptor_collection.create_index("uid", unique=True)
self._event_descriptor_collection.create_index(
[('run_start', pymongo.DESCENDING), ('time', pymongo.DESCENDING)],
unique=False, background=True)
[("run_start", pymongo.DESCENDING), ("time", pymongo.DESCENDING)],
unique=False,
background=True,
)
self._event_descriptor_collection.create_index(
[('time', pymongo.DESCENDING)], unique=False, background=True)
[("time", pymongo.DESCENDING)], unique=False, background=True
)
self._event_descriptor_collection.create_index([("$**", "text")])
self._event_collection.create_index('uid', unique=True)
self._event_collection.create_index("uid", unique=True)
self._event_collection.create_index(
[('descriptor', pymongo.DESCENDING), ('time', pymongo.ASCENDING)],
unique=False, background=True)
[("descriptor", pymongo.DESCENDING), ("time", pymongo.ASCENDING)],
unique=False,
background=True,
)

def __call__(self, name, doc):
# Before inserting into mongo, convert any numpy objects into built-in
Expand All @@ -135,12 +156,14 @@ def _insert(self, name, doc):
f"already exists in the database. Document:\n{doc}"
) from err
else:
doc.pop('_id')
doc.pop("_id")
if name == "datum":
id_name = "datum_id"
else:
id_name = "uid"
existing = self._collections[name].find_one({id_name: doc[id_name]}, {'_id': False})
existing = self._collections[name].find_one(
{id_name: doc[id_name]}, {"_id": False}
)
if existing != doc:
raise DuplicateUniqueID(
"A document with the same unique id as this one "
Expand All @@ -159,39 +182,55 @@ def update(self, name, doc):
Parameters
----------
name: {'start'}
The type of document being updated. Currently, only 'start' is
supported and any other value here will raise NotImplementedError.
name: {'start', 'stop', 'descriptor'}
The type of document being updated.
doc: dict
The new version of the document. Its uid will be used to match it
to the current version, the one to be updated.
"""
if name == 'start':
event_model.schema_validators[event_model.DocumentNames.start].validate(doc)
current_col = self._run_start_collection
revisions_col = self._run_start_collection_revisions
old = current_col.find_one({'uid': doc['uid']})
old.pop('_id')
target_uid_docs = revisions_col.find({'document.uid': doc['uid']})
cur = target_uid_docs.sort([('revision', pymongo.DESCENDING)]).limit(1)
wrapped = dict()
try:
wrapped['revision'] = next(cur)['revision'] + 1
except StopIteration:
wrapped['revision'] = 0
wrapped['document'] = old
revisions_col.insert_one(wrapped)
current_col.find_one_and_replace({'uid': doc['uid']}, doc)
if name in {"start", "stop", "descriptor"}:
event_model.schema_validators[
getattr(event_model.DocumentNames, name)
].validate(doc)
# Keys and collection names differ slightly between start, stop and descriptor
key = "uid"
name = f"_event_{name}" if name == "descriptor" else f"_run_{name}"
current_col = getattr(self, f"{name}_collection")
revisions_col = getattr(self, f"{name}_collection_revisions")
old = current_col.find_one({key: doc[key]})
if old is None and (name == "_run_stop" or name == "_event_descriptor"):
# New stop or descriptor document : insert it
current_col.insert_one(doc)
else:
old.pop("_id")
# Field Saftey Enforcement : Prevent restricted fields from changing
restricted_fields = ["run_start"]
for field in restricted_fields:
if field in old and field in doc:
if old[field] != doc[field]:
raise ValueError(
f"Field '{field}' is restricted and cannot be changed."
)
target_uid_docs = revisions_col.find({"document.uid": doc["uid"]})
cur = target_uid_docs.sort([("revision", pymongo.DESCENDING)]).limit(1)
wrapped = dict()
try:
wrapped["revision"] = next(cur)["revision"] + 1
except StopIteration:
wrapped["revision"] = 0
wrapped["document"] = old
revisions_col.insert_one(wrapped)
current_col.find_one_and_replace({"uid": doc["uid"]}, doc)
else:
raise NotImplementedError(
f"Updating a {name} document is not currently supported. "
f"Only updates to 'start' documents are supported.")
)

def start(self, doc):
self._insert('start', doc)
self._insert("start", doc)

def descriptor(self, doc):
self._insert('descriptor', doc)
self._insert("descriptor", doc)

def resource(self, doc):
# In old databases, we know there are duplicates Resources. Until we
Expand All @@ -202,9 +241,11 @@ def resource(self, doc):
# is slow, but since there are never a large number of Resources per
# Run, this is acceptable.
if self._resource_uid_unique:
self._insert('resource', doc)
self._insert("resource", doc)
else:
existing = self._collections["resource"].find_one({'uid': doc['uid']}, {'_id': False})
existing = self._collections["resource"].find_one(
{"uid": doc["uid"]}, {"_id": False}
)
if existing is not None:
if existing != doc:
raise DuplicateUniqueID(
Expand All @@ -216,7 +257,7 @@ def resource(self, doc):
self._collections["resource"].insert_one(doc)

def event(self, doc):
self._insert('event', doc)
self._insert("event", doc)

def event_page(self, doc):
# Unpack an EventPage into Events and do the actual insert inside
Expand All @@ -230,7 +271,7 @@ def event_page(self, doc):
filled_events.append(event_method(event_doc))

def datum(self, doc):
self._insert('datum', doc)
self._insert("datum", doc)

def datum_page(self, doc):
# Unpack an DatumPage into Datum and do the actual insert inside
Expand All @@ -244,20 +285,22 @@ def datum_page(self, doc):
filled_datums.append(datum_method(datum_doc))

def stop(self, doc):
self._insert('stop', doc)
self._insert("stop", doc)

def __repr__(self):
# Display connection info in eval-able repr.
return (f'{type(self).__name__}('
f'metadatastore_db={self._metadatastore_db!r}, '
f'asset_registry_db={self._asset_registry_db!r})')
return (
f"{type(self).__name__}("
f"metadatastore_db={self._metadatastore_db!r}, "
f"asset_registry_db={self._asset_registry_db!r})"
)


def _get_database(uri, tls):
if not pymongo.uri_parser.parse_uri(uri)['database']:
if not pymongo.uri_parser.parse_uri(uri)["database"]:
raise ValueError(
f"Invalid URI: {uri} "
f"Did you forget to include a database?")
f"Invalid URI: {uri} " f"Did you forget to include a database?"
)
else:
client = pymongo.MongoClient(uri, tls=tls)
return client.get_database()
Expand Down
Loading

0 comments on commit 0e5509e

Please sign in to comment.