Skip to content

Commit

Permalink
Matrix Server Delegation Support (m.server param)
Browse files Browse the repository at this point in the history
  • Loading branch information
caronc committed Sep 2, 2024
1 parent fa6d4e6 commit 4a08936
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 5 deletions.
106 changes: 101 additions & 5 deletions apprise/plugins/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#
import re
import requests
import socket
from markdown import markdown
from json import dumps
from json import loads
Expand Down Expand Up @@ -172,6 +173,9 @@ class NotifyMatrix(NotifyBase):
# throttle back.
default_retries = 2

# Delegate Server Lookups timeout after 5 seconds
srv_lookup_timeout_sec = 5

# The number of micro seconds to wait if we get a 429 error code and
# the server doesn't remind us how long we shoul wait for
default_wait_ms = 1000
Expand Down Expand Up @@ -256,6 +260,11 @@ class NotifyMatrix(NotifyBase):
'default': False,
'map_to': 'include_image',
},
'delegate': {
'name': _('Delegate Server Check'),
'type': 'bool',
'default': False,
},
'mode': {
'name': _('Webhook Mode'),
'type': 'choice:string',
Expand Down Expand Up @@ -283,7 +292,7 @@ class NotifyMatrix(NotifyBase):
})

def __init__(self, targets=None, mode=None, msgtype=None, version=None,
include_image=False, **kwargs):
include_image=None, delegate=None, **kwargs):
"""
Initialize Matrix Object
"""
Expand All @@ -305,7 +314,15 @@ def __init__(self, targets=None, mode=None, msgtype=None, version=None,
self.transaction_id = 0

# Place an image inline with the message body
self.include_image = include_image
self.include_image = self.template_args['image']['default'] \
if include_image is None else include_image

# Prepare Delegate Server Lookup Check
self.delegate = self.template_args['delegate']['default'] \
if delegate is None else delegate

# If delegation is used, this value is set later on
self.delegated_host = None

# Setup our mode
self.mode = self.template_args['mode']['default'] \
Expand Down Expand Up @@ -586,6 +603,17 @@ def _send_server_notification(self, body, title='',
Perform Direct Matrix Server Notification (no webhook)
"""

# Handle delegate server setup
if self.delegate and self.delegated_host is None:
self.delegated_host = self.delegated_server_lookup(
timeout=self.srv_lookup_timeout_sec)

if self.delegated_host is False:
self.logger.warning(
'Matrix delegated server could not be acquired.')
# Return; we're done
return False

if self.access_token is None:
# We need to register
if not self._login():
Expand Down Expand Up @@ -1180,7 +1208,7 @@ def _room_id(self, room):

return None

def _fetch(self, path, payload=None, params=None, attachment=None,
def _fetch(self, path, payload=None, params={}, attachment=None,
method='POST'):
"""
Wrapper to request.post() to manage it's response better and make
Expand All @@ -1202,6 +1230,10 @@ def _fetch(self, path, payload=None, params=None, attachment=None,

default_port = 443 if self.secure else 80

# Server Delegation
if self.delegated_host:
params.update({'m.server': self.delegated_host})

url = \
'{schema}://{hostname}{port}'.format(
schema='https' if self.secure else 'http',
Expand All @@ -1217,7 +1249,7 @@ def _fetch(self, path, payload=None, params=None, attachment=None,
# FUTURE url += MATRIX_V2_MEDIA_PATH + path
url += MATRIX_V2_MEDIA_PATH + path

params = {'filename': attachment.name}
params.update({'filename': attachment.name})
with open(attachment.path, 'rb') as fp:
payload = fp.read()

Expand Down Expand Up @@ -1258,7 +1290,7 @@ def _fetch(self, path, payload=None, params=None, attachment=None,
r = fn(
url,
data=dumps(payload) if not attachment else payload,
params=params,
params=None if not params else params,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
Expand Down Expand Up @@ -1426,6 +1458,7 @@ def url(self, privacy=False, *args, **kwargs):
'mode': self.mode,
'version': self.version,
'msgtype': self.msgtype,
'delegate': 'yes' if self.delegate else 'no',
}

# Extend our parameters
Expand Down Expand Up @@ -1495,6 +1528,10 @@ def parse_url(url):
results['include_image'] = parse_bool(results['qsd'].get(
'image', NotifyMatrix.template_args['image']['default']))

# Boolean to perform a delegate server lookup
results['delegate'] = parse_bool(results['qsd'].get(
'delegate', NotifyMatrix.template_args['delegate']['default']))

# Get our mode
results['mode'] = results['qsd'].get('mode')

Expand Down Expand Up @@ -1554,3 +1591,62 @@ def parse_native_url(url):
else '{}&{}'.format(result.group('params'), mode)))

return None

def delegated_server_lookup(self, timeout=5):
"""
Attempts to query delegated domain
Returns resolved delegated server if found, otherwise None if not.
Returns False if it just failed in general
"""
delegated_host = self.store.get('__delegate')
if delegated_host:
# We can use our cached value
return delegated_host

try:
query = f'_matrix._tcp.{self.host}'

# Set timeout for the socket
socket.setdefaulttimeout(timeout)

# Perform the DNS lookup using getaddrinfo
result = socket.getaddrinfo(
query, None, proto=socket.IPPROTO_TCP, type=socket.SOCK_STREAM)

if not result:
# No delegation
self.logger.warning(
'Matrix %s SRV had an empty response', query)
return None

# Parse the result to find the SRV record
for no, res in enumerate(result, start=1):
self.logger.trace('Matrix SRV %02d: %s', no, str(res))
if res[0] == socket.AF_INET:
# Prepare our host and/or port; return first match
host = res[4][0]
port = res[4][1]
delegated_host = '{}{}'.format(
host,
f':{port}' if port else '')

self.logger.debug(
'Matrix %s SRV resolved to %s',
query, delegated_host)

# Delegation determined, store cache for a while
self.store.set('__delegate', delegated_host, expires=86400)
return delegated_host

# No delegation
self.logger.warning(
'Matrix %s SRV had no delegated server in response')
return None

except (socket.gaierror, socket.timeout):
return False

finally:
# Reset timeout to default (None)
socket.setdefaulttimeout(None)
105 changes: 105 additions & 0 deletions test/test_plugin_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# POSSIBILITY OF SUCH DAMAGE.

from unittest import mock
import socket
import os
import requests
import pytest
Expand Down Expand Up @@ -1012,6 +1013,110 @@ def test_plugin_matrix_attachments_api_v3(mock_post, mock_get, mock_put):
del obj


@mock.patch('socket.getaddrinfo')
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_matrix_delegate_server(mock_post, mock_get, mock_getaddrinfo):
"""
NotifyMatrix() Delegate Server
"""

# Prepare a good response
response = mock.Mock()
response.status_code = requests.codes.ok
response.content = MATRIX_GOOD_RESPONSE.encode('utf-8')

# Prepare Mock return object
mock_post.return_value = response
mock_get.return_value = response

# Instantiate our object
obj = Apprise.instantiate(
'matrix://user:[email protected]/#general?v=2&delegate=yes')

# test throwing exception
mock_getaddrinfo.side_effect = socket.gaierror

# A failure
assert '__delegate' not in obj.store
assert obj.delegated_server_lookup() is False
assert '__delegate' not in obj.store

# Test notifications
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
) is False

mock_getaddrinfo.side_effect = None
mock_getaddrinfo.return_value = [
(
socket.AF_INET, # Address family (IPv4)
socket.SOCK_STREAM, # Socket type (TCP)
socket.IPPROTO_TCP, # Protocol (TCP)
'', # Canonical name (empty in this case)
('192.168.1.10', 8448) # Socket address (IP address, port)
),
(
socket.AF_INET, # Address family (IPv4)
socket.SOCK_STREAM, # Socket type (TCP)
socket.IPPROTO_TCP, # Protocol (TCP)
'', # Canonical name (empty in this case)
('192.168.1.11', 8448) # Socket address (IP address, port)
)
]

# Re-initialize
obj = Apprise.instantiate(
'matrix://user:[email protected]/#general?v=2&delegate=yes')

assert obj.delegated_server_lookup() == '192.168.1.10:8448'
assert '__delegate' in obj.store
assert obj.store.get('__delegate') == '192.168.1.10:8448'
# A second call uses the cached value
assert obj.delegated_server_lookup() == '192.168.1.10:8448'

# Test notifications
obj.store.clear('__delegate')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
) is True

# Now clear our cache
mock_getaddrinfo.return_value = []
obj.store.clear('__delegate')
# No entry found
assert obj.delegated_server_lookup() is None
assert '__delegate' not in obj.store

# Test notifications
obj.store.clear('__delegate')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
) is True

mock_getaddrinfo.return_value = [
(
socket.AF_INET6, # Address family (IPv6)
socket.SOCK_STREAM, # Socket type (TCP)
socket.IPPROTO_TCP, # Protocol (TCP)
'', # Canonical name (empty in this case)
# Socket address (IPv6 address, port, flow info, scope ID)
('2001:0db8:85a3:0000:0000:8a2e:0370:7334', 8448, 0, 0)
),
]
obj.store.clear('__delegate')
# No entry found that is IPv4
assert obj.delegated_server_lookup() is None
assert '__delegate' not in obj.store

# Test notifications
obj.store.clear('__delegate')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
) is True


@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_matrix_attachments_api_v2(mock_post, mock_get):
Expand Down

0 comments on commit 4a08936

Please sign in to comment.