From a522a341cd1c1abbde4ab3106c989b1ec1e22e39 Mon Sep 17 00:00:00 2001 From: kari Barry Date: Thu, 1 Feb 2024 17:30:26 -0500 Subject: [PATCH] =?UTF-8?q?Linter=20formatting=20after=20adding=20hypothet?= =?UTF-8?q?ical=20EventStream=20descriptor=20support=20=F0=9F=AB=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- suitcase/mongo_normalized/__init__.py | 178 +++++++++++++++----------- 1 file changed, 101 insertions(+), 77 deletions(-) diff --git a/suitcase/mongo_normalized/__init__.py b/suitcase/mongo_normalized/__init__.py index 33e6963..4e0085b 100644 --- a/suitcase/mongo_normalized/__init__.py +++ b/suitcase/mongo_normalized/__init__.py @@ -2,14 +2,19 @@ import pymongo from ._version import get_versions -__version__ = get_versions()['version'] +__version__ = get_versions()["version"] del get_versions class Serializer(event_model.DocumentRouter): - def __init__(self, metadatastore_db, asset_registry_db, - ignore_duplicates=True, resource_uid_unique=False, - tls=False): + def __init__( + self, + metadatastore_db, + asset_registry_db, + ignore_duplicates=True, + resource_uid_unique=False, + tls=False, + ): """ Insert documents into MongoDB using layout v1. @@ -48,23 +53,28 @@ def __init__(self, metadatastore_db, asset_registry_db, assets_db = _get_database(asset_registry_db, tls) else: assets_db = asset_registry_db - self._run_start_collection = mds_db.get_collection('run_start') - self._run_start_collection_revisions = mds_db.get_collection('run_start_revisions') - self._run_stop_collection = mds_db.get_collection('run_stop') - self._run_stop_collection_revisions = mds_db.get_collection('run_stop_revisions') - self._event_descriptor_collection = mds_db.get_collection( - 'event_descriptor') - self._event_collection = mds_db.get_collection('event') - - self._resource_collection = assets_db.get_collection('resource') - self._datum_collection = assets_db.get_collection('datum') - - self._collections = {'start': self._run_start_collection, - 'stop': self._run_stop_collection, - 'resource': self._resource_collection, - 'descriptor': self._event_descriptor_collection, - 'event': self._event_collection, - 'datum': self._datum_collection} + self._run_start_collection = mds_db.get_collection("run_start") + self._run_start_collection_revisions = mds_db.get_collection( + "run_start_revisions" + ) + self._run_stop_collection = mds_db.get_collection("run_stop") + self._run_stop_collection_revisions = mds_db.get_collection( + "run_stop_revisions" + ) + self._event_descriptor_collection = mds_db.get_collection("event_descriptor") + self._event_collection = mds_db.get_collection("event") + + self._resource_collection = assets_db.get_collection("resource") + self._datum_collection = assets_db.get_collection("datum") + + self._collections = { + "start": self._run_start_collection, + "stop": self._run_stop_collection, + "resource": self._resource_collection, + "descriptor": self._event_descriptor_collection, + "event": self._event_collection, + "datum": self._datum_collection, + } self._metadatastore_db = mds_db self._asset_registry_db = assets_db @@ -78,47 +88,54 @@ def _create_indexes(self): If the index already exists, this has no effect. """ - self._resource_collection.create_index( - 'uid', unique=self._resource_uid_unique) - self._resource_collection.create_index('resource_id') # legacy + self._resource_collection.create_index("uid", unique=self._resource_uid_unique) + self._resource_collection.create_index("resource_id") # legacy # TODO: Migrate all Resources to have a RunStart UID, and then make a # unique index on: # [('uid', pymongo.ASCENDING), ('run_start', pymongo.ASCENDING)] - self._datum_collection.create_index('datum_id', unique=True) - self._datum_collection.create_index('resource') - self._run_start_collection.create_index('uid', unique=True) - self._run_start_collection.create_index('scan_id', unique=False) + self._datum_collection.create_index("datum_id", unique=True) + self._datum_collection.create_index("resource") + self._run_start_collection.create_index("uid", unique=True) + self._run_start_collection.create_index("scan_id", unique=False) self._run_start_collection.create_index( - [('scan_id', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)], - unique=True) + [("scan_id", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True + ) self._run_start_collection.create_index( - [('time', pymongo.ASCENDING), ('_id', pymongo.DESCENDING)], - unique=True) + [("time", pymongo.ASCENDING), ("_id", pymongo.DESCENDING)], unique=True + ) self._run_start_collection.create_index( - [('time', pymongo.DESCENDING), ('_id', pymongo.DESCENDING)], - unique=True) + [("time", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)], unique=True + ) self._run_start_collection.create_index( - [('time', pymongo.DESCENDING), ('scan_id', pymongo.DESCENDING)], - unique=False, background=True) + [("time", pymongo.DESCENDING), ("scan_id", pymongo.DESCENDING)], + unique=False, + background=True, + ) self._run_start_collection.create_index([("$**", "text")]) - self._run_start_collection.create_index('data_session', unique=False) - self._run_start_collection.create_index('data_groups', unique=False) - self._run_stop_collection.create_index('run_start', unique=True) - self._run_stop_collection.create_index('uid', unique=True) + self._run_start_collection.create_index("data_session", unique=False) + self._run_start_collection.create_index("data_groups", unique=False) + self._run_stop_collection.create_index("run_start", unique=True) + self._run_stop_collection.create_index("uid", unique=True) self._run_stop_collection.create_index( - [('time', pymongo.DESCENDING)], unique=False, background=True) + [("time", pymongo.DESCENDING)], unique=False, background=True + ) self._run_stop_collection.create_index([("$**", "text")]) - self._event_descriptor_collection.create_index('uid', unique=True) + self._event_descriptor_collection.create_index("uid", unique=True) self._event_descriptor_collection.create_index( - [('run_start', pymongo.DESCENDING), ('time', pymongo.DESCENDING)], - unique=False, background=True) + [("run_start", pymongo.DESCENDING), ("time", pymongo.DESCENDING)], + unique=False, + background=True, + ) self._event_descriptor_collection.create_index( - [('time', pymongo.DESCENDING)], unique=False, background=True) + [("time", pymongo.DESCENDING)], unique=False, background=True + ) self._event_descriptor_collection.create_index([("$**", "text")]) - self._event_collection.create_index('uid', unique=True) + self._event_collection.create_index("uid", unique=True) self._event_collection.create_index( - [('descriptor', pymongo.DESCENDING), ('time', pymongo.ASCENDING)], - unique=False, background=True) + [("descriptor", pymongo.DESCENDING), ("time", pymongo.ASCENDING)], + unique=False, + background=True, + ) def __call__(self, name, doc): # Before inserting into mongo, convert any numpy objects into built-in @@ -136,12 +153,14 @@ def _insert(self, name, doc): f"already exists in the database. Document:\n{doc}" ) from err else: - doc.pop('_id') + doc.pop("_id") if name == "datum": id_name = "datum_id" else: id_name = "uid" - existing = self._collections[name].find_one({id_name: doc[id_name]}, {'_id': False}) + existing = self._collections[name].find_one( + {id_name: doc[id_name]}, {"_id": False} + ) if existing != doc: raise DuplicateUniqueID( "A document with the same unique id as this one " @@ -166,39 +185,40 @@ def update(self, name, doc): The new version of the document. Its uid will be used to match it to the current version, the one to be updated. """ - if name in ['start', 'stop', 'descriptor']: + if name in ["start", "stop", "descriptor"]: # Keys and collection names differ slightly between start, stop and descriptor - key = 'uid' if name == 'start' else 'run_start' - name = f'_event_{name}' if name == 'descriptor' else f'_run_{name}' + key = "uid" if name == "start" else "run_start" + name = f"_event_{name}" if name == "descriptor" else f"_run_{name}" event_model.schema_validators[event_model.DocumentNames.start].validate(doc) - current_col = getattr(self, f'{name}_collection') - revisions_col = getattr(self, f'{name}_collection_revisions') + current_col = getattr(self, f"{name}_collection") + revisions_col = getattr(self, f"{name}_collection_revisions") old = current_col.find_one({key: doc[key]}) - if (old is None and (name == '_run_stop' or name == '_event_descriptor')): + if old is None and (name == "_run_stop" or name == "_event_descriptor"): # New stop or descriptor document : insert it current_col.insert_one(doc) else: - old.pop('_id') - target_uid_docs = revisions_col.find({'document.uid': doc[key]}) - cur = target_uid_docs.sort([('revision', pymongo.DESCENDING)]).limit(1) + old.pop("_id") + target_uid_docs = revisions_col.find({"document.uid": doc[key]}) + cur = target_uid_docs.sort([("revision", pymongo.DESCENDING)]).limit(1) wrapped = dict() try: - wrapped['revision'] = next(cur)['revision'] + 1 + wrapped["revision"] = next(cur)["revision"] + 1 except StopIteration: - wrapped['revision'] = 0 - wrapped['document'] = old + wrapped["revision"] = 0 + wrapped["document"] = old revisions_col.insert_one(wrapped) - current_col.find_one_and_replace({key : doc[key]}, doc) + current_col.find_one_and_replace({key: doc[key]}, doc) else: raise NotImplementedError( f"Updating a {name} document is not currently supported. " - f"Only updates to 'start' documents are supported.") + f"Only updates to 'start' documents are supported." + ) def start(self, doc): - self._insert('start', doc) + self._insert("start", doc) def descriptor(self, doc): - self._insert('descriptor', doc) + self._insert("descriptor", doc) def resource(self, doc): # In old databases, we know there are duplicates Resources. Until we @@ -209,9 +229,11 @@ def resource(self, doc): # is slow, but since there are never a large number of Resources per # Run, this is acceptable. if self._resource_uid_unique: - self._insert('resource', doc) + self._insert("resource", doc) else: - existing = self._collections["resource"].find_one({'uid': doc['uid']}, {'_id': False}) + existing = self._collections["resource"].find_one( + {"uid": doc["uid"]}, {"_id": False} + ) if existing is not None: if existing != doc: raise DuplicateUniqueID( @@ -223,7 +245,7 @@ def resource(self, doc): self._collections["resource"].insert_one(doc) def event(self, doc): - self._insert('event', doc) + self._insert("event", doc) def event_page(self, doc): # Unpack an EventPage into Events and do the actual insert inside @@ -237,7 +259,7 @@ def event_page(self, doc): filled_events.append(event_method(event_doc)) def datum(self, doc): - self._insert('datum', doc) + self._insert("datum", doc) def datum_page(self, doc): # Unpack an DatumPage into Datum and do the actual insert inside @@ -251,20 +273,22 @@ def datum_page(self, doc): filled_datums.append(datum_method(datum_doc)) def stop(self, doc): - self._insert('stop', doc) + self._insert("stop", doc) def __repr__(self): # Display connection info in eval-able repr. - return (f'{type(self).__name__}(' - f'metadatastore_db={self._metadatastore_db!r}, ' - f'asset_registry_db={self._asset_registry_db!r})') + return ( + f"{type(self).__name__}(" + f"metadatastore_db={self._metadatastore_db!r}, " + f"asset_registry_db={self._asset_registry_db!r})" + ) def _get_database(uri, tls): - if not pymongo.uri_parser.parse_uri(uri)['database']: + if not pymongo.uri_parser.parse_uri(uri)["database"]: raise ValueError( - f"Invalid URI: {uri} " - f"Did you forget to include a database?") + f"Invalid URI: {uri} " f"Did you forget to include a database?" + ) else: client = pymongo.MongoClient(uri, tls=tls) return client.get_database()