Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronise 2023.1 with upstream #121

Merged
merged 9 commits into from
Oct 21, 2024
35 changes: 25 additions & 10 deletions nova/cmd/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ def version(self):
print(migration.db_version())

@args('--max_rows', type=int, metavar='<number>', dest='max_rows',
help='Maximum number of deleted rows to archive. Defaults to 1000. '
'Note that this number does not include the corresponding '
'rows, if any, that are removed from the API database for '
'deleted instances.')
help='Maximum number of deleted rows to archive per table. Defaults '
'to 1000. Note that this number is a soft limit and does not '
'include the corresponding rows, if any, that are removed '
'from the API database for deleted instances.')
@args('--before', metavar='<date>',
help=('Archive rows that have been deleted before this date. '
'Accepts date strings in the default format output by the '
Expand Down Expand Up @@ -432,7 +432,10 @@ def _do_archive(
'cell1.instances': 5}
:param cctxt: Cell-targeted nova.context.RequestContext if archiving
across all cells
:param max_rows: Maximum number of deleted rows to archive
:param max_rows: Maximum number of deleted rows to archive per table.
Note that this number is a soft limit and does not include the
corresponding rows, if any, that are removed from the API database
for deleted instances.
:param until_complete: Whether to run continuously until all deleted
rows are archived
:param verbose: Whether to print how many rows were archived per table
Expand All @@ -445,15 +448,26 @@ def _do_archive(
"""
ctxt = context.get_admin_context()
while True:
run, deleted_instance_uuids, total_rows_archived = \
# table_to_rows = {table_name: number_of_rows_archived}
# deleted_instance_uuids = ['uuid1', 'uuid2', ...]
table_to_rows, deleted_instance_uuids, total_rows_archived = \
db.archive_deleted_rows(
cctxt, max_rows, before=before_date, task_log=task_log)
for table_name, rows_archived in run.items():

for table_name, rows_archived in table_to_rows.items():
if cell_name:
table_name = cell_name + '.' + table_name
table_to_rows_archived.setdefault(table_name, 0)
table_to_rows_archived[table_name] += rows_archived
if deleted_instance_uuids:

# deleted_instance_uuids does not necessarily mean that any
# instances rows were archived because it is obtained by a query
# separate from the archive queries. For example, if a
# DBReferenceError was raised while processing the instances table,
# we would have skipped the table and had 0 rows archived even
# though deleted instances rows were found.
instances_archived = table_to_rows.get('instances', 0)
if deleted_instance_uuids and instances_archived:
table_to_rows_archived.setdefault(
'API_DB.instance_mappings', 0)
table_to_rows_archived.setdefault(
Expand All @@ -476,8 +490,9 @@ def _do_archive(

# If we're not archiving until there is nothing more to archive, we
# have reached max_rows in this cell DB or there was nothing to
# archive.
if not until_complete or not run:
# archive. We check the values() in case we get something like
# table_to_rows = {'instances': 0} back somehow.
if not until_complete or not any(table_to_rows.values()):
break
if verbose:
sys.stdout.write('.')
Expand Down
5 changes: 3 additions & 2 deletions nova/conf/pci.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@
its corresponding PF), otherwise they will be ignored and not
available for allocation.
- ``resource_class`` - optional Placement resource class name to be used
to track the matching PCI devices in Placement when [pci]device_spec is
True. It can be a standard resource class from the
to track the matching PCI devices in Placement when
[pci]report_in_placement is True.
It can be a standard resource class from the
``os-resource-classes`` lib. Or can be any string. In that case Nova will
normalize it to a proper Placement resource class by making it upper
case, replacing any consecutive character outside of ``[A-Z0-9_]`` with a
Expand Down
131 changes: 93 additions & 38 deletions nova/db/main/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4407,6 +4407,9 @@ def _archive_deleted_rows_for_table(
select = select.order_by(column).limit(max_rows)
with conn.begin():
rows = conn.execute(select).fetchall()

# This is a list of IDs of rows that should be archived from this table,
# limited to a length of max_rows.
records = [r[0] for r in rows]

# We will archive deleted rows for this table and also generate insert and
Expand All @@ -4419,51 +4422,103 @@ def _archive_deleted_rows_for_table(

# Keep track of any extra tablenames to number of rows that we archive by
# following FK relationships.
# {tablename: extra_rows_archived}
#
# extras = {tablename: number_of_extra_rows_archived}
extras = collections.defaultdict(int)
if records:
insert = shadow_table.insert().from_select(
columns, sql.select(table).where(column.in_(records))
).inline()
delete = table.delete().where(column.in_(records))

if not records:
# Nothing to archive, so return.
return rows_archived, deleted_instance_uuids, extras

# Keep track of how many rows we accumulate for the insert+delete database
# transaction and cap it as soon as it is >= max_rows. Because we will
# archive all child rows of a parent row along with the parent at the same
# time, we end up with extra rows to archive in addition to len(records).
num_rows_in_batch = 0
# The sequence of query statements we will execute in a batch. These are
# ordered: [child1, child1, parent1, child2, child2, child2, parent2, ...]
# Parent + child "trees" are kept together to avoid FK constraint
# violations.
statements_in_batch = []
# The list of records in the batch. This is used for collecting deleted
# instance UUIDs in the case of the 'instances' table.
records_in_batch = []

# (melwitt): We will gather rows related by foreign key relationship for
# each deleted row, one at a time. We do it this way to keep track of and
# limit the total number of rows that will be archived in a single database
# transaction. In a large scale database with potentially hundreds of
# thousands of deleted rows, if we don't limit the size of the transaction
# based on max_rows, we can get into a situation where we get stuck not
# able to make much progress. The value of max_rows has to be 1) small
# enough to not exceed the database's max packet size limit or timeout with
# a deadlock but 2) large enough to make progress in an environment with a
# constant high volume of create and delete traffic. By archiving each
# parent + child rows tree one at a time, we can ensure meaningful progress
# can be made while allowing the caller to predictably control the size of
# the database transaction with max_rows.
for record in records:
# Walk FK relationships and add insert/delete statements for rows that
# refer to this table via FK constraints. fk_inserts and fk_deletes
# will be prepended to by _get_fk_stmts if referring rows are found by
# FK constraints.
fk_inserts, fk_deletes = _get_fk_stmts(
metadata, conn, table, column, records)

# NOTE(tssurya): In order to facilitate the deletion of records from
# instance_mappings, request_specs and instance_group_member tables in
# the nova_api DB, the rows of deleted instances from the instances
# table are stored prior to their deletion. Basically the uuids of the
# archived instances are queried and returned.
if tablename == "instances":
query_select = sql.select(table.c.uuid).where(
table.c.id.in_(records)
)
with conn.begin():
rows = conn.execute(query_select).fetchall()
deleted_instance_uuids = [r[0] for r in rows]
metadata, conn, table, column, [record])
statements_in_batch.extend(fk_inserts + fk_deletes)
# statement to add parent row to shadow table
insert = shadow_table.insert().from_select(
columns, sql.select(table).where(column.in_([record]))).inline()
statements_in_batch.append(insert)
# statement to remove parent row from main table
delete = table.delete().where(column.in_([record]))
statements_in_batch.append(delete)

try:
# Group the insert and delete in a transaction.
with conn.begin():
for fk_insert in fk_inserts:
conn.execute(fk_insert)
for fk_delete in fk_deletes:
result_fk_delete = conn.execute(fk_delete)
extras[fk_delete.table.name] += result_fk_delete.rowcount
conn.execute(insert)
result_delete = conn.execute(delete)
rows_archived += result_delete.rowcount
except db_exc.DBReferenceError as ex:
# A foreign key constraint keeps us from deleting some of
# these rows until we clean up a dependent table. Just
# skip this table for now; we'll come back to it later.
LOG.warning("IntegrityError detected when archiving table "
"%(tablename)s: %(error)s",
{'tablename': tablename, 'error': str(ex)})
records_in_batch.append(record)

# Check whether were have a full batch >= max_rows. Rows are counted as
# the number of rows that will be moved in the database transaction.
# So each insert+delete pair represents one row that will be moved.
# 1 parent + its fks
num_rows_in_batch += 1 + len(fk_inserts)

if max_rows is not None and num_rows_in_batch >= max_rows:
break

# NOTE(tssurya): In order to facilitate the deletion of records from
# instance_mappings, request_specs and instance_group_member tables in the
# nova_api DB, the rows of deleted instances from the instances table are
# stored prior to their deletion. Basically the uuids of the archived
# instances are queried and returned.
if tablename == "instances":
query_select = sql.select(table.c.uuid).where(
table.c.id.in_(records_in_batch))
with conn.begin():
rows = conn.execute(query_select).fetchall()
# deleted_instance_uuids = ['uuid1', 'uuid2', ...]
deleted_instance_uuids = [r[0] for r in rows]

try:
# Group the insert and delete in a transaction.
with conn.begin():
for statement in statements_in_batch:
result = conn.execute(statement)
result_tablename = statement.table.name
# Add to archived row counts if not a shadow table.
if not result_tablename.startswith(_SHADOW_TABLE_PREFIX):
if result_tablename == tablename:
# Number of tablename (parent) rows archived.
rows_archived += result.rowcount
else:
# Number(s) of child rows archived.
extras[result_tablename] += result.rowcount

except db_exc.DBReferenceError as ex:
# A foreign key constraint keeps us from deleting some of these rows
# until we clean up a dependent table. Just skip this table for now;
# we'll come back to it later.
LOG.warning("IntegrityError detected when archiving table "
"%(tablename)s: %(error)s",
{'tablename': tablename, 'error': str(ex)})

conn.close()

Expand Down
4 changes: 4 additions & 0 deletions nova/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,10 @@ class FileNotFound(NotFound):
msg_fmt = _("File %(file_path)s could not be found.")


class DeviceBusy(NovaException):
msg_fmt = _("device %(file_path)s is busy.")


class ClassNotFound(NotFound):
msg_fmt = _("Class %(class_name)s could not be found: %(exception)s")

Expand Down
55 changes: 52 additions & 3 deletions nova/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,97 @@

"""Functions to address filesystem calls, particularly sysfs."""

import functools
import os
import time

from oslo_log import log as logging

from nova import exception

LOG = logging.getLogger(__name__)
SYS = '/sys'
RETRY_LIMIT = 5


SYS = '/sys'
# a retry decorator to handle EBUSY
def retry_if_busy(func):
"""Decorator to retry a function if it raises DeviceBusy.

This decorator will retry the function RETRY_LIMIT=5 times if it raises
DeviceBusy. It will sleep for 1 second on the first retry, 2 seconds on
the second retry, and so on, up to RETRY_LIMIT seconds. If the function
still raises DeviceBusy after RETRY_LIMIT retries, the exception will be
raised.
"""

@functools.wraps(func)
def wrapper(*args, **kwargs):
for i in range(RETRY_LIMIT):
try:
return func(*args, **kwargs)
except exception.DeviceBusy as e:
# if we have retried RETRY_LIMIT times, raise the exception
# otherwise, sleep and retry, i is 0-based so we need
# to add 1 to it
count = i + 1
if count < RETRY_LIMIT:
LOG.debug(
f"File {e.kwargs['file_path']} is busy, "
f"sleeping {count} seconds before retrying")
time.sleep(count)
continue
raise
return wrapper

# NOTE(bauzas): this method is deliberately not wrapped in a privsep entrypoint


@retry_if_busy
def read_sys(path: str) -> str:
"""Reads the content of a file in the sys filesystem.

:param path: relative or absolute. If relative, will be prefixed by /sys.
:returns: contents of that file.
:raises: nova.exception.FileNotFound if we can't read that file.
:raises: nova.exception.DeviceBusy if the file is busy.
"""
try:
# The path can be absolute with a /sys prefix but that's fine.
with open(os.path.join(SYS, path), mode='r') as data:
return data.read()
except (OSError, ValueError) as exc:
except OSError as exc:
# errno 16 is EBUSY, which means the file is busy.
if exc.errno == 16:
raise exception.DeviceBusy(file_path=path) from exc
# convert permission denied to file not found
raise exception.FileNotFound(file_path=path) from exc
except ValueError as exc:
raise exception.FileNotFound(file_path=path) from exc


# NOTE(bauzas): this method is deliberately not wrapped in a privsep entrypoint
# In order to correctly use it, you need to decorate the caller with a specific
# privsep entrypoint.
@retry_if_busy
def write_sys(path: str, data: str) -> None:
"""Writes the content of a file in the sys filesystem with data.

:param path: relative or absolute. If relative, will be prefixed by /sys.
:param data: the data to write.
:returns: contents of that file.
:raises: nova.exception.FileNotFound if we can't write that file.
:raises: nova.exception.DeviceBusy if the file is busy.
"""
try:
# The path can be absolute with a /sys prefix but that's fine.
with open(os.path.join(SYS, path), mode='w') as fd:
fd.write(data)
except (OSError, ValueError) as exc:
except OSError as exc:
# errno 16 is EBUSY, which means the file is busy.
if exc.errno == 16:
raise exception.DeviceBusy(file_path=path) from exc
# convert permission denied to file not found
raise exception.FileNotFound(file_path=path) from exc
except ValueError as exc:
raise exception.FileNotFound(file_path=path) from exc
44 changes: 44 additions & 0 deletions nova/tests/functional/db/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,50 @@ def test_archive_deleted_rows_incomplete(self):
break
self.assertFalse(exceptions)

def test_archive_deleted_rows_parent_child_trees_one_at_time(self):
"""Test that we are archiving parent + child rows "trees" one at a time
Previously, we archived deleted rows in batches of max_rows parents +
their child rows in a single database transaction. Doing it that way
limited how high a value of max_rows could be specified by the caller
because of the size of the database transaction it could generate.
For example, in a large scale deployment with hundreds of thousands of
deleted rows and constant server creation and deletion activity, a
value of max_rows=1000 might exceed the database's configured maximum
packet size or timeout due to a database deadlock, forcing the operator
to use a much lower max_rows value like 100 or 50.
And when the operator has e.g. 500,000 deleted instances rows (and
millions of deleted rows total) they are trying to archive, being
forced to use a max_rows value serveral orders of magnitude lower than
the number of rows they need to archive was a poor user experience.
This tests that we are archiving each parent plus their child rows one
at a time while limiting the total number of rows per table in a batch
as soon as the total number of archived rows is >= max_rows.
"""
# Boot two servers and delete them, then try to archive rows.
for i in range(2):
server = self._create_server()
self._delete_server(server)

# Use max_rows=5 to limit the archive to one complete parent +
# child rows tree.
table_to_rows, _, _ = db.archive_deleted_rows(max_rows=5)

# We should have archived only one instance because the parent +
# child tree will have >= max_rows of 5.
self.assertEqual(1, table_to_rows['instances'])

# Archive once more to archive the other instance (and its child rows).
table_to_rows, _, _ = db.archive_deleted_rows(max_rows=5)
self.assertEqual(1, table_to_rows['instances'])

# There should be nothing else to archive.
_, _, total_rows_archived = db.archive_deleted_rows(max_rows=100)
self.assertEqual(0, total_rows_archived)

def _get_table_counts(self):
engine = db.get_engine()
conn = engine.connect()
Expand Down
Loading