Skip to content

Commit

Permalink
Fix wait-for-snapshot-create action (elastic#1445)
Browse files Browse the repository at this point in the history
Even if snapshot reports success through GetSnapshotStatus API it might still not finished at that point (it means only that all shards are in the store but not that finalization went ok in cluster state).
This change adds additional check to wait-for-snapshot-create action that checks if target snapshot is in list of currently running snapshot with GetSnapshot _current API
  • Loading branch information
probakowski authored Feb 24, 2022
1 parent e382ba9 commit b1708b6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
5 changes: 5 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,11 @@ async def __call__(self, es, params):
stats = {}

while not snapshot_done:
response = await es.snapshot.get(repository=repository, snapshot="_current", verbose=False)
if snapshot in [s.get("snapshot") for s in response.get("snapshots", [])]:
await asyncio.sleep(wait_period)
continue

response = await es.snapshot.status(repository=repository, snapshot=snapshot, ignore_unavailable=True)

if "snapshots" in response:
Expand Down
23 changes: 23 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3763,6 +3763,16 @@ class TestWaitForSnapshotCreate:
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
es.snapshot.get = mock.AsyncMock(
side_effect=[
# target snapshot running
{"snapshots": [{"snapshot": "restore_speed_snapshot"}]},
# different snapshot running
{"snapshots": [{"snapshot": "different_snapshot"}]},
{},
{},
]
)
es.snapshot.status = mock.AsyncMock(
side_effect=[
# empty response
Expand Down Expand Up @@ -3854,6 +3864,7 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
result = await r(es, basic_params)

es.snapshot.status.assert_awaited_with(repository="restore_speed", snapshot="restore_speed_snapshot", ignore_unavailable=True)
es.snapshot.get.assert_awaited_with(repository="restore_speed", snapshot="_current", verbose=False)

assert result == {
"weight": 243468188055,
Expand All @@ -3867,10 +3878,16 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es):
}

assert es.snapshot.status.await_count == 3
assert es.snapshot.get.await_count == 4

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_immediate_success(self, es):
es.snapshot.get = mock.AsyncMock(
side_effect=[
{},
]
)
es.snapshot.status = mock.AsyncMock(
return_value={
"snapshots": [
Expand Down Expand Up @@ -3913,10 +3930,16 @@ async def test_wait_for_snapshot_create_immediate_success(self, es):
}

es.snapshot.status.assert_awaited_once_with(repository="backups", snapshot="snapshot-001", ignore_unavailable=True)
es.snapshot.get.assert_awaited_once_with(repository="backups", snapshot="_current", verbose=False)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_wait_for_snapshot_create_failure(self, es):
es.snapshot.get = mock.AsyncMock(
side_effect=[
{},
]
)
snapshot_status = {
"snapshots": [
{
Expand Down

0 comments on commit b1708b6

Please sign in to comment.