Skip to content

Commit

Permalink
remove dependency on primary index by converting queries that are agi…
Browse files Browse the repository at this point in the history
…qanst the id field to gets. (#335)

remove dependency on primary index by converting queries that are
agiqanst the id field to gets. These are queries for job documents which
aren't even all that large. By changing them from queries to data
fetches they are a lot more efficient and don't require either a special
index or a primary index.
  • Loading branch information
randytpierce authored Feb 21, 2024
2 parents 7f4539c + 375a4be commit 1d7240a
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 36 deletions.
10 changes: 3 additions & 7 deletions src/vxingest/ctc_to_cb/run_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,10 @@ def runit(self, args, log_queue: Queue, log_configurer: Callable[[Queue], None])
self.cb_credentials = self.get_credentials(self.load_spec)
# establish connections to cb, collection
self.connect_cb()
bucket = self.load_spec["cb_connection"]["bucket"]
scope = self.load_spec["cb_connection"]["scope"]
collection = self.load_spec["cb_connection"]["collection"]

# load the ingest document ids into the load_spec (this might be redundant)
stmnt = f'Select ingest_document_ids from `{bucket}`.{scope}.{collection} where meta().id = "{self.job_document_id}"'
result = self.cluster.query(stmnt)
self.load_spec["ingest_document_ids"] = list(result)[0][
ingest_document_result = self.collection.get(self.job_document_id)
ingest_document = ingest_document_result.content_as[dict]
self.load_spec["ingest_document_ids"] = ingest_document[
"ingest_document_ids"
]
# put all the ingest documents into the load_spec too
Expand Down
14 changes: 6 additions & 8 deletions src/vxingest/grib2_to_cb/run_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,21 @@ def runit(self, args, log_queue: Queue, log_configurer: Callable[[Queue], None])
# establish connections to cb, collection
self.connect_cb()
# load the ingest document ids into the load_spec (this might be redundant)
stmnt = f"Select ingest_document_ids from `{self.cb_credentials['bucket']}`.{self.cb_credentials['scope']}.{self.cb_credentials['collection']} where meta().id = \"{self.job_document_id}\""
result = self.cluster.query(stmnt)
self.load_spec["ingest_document_ids"] = list(result)[0][
ingest_document_result = self.collection.get(self.job_document_id)
ingest_document = ingest_document_result.content_as[dict]
self.load_spec["ingest_document_ids"] = ingest_document[
"ingest_document_ids"
]

# put all the ingest documents into the load_spec too
self.load_spec["ingest_documents"] = {}
for _id in self.load_spec["ingest_document_ids"]:
self.load_spec["ingest_documents"][_id] = self.collection.get(
_id
).content_as[dict]
# load the fmask and input_data_path into the load_spec
stmnt = f"Select file_mask, input_data_path from `{self.cb_credentials['bucket']}`.{self.cb_credentials['scope']}.{self.cb_credentials['collection']} where meta().id = \"{self.job_document_id}\""
result = self.cluster.query(stmnt)
result_list = list(result)
self.fmask = result_list[0]["file_mask"]
self.path = result_list[0]["input_data_path"]
self.fmask = ingest_document["file_mask"]
self.path = ingest_document["input_data_path"]
self.load_spec["fmask"] = self.fmask
self.load_spec["input_data_path"] = self.path
# stash the load_job in the load_spec
Expand Down
6 changes: 2 additions & 4 deletions src/vxingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ def get_job_docs(
"LOWER(META().id) as name, "
"run_priority, "
"offset_minutes, "
"LOWER(subType) as sub_type, "
"input_data_path as input_data_path "
"LOWER(subType) as sub_type "
f"FROM {creds['cb_bucket']}.{creds['cb_scope']}.{creds['cb_collection']} "
f"WHERE id='{job_id}' "
"AND (type = 'JOB-TEST' or type = 'JOB') "
Expand All @@ -247,8 +246,7 @@ def get_job_docs(
"LOWER(META().id) as name, "
"run_priority, "
"offset_minutes, "
"LOWER(subType) as sub_type, "
"input_data_path as input_data_path "
"LOWER(subType) as sub_type "
f"FROM {creds['cb_bucket']}.{creds['cb_scope']}.{creds['cb_collection']} "
"LET millis = ROUND(CLOCK_MILLIS()), "
"sched = SPLIT(schedule,' '), "
Expand Down
15 changes: 6 additions & 9 deletions src/vxingest/netcdf_to_cb/run_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ def runit(self, args, log_queue: Queue, log_configurer: Callable[[Queue], None])
# establish connections to cb, collection
self.connect_cb()
logger.info("connected to cb")
collection = self.load_spec["cb_connection"]["collection"]
bucket = self.load_spec["cb_connection"]["bucket"]
scope = self.load_spec["cb_connection"]["scope"]
collection = self.load_spec["cb_connection"]["collection"]
# load the ingest document ids into the load_spec (this might be redundant)
stmnt = f'Select ingest_document_ids from `{bucket}`.{scope}.{collection} where meta().id = "{self.job_document_id}"'
result = self.cluster.query(stmnt)
self.load_spec["ingest_document_ids"] = list(result)[0][
ingest_document_result = self.collection.get(self.job_document_id)
ingest_document = ingest_document_result.content_as[dict]
self.load_spec["ingest_document_ids"] = ingest_document[
"ingest_document_ids"
]
# put all the ingest documents into the load_spec too
Expand All @@ -187,11 +187,8 @@ def runit(self, args, log_queue: Queue, log_configurer: Callable[[Queue], None])
_id
).content_as[dict]
# load the fmask and input_data_path into the load_spec
stmnt = f'Select file_mask, input_data_path from `{bucket}`.{scope}.{collection} where meta().id = "{self.job_document_id}"'
result = self.cluster.query(stmnt)
result_list = list(result)
self.fmask = result_list[0]["file_mask"]
self.path = result_list[0]["input_data_path"]
self.fmask = ingest_document["file_mask"]
self.path = ingest_document["input_data_path"]
self.load_spec["fmask"] = self.fmask
self.load_spec["input_data_path"] = self.path
# stash the load_job in the load_spec
Expand Down
10 changes: 3 additions & 7 deletions src/vxingest/partial_sums_to_cb/run_ingest_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,10 @@ def runit(self, args, log_queue: Queue, log_configurer: Callable[[Queue], None])
self.cb_credentials = self.get_credentials(self.load_spec)
# establish connections to cb, collection
self.connect_cb()
bucket = self.load_spec["cb_connection"]["bucket"]
scope = self.load_spec["cb_connection"]["scope"]
collection = self.load_spec["cb_connection"]["collection"]

# load the ingest document ids into the load_spec (this might be redundant)
stmnt = f'Select ingest_document_ids from `{bucket}`.{scope}.{collection} where meta().id = "{self.job_document_id}"'
result = self.cluster.query(stmnt)
self.load_spec["ingest_document_ids"] = list(result)[0][
ingest_document_result = self.collection.get(self.job_document_id)
ingest_document = ingest_document_result.content_as[dict]
self.load_spec["ingest_document_ids"] = ingest_document[
"ingest_document_ids"
]
# put all the ingest documents into the load_spec too
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ WHEN ov.name = mv.name
END
) FOR mv IN m.m0data
END
WHERE m.mfve = o.ofve
WHERE m.mfve = o.ofve

0 comments on commit 1d7240a

Please sign in to comment.