diff --git a/hail/python/test/hailtop/batch/test_batch_service_backend.py b/hail/python/test/hailtop/batch/test_batch_service_backend.py index 2998d163589..15989d361ca 100644 --- a/hail/python/test/hailtop/batch/test_batch_service_backend.py +++ b/hail/python/test/hailtop/batch/test_batch_service_backend.py @@ -35,8 +35,8 @@ ) -def test_single_task_no_io(backend: ServiceBackend): - b = batch(backend) +def test_single_task_no_io(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.command('echo hello') res = b.run() @@ -46,10 +46,10 @@ def test_single_task_no_io(backend: ServiceBackend): def test_single_task_input( - backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]] + service_backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]] ): (url1, data1), _, _ = upload_test_files - b = batch(backend) + b = batch(service_backend) input = b.read_input(url1) j = b.new_job() j.command(f'cat {input}') @@ -60,10 +60,10 @@ def test_single_task_input( def test_single_task_input_resource_group( - backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]] + service_backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]] ): (url1, data1), _, _ = upload_test_files - b = batch(backend) + b = batch(service_backend) input = b.read_input_group(foo=url1) j = b.new_job() j.storage('10Gi') @@ -76,7 +76,7 @@ def test_single_task_input_resource_group( def test_single_task_output(backend: ServiceBackend): - b = batch(backend) + b = batch(service_backend) j = b.new_job(attributes={'a': 'bar', 'b': 'foo'}) j.command(f'echo hello > {j.ofile}') res = b.run() @@ -85,8 +85,8 @@ def test_single_task_output(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_single_task_write_output(backend: ServiceBackend, output_tmpdir: str): - b = batch(backend) +def test_single_task_write_output(service_backend: ServiceBackend, output_tmpdir: str): + b = batch(service_backend) j = b.new_job() j.command(f'echo hello > {j.ofile}') b.write_output(j.ofile, os.path.join(output_tmpdir, 'test_single_task_output.txt')) @@ -97,7 +97,7 @@ def test_single_task_write_output(backend: ServiceBackend, output_tmpdir: str): def test_single_task_resource_group(backend: ServiceBackend): - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.declare_resource_group(output={'foo': '{root}.foo'}) assert isinstance(j.output, ResourceGroup) @@ -108,8 +108,8 @@ def test_single_task_resource_group(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_single_task_write_resource_group(backend: ServiceBackend, output_tmpdir: str): - b = batch(backend) +def test_single_task_write_resource_group(service_backend: ServiceBackend, output_tmpdir: str): + b = batch(service_backend) j = b.new_job() j.declare_resource_group(output={'foo': '{root}.foo'}) assert isinstance(j.output, ResourceGroup) @@ -122,9 +122,9 @@ def test_single_task_write_resource_group(backend: ServiceBackend, output_tmpdir assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_multiple_dependent_tasks(backend: ServiceBackend, output_tmpdir: str): +def test_multiple_dependent_tasks(service_backend: ServiceBackend, output_tmpdir: str): output_file = os.path.join(output_tmpdir, 'test_multiple_dependent_tasks.txt') - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.command(f'echo "0" >> {j.ofile}') @@ -141,8 +141,8 @@ def test_multiple_dependent_tasks(backend: ServiceBackend, output_tmpdir: str): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_specify_cpu(backend: ServiceBackend): - b = batch(backend) +def test_specify_cpu(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.cpu('0.5') j.command(f'echo "hello" > {j.ofile}') @@ -152,8 +152,8 @@ def test_specify_cpu(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_specify_memory(backend: ServiceBackend): - b = batch(backend) +def test_specify_memory(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.memory('100M') j.command(f'echo "hello" > {j.ofile}') @@ -163,8 +163,8 @@ def test_specify_memory(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_scatter_gather(backend: ServiceBackend): - b = batch(backend) +def test_scatter_gather(service_backend: ServiceBackend): + b = batch(service_backend) for i in range(3): j = b.new_job(name=f'foo{i}') @@ -187,12 +187,12 @@ def test_scatter_gather(backend: ServiceBackend): def test_file_name_space( - backend: ServiceBackend, + service_backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]], output_tmpdir: str, ): _, _, (url3, data3) = upload_test_files - b = batch(backend) + b = batch(service_backend) input = b.read_input(url3) j = b.new_job() j.command(f'cat {input} > {j.ofile}') @@ -203,8 +203,8 @@ def test_file_name_space( assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_dry_run(backend: ServiceBackend, output_tmpdir: str): - b = batch(backend) +def test_dry_run(service_backend: ServiceBackend, output_tmpdir: str): + b = batch(service_backend) j = b.new_job() j.command(f'echo hello > {j.ofile}') b.write_output(j.ofile, os.path.join(output_tmpdir, 'test_single_job_output.txt')) @@ -212,12 +212,12 @@ def test_dry_run(backend: ServiceBackend, output_tmpdir: str): def test_verbose( - backend: ServiceBackend, + service_backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]], output_tmpdir: str, ): (url1, data1), _, _ = upload_test_files - b = batch(backend) + b = batch(service_backend) input = b.read_input(url1) j = b.new_job() j.command(f'cat {input}') @@ -228,10 +228,12 @@ def test_verbose( assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_cloudfuse_fails_with_read_write_mount_option(fs: RouterAsyncFS, backend: ServiceBackend, output_bucket_path): +def test_cloudfuse_fails_with_read_write_mount_option( + fs: RouterAsyncFS, service_backend: ServiceBackend, output_bucket_path +): bucket, path, output_tmpdir = output_bucket_path - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.command(f'mkdir -p {path}; echo head > {path}/cloudfuse_test_1') j.cloudfuse(bucket, f'/{bucket}', read_only=False) @@ -244,10 +246,10 @@ def test_cloudfuse_fails_with_read_write_mount_option(fs: RouterAsyncFS, backend assert False -def test_cloudfuse_fails_with_io_mount_point(fs: RouterAsyncFS, backend: ServiceBackend, output_bucket_path): +def test_cloudfuse_fails_with_io_mount_point(fs: RouterAsyncFS, service_backend: ServiceBackend, output_bucket_path): bucket, path, output_tmpdir = output_bucket_path - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.command(f'mkdir -p {path}; echo head > {path}/cloudfuse_test_1') j.cloudfuse(bucket, f'/io', read_only=True) @@ -260,10 +262,10 @@ def test_cloudfuse_fails_with_io_mount_point(fs: RouterAsyncFS, backend: Service assert False -def test_cloudfuse_read_only(backend: ServiceBackend, output_bucket_path): +def test_cloudfuse_read_only(service_backend: ServiceBackend, output_bucket_path): bucket, path, output_tmpdir = output_bucket_path - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.command(f'mkdir -p {path}; echo head > {path}/cloudfuse_test_1') j.cloudfuse(bucket, f'/{bucket}', read_only=True) @@ -274,13 +276,13 @@ def test_cloudfuse_read_only(backend: ServiceBackend, output_bucket_path): assert res_status['state'] == 'failure', str((res_status, res.debug_info())) -def test_cloudfuse_implicit_dirs(fs: RouterAsyncFS, backend: ServiceBackend, upload_test_files): +def test_cloudfuse_implicit_dirs(fs: RouterAsyncFS, service_backend: ServiceBackend, upload_test_files): (url1, data1), _, _ = upload_test_files parsed_url1 = fs.parse_url(url1) object_name = parsed_url1.path bucket_name = '/'.join(parsed_url1.bucket_parts) - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.command(f'cat ' + os.path.join('/cloudfuse', object_name)) j.cloudfuse(bucket_name, f'/cloudfuse', read_only=True) @@ -292,10 +294,10 @@ def test_cloudfuse_implicit_dirs(fs: RouterAsyncFS, backend: ServiceBackend, upl assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_cloudfuse_empty_string_bucket_fails(backend: ServiceBackend, output_bucket_path): +def test_cloudfuse_empty_string_bucket_fails(service_backend: ServiceBackend, output_bucket_path): bucket, path, output_tmpdir = output_bucket_path - b = batch(backend) + b = batch(service_backend) j = b.new_job() with pytest.raises(BatchException): j.cloudfuse('', '/empty_bucket') @@ -304,14 +306,14 @@ def test_cloudfuse_empty_string_bucket_fails(backend: ServiceBackend, output_buc async def test_cloudfuse_submount_in_io_doesnt_rm_bucket( - fs: RouterAsyncFS, backend: ServiceBackend, output_bucket_path + fs: RouterAsyncFS, service_backend: ServiceBackend, output_bucket_path ): bucket, path, output_tmpdir = output_bucket_path should_still_exist_url = os.path.join(output_tmpdir, 'should-still-exist') await fs.write(should_still_exist_url, b'should-still-exist') - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.cloudfuse(bucket, '/io/cloudfuse') j.command(f'ls /io/cloudfuse/') @@ -323,9 +325,9 @@ async def test_cloudfuse_submount_in_io_doesnt_rm_bucket( @skip_in_azure -def test_fuse_requester_pays(backend: ServiceBackend): +def test_fuse_requester_pays(service_backend: ServiceBackend): assert REQUESTER_PAYS_PROJECT - b = batch(backend, requester_pays_project=REQUESTER_PAYS_PROJECT) + b = batch(service_backend, requester_pays_project=REQUESTER_PAYS_PROJECT) j = b.new_job() j.cloudfuse('hail-test-requester-pays-fds32', '/fuse-bucket') j.command('cat /fuse-bucket/hello') @@ -337,12 +339,12 @@ def test_fuse_requester_pays(backend: ServiceBackend): @skip_in_azure def test_fuse_non_requester_pays_bucket_when_requester_pays_project_specified( - backend: ServiceBackend, output_bucket_path + service_backend: ServiceBackend, output_bucket_path ): bucket, path, output_tmpdir = output_bucket_path assert REQUESTER_PAYS_PROJECT - b = batch(backend, requester_pays_project=REQUESTER_PAYS_PROJECT) + b = batch(service_backend, requester_pays_project=REQUESTER_PAYS_PROJECT) j = b.new_job() j.command(f'ls /fuse-bucket') j.cloudfuse(bucket, f'/fuse-bucket', read_only=True) @@ -354,9 +356,9 @@ def test_fuse_non_requester_pays_bucket_when_requester_pays_project_specified( @skip_in_azure -def test_requester_pays(backend: ServiceBackend): +def test_requester_pays(service_backend: ServiceBackend): assert REQUESTER_PAYS_PROJECT - b = batch(backend, requester_pays_project=REQUESTER_PAYS_PROJECT) + b = batch(service_backend, requester_pays_project=REQUESTER_PAYS_PROJECT) input = b.read_input('gs://hail-test-requester-pays-fds32/hello') j = b.new_job() j.command(f'cat {input}') @@ -366,8 +368,8 @@ def test_requester_pays(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_benchmark_lookalike_workflow(backend: ServiceBackend, output_tmpdir): - b = batch(backend) +def test_benchmark_lookalike_workflow(service_backend: ServiceBackend, output_tmpdir): + b = batch(service_backend) setup_jobs = [] for i in range(10): @@ -390,8 +392,8 @@ def test_benchmark_lookalike_workflow(backend: ServiceBackend, output_tmpdir): # assert b.run().status()['state'] == 'success' -def test_envvar(backend: ServiceBackend): - b = batch(backend) +def test_envvar(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.env('SOME_VARIABLE', '123abcdef') j.command('[ $SOME_VARIABLE = "123abcdef" ]') @@ -401,9 +403,9 @@ def test_envvar(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_single_job_with_shell(backend: ServiceBackend): +def test_single_job_with_shell(service_backend: ServiceBackend): msg = 'hello world' - b = batch(backend) + b = batch(service_backend) j = b.new_job(shell='/bin/sh') j.command(f'echo "{msg}"') res = b.run() @@ -412,8 +414,8 @@ def test_single_job_with_shell(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_single_job_with_nonsense_shell(backend: ServiceBackend): - b = batch(backend) +def test_single_job_with_nonsense_shell(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job(shell='/bin/ajdsfoijasidojf') j.command(f'echo "hello"') res = b.run() @@ -422,8 +424,8 @@ def test_single_job_with_nonsense_shell(backend: ServiceBackend): assert res_status['state'] == 'failure', str((res_status, res.debug_info())) -def test_single_job_with_intermediate_failure(backend: ServiceBackend): - b = batch(backend) +def test_single_job_with_intermediate_failure(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.command(f'echoddd "hello"') j2 = b.new_job() @@ -436,10 +438,10 @@ def test_single_job_with_intermediate_failure(backend: ServiceBackend): def test_input_directory( - backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]] + service_backend: ServiceBackend, upload_test_files: Tuple[Tuple[str, bytes], Tuple[str, bytes], Tuple[str, bytes]] ): (url1, data1), _, _ = upload_test_files - b = batch(backend) + b = batch(service_backend) containing_folder = '/'.join(url1.rstrip('/').split('/')[:-1]) input1 = b.read_input(containing_folder) input2 = b.read_input(containing_folder + '/') @@ -452,8 +454,8 @@ def test_input_directory( assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_python_job(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_python_job(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) head = b.new_job() head.command(f'echo "5" > {head.r5}') head.command(f'echo "3" > {head.r3}') @@ -488,8 +490,8 @@ def reformat(x, y): assert res.get_job_log(4)['main'] == "3\n5\n30\n{\"x\": 3, \"y\": 5}\n", str(res.debug_info()) -def test_python_job_w_resource_group_unpack_individually(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_python_job_w_resource_group_unpack_individually(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) head = b.new_job() head.declare_resource_group(count={'r5': '{root}.r5', 'r3': '{root}.r3'}) assert isinstance(head.count, ResourceGroup) @@ -527,8 +529,8 @@ def reformat(x, y): assert res.get_job_log(4)['main'] == "3\n5\n30\n{\"x\": 3, \"y\": 5}\n", str(res.debug_info()) -def test_python_job_can_write_to_resource_path(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_python_job_can_write_to_resource_path(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) def write(path): with open(path, 'w') as f: @@ -548,8 +550,8 @@ def write(path): assert res.get_job_log(tail._job_id)['main'] == 'foo', str(res.debug_info()) -def test_python_job_w_resource_group_unpack_jointly(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_python_job_w_resource_group_unpack_jointly(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) head = b.new_job() head.declare_resource_group(count={'r5': '{root}.r5', 'r3': '{root}.r3'}) assert isinstance(head.count, ResourceGroup) @@ -583,8 +585,8 @@ def multiply(r): assert job_log_3['main'] == "15\n", str((job_log_3, res.debug_info())) -def test_python_job_w_non_zero_ec(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_python_job_w_non_zero_ec(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) j = b.new_python_job() def error(): @@ -597,8 +599,8 @@ def error(): assert res_status['state'] == 'failure', str((res_status, res.debug_info())) -def test_python_job_incorrect_signature(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_python_job_incorrect_signature(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) def foo(pos_arg1, pos_arg2, *, kwarg1, kwarg2=1): print(pos_arg1, pos_arg2, kwarg1, kwarg2) @@ -628,8 +630,8 @@ def foo(pos_arg1, pos_arg2, *, kwarg1, kwarg2=1): j.call(abs, -1, 5) -def test_fail_fast(backend: ServiceBackend): - b = batch(backend, cancel_after_n_failures=1) +def test_fail_fast(service_backend: ServiceBackend): + b = batch(service_backend, cancel_after_n_failures=1) j1 = b.new_job() j1.command('false') @@ -643,8 +645,8 @@ def test_fail_fast(backend: ServiceBackend): assert job_status['state'] == 'Cancelled', str((job_status, res.debug_info())) -def test_service_backend_remote_tempdir_with_trailing_slash(backend): - b = Batch(backend=backend) +def test_service_backend_remote_tempdir_with_trailing_slash(service_backend: ServiceBackend): + b = Batch(backend=service_backend) j1 = b.new_job() j1.command(f'echo hello > {j1.ofile}') j2 = b.new_job() @@ -652,8 +654,8 @@ def test_service_backend_remote_tempdir_with_trailing_slash(backend): b.run() -def test_service_backend_remote_tempdir_with_no_trailing_slash(backend): - b = Batch(backend=backend) +def test_service_backend_remote_tempdir_with_no_trailing_slash(service_backend: ServiceBackend): + b = Batch(backend=service_backend) j1 = b.new_job() j1.command(f'echo hello > {j1.ofile}') j2 = b.new_job() @@ -661,16 +663,16 @@ def test_service_backend_remote_tempdir_with_no_trailing_slash(backend): b.run() -def test_large_command(backend: ServiceBackend): - b = Batch(backend=backend) +def test_large_command(service_backend: ServiceBackend): + b = Batch(backend=service_backend) j1 = b.new_job() long_str = secrets.token_urlsafe(15 * 1024) j1.command(f'echo "{long_str}"') b.run() -def test_big_batch_which_uses_slow_path(backend: ServiceBackend): - b = Batch(backend=backend) +def test_big_batch_which_uses_slow_path(service_backend: ServiceBackend): + b = Batch(backend=service_backend) # 8 * 256 * 1024 = 2 MiB > 1 MiB max bunch size for _ in range(8): j1 = b.new_job() @@ -683,8 +685,8 @@ def test_big_batch_which_uses_slow_path(backend: ServiceBackend): assert batch_status['state'] == 'success', str((res.debug_info())) -def test_query_on_batch_in_batch(backend: ServiceBackend, output_tmpdir: str): - bb = Batch(backend=backend, default_python_image=HAIL_GENETICS_HAIL_IMAGE) +def test_query_on_batch_in_batch(service_backend: ServiceBackend, output_tmpdir: str): + bb = Batch(backend=service_backend, default_python_image=HAIL_GENETICS_HAIL_IMAGE) tmp_ht_path = os.path.join(output_tmpdir, secrets.token_urlsafe(32)) @@ -702,8 +704,8 @@ def qob_in_batch(): bb.run() -def test_basic_async_fun(backend: ServiceBackend): - b = Batch(backend=backend) +def test_basic_async_fun(service_backend: ServiceBackend): + b = Batch(backend=service_backend) j = b.new_python_job() j.call(asyncio.sleep, 1) @@ -714,8 +716,8 @@ def test_basic_async_fun(backend: ServiceBackend): assert batch_status['state'] == 'success', str((res.debug_info())) -def test_async_fun_returns_value(backend: ServiceBackend): - b = Batch(backend=backend) +def test_async_fun_returns_value(service_backend: ServiceBackend): + b = Batch(backend=service_backend) async def foo(i, j): await asyncio.sleep(1) @@ -735,10 +737,10 @@ async def foo(i, j): assert job_log_2['main'] == "6\n", str((job_log_2, res.debug_info())) -def test_specify_job_region(backend: ServiceBackend): - b = batch(backend) +def test_specify_job_region(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job('region') - possible_regions = backend.supported_regions() + possible_regions = service_backend.supported_regions() j.regions(possible_regions) j.command('true') res = b.run() @@ -747,10 +749,10 @@ def test_specify_job_region(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_job_regions_controls_job_execution_region(backend: ServiceBackend): - the_region = backend.supported_regions()[0] +def test_job_regions_controls_job_execution_region(service_backend: ServiceBackend): + the_region = service_backend.supported_regions()[0] - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.regions([the_region]) j.command('true') @@ -761,10 +763,10 @@ def test_job_regions_controls_job_execution_region(backend: ServiceBackend): assert job_status['status']['region'] == the_region, str((job_status, res.debug_info())) -def test_job_regions_overrides_batch_regions(backend: ServiceBackend): - the_region = backend.supported_regions()[0] +def test_job_regions_overrides_batch_regions(service_backend: ServiceBackend): + the_region = service_backend.supported_regions()[0] - b = batch(backend, default_regions=['some-other-region']) + b = batch(service_backend, default_regions=['some-other-region']) j = b.new_job() j.regions([the_region]) j.command('true') @@ -775,10 +777,10 @@ def test_job_regions_overrides_batch_regions(backend: ServiceBackend): assert job_status['status']['region'] == the_region, str((job_status, res.debug_info())) -def test_always_copy_output(backend: ServiceBackend, output_tmpdir: str): +def test_always_copy_output(service_backend: ServiceBackend, output_tmpdir: str): output_path = os.path.join(output_tmpdir, 'test_always_copy_output.txt') - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.always_copy_output() j.command(f'echo "hello" > {j.ofile} && false') @@ -789,7 +791,7 @@ def test_always_copy_output(backend: ServiceBackend, output_tmpdir: str): res_status = res.status() assert res_status['state'] == 'failure', str((res_status, res.debug_info())) - b2 = batch(backend) + b2 = batch(service_backend) input = b2.read_input(output_path) file_exists_j = b2.new_job() file_exists_j.command(f'cat {input}') @@ -801,10 +803,10 @@ def test_always_copy_output(backend: ServiceBackend, output_tmpdir: str): assert res.get_job_log(1)['main'] == "hello\n", str(res.debug_info()) -def test_no_copy_output_on_failure(backend: ServiceBackend, output_tmpdir: str): +def test_no_copy_output_on_failure(service_backend: ServiceBackend, output_tmpdir: str): output_path = os.path.join(output_tmpdir, 'test_no_copy_output.txt') - b = batch(backend) + b = batch(service_backend) j = b.new_job() j.command(f'echo "hello" > {j.ofile} && false') @@ -814,7 +816,7 @@ def test_no_copy_output_on_failure(backend: ServiceBackend, output_tmpdir: str): res_status = res.status() assert res_status['state'] == 'failure', str((res_status, res.debug_info())) - b2 = batch(backend) + b2 = batch(service_backend) input = b2.read_input(output_path) file_exists_j = b2.new_job() file_exists_j.command(f'cat {input}') @@ -825,8 +827,8 @@ def test_no_copy_output_on_failure(backend: ServiceBackend, output_tmpdir: str): assert res_status['state'] == 'failure', str((res_status, res.debug_info())) -def test_update_batch(backend: ServiceBackend): - b = batch(backend) +def test_update_batch(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.command('true') res = b.run() @@ -843,8 +845,8 @@ def test_update_batch(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -def test_update_batch_with_dependencies(backend: ServiceBackend): - b = batch(backend) +def test_update_batch_with_dependencies(service_backend: ServiceBackend): + b = batch(service_backend) j1 = b.new_job() j1.command('true') j2 = b.new_job() @@ -872,8 +874,8 @@ def test_update_batch_with_dependencies(backend: ServiceBackend): assert res.get_job(4).status()['state'] == 'Cancelled', str((res_status, res.debug_info())) -def test_update_batch_with_python_job_dependencies(backend: ServiceBackend): - b = batch(backend) +def test_update_batch_with_python_job_dependencies(service_backend: ServiceBackend): + b = batch(service_backend) async def foo(i, j): await asyncio.sleep(1) @@ -905,8 +907,8 @@ async def foo(i, j): assert batch_status['state'] == 'success', str((batch_status, res.debug_info())) -def test_update_batch_from_batch_id(backend: ServiceBackend): - b = batch(backend) +def test_update_batch_from_batch_id(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.command('true') res = b.run() @@ -924,11 +926,11 @@ def test_update_batch_from_batch_id(backend: ServiceBackend): assert res_status['state'] == 'success', str((res_status, res.debug_info())) -async def test_python_job_with_kwarg(fs: RouterAsyncFS, backend: ServiceBackend, output_tmpdir: str): +async def test_python_job_with_kwarg(fs: RouterAsyncFS, service_backend: ServiceBackend, output_tmpdir: str): def foo(*, kwarg): return kwarg - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) j = b.new_python_job() r = j.call(foo, kwarg='hello world') @@ -941,8 +943,8 @@ def foo(*, kwarg): assert orjson.loads(await fs.read(output_path)) == 'hello world' -def test_tuple_recursive_resource_extraction_in_python_jobs(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_tuple_recursive_resource_extraction_in_python_jobs(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) def write(paths): if not isinstance(paths, tuple): @@ -966,8 +968,8 @@ def write(paths): assert res.get_job_log(tail._job_id)['main'] == '01', str(res.debug_info()) -def test_list_recursive_resource_extraction_in_python_jobs(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_list_recursive_resource_extraction_in_python_jobs(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) def write(paths): for i, path in enumerate(paths): @@ -989,8 +991,8 @@ def write(paths): assert res.get_job_log(tail._job_id)['main'] == '01', str(res.debug_info()) -def test_dict_recursive_resource_extraction_in_python_jobs(backend: ServiceBackend): - b = batch(backend, default_python_image=PYTHON_DILL_IMAGE) +def test_dict_recursive_resource_extraction_in_python_jobs(service_backend: ServiceBackend): + b = batch(service_backend, default_python_image=PYTHON_DILL_IMAGE) def write(kwargs): for k, v in kwargs.items(): @@ -1012,14 +1014,14 @@ def write(kwargs): assert res.get_job_log(tail._job_id)['main'] == 'ab', str(res.debug_info()) -def test_wait_on_empty_batch_update(backend: ServiceBackend): - b = batch(backend) +def test_wait_on_empty_batch_update(service_backend: ServiceBackend): + b = batch(service_backend) b.run(wait=True) b.run(wait=True) -def test_non_spot_job(backend: ServiceBackend): - b = batch(backend) +def test_non_spot_job(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.spot(False) j.command('echo hello') @@ -1028,8 +1030,8 @@ def test_non_spot_job(backend: ServiceBackend): assert res.get_job(1).status()['spec']['resources']['preemptible'] == False -def test_spot_unspecified_job(backend: ServiceBackend): - b = batch(backend) +def test_spot_unspecified_job(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.command('echo hello') res = b.run() @@ -1037,8 +1039,8 @@ def test_spot_unspecified_job(backend: ServiceBackend): assert res.get_job(1).status()['spec']['resources']['preemptible'] == True -def test_spot_true_job(backend: ServiceBackend): - b = batch(backend) +def test_spot_true_job(service_backend: ServiceBackend): + b = batch(service_backend) j = b.new_job() j.spot(True) j.command('echo hello') @@ -1047,8 +1049,8 @@ def test_spot_true_job(backend: ServiceBackend): assert res.get_job(1).status()['spec']['resources']['preemptible'] == True -def test_non_spot_batch(backend: ServiceBackend): - b = batch(backend, default_spot=False) +def test_non_spot_batch(service_backend: ServiceBackend): + b = batch(service_backend, default_spot=False) j1 = b.new_job() j1.command('echo hello') j2 = b.new_job() @@ -1063,8 +1065,8 @@ def test_non_spot_batch(backend: ServiceBackend): assert res.get_job(3).status()['spec']['resources']['preemptible'] == True -def test_local_file_paths_error(backend: ServiceBackend): - b = batch(backend) +def test_local_file_paths_error(service_backend: ServiceBackend): + b = batch(service_backend) b.new_job() for input in ["hi.txt", "~/hello.csv", "./hey.tsv", "/sup.json", "file://yo.yaml"]: with pytest.raises(ValueError) as e: @@ -1073,7 +1075,7 @@ def test_local_file_paths_error(backend: ServiceBackend): @skip_in_azure -def test_validate_cloud_storage_policy(backend, monkeypatch): +def test_validate_cloud_storage_policy(service_backend: ServiceBackend, monkeypatch): # buckets do not exist (bucket names can't contain the string "google" per # https://cloud.google.com/storage/docs/buckets) fake_bucket1 = "google" @@ -1113,7 +1115,7 @@ def _test_raises_cold_error(func): # no configuration, cold bucket errors _test_raises_cold_error(lambda: ServiceBackend(remote_tmpdir=cold_uri)) - b = batch(backend) + b = batch(service_backend) _test_raises_cold_error(lambda: b.read_input(cold_uri)) j = b.new_job() j.command(f"echo hello > {j.ofile}")