Skip to content

Commit

Permalink
Background update in a single transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods committed Oct 16, 2024
1 parent 5c6178b commit 6bd638e
Showing 1 changed file with 75 additions and 70 deletions.
145 changes: 75 additions & 70 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2451,96 +2451,101 @@ async def _sliding_sync_membership_snapshots_fix_forgotten_column_bg_update(
"last_event_stream_ordering", -(1 << 31)
)

# To simplify things, we can just recheck for any row in
# `sliding_sync_membership_snapshots` with `forgotten=1`
def _find_memberships_to_update_txn(
def _txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int]]:
) -> int:
"""
Returns:
The number of rows updated.
"""

# To simplify things, we can just recheck any row in
# `sliding_sync_membership_snapshots` with `forgotten=1`
txn.execute(
"""
SELECT
room_id,
user_id,
membership_event_id,
event_stream_ordering
FROM sliding_sync_membership_snapshots
WHERE event_stream_ordering > ?
AND forgotten = 1
ORDER BY event_stream_ordering ASC
s.room_id,
s.user_id,
s.membership_event_id,
s.event_stream_ordering,
m.forgotten
FROM sliding_sync_membership_snapshots AS s
INNER JOIN room_memberships AS m ON (s.membership_event_id = m.event_id)
WHERE s.event_stream_ordering > ?
AND s.forgotten = 1
ORDER BY s.event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)

memberships_to_update_rows = cast(
List[Tuple[str, str, str, int]],
List[Tuple[str, str, str, int, int]],
txn.fetchall(),
)
if not memberships_to_update_rows:
return 0

return memberships_to_update_rows

memberships_to_update_rows = await self.db_pool.runInteraction(
"_sliding_sync_membership_snapshots_fix_forgotten_column_bg_update._find_memberships_to_update_txn",
_find_memberships_to_update_txn,
)

if not memberships_to_update_rows:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE
)
return 0

forgotten_update_query_args: List[Tuple[str, str, str, str]] = []
for (
room_id,
user_id,
membership_event_id,
_event_stream_ordering,
) in memberships_to_update_rows:
forgotten_update_query_args.append(
(
membership_event_id,
room_id,
user_id,
membership_event_id,
# Assemble the values to update
#
# (room_id, user_id)
key_values: List[Tuple[str, str]] = []
# (forgotten,)
value_values: List[Tuple[int]] = []
for (
room_id,
user_id,
_membership_event_id,
_event_stream_ordering,
forgotten,
) in memberships_to_update_rows:
key_values.append(
(
room_id,
user_id,
)
)
)
value_values.append((forgotten,))

def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_membership_snapshots` table
#
# We need to find the `forgotten` value during the transaction because
# we can't risk inserting stale data.
txn.execute_batch(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ? AND membership_event_id = ?
""",
forgotten_update_query_args,
# Update all of the rows in one go
self.db_pool.simple_update_many_txn(
txn,
table="sliding_sync_membership_snapshots",
key_names=("room_id", "user_id"),
key_values=key_values,
value_names=("forgotten",),
value_values=value_values,
)

# Update the progress
(
_room_id,
_user_id,
_membership_event_id,
event_stream_ordering,
_forgotten,
) = memberships_to_update_rows[-1]
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
{
"last_event_stream_ordering": event_stream_ordering,
},
)

await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update",
_fill_table_txn,
)
return len(memberships_to_update_rows)

# Update the progress
(
_room_id,
_user_id,
_membership_event_id,
event_stream_ordering,
) = memberships_to_update_rows[-1]
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
{
"last_event_stream_ordering": event_stream_ordering,
},
num_rows = await self.db_pool.runInteraction(
"_sliding_sync_membership_snapshots_fix_forgotten_column_bg_update",
_txn,
)

return len(memberships_to_update_rows)
if not num_rows:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE
)

return num_rows


def _resolve_stale_data_in_sliding_sync_tables(
Expand Down

0 comments on commit 6bd638e

Please sign in to comment.