diff --git a/poetry.lock b/poetry.lock index 8d468adf1295..58fd22018b66 100644 --- a/poetry.lock +++ b/poetry.lock @@ -888,17 +888,17 @@ tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"] [[package]] name = "pyopenssl" -version = "22.0.0" +version = "22.1.0" description = "Python wrapper module around the OpenSSL library" category = "main" optional = false python-versions = ">=3.6" [package.dependencies] -cryptography = ">=35.0" +cryptography = ">=38.0.0,<39" [package.extras] -docs = ["sphinx", "sphinx-rtd-theme"] +docs = ["sphinx (!=5.2.0,!=5.2.0.post0)", "sphinx-rtd-theme"] test = ["flaky", "pretend", "pytest (>=3.0.1)"] [[package]] @@ -2451,8 +2451,8 @@ pynacl = [ {file = "PyNaCl-1.5.0.tar.gz", hash = "sha256:8ac7448f09ab85811607bdd21ec2464495ac8b7c66d146bf545b0f08fb9220ba"}, ] pyopenssl = [ - {file = "pyOpenSSL-22.0.0-py2.py3-none-any.whl", hash = "sha256:ea252b38c87425b64116f808355e8da644ef9b07e429398bfece610f893ee2e0"}, - {file = "pyOpenSSL-22.0.0.tar.gz", hash = "sha256:660b1b1425aac4a1bea1d94168a85d99f0b3144c869dd4390d27629d0087f1bf"}, + {file = "pyOpenSSL-22.1.0-py3-none-any.whl", hash = "sha256:b28437c9773bb6c6958628cf9c3bebe585de661dba6f63df17111966363dd15e"}, + {file = "pyOpenSSL-22.1.0.tar.gz", hash = "sha256:7a83b7b272dd595222d672f5ce29aa030f1fb837630ef229f62e72e395ce8968"}, ] pyparsing = [ {file = "pyparsing-3.0.7-py3-none-any.whl", hash = "sha256:a6c06a88f252e6c322f65faf8f418b16213b51bdfaece0524c1c1bc30c63c484"}, diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 60774b240d9f..11c5b10c9546 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import time import urllib.parse from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Tuple @@ -324,6 +325,10 @@ async def push_bulk( "left": list(device_list_summary.left), } + if len(serialized_events) == 0 and len(ephemeral) == 0: + logger.info("Returning early on transaction: no events to send") + return True + try: await self.put_json( uri=uri, @@ -365,7 +370,19 @@ async def push_bulk( def _serialize( self, service: "ApplicationService", events: Iterable[EventBase] ) -> List[JsonDict]: + new_events = [] time_now = self.clock.time_msec() + + for event in events: + if int(round(time.time() * 1000)) - event.origin_server_ts > (15 * 60 * 1000): + logger.warning("Dropping event (due to age) %s" % event.event_id) + continue + if service.id != "github" and service.is_interested_in_user(event.sender) and event.sender.endswith(":t2bot.io"): + logger.warning("Dropping event (due to echo) %s" % event.event_id) + continue + logger.info("Allowing @ fallback: %s" % event.event_id) + new_events.append(event) + return [ serialize_event( e, @@ -384,5 +401,5 @@ def _serialize( ), ), ) - for e in events + for e in new_events ] diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c4c0bc7315b4..2402bbbc9ba5 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -369,7 +369,7 @@ async def get_pdu( destinations: Collection[str], event_id: str, room_version: RoomVersion, - timeout: Optional[int] = None, + timeout: Optional[int] = 15000, ) -> Optional[PulledPduInfo]: """Requests the PDU with given origin and ID from the remote home servers. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bb20af6e91ed..1b4cfc89d9b0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -87,7 +87,7 @@ # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. -TRANSACTION_CONCURRENCY_LIMIT = 10 +TRANSACTION_CONCURRENCY_LIMIT = 50 # T2B: Raise from 10 logger = logging.getLogger(__name__) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 3ad483efe079..a58cca4f826f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -705,6 +705,11 @@ def send_presence_to_destinations( for destination in destinations: if destination == self.server_name: continue + + # T2B: Skip sending presence to servers we know don't support it + if destination == "matrix.org": + continue + if not self._federation_shard_config.should_handle( self._instance_name, destination ): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd39d4d1113a..74d6897aa8ea 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -106,7 +106,7 @@ async def get_room_state( ) async def get_event( - self, destination: str, event_id: str, timeout: Optional[int] = None + self, destination: str, event_id: str, timeout: Optional[int] = 15000 ) -> JsonDict: """Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 205fd16daa98..4e2d1b2331fa 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -100,14 +100,32 @@ async def on_PUT( logger.debug("Decoded %s: %s", transaction_id, str(transaction_data)) + edus_before_filter = len(transaction_data.get("edus", [])) + + filtered_edus = [] + for edu in transaction_data.get("edus", []): + edu_type = edu.get('edu_type', 'io.t2bot.ignored') + if edu_type == 'io.t2bot.ignored': + continue + if edu_type == 'm.presence': + continue + if edu_type == 'm.receipt': + continue + if edu_type == 'm.typing': + continue + filtered_edus.append(edu) + logger.info( - "Received txn %s from %s. (PDUs: %d, EDUs: %d)", + "Received txn %s from %s. (PDUs: %d, Accepted EDUs: %d, Ignored EDUs: %d)", transaction_id, origin, len(transaction_data.get("pdus", [])), - len(transaction_data.get("edus", [])), + len(filtered_edus), + edus_before_filter - len(filtered_edus), ) + transaction_data["edus"] = filtered_edus + if issue_8631_logger.isEnabledFor(logging.DEBUG): DEVICE_UPDATE_EDUS = [ EduTypes.DEVICE_LIST_UPDATE, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f7223b03c364..b8c9080f447c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2151,7 +2151,7 @@ async def _run_push_actions_and_persist_event( # persist_events_and_notify directly.) assert not event.internal_metadata.outlier - if not backfilled and not context.rejected: + if False and not backfilled and not context.rejected: min_depth = await self._store.get_min_depth(event.room_id) if min_depth is None or min_depth > event.depth: # XXX richvdh 2021/10/07: I don't really understand what this diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4cf593cfdcbc..ff4f1c1b2a0b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1462,9 +1462,10 @@ async def _persist_events( a room that has been un-partial stated. """ - await self._bulk_push_rule_evaluator.action_for_events_by_user( - events_and_context - ) + # T2B: Disable push processing. + #await self._bulk_push_rule_evaluator.action_for_events_by_user( + # events_and_context + #) try: # If we're a worker we need to hit out to the master. diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 3610b6bf785e..bdda5e2f0f6e 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -70,7 +70,8 @@ def __init__(self, hs: "HomeServer"): # Guard to ensure we only process deltas one at a time self._is_processing = False - if self.update_user_directory: + # T2B: Disable user directory + if self.update_user_directory and False: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before @@ -109,6 +110,11 @@ async def search_users( def notify_new_event(self) -> None: """Called when there may be more deltas to process""" + + # T2B: Disable user directory + if True: + return + if not self.update_user_directory: return @@ -133,6 +139,10 @@ async def handle_local_profile_change( # FIXME(#3714): We should probably do this in the same worker as all # the other changes. + # T2B: Disable user directory + if True: + return + if await self.store.should_include_local_user_in_dir(user_id): await self.store.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url @@ -142,6 +152,11 @@ async def handle_local_user_deactivated(self, user_id: str) -> None: """Called when a user ID is deactivated""" # FIXME(#3714): We should probably do this in the same worker as all # the other changes. + + # T2B: Disable user directory + if True: + return + await self.store.remove_from_user_dir(user_id) async def _unsafe_process(self) -> None: