diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index bfcd39040..e2008a068 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -64,7 +64,8 @@ async def runner_function(*args): await returned_runner({"default": "default_client", "other": "other_client"}, "param")) @run_async - async def test_single_cluster_runner_class_with_context_manager_should_be_wrapped_with_context_manager_enabled(self): + async def test_single_cluster_runner_class_with_context_manager_should_be_wrapped_with_context_manager_enabled( + self): class UnitTestSingleClusterContextManagerRunner(BaseUnitTestContextManagerRunner): async def __call__(self, *args): return args @@ -189,7 +190,7 @@ async def test_asserts_equal_succeeds(self): @run_async async def test_asserts_equal_fails(self): - opensearch = None + opensearch = None response = { "hits": { "hits": { @@ -429,7 +430,7 @@ async def test_simple_bulk_with_timeout_and_headers(self, opensearch): "type": "_doc", "index": "test1", "request-timeout": 3.0, - "headers": { "x-test-id": "1234"}, + "headers": {"x-test-id": "1234"}, "opaque-id": "DESIRED-OPAQUE-ID", "bulk-size": 3, "unit": "docs" @@ -445,12 +446,12 @@ async def test_simple_bulk_with_timeout_and_headers(self, opensearch): self.assertFalse("error-type" in result) opensearch.bulk.assert_called_with(doc_type="_doc", - params={}, - body="index_line\nindex_line\nindex_line\n", - headers={"x-test-id": "1234"}, - index="test1", - opaque_id="DESIRED-OPAQUE-ID", - request_timeout=3.0) + params={}, + body="index_line\nindex_line\nindex_line\n", + headers={"x-test-id": "1234"}, + index="test1", + opaque_id="DESIRED-OPAQUE-ID", + request_timeout=3.0) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -599,7 +600,8 @@ async def test_bulk_index_error_no_shards(self, opensearch): "_type": "doc", "_id": "1", "status": 429, - "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@1]" # pylint: disable=line-too-long + "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@1]" + # pylint: disable=line-too-long } }, { @@ -608,7 +610,8 @@ async def test_bulk_index_error_no_shards(self, opensearch): "_type": "doc", "_id": "2", "status": 429, - "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@2]" # pylint: disable=line-too-long + "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@2]" + # pylint: disable=line-too-long } }, { @@ -617,7 +620,8 @@ async def test_bulk_index_error_no_shards(self, opensearch): "_type": "doc", "_id": "3", "status": 429, - "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@3]" # pylint: disable=line-too-long + "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@3]" + # pylint: disable=line-too-long } } ] @@ -1091,14 +1095,14 @@ async def test_force_merge_with_timeout_and_headers(self, opensearch): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() await force_merge(opensearch, params={"index": "_all", - "opaque-id": "test-id", - "request-timeout": 3.0, - "headers": {"header1": "value1"}}) + "opaque-id": "test-id", + "request-timeout": 3.0, + "headers": {"header1": "value1"}}) opensearch.indices.forcemerge.assert_called_once_with(headers={"header1": "value1"}, - index="_all", - opaque_id="test-id", - request_timeout=3.0) + index="_all", + opaque_id="test-id", + request_timeout=3.0) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -1126,7 +1130,7 @@ async def test_force_merge_with_polling_no_timeout(self, opensearch): opensearch.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0}) + await force_merge(opensearch, params={"index": "_all", "mode": "polling", 'poll-period': 0}) opensearch.indices.forcemerge.assert_called_once_with(index="_all") @mock.patch("opensearchpy.OpenSearch") @@ -1227,8 +1231,8 @@ async def test_force_merge_with_polling_and_params(self, opensearch): ] force_merge = runner.ForceMerge() # request-timeout should be ignored as mode:polling - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1, - "request-timeout": 50000, "poll-period": 0}) + await force_merge(opensearch, params={"index": "_all", "mode": "polling", "max-num-segments": 1, + "request-timeout": 50000, "poll-period": 0}) opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) @@ -1251,17 +1255,17 @@ async def test_indices_stats_with_timeout_and_headers(self, opensearch): opensearch.indices.stats.return_value = as_future({}) indices_stats = runner.IndicesStats() result = await indices_stats(opensearch, params={"request-timeout": 3.0, - "headers": {"header1": "value1"}, - "opaque-id": "test-id1"}) + "headers": {"header1": "value1"}, + "opaque-id": "test-id1"}) self.assertEqual(1, result["weight"]) self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) opensearch.indices.stats.assert_called_once_with(index="_all", - metric="_all", - headers={"header1": "value1"}, - opaque_id="test-id1", - request_timeout=3.0) + metric="_all", + headers={"header1": "value1"}, + opaque_id="test-id1", + request_timeout=3.0) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -2207,11 +2211,12 @@ async def test_search_pipeline_using_request_params(self, opensearch): ) opensearch.clear_scroll.assert_not_called() + class CreateIngestPipelineRunnerTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_create_pipeline(self, opensearch): - opensearch.ingest.put_pipeline.return_value = as_future() + opensearch.ingest.create_ingest_pipeline.return_value = as_future() r = runner.CreateIngestPipeline() @@ -2232,12 +2237,13 @@ async def test_create_pipeline(self, opensearch): await r(opensearch, params) - opensearch.ingest.put_pipeline.assert_called_once_with(id="rename", body=params["body"], master_timeout=None, timeout=None) + opensearch.ingest.create_ingest_pipeline.assert_called_once_with(id="rename", body=params["body"], master_timeout=None, + timeout=None) @mock.patch("opensearchpy.OpenSearch") @run_async async def test_param_body_mandatory(self, opensearch): - opensearch.ingest.put_pipeline.return_value = as_future() + opensearch.ingest.create_ingest_pipeline.return_value = as_future() r = runner.CreateIngestPipeline() @@ -2245,16 +2251,16 @@ async def test_param_body_mandatory(self, opensearch): "id": "rename" } with self.assertRaisesRegex(exceptions.DataError, - "Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. " + "Parameter source for operation 'create-ingest-pipeline' did not provide the mandatory parameter 'body'. " "Add it to your parameter source and try again."): await r(opensearch, params) - self.assertEqual(0, opensearch.ingest.put_pipeline.call_count) + self.assertEqual(0, opensearch.ingest.create_ingest_pipeline.call_count) @mock.patch("opensearchpy.OpenSearch") @run_async async def test_param_id_mandatory(self, opensearch): - opensearch.ingest.put_pipeline.return_value = as_future() + opensearch.ingest.create_ingest_pipeline.return_value = as_future() r = runner.CreateIngestPipeline() @@ -2266,7 +2272,7 @@ async def test_param_id_mandatory(self, opensearch): "Add it to your parameter source and try again."): await r(opensearch, params) - self.assertEqual(0, opensearch.ingest.put_pipeline.call_count) + self.assertEqual(0, opensearch.ingest.create_ingest_pipeline.call_count) class ClusterHealthRunnerTests(TestCase): @@ -2353,9 +2359,9 @@ async def test_cluster_health_with_timeout_and_headers(self, opensearch): }, result) opensearch.cluster.health.assert_called_once_with(headers={"header1": "value1"}, - opaque_id="testid-1", - params={"wait_for_status": "yellow"}, - request_timeout=3.0) + opaque_id="testid-1", + params={"wait_for_status": "yellow"}, + request_timeout=3.0) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -2385,7 +2391,8 @@ async def test_rejects_relocating_shards(self, opensearch): }, result) opensearch.cluster.health.assert_called_once_with(index="logs-*", - params={"wait_for_status": "red", "wait_for_no_relocating_shards": True}) + params={"wait_for_status": "red", + "wait_for_no_relocating_shards": True}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -2477,13 +2484,12 @@ async def test_create_with_timeout_and_headers(self, opensearch): "success": True }, result) - opensearch.indices.create.assert_called_once_with(index="indexA", - body={"settings": {}}, - headers={"header1": "value1"}, - opaque_id="test-id1", - params={"wait_for_active_shards": "true"}, - request_timeout=3.0) + body={"settings": {}}, + headers={"header1": "value1"}, + opaque_id="test-id1", + params={"wait_for_active_shards": "true"}, + request_timeout=3.0) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -2514,8 +2520,8 @@ async def test_ignore_invalid_params(self, opensearch): }, result) opensearch.indices.create.assert_called_once_with(index="indexA", - body={"settings": {}}, - params={"wait_for_active_shards": "true"}) + body={"settings": {}}, + params={"wait_for_active_shards": "true"}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -2826,8 +2832,8 @@ async def test_create_index_templates(self, opensearch): r = runner.CreateComponentTemplate() params = { "templates": [ - ("templateA", {"template":{"mappings":{"properties":{"@timestamp":{"type": "date"}}}}}), - ("templateB", {"template":{"settings": {"index.number_of_shards": 1,"index.number_of_replicas": 1}}}), + ("templateA", {"template": {"mappings": {"properties": {"@timestamp": {"type": "date"}}}}}), + ("templateB", {"template": {"settings": {"index.number_of_shards": 1, "index.number_of_replicas": 1}}}), ], "request-params": { "timeout": 50, @@ -2842,9 +2848,11 @@ async def test_create_index_templates(self, opensearch): "success": True }, result) opensearch.cluster.put_component_template.assert_has_calls([ - mock.call(name="templateA", body={"template":{"mappings":{"properties":{"@timestamp":{"type": "date"}}}}}, + mock.call(name="templateA", + body={"template": {"mappings": {"properties": {"@timestamp": {"type": "date"}}}}}, params=params["request-params"]), - mock.call(name="templateB", body={"template":{"settings": {"index.number_of_shards": 1,"index.number_of_replicas": 1}}}, + mock.call(name="templateB", + body={"template": {"settings": {"index.number_of_shards": 1, "index.number_of_replicas": 1}}}, params=params["request-params"]) ]) @@ -2898,7 +2906,6 @@ async def test_deletes_all_index_templates(self, opensearch): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_deletes_only_existing_index_templates(self, opensearch): - def _side_effect(http_method, path): if http_method == "HEAD": return as_future(path == "/_component_template/templateB") @@ -2927,7 +2934,8 @@ def _side_effect(http_method, path): "success": True }, result) - opensearch.cluster.delete_component_template.assert_called_once_with(name="templateB", params=params["request-params"]) + opensearch.cluster.delete_component_template.assert_called_once_with(name="templateB", + params=params["request-params"]) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -2951,10 +2959,10 @@ async def test_create_index_templates(self, opensearch): r = runner.CreateComposableTemplate() params = { "templates": [ - ("templateA", {"index_patterns":["logs-*"],"template":{"settings":{"index.number_of_shards":3}}, - "composed_of":["ct1","ct2"]}), - ("templateB", {"index_patterns":["metrics-*"],"template":{"settings":{"index.number_of_shards":2}}, - "composed_of":["ct3","ct4"]}), + ("templateA", {"index_patterns": ["logs-*"], "template": {"settings": {"index.number_of_shards": 3}}, + "composed_of": ["ct1", "ct2"]}), + ("templateB", {"index_patterns": ["metrics-*"], "template": {"settings": {"index.number_of_shards": 2}}, + "composed_of": ["ct3", "ct4"]}), ], "request-params": { "timeout": 50 @@ -2968,10 +2976,12 @@ async def test_create_index_templates(self, opensearch): "success": True }, result) opensearch.cluster.put_index_template.assert_has_calls([ - mock.call(name="templateA", body={"index_patterns":["logs-*"],"template":{"settings":{"index.number_of_shards":3}}, - "composed_of":["ct1","ct2"]}, params=params["request-params"]), - mock.call(name="templateB", body={"index_patterns":["metrics-*"],"template":{"settings":{"index.number_of_shards":2}}, - "composed_of":["ct3","ct4"]}, params=params["request-params"]) + mock.call(name="templateA", + body={"index_patterns": ["logs-*"], "template": {"settings": {"index.number_of_shards": 3}}, + "composed_of": ["ct1", "ct2"]}, params=params["request-params"]), + mock.call(name="templateB", + body={"index_patterns": ["metrics-*"], "template": {"settings": {"index.number_of_shards": 2}}, + "composed_of": ["ct3", "ct4"]}, params=params["request-params"]) ]) @mock.patch("opensearchpy.OpenSearch") @@ -3052,7 +3062,8 @@ async def test_deletes_only_existing_index_templates(self, opensearch): "success": True }, result) - opensearch.indices.delete_index_template.assert_called_once_with(name="templateB", params=params["request-params"]) + opensearch.indices.delete_index_template.assert_called_once_with(name="templateB", + params=params["request-params"]) # not called because the matching index is empty. self.assertEqual(0, opensearch.indices.delete.call_count) @@ -3084,7 +3095,8 @@ async def test_raises_missing_slash(self, opensearch): with mock.patch.object(r.logger, "error") as mocked_error_logger: with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx: await r(opensearch, params) - self.assertEqual("RawRequest [_cat/count] failed. Path parameter must begin with a '/'.", ctx.exception.args[0]) + self.assertEqual("RawRequest [_cat/count] failed. Path parameter must begin with a '/'.", + ctx.exception.args[0]) mocked_error_logger.assert_has_calls([ mock.call("RawRequest failed. Path parameter: [%s] must begin with a '/'.", params["path"]) ]) @@ -3101,10 +3113,10 @@ async def test_issue_request_with_defaults(self, opensearch): await r(opensearch, params) opensearch.transport.perform_request.assert_called_once_with(method="GET", - url="/_cat/count", - headers=None, - body=None, - params={}) + url="/_cat/count", + headers=None, + body=None, + params={}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3123,10 +3135,10 @@ async def test_issue_delete_index(self, opensearch): await r(opensearch, params) opensearch.transport.perform_request.assert_called_once_with(method="DELETE", - url="/twitter", - headers=None, - body=None, - params={"ignore": [400, 404], "pretty": "true"}) + url="/twitter", + headers=None, + body=None, + params={"ignore": [400, 404], "pretty": "true"}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3148,16 +3160,16 @@ async def test_issue_create_index(self, opensearch): await r(opensearch, params) opensearch.transport.perform_request.assert_called_once_with(method="POST", - url="/twitter", - headers=None, - body={ - "settings": { - "index": { - "number_of_replicas": 0 - } - } - }, - params={}) + url="/twitter", + headers=None, + body={ + "settings": { + "index": { + "number_of_replicas": 0 + } + } + }, + params={}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3180,15 +3192,17 @@ async def test_issue_msearch(self, opensearch): await r(opensearch, params) opensearch.transport.perform_request.assert_called_once_with(method="GET", - url="/_msearch", - headers={"Content-Type": "application/x-ndjson"}, - body=[ - {"index": "test"}, - {"query": {"match_all": {}}, "from": 0, "size": 10}, - {"index": "test", "search_type": "dfs_query_then_fetch"}, - {"query": {"match_all": {}}} - ], - params={}) + url="/_msearch", + headers={"Content-Type": "application/x-ndjson"}, + body=[ + {"index": "test"}, + {"query": {"match_all": {}}, "from": 0, + "size": 10}, + {"index": "test", + "search_type": "dfs_query_then_fetch"}, + {"query": {"match_all": {}}} + ], + params={}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3213,16 +3227,18 @@ async def test_raw_with_timeout_and_opaqueid(self, opensearch): await r(opensearch, params) opensearch.transport.perform_request.assert_called_once_with(method="GET", - url="/_msearch", - headers={"Content-Type": "application/x-ndjson", - "x-opaque-id": "test-id1"}, - body=[ - {"index": "test"}, - {"query": {"match_all": {}}, "from": 0, "size": 10}, - {"index": "test", "search_type": "dfs_query_then_fetch"}, - {"query": {"match_all": {}}} - ], - params={"request_timeout": 3.0}) + url="/_msearch", + headers={"Content-Type": "application/x-ndjson", + "x-opaque-id": "test-id1"}, + body=[ + {"index": "test"}, + {"query": {"match_all": {}}, "from": 0, + "size": 10}, + {"index": "test", + "search_type": "dfs_query_then_fetch"}, + {"query": {"match_all": {}}} + ], + params={"request_timeout": 3.0}) class SleepTests(TestCase): @@ -3290,13 +3306,13 @@ async def test_create_snapshot_repository(self, opensearch): await r(opensearch, params) opensearch.snapshot.create_repository.assert_called_once_with(repository="backups", - body={ - "type": "fs", - "settings": { - "location": "/var/backups" - } - }, - params={}) + body={ + "type": "fs", + "settings": { + "location": "/var/backups" + } + }, + params={}) class CreateSnapshotTests(TestCase): @@ -3321,12 +3337,12 @@ async def test_create_snapshot_no_wait(self, opensearch): await r(opensearch, params) opensearch.snapshot.create.assert_called_once_with(repository="backups", - snapshot="snapshot-001", - body={ - "indices": "logs-*" - }, - params={"request_timeout": 7200}, - wait_for_completion=False) + snapshot="snapshot-001", + body={ + "indices": "logs-*" + }, + params={"request_timeout": 7200}, + wait_for_completion=False) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3372,12 +3388,12 @@ async def test_create_snapshot_wait_for_completion(self, opensearch): await r(opensearch, params) opensearch.snapshot.create.assert_called_once_with(repository="backups", - snapshot="snapshot-001", - body={ - "indices": "logs-*" - }, - params={"request_timeout": 7200}, - wait_for_completion=True) + snapshot="snapshot-001", + body={ + "indices": "logs-*" + }, + params={"request_timeout": 7200}, + wait_for_completion=True) class WaitForSnapshotCreateTests(TestCase): @@ -3530,8 +3546,8 @@ async def test_wait_for_snapshot_create_immediate_success(self, opensearch): }, result) opensearch.snapshot.status.assert_called_once_with(repository="backups", - snapshot="snapshot-001", - ignore_unavailable=True) + snapshot="snapshot-001", + ignore_unavailable=True) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3583,9 +3599,9 @@ async def test_restore_snapshot(self, opensearch): await r(opensearch, params) opensearch.snapshot.restore.assert_called_once_with(repository="backups", - snapshot="snapshot-001", - wait_for_completion=True, - params={"request_timeout": 7200}) + snapshot="snapshot-001", + wait_for_completion=True, + params={"request_timeout": 7200}) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -3611,16 +3627,16 @@ async def test_restore_snapshot_with_body(self, opensearch): await r(opensearch, params) opensearch.snapshot.restore.assert_called_once_with(repository="backups", - snapshot="snapshot-001", - body={ - "indices": "index1,index2", - "include_global_state": False, - "index_settings": { - "index.number_of_replicas": 0 - } - }, - wait_for_completion=True, - params={"request_timeout": 7200}) + snapshot="snapshot-001", + body={ + "indices": "index1,index2", + "include_global_state": False, + "index_settings": { + "index.number_of_replicas": 0 + } + }, + wait_for_completion=True, + params={"request_timeout": 7200}) class IndicesRecoveryTests(TestCase): @@ -3804,13 +3820,13 @@ async def test_shrink_index_with_shrink_node(self, sleep, opensearch): await r(opensearch, params) opensearch.indices.put_settings.assert_called_once_with(index="src", - body={ - "settings": { - "index.routing.allocation.require._name": "benchmark-node-0", - "index.blocks.write": "true" - } - }, - preserve_existing=True) + body={ + "settings": { + "index.routing.allocation.require._name": "benchmark-node-0", + "index.blocks.write": "true" + } + }, + preserve_existing=True) opensearch.cluster.health.assert_has_calls([ mock.call(index="src", params={"wait_for_no_relocating_shards": "true"}), @@ -3887,14 +3903,14 @@ async def test_shrink_index_derives_shrink_node(self, sleep, opensearch): await r(opensearch, params) opensearch.indices.put_settings.assert_called_once_with(index="src", - body={ - "settings": { - # the only data node in the cluster was chosen - "index.routing.allocation.require._name": "node0", - "index.blocks.write": "true" - } - }, - preserve_existing=True) + body={ + "settings": { + # the only data node in the cluster was chosen + "index.routing.allocation.require._name": "node0", + "index.blocks.write": "true" + } + }, + preserve_existing=True) opensearch.cluster.health.assert_has_calls([ mock.call(index="src", params={"wait_for_no_relocating_shards": "true"}), @@ -4068,8 +4084,9 @@ async def test_create_transform(self, opensearch): r = runner.CreateTransform() await r(opensearch, params) - opensearch.transform.put_transform.assert_called_once_with(transform_id=params["transform-id"], body=params["body"], - defer_validation=params["defer-validation"]) + opensearch.transform.put_transform.assert_called_once_with(transform_id=params["transform-id"], + body=params["body"], + defer_validation=params["defer-validation"]) class StartTransformTests(TestCase): @@ -4087,7 +4104,8 @@ async def test_start_transform(self, opensearch): r = runner.StartTransform() await r(opensearch, params) - opensearch.transform.start_transform.assert_called_once_with(transform_id=transform_id, timeout=params["timeout"]) + opensearch.transform.start_transform.assert_called_once_with(transform_id=transform_id, + timeout=params["timeout"]) class WaitForTransformTests(TestCase): @@ -4150,10 +4168,10 @@ async def test_wait_for_transform(self, opensearch): self.assertEqual(result["unit"], "docs") opensearch.transform.stop_transform.assert_called_once_with(transform_id=transform_id, force=params["force"], - timeout=params["timeout"], - wait_for_completion=False, - wait_for_checkpoint=params["wait-for-checkpoint"] - ) + timeout=params["timeout"], + wait_for_completion=False, + wait_for_checkpoint=params["wait-for-checkpoint"] + ) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -4332,10 +4350,10 @@ async def test_wait_for_transform_progress(self, opensearch): self.assertEqual(result["unit"], "docs") opensearch.transform.stop_transform.assert_called_once_with(transform_id=transform_id, force=params["force"], - timeout=params["timeout"], - wait_for_completion=False, - wait_for_checkpoint=True - ) + timeout=params["timeout"], + wait_for_completion=False, + wait_for_checkpoint=True + ) class DeleteTransformTests(TestCase): @@ -4354,7 +4372,7 @@ async def test_delete_transform(self, opensearch): await r(opensearch, params) opensearch.transform.delete_transform.assert_called_once_with(transform_id=transform_id, force=params["force"], - ignore=[404]) + ignore=[404]) class SubmitAsyncSearchTests(TestCase): @@ -4486,6 +4504,7 @@ async def test_can_only_be_run_in_composite(self, opensearch): self.assertEqual("This operation is only allowed inside a composite operation.", ctx.exception.args[0]) + class DeletePointInTimeTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async @@ -4495,13 +4514,14 @@ async def test_delete_point_in_time(self, opensearch): "name": "close-pit-test", "with-point-in-time-from": "open-pit-task1", } - opensearch.delete_point_in_time.return_value=(as_future()) + opensearch.delete_point_in_time.return_value = (as_future()) r = runner.DeletePointInTime() async with runner.CompositeContext(): runner.CompositeContext.put("open-pit-task1", pit_id) await r(opensearch, params) - opensearch.delete_point_in_time.assert_called_once_with(body={"pit_id": ["0123456789abcdef"]}, params={}, headers=None) + opensearch.delete_point_in_time.assert_called_once_with(body={"pit_id": ["0123456789abcdef"]}, params={}, + headers=None) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -4509,11 +4529,12 @@ async def test_delete_point_in_time_without_context(self, opensearch): params = { "name": "close-pit-test", } - opensearch.delete_point_in_time.return_value=(as_future()) + opensearch.delete_point_in_time.return_value = (as_future()) r = runner.DeletePointInTime() await r(opensearch, params) opensearch.delete_point_in_time.assert_called_once_with(body=None, all=True, params={}, headers=None) + class ListAllPointInTimeTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async @@ -4585,36 +4606,36 @@ async def test_search_after_with_pit(self, opensearch): "hits": [ { "_id": "1", - "timestamp": 1609780186, - "sort": [1609780186, "1"] + "timestamp": 1609780186, + "sort": [1609780186, "1"] }, { "_id": "2", - "timestamp": 1609780186, - "sort": [1609780186, "2"] + "timestamp": 1609780186, + "sort": [1609780186, "2"] } ] } } page_2 = {"pit_id": "fedcba9876543211", - "took": 10, - "timed_out": False, - "hits": { - "total": { - "value": "3", - "relation": "eq" - }, - "hits": [ - {"_id": "3", - "timestamp": 1609780187, - "sort": [1609780187, "3"] - } - ] - }} + "took": 10, + "timed_out": False, + "hits": { + "total": { + "value": "3", + "relation": "eq" + }, + "hits": [ + {"_id": "3", + "timestamp": 1609780187, + "sort": [1609780187, "3"] + } + ] + }} opensearch.transport.perform_request.side_effect = [as_future(io.BytesIO(json.dumps(page_1).encode())), - as_future(io.BytesIO(json.dumps(page_2).encode()))] + as_future(io.BytesIO(json.dumps(page_2).encode()))] r = runner.Query() @@ -4625,38 +4646,38 @@ async def test_search_after_with_pit(self, opensearch): self.assertEqual("fedcba9876543211", runner.CompositeContext.get(pit_op)) opensearch.transport.perform_request.assert_has_calls([mock.call("GET", "/_search", params={}, - body={ - "query": { - "match-all": {} - }, - "sort": [{ - "timestamp": "asc", - "tie_breaker_id": "asc" - }], - "size": 2, - "pit": { - "id": "0123456789abcdef", - "keep_alive": "1m" - } - }, - headers=None), - mock.call("GET", "/_search", params={}, - body={ - "query": { - "match-all": {} - }, - "sort": [{ - "timestamp": "asc", - "tie_breaker_id": "asc" - }], - "size": 2, - "pit": { - "id": "fedcba9876543210", - "keep_alive": "1m" - }, - "search_after": [1609780186, "2"] - }, - headers=None)]) + body={ + "query": { + "match-all": {} + }, + "sort": [{ + "timestamp": "asc", + "tie_breaker_id": "asc" + }], + "size": 2, + "pit": { + "id": "0123456789abcdef", + "keep_alive": "1m" + } + }, + headers=None), + mock.call("GET", "/_search", params={}, + body={ + "query": { + "match-all": {} + }, + "sort": [{ + "timestamp": "asc", + "tie_breaker_id": "asc" + }], + "size": 2, + "pit": { + "id": "fedcba9876543210", + "keep_alive": "1m" + }, + "search_after": [1609780186, "2"] + }, + headers=None)]) @mock.patch("opensearchpy.OpenSearch") @run_async @@ -4699,40 +4720,40 @@ async def test_search_after_without_pit(self, opensearch): "took": 10, "timed_out": False, "hits": { - "total": { - "value": 3, - "relation": "eq" - }, - "hits": [ - {"_id": "3", - "timestamp": 1609780187, - "sort": [1609780187, "3"] - } - ] + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + {"_id": "3", + "timestamp": 1609780187, + "sort": [1609780187, "3"] + } + ] } } opensearch.transport.perform_request.side_effect = [as_future(io.BytesIO(json.dumps(page_1).encode())), - as_future(io.BytesIO(json.dumps(page_2).encode()))] + as_future(io.BytesIO(json.dumps(page_2).encode()))] r = runner.Query() await r(opensearch, params) opensearch.transport.perform_request.assert_has_calls([mock.call("GET", "/test-index-1/_search", params={}, - body={"query": {"match-all": {}}, - "sort": [ - {"timestamp": "asc", - "tie_breaker_id": "asc"}], - "size": 2}, - headers=None), - mock.call("GET", "/test-index-1/_search", params={}, - body={"query": {"match-all": {}}, - "sort": [ - {"timestamp": "asc", - "tie_breaker_id": "asc"}], - "size": 2, - "search_after": [1609780186, "2"]}, - headers=None)] - ) + body={"query": {"match-all": {}}, + "sort": [ + {"timestamp": "asc", + "tie_breaker_id": "asc"}], + "size": 2}, + headers=None), + mock.call("GET", "/test-index-1/_search", params={}, + body={"query": {"match-all": {}}, + "sort": [ + {"timestamp": "asc", + "tie_breaker_id": "asc"}], + "size": 2, + "search_after": [1609780186, "2"]}, + headers=None)] + ) class SearchAfterExtractorTests(TestCase): @@ -4763,10 +4784,10 @@ def test_extract_all_properties(self): target = runner.SearchAfterExtractor() props, last_sort = target(response=self.response, get_point_in_time=True, hits_total=None) expected_props = {"hits.total.relation": "eq", - "hits.total.value": 2, - "pit_id": "fedcba9876543210", - "timed_out": False, - "took": 10} + "hits.total.value": 2, + "pit_id": "fedcba9876543210", + "timed_out": False, + "took": 10} expected_sort_value = [1609780186, "2"] self.assertEqual(expected_props, props) self.assertEqual(expected_sort_value, last_sort) @@ -5675,9 +5696,10 @@ async def test_param_body_mandatory(self, opensearch): "id": "test_pipeline", } with self.assertRaisesRegex( - exceptions.DataError, - "Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'body'. " - "Add it to your parameter source and try again."): + exceptions.DataError, + "Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter " + "'body'." + "Add it to your parameter source and try again."): await r(opensearch, params) self.assertEqual(0, opensearch.transport.perform_request.call_count) @@ -5693,9 +5715,9 @@ async def test_param_id_mandatory(self, opensearch): "body": {} } with self.assertRaisesRegex( - exceptions.DataError, - "Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'id'. " - "Add it to your parameter source and try again."): + exceptions.DataError, + "Parameter source for operation 'create-search-pipeline' did not provide the mandatory parameter 'id'. " + "Add it to your parameter source and try again."): await r(opensearch, params) self.assertEqual(0, opensearch.transport.perform_request.call_count)