Skip to content

Commit

Permalink
Linter formatting after adding hypothetical EventStream descriptor su…
Browse files Browse the repository at this point in the history
…pport 🫧
  • Loading branch information
Kezzsim committed Feb 1, 2024
1 parent 8737d3a commit a522a34
Showing 1 changed file with 101 additions and 77 deletions.
178 changes: 101 additions & 77 deletions suitcase/mongo_normalized/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
import pymongo
from ._version import get_versions

__version__ = get_versions()['version']
__version__ = get_versions()["version"]
del get_versions


class Serializer(event_model.DocumentRouter):
def __init__(self, metadatastore_db, asset_registry_db,
ignore_duplicates=True, resource_uid_unique=False,
tls=False):
def __init__(
self,
metadatastore_db,
asset_registry_db,
ignore_duplicates=True,
resource_uid_unique=False,
tls=False,
):
"""
Insert documents into MongoDB using layout v1.
Expand Down Expand Up @@ -48,23 +53,28 @@ def __init__(self, metadatastore_db, asset_registry_db,
assets_db = _get_database(asset_registry_db, tls)
else:
assets_db = asset_registry_db
self._run_start_collection = mds_db.get_collection('run_start')
self._run_start_collection_revisions = mds_db.get_collection('run_start_revisions')
self._run_stop_collection = mds_db.get_collection('run_stop')
self._run_stop_collection_revisions = mds_db.get_collection('run_stop_revisions')
self._event_descriptor_collection = mds_db.get_collection(
'event_descriptor')
self._event_collection = mds_db.get_collection('event')

self._resource_collection = assets_db.get_collection('resource')
self._datum_collection = assets_db.get_collection('datum')

self._collections = {'start': self._run_start_collection,
'stop': self._run_stop_collection,
'resource': self._resource_collection,
'descriptor': self._event_descriptor_collection,
'event': self._event_collection,
'datum': self._datum_collection}
self._run_start_collection = mds_db.get_collection("run_start")
self._run_start_collection_revisions = mds_db.get_collection(
"run_start_revisions"
)
self._run_stop_collection = mds_db.get_collection("run_stop")
self._run_stop_collection_revisions = mds_db.get_collection(
"run_stop_revisions"
)
self._event_descriptor_collection = mds_db.get_collection("event_descriptor")
self._event_collection = mds_db.get_collection("event")

self._resource_collection = assets_db.get_collection("resource")
self._datum_collection = assets_db.get_collection("datum")

self._collections = {
"start": self._run_start_collection,
"stop": self._run_stop_collection,
"resource": self._resource_collection,
"descriptor": self._event_descriptor_collection,
"event": self._event_collection,
"datum": self._datum_collection,
}

self._metadatastore_db = mds_db
self._asset_registry_db = assets_db
Expand All @@ -78,47 +88,54 @@ def _create_indexes(self):
If the index already exists, this has no effect.
"""
self._resource_collection.create_index(
'uid', unique=self._resource_uid_unique)
self._resource_collection.create_index('resource_id') # legacy
self._resource_collection.create_index("uid", unique=self._resource_uid_unique)
self._resource_collection.create_index("resource_id") # legacy
# TODO: Migrate all Resources to have a RunStart UID, and then make a
# unique index on:
# [('uid', pymongo.ASCENDING), ('run_start', pymongo.ASCENDING)]
self._datum_collection.create_index('datum_id', unique=True)
self._datum_collection.create_index('resource')
self._run_start_collection.create_index('uid', unique=True)
self._run_start_collection.create_index('scan_id', unique=False)
self._datum_collection.create_index("datum_id", unique=True)
self._datum_collection.create_index("resource")
self._run_start_collection.create_index("uid", unique=True)
self._run_start_collection.create_index("scan_id", unique=False)
self._run_start_collection.create_index(
[('scan_id', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("scan_id", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.ASCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("time", pymongo.ASCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)],
unique=True)
[("time", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True
)
self._run_start_collection.create_index(
[('time', pymongo.DESCENDING), ('scan_id', pymongo.DESCENDING)],
unique=False, background=True)
[("time", pymongo.DESCENDING), ("scan_id", pymongo.DESCENDING)],
unique=False,
background=True,
)
self._run_start_collection.create_index([("$**", "text")])
self._run_start_collection.create_index('data_session', unique=False)
self._run_start_collection.create_index('data_groups', unique=False)
self._run_stop_collection.create_index('run_start', unique=True)
self._run_stop_collection.create_index('uid', unique=True)
self._run_start_collection.create_index("data_session", unique=False)
self._run_start_collection.create_index("data_groups", unique=False)
self._run_stop_collection.create_index("run_start", unique=True)
self._run_stop_collection.create_index("uid", unique=True)
self._run_stop_collection.create_index(
[('time', pymongo.DESCENDING)], unique=False, background=True)
[("time", pymongo.DESCENDING)], unique=False, background=True
)
self._run_stop_collection.create_index([("$**", "text")])
self._event_descriptor_collection.create_index('uid', unique=True)
self._event_descriptor_collection.create_index("uid", unique=True)
self._event_descriptor_collection.create_index(
[('run_start', pymongo.DESCENDING), ('time', pymongo.DESCENDING)],
unique=False, background=True)
[("run_start", pymongo.DESCENDING), ("time", pymongo.DESCENDING)],
unique=False,
background=True,
)
self._event_descriptor_collection.create_index(
[('time', pymongo.DESCENDING)], unique=False, background=True)
[("time", pymongo.DESCENDING)], unique=False, background=True
)
self._event_descriptor_collection.create_index([("$**", "text")])
self._event_collection.create_index('uid', unique=True)
self._event_collection.create_index("uid", unique=True)
self._event_collection.create_index(
[('descriptor', pymongo.DESCENDING), ('time', pymongo.ASCENDING)],
unique=False, background=True)
[("descriptor", pymongo.DESCENDING), ("time", pymongo.ASCENDING)],
unique=False,
background=True,
)

def __call__(self, name, doc):
# Before inserting into mongo, convert any numpy objects into built-in
Expand All @@ -136,12 +153,14 @@ def _insert(self, name, doc):
f"already exists in the database. Document:\n{doc}"
) from err
else:
doc.pop('_id')
doc.pop("_id")
if name == "datum":
id_name = "datum_id"
else:
id_name = "uid"
existing = self._collections[name].find_one({id_name: doc[id_name]}, {'_id': False})
existing = self._collections[name].find_one(
{id_name: doc[id_name]}, {"_id": False}
)
if existing != doc:
raise DuplicateUniqueID(
"A document with the same unique id as this one "
Expand All @@ -166,39 +185,40 @@ def update(self, name, doc):
The new version of the document. Its uid will be used to match it
to the current version, the one to be updated.
"""
if name in ['start', 'stop', 'descriptor']:
if name in ["start", "stop", "descriptor"]:
# Keys and collection names differ slightly between start, stop and descriptor
key = 'uid' if name == 'start' else 'run_start'
name = f'_event_{name}' if name == 'descriptor' else f'_run_{name}'
key = "uid" if name == "start" else "run_start"
name = f"_event_{name}" if name == "descriptor" else f"_run_{name}"
event_model.schema_validators[event_model.DocumentNames.start].validate(doc)
current_col = getattr(self, f'{name}_collection')
revisions_col = getattr(self, f'{name}_collection_revisions')
current_col = getattr(self, f"{name}_collection")
revisions_col = getattr(self, f"{name}_collection_revisions")
old = current_col.find_one({key: doc[key]})
if (old is None and (name == '_run_stop' or name == '_event_descriptor')):
if old is None and (name == "_run_stop" or name == "_event_descriptor"):
# New stop or descriptor document : insert it
current_col.insert_one(doc)
else:
old.pop('_id')
target_uid_docs = revisions_col.find({'document.uid': doc[key]})
cur = target_uid_docs.sort([('revision', pymongo.DESCENDING)]).limit(1)
old.pop("_id")
target_uid_docs = revisions_col.find({"document.uid": doc[key]})
cur = target_uid_docs.sort([("revision", pymongo.DESCENDING)]).limit(1)
wrapped = dict()
try:
wrapped['revision'] = next(cur)['revision'] + 1
wrapped["revision"] = next(cur)["revision"] + 1
except StopIteration:
wrapped['revision'] = 0
wrapped['document'] = old
wrapped["revision"] = 0
wrapped["document"] = old
revisions_col.insert_one(wrapped)
current_col.find_one_and_replace({key : doc[key]}, doc)
current_col.find_one_and_replace({key: doc[key]}, doc)
else:
raise NotImplementedError(
f"Updating a {name} document is not currently supported. "
f"Only updates to 'start' documents are supported.")
f"Only updates to 'start' documents are supported."
)

def start(self, doc):
self._insert('start', doc)
self._insert("start", doc)

def descriptor(self, doc):
self._insert('descriptor', doc)
self._insert("descriptor", doc)

def resource(self, doc):
# In old databases, we know there are duplicates Resources. Until we
Expand All @@ -209,9 +229,11 @@ def resource(self, doc):
# is slow, but since there are never a large number of Resources per
# Run, this is acceptable.
if self._resource_uid_unique:
self._insert('resource', doc)
self._insert("resource", doc)
else:
existing = self._collections["resource"].find_one({'uid': doc['uid']}, {'_id': False})
existing = self._collections["resource"].find_one(
{"uid": doc["uid"]}, {"_id": False}
)
if existing is not None:
if existing != doc:
raise DuplicateUniqueID(
Expand All @@ -223,7 +245,7 @@ def resource(self, doc):
self._collections["resource"].insert_one(doc)

def event(self, doc):
self._insert('event', doc)
self._insert("event", doc)

def event_page(self, doc):
# Unpack an EventPage into Events and do the actual insert inside
Expand All @@ -237,7 +259,7 @@ def event_page(self, doc):
filled_events.append(event_method(event_doc))

def datum(self, doc):
self._insert('datum', doc)
self._insert("datum", doc)

def datum_page(self, doc):
# Unpack an DatumPage into Datum and do the actual insert inside
Expand All @@ -251,20 +273,22 @@ def datum_page(self, doc):
filled_datums.append(datum_method(datum_doc))

def stop(self, doc):
self._insert('stop', doc)
self._insert("stop", doc)

def __repr__(self):
# Display connection info in eval-able repr.
return (f'{type(self).__name__}('
f'metadatastore_db={self._metadatastore_db!r}, '
f'asset_registry_db={self._asset_registry_db!r})')
return (
f"{type(self).__name__}("
f"metadatastore_db={self._metadatastore_db!r}, "
f"asset_registry_db={self._asset_registry_db!r})"
)


def _get_database(uri, tls):
if not pymongo.uri_parser.parse_uri(uri)['database']:
if not pymongo.uri_parser.parse_uri(uri)["database"]:
raise ValueError(
f"Invalid URI: {uri} "
f"Did you forget to include a database?")
f"Invalid URI: {uri} " f"Did you forget to include a database?"
)
else:
client = pymongo.MongoClient(uri, tls=tls)
return client.get_database()
Expand Down

0 comments on commit a522a34

Please sign in to comment.