diff --git a/cumulus_etl/loaders/fhir/bulk_export.py b/cumulus_etl/loaders/fhir/bulk_export.py index fdd1f6b..e87cfe1 100644 --- a/cumulus_etl/loaders/fhir/bulk_export.py +++ b/cumulus_etl/loaders/fhir/bulk_export.py @@ -2,7 +2,6 @@ import asyncio import datetime -import json import os import urllib.parse from collections.abc import Callable @@ -162,17 +161,11 @@ async def export(self) -> None: except ValueError: pass # server gave us a bad timestamp, ignore it :shrug: - # Were there any server-side errors during the export? - # The spec acknowledges that "error" is perhaps misleading for an array that can contain - # info messages. - error_texts, warning_texts = await self._gather_all_messages(response_json.get("error", [])) - if warning_texts: - print("\n - ".join(["Messages from server:", *warning_texts])) - # Download all the files print("Bulk FHIR export finished, now downloading resources…") - files = response_json.get("output", []) - await self._download_all_ndjson_files(files) + await self._download_all_ndjson_files(response_json, "output") + await self._download_all_ndjson_files(response_json, "error") + await self._download_all_ndjson_files(response_json, "deleted") self._log.export_complete() @@ -181,6 +174,11 @@ async def export(self) -> None: # files up there, so the user could try to manually recover. await self._delete_export(poll_location) + # Were there any server-side errors during the export? + error_texts, warning_texts = self._gather_all_messages() + if warning_texts: + print("\n - ".join(["Messages from server:", *warning_texts])) + # Make sure we're fully done before we bail because the server told us the export has # issues. We still want to DELETE the export in this case. And we still want to download # all the files the server DID give us. Servers may have lots of ignorable errors that @@ -260,7 +258,6 @@ async def _request_with_retries( headers: dict | None = None, target_status_code: int = 200, method: str = "GET", - log_begin: Callable[[], None] | None = None, log_request: Callable[[], None] | None = None, log_progress: Callable[[httpx.Response], None] | None = None, log_error: Callable[[Exception], None] | None = None, @@ -275,7 +272,6 @@ async def _request_with_retries( :param headers: headers for request :param target_status_code: retries until this status code is returned :param method: HTTP method to request - :param log_begin: method to call to report that we are about to start requests :param log_request: method to call to report every request attempt :param log_progress: method to call to report a successful request but not yet done :param log_error: method to call to report request failures @@ -291,9 +287,6 @@ async def _request_with_retries( max_errors = len(error_retry_minutes) num_errors = 0 - if log_begin: - log_begin() - # Actually loop, attempting the request multiple times as needed while self._total_wait_time < self._TIMEOUT_THRESHOLD: if log_request: @@ -361,57 +354,43 @@ async def _request_with_retries( log_error(exc) raise exc - async def _gather_all_messages(self, error_list: list[dict]) -> (list[str], list[str]): + def _gather_all_messages(self) -> (list[str], list[str]): """ - Downloads all outcome message ndjson files from the bulk export server. + Parses all error/info ndjson files from the bulk export server. - :param error_list: info about each error file from the bulk FHIR server :returns: (error messages, non-fatal messages) """ - coroutines = [] - for error in error_list: - # per spec as of writing, OperationOutcome is the only allowed type - if error.get("type") == "OperationOutcome": - coroutines.append( - self._request_with_delay_status( - error["url"], - headers={"Accept": "application/fhir+ndjson"}, - retry_errors=True, - log_begin=partial( - self._log.download_request, - error["url"], - "error", - error["type"], - ), - log_error=partial(self._log.download_error, error["url"]), - ), - ) - responses = await asyncio.gather(*coroutines) + # The spec acknowledges that "error" is perhaps misleading for an array that can contain + # info messages. + error_folder = store.Root(f"{self._destination}/error") + outcomes = common.read_resource_ndjson(error_folder, "OperationOutcome") fatal_messages = [] info_messages = [] - for response in responses: - # Create a list of OperationOutcomes - outcomes = [json.loads(x) for x in response.text.split("\n") if x] - self._log.download_complete(response.url, len(outcomes), len(response.text)) - for outcome in outcomes: - for issue in outcome.get("issue", []): - text = issue.get("diagnostics") - text = text or issue.get("details", {}).get("text") - text = text or issue.get("code") # code is required at least - if issue.get("severity") in ("fatal", "error"): - fatal_messages.append(text) - else: - info_messages.append(text) + for outcome in outcomes: + for issue in outcome.get("issue", []): + text = issue.get("diagnostics") + text = text or issue.get("details", {}).get("text") + text = text or issue.get("code") # code is required at least + if issue.get("severity") in ("fatal", "error"): + fatal_messages.append(text) + else: + info_messages.append(text) return fatal_messages, info_messages - async def _download_all_ndjson_files(self, files: list[dict]) -> None: + async def _download_all_ndjson_files(self, resource_json: dict, item_type: str) -> None: """ Downloads all exported ndjson files from the bulk export server. - :param files: info about each file from the bulk FHIR server + :param resource_json: the status response from bulk FHIR server + :param item_type: which type of object to download: output, error, or deleted """ + files = resource_json.get(item_type, []) + + # Use the same (sensible) download folder layout as bulk-data-client: + subfolder = "" if item_type == "output" else item_type + resource_counts = {} # how many of each resource we've seen coroutines = [] for file in files: @@ -422,12 +401,15 @@ async def _download_all_ndjson_files(self, files: list[dict]) -> None: self._download_ndjson_file( file["url"], file["type"], - os.path.join(self._destination, filename), + os.path.join(self._destination, subfolder, filename), + item_type, ), ) await asyncio.gather(*coroutines) - async def _download_ndjson_file(self, url: str, resource_type: str, filename: str) -> None: + async def _download_ndjson_file( + self, url: str, resource_type: str, filename: str, item_type: str + ) -> None: """ Downloads a single ndjson file from the bulk export server. @@ -442,10 +424,11 @@ async def _download_ndjson_file(self, url: str, resource_type: str, filename: st headers={"Accept": "application/fhir+ndjson"}, stream=True, retry_errors=True, - log_request=partial(self._log.download_request, url, "output", resource_type), + log_request=partial(self._log.download_request, url, item_type, resource_type), log_error=partial(self._log.download_error, url), ) try: + os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w", encoding="utf8") as file: async for block in response.aiter_text(): file.write(block) diff --git a/docs/bulk-exports.md b/docs/bulk-exports.md index e69afec..209bf11 100644 --- a/docs/bulk-exports.md +++ b/docs/bulk-exports.md @@ -44,6 +44,23 @@ You can save the exported files for archiving after the fact with `--export-to=P However, bulk exports tend to be brittle and slow for many EHRs at the time of this writing. It might be wiser to separately export, make sure the data is all there and good, and then ETL it. +## Archiving Exports + +Exports can take a long time, and it's often convenient to archive the results. +For later re-processing, sanity checking, quality assurance, or whatever. + +It's recommended that you archive everything in the export folder. +This is what you may expect to archive: + +- The resource export files themselves + (these will look like `1.Patient.ndjson` or `Patient.000.ndjson` or similar) +- The `log.ndjson` log file +- The `deleted/` subfolder, if present + (this will hold a list of resources that the FHIR server says should be deleted) +- The `error/` subfolder, if present + (this will hold a list of errors from the FHIR server + as well as warnings and informational messages, despite the name) + ## Resuming an Interrupted Export Bulk exports can be brittle. diff --git a/docs/setup/sample-runs.md b/docs/setup/sample-runs.md index a2d61e5..68956fc 100644 --- a/docs/setup/sample-runs.md +++ b/docs/setup/sample-runs.md @@ -205,13 +205,9 @@ Follow the [S3 setup guide](aws.md) document for guidance there. While technically optional, it's recommended that you manually specify these arguments because their defaults are subject to change or might not match your situation. -* `--input-format`: There are two reasonable values (`ndjson` and `i2b2`). If you want to pull from - your bulk export FHIR server, pass in its URL as your input path and use `ndjson` as your input - format. Otherwise, you can use either value to point at a local folder with either FHIR ndjson - or i2b2 csv files sitting in them, respectively. - * `--output-format`: There are two reasonable values (`ndjson` and `deltalake`). - For production use, you want `deltalake` as it is supports incremental, batched updates. + For production use, you can use the default value of `deltalake` as it supports incremental, + batched updates. But `ndjson` is useful when debugging as it is human-readable. * `--batch-size`: How many resources to save in a single output file. If there are more resources diff --git a/tests/loaders/ndjson/test_bulk_export.py b/tests/loaders/ndjson/test_bulk_export.py index 5a62a8f..d5a840d 100644 --- a/tests/loaders/ndjson/test_bulk_export.py +++ b/tests/loaders/ndjson/test_bulk_export.py @@ -272,31 +272,29 @@ async def test_export_error(self): ], }, ) + err1 = ( + '{"resourceType": "OperationOutcome",' + ' "issue": [{"severity": "error", "diagnostics": "err1"}]}' + ) self.respx_mock.get( "https://example.com/err1", headers={"Accept": "application/fhir+ndjson"}, - ).respond( - json={ - "resourceType": "OperationOutcome", - "issue": [{"severity": "error", "diagnostics": "err1"}], - }, + ).respond(text=err1) + err2 = ( + '{"resourceType": "OperationOutcome",' + '"issue": [{"severity": "fatal", "details": {"text": "err2"}}]}\n' + '{"resourceType": "OperationOutcome",' + '"issue": [{"severity": "warning", "diagnostics": "warning1"}]}\n' + '{"resourceType": "OperationOutcome",' + '"issue": [' + '{"severity": "error", "code": "err3"},' + '{"severity": "fatal", "code": "err4"}' + "]}\n" ) self.respx_mock.get( "https://example.com/err2", headers={"Accept": "application/fhir+ndjson"}, - ).respond( - text=( - '{"resourceType": "OperationOutcome",' - '"issue": [{"severity": "fatal", "details": {"text": "err2"}}]}\n' - '{"resourceType": "OperationOutcome",' - '"issue": [{"severity": "warning", "diagnostics": "warning1"}]}\n' - '{"resourceType": "OperationOutcome",' - '"issue": [' - '{"severity": "error", "code": "err3"},' - '{"severity": "fatal", "code": "err4"}' - "]}\n" - ) - ) + ).respond(text=err2) self.respx_mock.get( "https://example.com/con1", headers={"Accept": "application/fhir+ndjson"}, @@ -311,6 +309,11 @@ async def test_export_error(self): self.assertIsNone(self.exporter.export_datetime) # date time couldn't be parsed + err1_file = common.read_text(f"{self.tmpdir}/error/OperationOutcome.000.ndjson") + self.assertEqual(err1_file, err1) + err2_file = common.read_text(f"{self.tmpdir}/error/OperationOutcome.001.ndjson") + self.assertEqual(err2_file, err2) + self.assert_log_equals( ("kickoff", None), ( @@ -389,6 +392,54 @@ async def test_export_warning(self): self.assertIn("Messages from server:\n - warning1\n", stdout.getvalue()) + async def test_deleted_resources(self): + """Verify that we preserve the list of resources to be deleted""" + self.mock_kickoff() + self.mock_delete() + self.respx_mock.get("https://example.com/poll").respond( + json={ + "deleted": [ + {"type": "Bundle", "url": "https://example.com/deleted1"}, + ], + }, + ) + deleted1 = { + "resourceType": "Bundle", + "type": "transaction", + "entry": [ + { + "request": {"method": "DELETE", "url": "Patient/123"}, + } + ], + } + self.respx_mock.get("https://example.com/deleted1").respond(json=deleted1) + + await self.export() + + bundle = common.read_json(f"{self.tmpdir}/deleted/Bundle.000.ndjson") + self.assertEqual(bundle, deleted1) + + self.assert_log_equals( + ("kickoff", None), + ("status_complete", None), + ( + "download_request", + { + "fileUrl": "https://example.com/deleted1", + "itemType": "deleted", + "resourceType": "Bundle", + }, + ), + ( + "download_complete", + {"fileSize": 117, "fileUrl": "https://example.com/deleted1", "resourceCount": 1}, + ), + ( + "export_complete", + {"attachments": None, "bytes": 117, "duration": 0, "files": 1, "resources": 1}, + ), + ) + async def test_file_download_error(self): """Verify that we correctly handle a resource download failure""" self.mock_kickoff()