-
Notifications
You must be signed in to change notification settings - Fork 3
/
matrix.py
102 lines (74 loc) · 3.4 KB
/
matrix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import logging
from urllib.parse import quote
from matrix_client.errors import MatrixRequestError
from opsdroid.database import Database
from opsdroid.connector.matrix.events import MatrixStateEvent
_LOGGER = logging.getLogger(__name__)
class DatabaseMatrix(Database):
"""A module for opsdroid to allow memory to persist in matrix room state."""
def __init__(self, config, opsdroid=None):
"""Start the database connection."""
super().__init__(config, opsdroid=opsdroid)
self.name = "matrix"
self.room = config.get("default_room", "main")
self._event_type = "opsdroid.database"
self._single_state_key = config.get("single_state_key", True)
_LOGGER.debug("Loaded matrix database connector.")
async def connect(self):
"""Connect to the database."""
_LOGGER.info("Matrix Database connector initialised.")
async def put(self, key, value):
"""Insert or replace an object into the database for a given key."""
# If the single state key flag is set then use that else use state key.
state_key = "" if self._single_state_key is True else self._single_state_key or key
room = self.room or "main"
room_id = room if room.startswith("!") else self.connector.room_ids[room]
_LOGGER.debug(f"Putting {key} into matrix room {room_id}.")
ori_data = await self.get_state_event(room_id, state_key)
if self._single_state_key:
value = {key: value}
elif not isinstance(value, dict):
raise ValueError("When single_state_key is False value must be a dict.")
data = {**ori_data, **value}
if data == ori_data:
_LOGGER.debug("Not updating matrix state, as content hasn't changed.")
return
_LOGGER.debug(f"===== Putting {key} into matrix room {room_id} with {data}")
await self.opsdroid.send(
MatrixStateEvent(
self._event_type,
content=data,
target=room_id,
connector=self.connector,
state_key=state_key,
)
)
async def get(self, key):
"""Get a document from the database for a given key."""
room = self.room or "main"
room_id = room if room.startswith("!") else self.connector.room_ids[room]
state_key = "" if self._single_state_key is True else self._single_state_key or key
_LOGGER.debug(f"Getting {key} from matrix room {room_id} with state_key={state_key}.")
data = await self.get_state_event(room_id, state_key)
_LOGGER.debug(f"Got {data} from state request.")
if not data:
return None
if self._single_state_key:
data = data.get(key)
return data
@property
def connector(self):
return self.opsdroid._connector_names["matrix"]
async def get_state_event(self, room_id, key):
url = f"/rooms/{room_id}/state/{self._event_type}"
if key:
url += f"/{key}"
try:
_LOGGER.debug(f"Making request to {url}.")
return await self.connector.connection._send("GET", quote(url))
except MatrixRequestError as e:
if e.code == 404:
_LOGGER.debug(f"Failed to get state event with code 404: {e}")
else:
_LOGGER.exception("Failed to get state event with unknown error.")
return {}