-
Notifications
You must be signed in to change notification settings - Fork 0
/
models.py
529 lines (436 loc) · 15.8 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
"""Models module to interact with a sqlite database"""
import sqlite3
import bcrypt
from datetime import datetime
# Define the name of the database and a salt value for password hashing.
if __name__ == '__main__':
DATABASE = 'social_media.db'
else:
DATABASE = 'test.db'
SALT = b'$2b$12$bCkhk/dnjeaHnxqYLh39be'
def valid_user(username, password):
"""Checks if a user with the given username and password exists in the database.
Args:
username (str): The username of the user.
password (str): The password of the user.
Returns:
bool: True if a user with the given username and password exists, False otherwise.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute('''
SELECT
people.username,
people.encrypted_password
FROM people
WHERE people.username = ? ''', (username,))
result = cursor.fetchone()
if result is None:
return False
conn.close()
check = bcrypt.checkpw(password.encode("utf-8"), result[1])
return check
def user_exists(username):
"""Checks if a user with the given username exists in the database.
Args:
username (str): The username of the user.
Returns:
bool: True if a user with the given username exists, False otherwise.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute('''
SELECT
people.username
FROM people
WHERE people.username = ? ''', (username,))
result = cursor.fetchone()
conn.close()
return not not result
def create_user(name, username, password):
"""Creates a new user in the database.
Args:
name (str): The name of the user.
username (str): The username of the user.
password (str): The password of the user.
Returns:
int: The ID of the newly created user.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
hashed_password = bcrypt.hashpw(password.encode("utf-8"), SALT)
result = cursor.execute('''
INSERT INTO people (name, username, encrypted_password)
VALUES (?,?,?)''', (name, username, hashed_password))
conn.commit()
conn.close()
return result.lastrowid
def delete_user(username, password):
"""Deletes a user from the database.
Args:
username (str): The username of the user.
password (str): The password of the user.
Returns:
bool: True if a user with the given username and password exists, False otherwise.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
hashed_password = bcrypt.hashpw(password.encode("utf-8"), SALT)
cursor.execute('''
DELETE FROM people
WHERE people.username =? AND people.encrypted_password =?''', (username, hashed_password))
conn.commit()
conn.close()
return not user_exists(username)
def get_users():
"""Gets all users from the database.
Returns:
list: A list of all users tuples.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute('''
SELECT
people._id,
people.username,
people.name
FROM people
ORDER BY people.username''')
result = cursor.fetchall()
conn.close()
return result
def get_user_id(username):
"""Returns the id of the user with the given username.
Args:
username (str): The username of the user.
Returns:
int: The id of the user.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Retrieve the id of the user with the given username
cursor.execute('SELECT _id FROM people WHERE username = ?', (username,))
row = cursor.fetchone()
if row:
user_id = row[0]
else:
user_id = None
# Close the database connection and return the user ID
conn.close()
return user_id
def get_username_by_id(user_id):
"""Returns the username of a user given their ID.
Args:
user_id (int): The ID of the user to lookup.
Returns:
str: The username of the user.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute('SELECT username FROM people WHERE _id = ?', (user_id,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def get_comments_by_post_id(post_id):
"""Fetches all comments for a given post from the database.
Args:
post_id (int): The id of the post for which to fetch comments.
Returns:
list of dict: A list of dictionaries, where each dictionary represents a comment.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Fetch comments for given post ID
cursor.execute('SELECT * FROM comments WHERE post_id = ?', (post_id,))
comments = cursor.fetchall()
# Close database connection and return comments
conn.close()
return comments
def add_comment(post_id, author_id, content):
"""Adds a comment to the database.
Args:
post_id (int): The id of the post being commented on.
author_id (int): The id of the user adding the comment.
content (str): The content of the comment.
Returns:
str: Comment added
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Check if post with given ID exists
cursor.execute('SELECT * FROM posts WHERE _id = ?', (post_id,))
post = cursor.fetchone()
if not post:
return None # Return None if post with given ID does not exist
# Check if user with given ID exists
cursor.execute('SELECT * FROM people WHERE _id =?', (author_id,))
user = cursor.fetchone()
if not user:
return None # Return None if user with given ID does not exist
# Insert new comment into comments table
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('INSERT INTO comments (post_id, author_id, content, timestamp) VALUES (?, ?, ?, ?)',
(post_id, author_id, content, timestamp))
conn.commit()
# Get ID of new comment
comment_id = cursor.lastrowid
# Fetch newly created comment
cursor.execute('SELECT * FROM comments WHERE _id = ?', (comment_id,))
comment = cursor.fetchone()
# Close database connection and return newly created comment
conn.close()
return comment
def remove_comment(comment_id):
"""removes a comment from the database.
Args:
comment_id (int): The id of the comment being deleted.
Returns:
bool: If comment was deleted
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Delete the comment with the given ID
cursor.execute("DELETE FROM comments WHERE _id=?", (comment_id,))
conn.commit()
# Close the connection and return success
changes = conn.total_changes
conn.close()
return not not changes
def get_feed_with_comments():
"""Returns all posts and their associated comments."""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Join posts and comments tables
cursor.execute('SELECT posts._id, posts.author_id, posts.title, posts.content, comments._id, comments.author_id, comments.content '
'FROM posts LEFT JOIN comments ON posts._id = comments.post_id')
# Group results by post ID to combine posts with their comments
results = cursor.fetchall()
feed = {}
for row in results:
post_id = row[0]
if post_id not in feed:
feed[post_id] = (row[0], row[1], row[2], row[3], [])
if row[4]:
feed[post_id][4].append((row[4], row[5], row[6]))
# Close database connection and return feed
conn.close()
return list(feed.values())
def create_post(author_id, title, content):
"""Creates a new post in the database."""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('INSERT INTO posts (author_id, title, content, timestamp) VALUES (?,?,?,?)',
(author_id, title, content, timestamp))
conn.commit()
post_id = cursor.lastrowid
conn.close()
return post_id
def delete_post(post_id):
"""Deletes a post from the database."""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute('DELETE FROM posts WHERE _id =?', (post_id,))
conn.commit()
changes = conn.total_changes
conn.close()
return not changes
def fetch_post_feed():
"""Fetches all posts from the database."""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute('SELECT _id, author_id, title, content FROM posts')
posts = cursor.fetchall()
conn.close()
return posts
def get_most_active_user():
"""Returns the user with the most posts and comments on other posts."""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Get all users with their posts and comments count
cursor.execute('''
SELECT people.username, COUNT(DISTINCT posts._id) AS num_posts,
COUNT(DISTINCT comments._id) AS num_comments
FROM people
LEFT JOIN posts ON people._id = posts.author_id
LEFT JOIN comments ON people._id = comments.author_id
GROUP BY people.username
''')
users = cursor.fetchall()
# Find the user with the highest sum of posts and comments count
most_active_user = None
max_posts_comments = 0
for user in users:
posts_comments = user[1] + user[2]
if posts_comments > max_posts_comments:
max_posts_comments = posts_comments
most_active_user = user[0]
# Close database connection and return most active user
conn.close()
return most_active_user
def add_follower(user_id, follower_id):
"""Adds a follower to a user in the database.
Args:
user_id (int): The ID of the user being followed.
follower_id (int): The ID of the follower.
Returns:
str: Message indicating whether the follower was added.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Check if user with given ID exists
cursor.execute('SELECT * FROM people WHERE _id = ?', (user_id,))
user = cursor.fetchone()
if not user:
return "User not found"
# Check if follower with given ID exists
cursor.execute('SELECT * FROM people WHERE _id = ?', (follower_id,))
follower = cursor.fetchone()
if not follower:
return "Follower not found"
# Check if follower is already following user
cursor.execute('SELECT * FROM following WHERE _id = ? AND following_id = ?',
(user_id, follower_id))
existing_follower = cursor.fetchone()
if existing_follower:
return "Follower already exists"
# Insert new follower into followers table
cursor.execute('INSERT INTO following (_id, following_id) VALUES (?, ?)',
(user_id, follower_id))
conn.commit()
# Close database connection and return success message
conn.close()
return "Follower added"
def remove_follower(follower_id, followed_id):
"""Removes a follower from a user's list of followers.
Args:
follower_id (int): The ID of the user who is following.
followed_id (int): The ID of the user being followed.
Returns:
str: Message indicating whether follower was removed or not.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Check if follower exists
cursor.execute('SELECT * FROM people WHERE _id = ?', (follower_id,))
follower = cursor.fetchone()
if not follower:
conn.close()
return f"User with ID {follower_id} does not exist."
# Check if followed user exists
cursor.execute('SELECT * FROM people WHERE _id = ?', (followed_id,))
followed = cursor.fetchone()
if not followed:
conn.close()
return f"User with ID {followed_id} does not exist."
# Check if follower is already following followed user
cursor.execute('SELECT * FROM following WHERE _id = ? AND following_id = ?',
(follower_id, followed_id))
existing_follower = cursor.fetchone()
if not existing_follower:
conn.close()
return f"User with ID {follower_id} is not following user with ID {followed_id}."
# Remove follower from database
cursor.execute('DELETE FROM following WHERE _id = ? AND following_id = ?',
(follower_id, followed_id))
conn.commit()
conn.close()
return f"User with ID {follower_id} successfully unfollowed user with ID {followed_id}."
def get_followers_and_following(user_id):
"""Get list of followers and users being followed by user with given ID.
Args:
user_id (int): The ID of the user.
Returns:
Tuple[List[str], List[str]]: The first list contains the usernames of followers, the second list contains the usernames of users being followed.
"""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# Get list of followers
cursor.execute(
'SELECT people.username FROM people INNER JOIN following ON people._id = following._id WHERE following.following_id = ?', (user_id,))
followers = [row[0] for row in cursor.fetchall()]
# Get list of users being followed
cursor.execute(
'SELECT people.username FROM people INNER JOIN following ON people._id = following.following_id WHERE following._id = ?', (user_id,))
following = [row[0] for row in cursor.fetchall()]
# Close database connection and return results
conn.close()
return followers, following
def get_bacon_number(lookup_user_id):
"""Returns the bacon number of other users to a lookup user.
bacon number is calculated by the number of followers awway from
the lookup user."""
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
people_table_size = cursor.execute(
"SELECT COUNT(*) FROM people").fetchone()[0] - 1
cursor.execute('''
SELECT
_id,
name,
MIN(bacon_number) as n
FROM(
SELECT
*
FROM(
WITH RECURSIVE bacon_numbers(_id, name, bacon_number) AS(
SELECT
_id,
name,
0 AS bacon_number
FROM people
WHERE _id= {lookup_user_id}
UNION
SELECT
p._id,
p.name,
bn.bacon_number + 1
FROM following f1
JOIN bacon_numbers bn ON
bn._id=f1._id
JOIN people p ON
f1.following_id=p._id
WHERE bn.bacon_number < {table_size}
)
SELECT
*
FROM bacon_numbers
)
UNION
SELECT
*
FROM(
WITH RECURSIVE bacon_numbers(_id, name, bacon_number) AS(
SELECT
_id,
name,
0 AS bacon_number
FROM people
WHERE _id= {lookup_user_id}
UNION
SELECT
p._id,
p.name,
bn.bacon_number + 1
FROM following f1
JOIN bacon_numbers bn ON
bn._id=f1.following_id
JOIN people p ON
f1._id=p._id
WHERE bn.bacon_number < {table_size}
)
SELECT
*
FROM bacon_numbers
)
)
GROUP BY
_id
ORDER BY
bacon_number,
name DESC
'''.format(lookup_user_id=lookup_user_id, table_size=people_table_size))
bacon_numbers = cursor.fetchall()
# Close database connection and return bacon number
conn.close()
return bacon_numbers