-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding Sync: Slight optimization when fetching state for the room (get_events_as_list(...)
)
#17718
Changes from all commits
fbae11f
fed7502
d3e67f1
c84ee8f
4b62546
be025a4
55c5811
58550d0
ff6e51f
4d1b3e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Slight optimization when fetching state/events for Sliding Sync. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,7 +61,13 @@ | |
current_context, | ||
make_deferred_yieldable, | ||
) | ||
from synapse.logging.opentracing import start_active_span, tag_args, trace | ||
from synapse.logging.opentracing import ( | ||
SynapseTags, | ||
set_tag, | ||
start_active_span, | ||
tag_args, | ||
trace, | ||
) | ||
from synapse.metrics.background_process_metrics import ( | ||
run_as_background_process, | ||
wrap_as_background_process, | ||
|
@@ -525,6 +531,7 @@ async def get_event( | |
|
||
return event | ||
|
||
@trace | ||
async def get_events( | ||
self, | ||
event_ids: Collection[str], | ||
|
@@ -556,6 +563,11 @@ async def get_events( | |
Returns: | ||
A mapping from event_id to event. | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", | ||
str(len(event_ids)), | ||
) | ||
|
||
events = await self.get_events_as_list( | ||
event_ids, | ||
redact_behaviour=redact_behaviour, | ||
|
@@ -603,6 +615,10 @@ async def get_events_as_list( | |
Note that the returned list may be smaller than the list of event | ||
IDs if not all events could be fetched. | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", | ||
str(len(event_ids)), | ||
) | ||
|
||
if not event_ids: | ||
return [] | ||
|
@@ -723,10 +739,11 @@ async def get_events_as_list( | |
|
||
return events | ||
|
||
@trace | ||
@cancellable | ||
async def get_unredacted_events_from_cache_or_db( | ||
self, | ||
event_ids: Iterable[str], | ||
event_ids: Collection[str], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Collection because we iterate over |
||
allow_rejected: bool = False, | ||
) -> Dict[str, EventCacheEntry]: | ||
"""Fetch a bunch of events from the cache or the database. | ||
|
@@ -748,6 +765,11 @@ async def get_unredacted_events_from_cache_or_db( | |
Returns: | ||
map from event id to result | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", | ||
str(len(event_ids)), | ||
) | ||
|
||
# Shortcut: check if we have any events in the *in memory* cache - this function | ||
# may be called repeatedly for the same event so at this point we cannot reach | ||
# out to any external cache for performance reasons. The external cache is | ||
|
@@ -936,7 +958,7 @@ async def _get_events_from_cache( | |
events, update_metrics=update_metrics | ||
) | ||
|
||
missing_event_ids = (e for e in events if e not in event_map) | ||
missing_event_ids = [e for e in events if e not in event_map] | ||
event_map.update( | ||
await self._get_events_from_external_cache( | ||
events=missing_event_ids, | ||
|
@@ -946,8 +968,9 @@ async def _get_events_from_cache( | |
|
||
return event_map | ||
|
||
@trace | ||
async def _get_events_from_external_cache( | ||
self, events: Iterable[str], update_metrics: bool = True | ||
self, events: Collection[str], update_metrics: bool = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one was a proper |
||
) -> Dict[str, EventCacheEntry]: | ||
"""Fetch events from any configured external cache. | ||
|
||
|
@@ -957,6 +980,10 @@ async def _get_events_from_external_cache( | |
events: list of event_ids to fetch | ||
update_metrics: Whether to update the cache hit ratio metrics | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "events.length", | ||
str(len(events)), | ||
) | ||
event_map = {} | ||
|
||
for event_id in events: | ||
|
@@ -1222,6 +1249,7 @@ def fire_errback(exc: Exception) -> None: | |
with PreserveLoggingContext(): | ||
self.hs.get_reactor().callFromThread(fire_errback, e) | ||
|
||
@trace | ||
async def _get_events_from_db( | ||
self, event_ids: Collection[str] | ||
) -> Dict[str, EventCacheEntry]: | ||
|
@@ -1240,6 +1268,11 @@ async def _get_events_from_db( | |
map from event id to result. May return extra events which | ||
weren't asked for. | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", | ||
str(len(event_ids)), | ||
) | ||
|
||
fetched_event_ids: Set[str] = set() | ||
fetched_events: Dict[str, _EventRow] = {} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
# | ||
import json | ||
from contextlib import contextmanager | ||
from typing import Generator, List, Tuple | ||
from typing import Generator, List, Set, Tuple | ||
from unittest import mock | ||
|
||
from twisted.enterprise.adbapi import ConnectionPool | ||
|
@@ -295,6 +295,53 @@ def test_dedupe(self) -> None: | |
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) | ||
|
||
|
||
class GetEventsTestCase(unittest.HomeserverTestCase): | ||
"""Test `get_events(...)`/`get_events_as_list(...)`""" | ||
|
||
servlets = [ | ||
admin.register_servlets, | ||
room.register_servlets, | ||
login.register_servlets, | ||
] | ||
|
||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: | ||
self.store: EventsWorkerStore = hs.get_datastores().main | ||
|
||
def test_get_lots_of_messages(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New test because I used it as a test bench and get consistent traces (previous iteration with tracing). The tracing stuff has been removed. Feel free to nit to remove the whole thing. |
||
"""Sanity check that `get_events(...)`/`get_events_as_list(...)` works""" | ||
num_events = 100 | ||
|
||
user_id = self.register_user("user", "pass") | ||
user_tok = self.login(user_id, "pass") | ||
|
||
room_id = self.helper.create_room_as(user_id, tok=user_tok) | ||
|
||
event_ids: Set[str] = set() | ||
for i in range(num_events): | ||
event = self.get_success( | ||
inject_event( | ||
self.hs, | ||
room_id=room_id, | ||
type="m.room.message", | ||
sender=user_id, | ||
content={ | ||
"body": f"foo{i}", | ||
"msgtype": "m.text", | ||
}, | ||
) | ||
) | ||
event_ids.add(event.event_id) | ||
|
||
# Sanity check that we actually created the events | ||
self.assertEqual(len(event_ids), num_events) | ||
|
||
# This is the function under test | ||
fetched_event_map = self.get_success(self.store.get_events(event_ids)) | ||
|
||
# Sanity check that we got the events back | ||
self.assertIncludes(fetched_event_map.keys(), event_ids, exact=True) | ||
|
||
|
||
class DatabaseOutageTestCase(unittest.HomeserverTestCase): | ||
"""Test event fetching during a database outage.""" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One slight optimization to use
get_events_as_list(...)
directly instead ofget_events(...)
.get_events(...)
just turns the result fromget_events_as_list(...)
into a dict and since we're just iterating over the events, we don't need the dict/map.