Skip to content

Commit

Permalink
Produce import_metadata_mcf in executor (#1144)
Browse files Browse the repository at this point in the history
* Produce import_metadata_mcf in executor

* fix

* fix
  • Loading branch information
n-h-diaz authored Dec 19, 2024
1 parent 5c49edd commit f8c47fb
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 20 deletions.
3 changes: 3 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class ExecutorConfig:
# The content of latest_version.txt would be a single line of
# '2020_07_15T12_07_17_365264_07_00'.
storage_version_filename: str = 'latest_version.txt'
# Name of the file that contains the import_metadata_mcf for the import.
# These files are stored at the same level as the storage_version_filename.
import_metadata_mcf_filename: str = 'import_metadata_mcf.mcf'
# Types of inputs accepted by the Data Commons importer. These are
# also the accepted fields of an import_inputs value in the manifest.
import_input_types: List[str] = ('template_mcf', 'cleaned_csv', 'node_mcf')
Expand Down
51 changes: 45 additions & 6 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
'Please find logs in the Logs Explorer of the GCP project associated with'
' Import Automation.')

_IMPORT_METADATA_MCF_TEMPLATE = """
Node: dcid:dc/base/{import_name}
typeOf: dcid:Provenance
lastDataRefeshDate: "{last_data_refresh_date}"
"""


@dataclasses.dataclass
class ExecutionResult:
Expand Down Expand Up @@ -379,7 +385,7 @@ def _import_one_helper(
import_dir=absolute_import_dir,
output_dir=f'{relative_import_dir}/{import_spec["import_name"]}',
version=version,
import_inputs=import_spec.get('import_inputs', []),
import_spec=import_spec,
)

if self.importer:
Expand Down Expand Up @@ -410,7 +416,7 @@ def _upload_import_inputs(
import_dir: str,
output_dir: str,
version: str,
import_inputs: List[Dict[str, str]],
import_spec: dict,
) -> import_service.ImportInputs:
"""Uploads the generated import data files.
Expand All @@ -422,14 +428,13 @@ def _upload_import_inputs(
import_dir: Absolute path to the directory with the manifest, as a
string.
output_dir: Path to the output directory, as a string.
import_inputs: List of import inputs each as a dict mapping import types
to relative paths within the repository. This is parsed from the
'import_inputs' field in the manifest.
import_inputs: Specification of the import as a dict.
Returns:
ImportInputs object containing the paths to the uploaded inputs.
"""
uploaded = import_service.ImportInputs()
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
for input_type in self.config.import_input_types:
path = import_input.get(input_type)
Expand All @@ -443,6 +448,9 @@ def _upload_import_inputs(
self.uploader.upload_string(
version,
os.path.join(output_dir, self.config.storage_version_filename))
self.uploader.upload_string(
self._import_metadata_mcf_helper(import_spec),
os.path.join(output_dir, self.config.import_metadata_mcf_filename))
return uploaded

def _upload_file_helper(self, src: str, dest: str) -> None:
Expand All @@ -454,6 +462,25 @@ def _upload_file_helper(self, src: str, dest: str) -> None:
"""
self.uploader.upload_file(src, dest)

def _import_metadata_mcf_helper(self, import_spec: dict) -> str:
"""Generates import_metadata_mcf node for import.
Args:
import_spec: Specification of the import as a dict.
Returns:
import_metadata_mcf node.
"""
node = _IMPORT_METADATA_MCF_TEMPLATE.format_map({
"import_name": import_spec.get('import_name'),
"last_data_refresh_date": _clean_date(utils.pacific_time())
})
next_data_refresh_date = utils.next_pacific_date(
import_spec.get('cron_schedule'))
if next_data_refresh_date:
node += f'nextDataRefreshDate: "{next_data_refresh_date}"\n'
return node


def parse_manifest(path: str) -> dict:
"""Parses the import manifest.
Expand Down Expand Up @@ -600,7 +627,7 @@ def _run_with_timeout(args: List[str],
logging.exception(
f'An unexpected exception was thrown: {e} when running {args}:'
f' {message}')
return None
raise e


def _create_venv(requirements_path: Iterable[str], venv_dir: str,
Expand Down Expand Up @@ -691,6 +718,18 @@ def _clean_time(
return time


def _clean_date(time: str) -> str:
"""Converts ISO8601 time string to YYYY-MM-DD format.
Args:
time: Time string in ISO8601 format.
Returns:
YYYY-MM-DD date.
"""
return time[:10]


def _construct_process_message(message: str,
process: subprocess.CompletedProcess) -> str:
"""Constructs a log message describing the result of a subprocess.
Expand Down
27 changes: 25 additions & 2 deletions import-automation/executor/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import os
import re
import datetime
from croniter import croniter
from typing import List

import pytz
import requests

_PACIFIC_TIME = 'America/Los_Angeles'


def utctime():
"""Returns the current time string in ISO 8601 with timezone UTC+0, e.g.
Expand All @@ -34,8 +37,28 @@ def utctime():
def pacific_time():
"""Returns the current time string in ISO 8601 with timezone
America/Los_Angeles, e.g. '2020-06-30T04:28:53.717569-07:00'."""
return datetime.datetime.now(
pytz.timezone('America/Los_Angeles')).isoformat()
return datetime.datetime.now(pytz.timezone(_PACIFIC_TIME)).isoformat()


def next_pacific_date(cron_expression: str, from_time: str = None) -> str:
"""Returns the next date from today in ISO8601 with timezone
America/Los_Angeles, given a cron schedule.
Args:
cron_expression: Expression for cron schedule.
from_time: Optional time to start from. Default is now.
Returns:
The next date based on the schedule.
"""
try:
if not from_time:
from_time = datetime.datetime.now(pytz.timezone(_PACIFIC_TIME))
iter = croniter(cron_expression, from_time)
return iter.get_next(datetime.datetime).date().isoformat()
except Exception as e:
print(f"Error calculating next date: {e}")
return ""


def list_to_str(a_list: List, sep: str = ', ') -> str:
Expand Down
1 change: 1 addition & 0 deletions import-automation/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ flask
gunicorn
pytz
absl-py
croniter
5 changes: 2 additions & 3 deletions import-automation/executor/test/file_uploader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ class LocalFileUploaderTest(unittest.TestCase):
def test_upload_file(self):
with tempfile.TemporaryDirectory() as tmp_dir:
uploader = file_uploader.LocalFileUploader(tmp_dir)
src = os.path.join(
os.getcwd(),
'import-automation/executor/test/data/COVIDTracking_States.csv')
src = os.path.join(os.path.dirname(__file__),
'data/COVIDTracking_States.csv')
uploader.upload_file(src, 'foo/bar/data.csv')
self.assertTrue(
utils.compare_lines(src,
Expand Down
20 changes: 13 additions & 7 deletions import-automation/executor/test/github_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ def test_find_dirs_in_commit_empty(self, dir_exists, query_files_in_dir,

@mock.patch('requests.get')
def test_download_repo(self, get):
tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz'
tar_path = os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tar.gz')
with open(tar_path, 'rb') as tar:
headers = {'Content-Disposition': 'attachment; filename=abc'}
get.return_value = utils.ResponseMock(200, raw=tar, headers=headers)
Expand All @@ -307,24 +308,28 @@ def test_download_repo(self, get):
file = os.path.join(downloaded,
'treasury_constant_maturity_rates.csv')
assert test.utils.compare_lines(
'import-automation/executor/test/data/treasury_constant_maturity_rates.csv',
os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.csv'),
file, integration_test.NUM_LINES_TO_CHECK)

file = os.path.join(downloaded,
'treasury_constant_maturity_rates.mcf')
assert test.utils.compare_lines(
'import-automation/executor/test/data/treasury_constant_maturity_rates.mcf',
os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.mcf'),
file, integration_test.NUM_LINES_TO_CHECK)

file = os.path.join(downloaded,
'treasury_constant_maturity_rates.tmcf')
assert test.utils.compare_lines(
'import-automation/executor/test/data/treasury_constant_maturity_rates.tmcf',
os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tmcf'),
file, integration_test.NUM_LINES_TO_CHECK)

@mock.patch('requests.get')
def test_download_repo_timeout(self, get):
tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz'
tar_path = os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tar.gz')
with open(tar_path, 'rb') as tar:
headers = {'Content-Disposition': 'attachment; filename=abc'}
get.return_value = utils.ResponseMock(200, raw=tar, headers=headers)
Expand All @@ -335,7 +340,7 @@ def test_download_repo_timeout(self, get):

@mock.patch('requests.get')
def test_download_repo_empty(self, get):
with open('import-automation/executor/test/data/empty.tar.gz',
with open(os.path.join(os.path.dirname(__file__), 'data/empty.tar.gz'),
'rb') as tar:
headers = {'Content-Disposition': 'attachment; filename=abc'}
get.return_value = utils.ResponseMock(200, raw=tar, headers=headers)
Expand All @@ -346,7 +351,8 @@ def test_download_repo_empty(self, get):

@mock.patch('requests.get')
def test_download_repo_http_error(self, get):
tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz'
tar_path = os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tar.gz')
with open(tar_path, 'rb') as tar:
get.return_value = utils.ResponseMock(400, raw=tar)

Expand Down
8 changes: 6 additions & 2 deletions import-automation/executor/test/import_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def test_clean_time(self):
'2020_07_15T12_07_17_365264_07_00',
import_executor._clean_time('2020-07-15T12:07:17.365264-07:00'))

def test_clean_date(self):
self.assertEqual(
'2020-07-15',
import_executor._clean_date('2020-07-15T12:07:17.365264+00:00'))

def test_run_with_timeout(self):
self.assertRaises(subprocess.TimeoutExpired,
import_executor._run_with_timeout, ['sleep', '5'],
Expand Down Expand Up @@ -62,8 +67,7 @@ def test_run_and_handle_exception(self):
def raise_exception():
raise Exception

result = import_executor.run_and_handle_exception(
'run', raise_exception)
result = import_executor.run_and_handle_exception(raise_exception)
self.assertEqual('failed', result.status)
self.assertEqual([], result.imports_executed)
self.assertIn('Exception', result.message)
Expand Down
10 changes: 10 additions & 0 deletions import-automation/executor/test/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ def test_pacific_time_to_datetime_then_back(self):
time_datetime = datetime.datetime.fromisoformat(time_iso)
self.assertEqual(time_iso, time_datetime.isoformat())

def test_next_pacific_date(self):
"""Tests next_pacific_date."""
# At 00:00 on Friday.
cron_expression = '0 0 * * FRI'
# Friday.
from_time = datetime.datetime(2024, 12, 13)
self.assertEqual(
app.utils.next_pacific_date(cron_expression, from_time),
'2024-12-20')

def test_download_file(self):
"""Response does not have a Content-Disposition header."""
url = ('https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/'
Expand Down

0 comments on commit f8c47fb

Please sign in to comment.