diff --git a/hail/python/test/hailtop/batch/test_python_job_in_service.py b/hail/python/test/hailtop/batch/test_python_job_in_service.py index 46c9ce828394..28fbcc91d12c 100644 --- a/hail/python/test/hailtop/batch/test_python_job_in_service.py +++ b/hail/python/test/hailtop/batch/test_python_job_in_service.py @@ -1,3 +1,4 @@ +from typing import List, Tuple, Dict import asyncio import secrets @@ -57,6 +58,85 @@ def reformat(x, y): assert res.get_job_log(4)['main'] == "3\n5\n30\n{\"x\": 3, \"y\": 5}\n", str(res.debug_info()) +async def test_python_job_input( + service_backend: ServiceBackend, + fs: RouterAsyncFS, + upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]], + output_tmpdir: str, +): + (url1, data1), _, _ = upload_test_files + + b = batch(service_backend) + input_file = b.read_input(url1) + + def readall(path: str) -> str: + return open(path).read() + + j = b.new_python_job() + r = j.call(readall, input_file) + + out = os.path.join(output_tmpdir, secrets.token_urlsafe(5)) + b.write_output(r.as_json(), out) + res = b.run() + + assert isinstance(res, bc.Batch) + assert res.status()['state'] == 'success', str((res, res.debug_info())) + assert orjson.loads(await fs.read(out)) == data1 + + +async def test_python_job_input_in_list( + service_backend: ServiceBackend, + fs: RouterAsyncFS, + upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]], + output_tmpdir: str, +): + (url1, data1), _, _ = upload_test_files + + b = batch(service_backend) + input_file = b.read_input(url1) + + def readall(paths: List[str]) -> str: + assert len(paths) == 1 + return open(paths[0]).read() + + j = b.new_python_job() + r = j.call(readall, [input_file]) + + out = os.path.join(output_tmpdir, secrets.token_urlsafe(5)) + b.write_output(r.as_json(), out) + res = b.run() + + assert isinstance(res, bc.Batch) + assert res.status()['state'] == 'success', str((res, res.debug_info())) + assert orjson.loads(await fs.read(out)) == data1 + + +async def test_python_job_input_in_dict( + service_backend: ServiceBackend, + fs: RouterAsyncFS, + upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]], + output_tmpdir: str, +): + (url1, data1), _, _ = upload_test_files + + b = batch(service_backend) + input_file = b.read_input(url1) + + def readall(paths: Dict[str, str]) -> str: + return open(paths['hello!!!']).read() + + j = b.new_python_job() + r = j.call(readall, {'hello!!!': input_file}) + + out = os.path.join(output_tmpdir, secrets.token_urlsafe(5)) + b.write_output(r.as_json(), out) + res = b.run() + + assert isinstance(res, bc.Batch) + assert res.status()['state'] == 'success', str((res, res.debug_info())) + assert orjson.loads(await fs.read(out)) == data1 + + def test_python_job_w_resource_group_unpack_individually(service_backend: ServiceBackend): b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) head = b.new_job()