-
Notifications
You must be signed in to change notification settings - Fork 11
Architecture: engine
Engine contains several components, mainly connected by a RabbitMQ message bus:
(NOTE: This can be edited - grab the raw file from /docs/Engine Layout.drawio.svg and edit it in https://app.diagrams.net/)
API is mainly a bridge for sm-graphql to talk with the Python codebase.
- Most simple requests are handled directly (e.g. optical image upload and calculating isotopic peaks for webapp's Diagnostics view)
- Most complex requests are added to one of the processing queues to be handled by a daemon. Sometimes some pre-processing is done in API. E.g. Dataset creation - API saves the DB record before enqueuing an annotation message.
- "Update" requests can optionally wait for an ElasticSearch refresh cycle for the Dataset record by passing
async_es_update=True
- HTTP requests are processed in parallel due to the "cherrypy" backend, but API is still often slow enough to be a bottleneck. E.g. if you rename a group that contains 500 datasets, GraphQL will send 500 update requests to API and some of them will likely timeout.
The Update Daemon performs several functions, all are triggered by Update Queue messages:
- (Re-)index a dataset and annotations in ElasticSearch
- Partially update a dataset and annotations in ElasticSearch, e.g. to change a metadata field without adding/removing any annotations. This is much faster than a full reindex
- Classify off-sample images. This downloads images from unclassified annotations, sends them to the off-sample daemon, then writes the results back to the database.
The Update Daemon runs multiple threads (usually 4), each can process 1 message at a time.
Some
When a message is added to the annotate queue:
- Cluster Auto-Start detects the message without consuming it, and starts the Spark cluster if it's not already running
- The Spark cluster starts Annotate Daemon
- Annotate daemon processes the message:
- it sets a
cluster-busy: yes
lock in Redis - it fetches the dataset information from postgres/S3
- it sets the dataset status to "Processing" (including sending an
UPDATE
message to the Update queue) - it runs the annotation pipeline against the dataset
- it saves the results to postgres/S3
- it adds an
INDEX
message and aCLASSIFY_OFFSAMPLE
message to the Update queue - it releases the lock by setting
cluster-busy: no
in Redis
- it sets a
- Cluster Auto-Start waits for the queue to become empty AND for
cluster-busy: no
then destroys the Spark cluster.
When running in a development docker setup, Cluster Auto-Start isn't used at all. Annotate Daemon runs continuously.
This daemon listens to the Lithops Queue and processes messages one-at-a-time. It is parallelized by running multiple instances of the Daemon, managed via supervisor.
Lithops has historically had resource/memory leak issues, so this daemon is configured to kill itself after every failed annotation, or after running for 24 hours. Supervisor restarts it after it exits.
This queue is for signalling to graphql that a dataset has changed, so that webapp can show notifications and update data needing to refresh. All modules that can modify the dataset (API, Update Daemon, Lithops Daemon, Annotate Daemon) can potentially create messages on this queue.
The GraphQL side is handled in graphql/src/dataset/controller/Subscription.ts
. Because ElasticSearch updates are async, graphql will wait for ES to finish updating before notifying webapp. It does this by repeatedly querying ElasticSearch until it sees that its ES status matches the status in the message.
Update Daemon sends it batches of base-64-encoded images in a JSON request, and it returns their off-sample classification, e.g. {"label": "on", "prob": 0.123}
which indicates that the image is classified as on-sample and the raw model output was 0.123 (values <0.5 are on-sample, >0.5 are off-sample).
Off-Sample Service does not connect directly to any other database or storage. It just responds to HTTP requests.