Skip to content

Commit

Permalink
sharder: always get ranges from root while shrinking
Browse files Browse the repository at this point in the history
While auditing a shard container that is in a shrinking state, the
sharder will merge any shard ranges fetched from the root that cover
the shard's namespace. Previously this was conditional upon the
sharder also fetching the shard's *own* shard range from the
root. However, in extreme circumstances, the shard's own shard range
could become deleted and reclaimed from the root while the shard DB
still needs to shrink to its acceptor ranges. This patch therefore
allows the shard to be updated with potential acceptor ranges from the
root even when its own shard range is not fetched from the root.

Also change the log level from debug to info when logging an update to
a shard's own shard range from root.

Change-Id: I17957cf0ef4936f91e69c6d9ae21551972f0df31
  • Loading branch information
alistairncoles committed Sep 30, 2022
1 parent 429702e commit f15b920
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 57 deletions.
1 change: 1 addition & 0 deletions swift/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5316,6 +5316,7 @@ class ShardRange(object):
SHARDED: 'sharded',
SHRUNK: 'shrunk'}
STATES_BY_NAME = dict((v, k) for k, v in STATES.items())
SHRINKING_STATES = (SHRINKING, SHRUNK)

@functools.total_ordering
class MaxBound(ShardRangeOuterBound):
Expand Down
14 changes: 6 additions & 8 deletions swift/container/sharder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,15 +1219,14 @@ def _merge_shard_ranges_from_root(self, broker, shard_ranges,
own_shard_range = broker.get_own_shard_range()
if (orig_own_shard_range != own_shard_range or
orig_own_shard_range.state != own_shard_range.state):
self.logger.debug(
self.logger.info(
'Updated own shard range from %s to %s',
orig_own_shard_range, own_shard_range)
else:
other_shard_ranges.append(shard_range)

if (other_shard_ranges and own_shard_range_from_root and
own_shard_range.state in
(ShardRange.SHRINKING, ShardRange.SHRUNK)):
if (other_shard_ranges and
own_shard_range.state in ShardRange.SHRINKING_STATES):
# If own_shard_range state is shrinking, save off *all* shards
# returned because these may contain shards into which this
# shard is to shrink itself; shrinking is the only case when we
Expand Down Expand Up @@ -1259,7 +1258,7 @@ def _delete_shard_container(self, broker, own_shard_range):
own_shard_range.timestamp < delete_age and
broker.empty()):
broker.delete_db(Timestamp.now().internal)
self.logger.debug('Deleted shard container %s (%s)',
self.logger.debug('Marked shard container as deleted %s (%s)',
broker.db_file, quote(broker.path))

def _do_audit_shard_container(self, broker):
Expand Down Expand Up @@ -1796,7 +1795,7 @@ def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
quote(broker.path), shard_range)

replication_quorum = self.existing_shard_replication_quorum
if own_shard_range.state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
if own_shard_range.state in ShardRange.SHRINKING_STATES:
if shard_range.includes(own_shard_range):
# When shrinking to a single acceptor that completely encloses
# this shard's namespace, include deleted own (donor) shard
Expand Down Expand Up @@ -2001,8 +2000,7 @@ def _complete_sharding(self, broker):
quote(broker.path))
return False
own_shard_range.update_meta(0, 0)
if own_shard_range.state in (ShardRange.SHRINKING,
ShardRange.SHRUNK):
if own_shard_range.state in ShardRange.SHRINKING_STATES:
own_shard_range.update_state(ShardRange.SHRUNK)
modified_shard_ranges = []
else:
Expand Down
235 changes: 186 additions & 49 deletions test/unit/container/test_sharder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf, \
find_paths_with_gaps
from swift.common.utils import ShardRange, Timestamp, hash_path, \
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
encode_timestamps, parse_db_filename, quorum_size, Everything, md5, \
ShardName
from test import annotate_failure

from test.debug_logger import debug_logger
Expand Down Expand Up @@ -5605,10 +5606,17 @@ def _do_test_audit_shard_container(self, *args):
self.assertTrue(shard_ranges[1].update_state(ShardRange.ACTIVE,
state_timestamp=root_ts))
shard_ranges[1].timestamp = root_ts
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
with mock_timestamp_now() as ts_now:
sharder, mock_swift = self.call_audit_container(
broker, shard_ranges)
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertEqual(['Updating own shard range from root', mock.ANY],
self.assertEqual(['Updating own shard range from root'],
sharder.logger.get_lines_for_level('debug'))
own_shard_range.meta_timestamp = ts_now
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
self.assertEqual(['Updated own shard range from %s to %s'
% (own_shard_range, expected)],
sharder.logger.get_lines_for_level('info'))
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
Expand Down Expand Up @@ -5695,19 +5703,27 @@ def _do_test_audit_shard_container(self, *args):
# make own shard range match one in root, but different state
own_ts = next(self.ts_iter)
shard_ranges[1].timestamp = own_ts
broker.merge_shard_ranges([shard_ranges[1]])
own_shard_range = shard_ranges[1].copy()
broker.merge_shard_ranges([own_shard_range])
root_ts = next(self.ts_iter)
shard_ranges[1].update_state(ShardRange.SHARDING,
state_timestamp=root_ts)
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
with mock_timestamp_now() as ts_now:
sharder, mock_swift = self.call_audit_container(
broker, shard_ranges)
self.assert_no_audit_messages(sharder, mock_swift)
self.assertFalse(broker.is_deleted())
self.assertEqual(['Updating own shard range from root'],
sharder.logger.get_lines_for_level('debug'))
own_shard_range.meta_timestamp = ts_now
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
self.assertEqual(['Updated own shard range from %s to %s'
% (own_shard_range, expected)],
sharder.logger.get_lines_for_level('info'))
# own shard range state is updated from root version
own_shard_range = broker.get_own_shard_range()
self.assertEqual(ShardRange.SHARDING, own_shard_range.state)
self.assertEqual(root_ts, own_shard_range.state_timestamp)
self.assertEqual(['Updating own shard range from root', mock.ANY],
sharder.logger.get_lines_for_level('debug'))

own_shard_range.update_state(ShardRange.SHARDED,
state_timestamp=next(self.ts_iter))
Expand Down Expand Up @@ -5834,60 +5850,181 @@ def test_audit_shard_container_merge_other_ranges(self):
self._do_test_audit_shard_container_merge_other_ranges('Quoted-Root',
'a/c')

def _do_test_audit_shard_container_with_root_ranges(self, *args):
# shards may merge acceptors and the root range when shrinking; verify
# that shard audit is ok with merged ranges
def check_audit(own_state, acceptor_state, root_state):
broker = self._make_broker(
account='.shards_a',
container='shard_c_%s' % next(self.ts_iter).normal)
broker.set_sharding_sysmeta(*args)
own_sr = broker.get_own_shard_range().copy(
state=own_state, state_timestamp=next(self.ts_iter),
lower='a', upper='b', timestamp=next(self.ts_iter))
broker.merge_shard_ranges([own_sr])
def _assert_merge_into_shard(self, own_shard_range, shard_ranges,
root_shard_ranges, expected, *args):
# create a shard broker, initialise with shard_ranges, run audit on it
# supplying given root_shard_ranges and verify that the broker ends up
# with expected shard ranges.
broker = self._make_broker(account=own_shard_range.account,
container=own_shard_range.container)
broker.set_sharding_sysmeta(*args)
broker.merge_shard_ranges([own_shard_range] + shard_ranges)
self.assertFalse(broker.is_root_container())

# make acceptor and root ranges that overlap with the shard
overlaps = self._make_shard_ranges([('a', 'c'), ('', '')],
[acceptor_state, root_state])
sharder, mock_swift = self.call_audit_container(
broker, [own_sr] + overlaps)
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
if own_state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
# check acceptor & root are merged into audited shard
self.assertEqual(
[dict(sr) for sr in overlaps],
[dict(sr) for sr in broker.get_shard_ranges()])
return sharder
sharder, mock_swift = self.call_audit_container(
broker, root_shard_ranges)
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)

def assert_ok(own_state, acceptor_state, root_state):
sharder = check_audit(own_state, acceptor_state, root_state)
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
self._assert_shard_ranges_equal(expected, broker.get_shard_ranges())
self.assertEqual(own_shard_range,
broker.get_own_shard_range(no_default=True))
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
self._assert_stats(expected_stats, sharder, 'audit_shard')
return sharder

def _do_test_audit_shard_root_ranges_not_merged(self, *args):
# Make root and other ranges that fully contain the shard namespace...
root_own_sr = ShardRange('a/c', next(self.ts_iter))
acceptor = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c',
next(self.ts_iter), 1)),
next(self.ts_iter), 'a', 'c')

def do_test(own_state, acceptor_state, root_state):
acceptor_from_root = acceptor.copy(
timestamp=next(self.ts_iter), state=acceptor_state)
root_from_root = root_own_sr.copy(
timestamp=next(self.ts_iter), state=root_state)
with annotate_failure('with states %s %s %s'
% (own_state, acceptor_state, root_state)):
self._assert_stats(expected_stats, sharder, 'audit_shard')
own_sr_name = ShardName.create(
'.shards_a', 'c', 'c', next(self.ts_iter), 0)
own_sr = ShardRange(
str(own_sr_name), next(self.ts_iter), state=own_state,
state_timestamp=next(self.ts_iter), lower='a', upper='b')
expected = existing = []
sharder = self._assert_merge_into_shard(
own_sr, existing,
[own_sr, acceptor_from_root, root_from_root],
expected, *args)
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))

for own_state in ShardRange.STATES:
if own_state in ShardRange.SHRINKING_STATES:
# shrinking states are covered by other tests
continue
for acceptor_state in ShardRange.STATES:
for root_state in ShardRange.STATES:
do_test(own_state, acceptor_state, root_state)

def test_audit_old_style_shard_root_ranges_not_merged_not_shrinking(self):
# verify that other shard ranges from root are NOT merged into shard
# when it is NOT in a shrinking state
self._do_test_audit_shard_root_ranges_not_merged('Root', 'a/c')

def test_audit_shard_root_ranges_not_merged_not_shrinking(self):
# verify that other shard ranges from root are NOT merged into shard
# when it is NOT in a shrinking state
self._do_test_audit_shard_root_ranges_not_merged('Quoted-Root', 'a/c')

def test_audit_shard_root_ranges_with_own_merged_while_shrinking(self):
# Verify that shrinking shard will merge root and other ranges,
# including root range.
# Make root and other ranges that fully contain the shard namespace...
root_own_sr = ShardRange('a/c', next(self.ts_iter))
acceptor = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c',
next(self.ts_iter), 1)),
next(self.ts_iter), 'a', 'c')

def do_test(own_state, acceptor_state, root_state):
acceptor_from_root = acceptor.copy(
timestamp=next(self.ts_iter), state=acceptor_state)
root_from_root = root_own_sr.copy(
timestamp=next(self.ts_iter), state=root_state)
ts = next(self.ts_iter)
own_sr = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
ts, lower='a', upper='b', state=own_state, state_timestamp=ts)
expected = [acceptor_from_root, root_from_root]
with annotate_failure('with states %s %s %s'
% (own_state, acceptor_state, root_state)):
sharder = self._assert_merge_into_shard(
own_sr, [],
# own sr is in ranges fetched from root
[own_sr, acceptor_from_root, root_from_root],
expected, 'Quoted-Root', 'a/c')
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))

for own_state in ShardRange.SHRINKING_STATES:
for acceptor_state in ShardRange.STATES:
for root_state in ShardRange.STATES:
assert_ok(own_state, acceptor_state, root_state)
do_test(own_state, acceptor_state, root_state)

def test_audit_shard_root_ranges_missing_own_merged_while_shrinking(self):
# Verify that shrinking shard will merge root and other ranges,
# including root range.
# Make root and other ranges that fully contain the shard namespace...
root_own_sr = ShardRange('a/c', next(self.ts_iter))
acceptor = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c',
next(self.ts_iter), 1)),
next(self.ts_iter), 'a', 'c')

def do_test(own_state, acceptor_state, root_state):
acceptor_from_root = acceptor.copy(
timestamp=next(self.ts_iter), state=acceptor_state)
root_from_root = root_own_sr.copy(
timestamp=next(self.ts_iter), state=root_state)
ts = next(self.ts_iter)
own_sr = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
ts, lower='a', upper='b', state=own_state, state_timestamp=ts)
expected = [acceptor_from_root, root_from_root]
with annotate_failure('with states %s %s %s'
% (own_state, acceptor_state, root_state)):
sharder = self._assert_merge_into_shard(
own_sr, [],
# own sr is NOT in ranges fetched from root
[acceptor_from_root, root_from_root],
expected, 'Quoted-Root', 'a/c')
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines))
self.assertIn('root has no matching shard range',
warning_lines[0])
self.assertFalse(sharder.logger.get_lines_for_level('error'))

def test_audit_old_style_shard_container_with_root_ranges(self):
self._do_test_audit_shard_container_with_root_ranges('Root', 'a/c')
for own_state in ShardRange.SHRINKING_STATES:
for acceptor_state in ShardRange.STATES:
for root_state in ShardRange.STATES:
do_test(own_state, acceptor_state, root_state)

def test_audit_shard_root_ranges_fetch_fails_while_shrinking(self):
# check audit copes with failed response while shard is shrinking
ts = next(self.ts_iter)
own_sr = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
ts, lower='a', upper='b', state=ShardRange.SHRINKING,
state_timestamp=ts)
broker = self._make_broker(account=own_sr.account,
container=own_sr.container)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
broker.merge_shard_ranges(own_sr)
self.assertFalse(broker.is_root_container())

def test_audit_shard_container_with_root_ranges(self):
self._do_test_audit_shard_container_with_root_ranges('Quoted-Root',
'a/c')
sharder, mock_swift = self.call_audit_container(
broker, [], exc=internal_client.UnexpectedResponse('bad', 'resp'))
self.assertEqual([], broker.get_shard_ranges())
self.assertEqual(own_sr, broker.get_own_shard_range(no_default=True))
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
self._assert_stats(expected_stats, sharder, 'audit_shard')
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertEqual(2, len(warning_lines))
self.assertIn('Failed to get shard ranges from a/c: bad',
warning_lines[0])
self.assertIn('unable to get shard ranges from root',
warning_lines[1])
self.assertFalse(sharder.logger.get_lines_for_level('error'))

def test_audit_shard_deleted_range_in_root_container(self):
# verify that shard DB is marked deleted when its own shard range is
Expand Down

0 comments on commit f15b920

Please sign in to comment.