-
Notifications
You must be signed in to change notification settings - Fork 1
/
eb_user.py
executable file
·254 lines (199 loc) · 8.25 KB
/
eb_user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#!/usr/bin/env python3
"eb_user.py by Christer Enfors (c) 2016"
from __future__ import print_function
import datetime
class User(object):
"Keep track of the same human across multiple protocols."
known_protocols = ("Twitter", "Telegram", "IRC")
def __init__(self, config, database, user_id=None, name=None,
password=None, protocols={}):
# pylint: disable=dangerous-default-value
self.config = config
self.database = database
self.user_id = user_id
self.name = name
self.password = password
for protocol in self.known_protocols:
try:
protocols[protocol]
except KeyError:
protocols[protocol] = None
self.protocols = protocols
self.activities = []
def identify(self, name, password=None):
"Called when the user manually identifies themselves."
self.name = name.lower()
self.password = password
def find_identifier_by_protocol(self, protocol):
"Given a protocol, return the identifier."
try:
return self.protocols[protocol]
except KeyError:
return None
def add_protocol(self, protocol, identifier):
"""
Called to connect this user to a new protcol.
Example:
# Add the Twitter user @enfors to this user:
user.add_protocol("Twitter", "Enfors")
"""
self.protocols[protocol] = identifier
def insert_activity(self, activity):
"Add an activity to the start of the queue."
self.activities.insert(0, activity)
def push_activity(self, activity):
"Add an activity to the end of the queue."
self.activities.append(activity)
def remove_activity(self):
"Remove and return the first activity on the queue."
if len(self.activities) < 1:
return None
activity = self.activities[0]
del self.activities[0]
return activity
def pop_activity(self):
"Remove and return the activity at the end of the list."
if len(self.activities) < 1:
return None
return self.activities.pop()
def current_activity(self):
"Return the current (read: first) activity on the queue."
if len(self.activities) < 1:
return None
else:
return self.activities[0]
def save(self):
"Save the user."
print("Saving user '%s'" % self.name)
with self.config.lock, self.database:
cur = self.database.cursor()
cur.execute("insert or replace into USER "
"(USER_ID, NAME, TWITTER_ID, TELEGRAM_ID, IRC_ID, "
"CREATED) values "
"((select USER_ID from USER where NAME = '?'), "
"?, ?, ?, ?, ?);",
(self.name,
self.protocols["Twitter"],
self.protocols["Telegram"],
self.protocols["IRC"],
datetime.datetime.now()))
# The user is saved. Now, find it's user_id number.
cur.execute("select USER_ID from USER "
"where NAME=?", (self.name,))
self.user_id = cur.fetchone()
def save_data(self, field, val):
"Save arbitrary value for the user to the database."
field = str(field)
val = str(val)
print("Saving %s=%s for user %s" % (field, val, self.name))
with self.config.lock, self.database:
cur = self.database.cursor()
cur.execute("select FIELD from USER_DATA "
"where USER_ID=? and FIELD=?",
(self.user_id, field))
row = cur.fetchone()
if row is None:
cur.execute("insert into USER_DATA "
"values (?, ?, ?, ?)",
(self.user_id, field, val,
datetime.datetime.now()))
else:
cur.execute("update USER_DATA "
"set VAL=?, LAST_UPDATE=? "
"where USER_ID=? and FIELD=?",
(val, datetime.datetime.now(),
self.user_id, field))
def load_data(self, field, default=None):
"Load arbitrary user data, previously saved with save_data."
field = str(field)
with self.config.lock, self.database:
cur = self.database.cursor()
cur.execute("select VAL from USER_DATA "
"where USER_ID=? and FIELD=?",
(self.user_id, field))
val = cur.fetchone()
if val is None:
return default
else:
return val[0]
def __repr__(self):
output = 'User(config, database, name="%s", userid=%s)' % \
(self.name, str(self.user_id))
# output += "\n- Protocols:" % self.name
# for protocol in self.protocols.keys():
# output += "\n - %s:%s" % (protocol, self.protocols[protocol])
return output
class UserHandler(object):
"Keep track of all users."
def __init__(self, config, database):
self.config = config
self.database = database
christer = User(self.config, self.database, None, "Christer",
protocols={"Twitter": "Enfors",
"Telegram": "167773515",
"IRC": "Enfors"})
indra = User(self.config, self.database, None, "Indra",
protocols={"Twitter": "IndraEnfors"})
# self.users = []
self.users = [christer, indra]
def find_user_by_identifier(self, protocol, identifier):
"Given a protocol and identifier, find and return a user."
identifier = str(identifier)
# If the user is online, return them.
user = self.find_online_user_by_identifier(protocol, identifier)
if user:
return user
# The user is not online. Load them from the database, or create
# them if they're not already there.
name = None
twitter_id = None
telegram_id = None
irc_id = None
password = None
if protocol.lower() == "twitter":
col = "TWITTER_ID"
elif protocol.lower() == "telegram":
col = "TELEGRAM_ID"
elif protocol.lower() == "irc":
col = "IRC_ID"
else:
print("find_user_by_identifier(): Unknown protocol '%s'" %
protocol)
query = "select USER_ID, NAME, PASSWORD, TWITTER_ID, TELEGRAM_ID, " \
"IRC_ID from USER where %s=?" % col
with self.config.lock, self.database:
cur = self.database.cursor()
cur.execute(query, (identifier,))
try:
(user_id, name, password, twitter_id, telegram_id,
irc_id) = cur.fetchone()
user = User(self.config, self.database, user_id, name,
password,
{"Twitter": twitter_id,
"Telegram": telegram_id,
"IRC": irc_id})
except TypeError:
protocols = {"Twitter": twitter_id,
"Telegram": telegram_id,
"IRC": irc_id}
protocols[protocol] = identifier
user = User(self.config, self.database, None, name, password,
protocols)
# Don't save the user now; wait until they have given
# us their name.
self.users.append(user)
return user
def find_online_user_by_identifier(self, protocol, identifier):
"Find a user we've talked to previously, since last restart."
for user in self.users:
if user.protocols[protocol] == identifier:
return user
return None
def test(self):
"For testing only."
for user in self.users:
print(user)
print("Looking for Enfors:")
print(self.find_user_by_identifier("Twitter", "Enfors"))
print("Looking for Indra:")
print(self.find_user_by_identifier("Twitter", "IndraEnfors"))