-
Notifications
You must be signed in to change notification settings - Fork 0
/
model.py
258 lines (217 loc) · 8.45 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# -*- coding: utf-8 -*-
'''The file contains functions for working with the database'''
import base64
import time
from os.path import isfile
import sqlalchemy as sa
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from envparse import env
# Reading settings file
if isfile('.env'):
env.read_envfile('.env')
# Database connection parameters obtained from .env
def get_dsn():
'''DB connection string
:return:
'''
return f"dbname={env.str('PG_DATABASE')} user={env.str('PG_USERNAME')} " \
f"password={env.str('PG_PASSWORD')} host={env.str('PG_SERVER')}"
def get_sekret_key():
'''SECRET_KEY for the session
:return:
'''
return EncryptedCookieStorage(base64.urlsafe_b64decode(env.str('SECRET_KEY')))
def get_timestamp_str():
'''TimeStamp string of the current timestamp for the base
:return:
'''
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
metadata = sa.MetaData()
# Table user
tb_user = sa.Table(
'user',
metadata,
sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
sa.Column('email', sa.String(255)),
sa.Column('password', sa.String(255)),
sa.Column('name', sa.String(255)),
sa.Column('surname', sa.String(255)),
sa.Column('create_at', sa.TIMESTAMP),
sa.Column('delete_at', sa.TIMESTAMP))
# Table user_rule
tb_user_rule = sa.Table(
'user_rule',
metadata,
sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
sa.Column('rule', None, sa.ForeignKey('tb_rule.id')),
sa.Column('user', None, sa.ForeignKey('tb_user.id')))
# Table rule
tb_rule = sa.Table(
'rule',
metadata,
sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
sa.Column('rule', sa.String(255)),
sa.Column('comment', sa.String(255)))
async def get_user_by_email(engine, email):
'''User existence check
:param engine: DB connection
:param email: user email
:return: a list of users
'''
async with engine.acquire() as conn:
async for row in conn.execute(tb_user.select().where(tb_user.c.email == email)):
return {
'id': row[0],
'email': row[1],
'password': row[2],
'name': row[3],
'surname': row[4],
'create_at': row[5],
'delete_at': row[6]
}
async def get_user_rules(engine, user_id):
'''Obtaining user rights by id
:param engine: DB connection
:param user_id: user id
:return: user rights list
'''
async with engine.acquire() as conn:
rules = []
join = sa.join(tb_rule, tb_user_rule, tb_rule.c.id == tb_user_rule.c.rule)
async for row in conn.execute(
tb_rule.select().select_from(join).where(tb_user_rule.c.user == user_id)):
rules.append(row[1])
return rules
async def get_user_info(engine, user_id):
'''Getting user data by id
:param engine: DB connection
:param user_id: user id
:return: user information
'''
async with engine.acquire() as conn:
async for row in conn.execute(tb_user.select().where(tb_user.c.id == user_id)):
return {
'id': row[0],
'email': row[1],
'password': row[2],
'name': row[3],
'surname': row[4],
'rules': await get_user_rules(engine=engine, user_id=user_id)
}
async def get_users(engine, admin):
'''Retrieving user data
:param engine: DB connection
:param admin: Request data for admin user
:return: a list of users
'''
async with engine.acquire() as conn:
users = []
where = '' if admin else 'WHERE u.delete_at is null'
async for row in await conn.execute(
f'''SELECT u.id, u.email, u.password, u.name, u.surname, u.delete_at,
ARRAY(
SELECT r.rule
FROM "user_rule" as ur
LEFT JOIN "rule" as r on ur.rule = r.id
WHERE ur.user = u.id
) as "rules"
FROM "user" as u
{where}
ORDER BY u.id;'''):
# If the data is requested not by the Admin, then we do not show the admins
if not admin and 'admin' in row[6]:
continue
users.append({
'id': row[0],
'email': row[1],
'password': row[2],
'name': row[3],
'surname': row[4],
'delete': row[5] is not None,
'rules': row[6]
})
return users
async def get_rules(engine):
'''Obtaining rights data
:param engine: DB connection
:return: list of rights
'''
async with engine.acquire() as conn:
rules = {}
async for row in conn.execute(tb_rule.select()):
# {'admin': 0}
rules[row[1]] = row[0]
return rules
async def set_rules_for_user(engine, user_id, data):
'''Setting / changing user rights
:param engine: DB connection
:param user_id: user id
:param data: data for setting
:return:
'''
rules = await get_rules(engine)
user_rules = await get_user_rules(engine, user_id)
for rule, rule_id in rules.items():
# The user already has the current role and from the form flew to True
# if rule in user_rules and data.get(rule, False) is True:
# The user does not have the current role and from the form flew to False
# if rule not in user_rules and data.get(rule, False) is False:
# The user has a role, but False has arrived from the form - delete
if rule in user_rules and data.get(rule, False) is False:
async with engine.acquire() as conn:
await conn.execute(
tb_user_rule.delete(None)
.where(tb_user_rule.c.user == user_id)
.where(tb_user_rule.c.rule == rule_id))
# The user does not have roles, but True has arrived from the form - add
if rule not in user_rules and data.get(rule, False) is True:
async with engine.acquire() as conn:
await conn.execute(tb_user_rule.insert(None).values(user=user_id, rule=rule_id))
async def set_delete_at_for_user(engine, user_id, restore=False):
'''Delete user by id
:param engine: DB connection
:param user_id: id of the user to be deleted
:return:
'''
timestamp = 'null' if restore else f"'{get_timestamp_str()}'"
async with engine.acquire() as conn:
await conn.execute(f'''UPDATE "user" SET delete_at={timestamp} WHERE id={user_id};''')
async def create_user(engine, data):
'''User creation
:param engine: DB connection
:param data: new user data
:return:
'''
async with engine.acquire() as conn:
user = await get_user_by_email(engine=engine, email=data['email'])
if user is not None:
raise Warning('A user with this email already exists.')
user_id = await conn.scalar(
tb_user.insert(None).values(
email=data['email'],
password=data['password'],
name=data['name'],
surname=data['surname'],
create_at=get_timestamp_str()))
await set_rules_for_user(engine=engine, user_id=user_id, data=data)
async def update_user(engine, data):
'''User data update
:param engine: DB connection
:param data: user data to update
:return:
'''
async with engine.acquire() as conn:
# Check that the email matches the current one, or that it is unique in the database
user = await get_user_by_email(engine=engine, email=data['email'])
if user is not None and int(user['id']) != int(data['id']):
raise Warning('A user with this email already exists')
await conn.execute(
sa.update(tb_user)
.values({
'email': data['email'],
'password': data['password'],
'name': data['name'],
'surname': data['surname']
})
.where(tb_user.c.id == int(data['id'])))
await set_rules_for_user(engine=engine, user_id=int(data['id']), data=data)