From b1708b626529aff9e63b33d9dcc34d4ef93ac1a0 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Thu, 24 Feb 2022 11:17:24 +0100 Subject: [PATCH] Fix wait-for-snapshot-create action (#1445) Even if snapshot reports success through GetSnapshotStatus API it might still not finished at that point (it means only that all shards are in the store but not that finalization went ok in cluster state). This change adds additional check to wait-for-snapshot-create action that checks if target snapshot is in list of currently running snapshot with GetSnapshot _current API --- esrally/driver/runner.py | 5 +++++ tests/driver/runner_test.py | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 01d87a44e..f8899167e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1846,6 +1846,11 @@ async def __call__(self, es, params): stats = {} while not snapshot_done: + response = await es.snapshot.get(repository=repository, snapshot="_current", verbose=False) + if snapshot in [s.get("snapshot") for s in response.get("snapshots", [])]: + await asyncio.sleep(wait_period) + continue + response = await es.snapshot.status(repository=repository, snapshot=snapshot, ignore_unavailable=True) if "snapshots" in response: diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 24a888b2d..2c6c60963 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -3763,6 +3763,16 @@ class TestWaitForSnapshotCreate: @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_snapshot_create_entire_lifecycle(self, es): + es.snapshot.get = mock.AsyncMock( + side_effect=[ + # target snapshot running + {"snapshots": [{"snapshot": "restore_speed_snapshot"}]}, + # different snapshot running + {"snapshots": [{"snapshot": "different_snapshot"}]}, + {}, + {}, + ] + ) es.snapshot.status = mock.AsyncMock( side_effect=[ # empty response @@ -3854,6 +3864,7 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es): result = await r(es, basic_params) es.snapshot.status.assert_awaited_with(repository="restore_speed", snapshot="restore_speed_snapshot", ignore_unavailable=True) + es.snapshot.get.assert_awaited_with(repository="restore_speed", snapshot="_current", verbose=False) assert result == { "weight": 243468188055, @@ -3867,10 +3878,16 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es): } assert es.snapshot.status.await_count == 3 + assert es.snapshot.get.await_count == 4 @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_snapshot_create_immediate_success(self, es): + es.snapshot.get = mock.AsyncMock( + side_effect=[ + {}, + ] + ) es.snapshot.status = mock.AsyncMock( return_value={ "snapshots": [ @@ -3913,10 +3930,16 @@ async def test_wait_for_snapshot_create_immediate_success(self, es): } es.snapshot.status.assert_awaited_once_with(repository="backups", snapshot="snapshot-001", ignore_unavailable=True) + es.snapshot.get.assert_awaited_once_with(repository="backups", snapshot="_current", verbose=False) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_snapshot_create_failure(self, es): + es.snapshot.get = mock.AsyncMock( + side_effect=[ + {}, + ] + ) snapshot_status = { "snapshots": [ {