-
Notifications
You must be signed in to change notification settings - Fork 1
/
DBManager.py
190 lines (162 loc) · 8.46 KB
/
DBManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import sqlalchemy
from database.requestInfo import RequestInfo
from database.models import Award, AwardYear, AwardName, AwardReceiving, Address, Organization, Person, KnowledgeField
from sqlalchemy import and_, or_
class DataBaseManager:
def get_data(request_info: RequestInfo, db):
return db.session.query(AwardReceiving).join(
db.session.query(Organization).join(Address).filter(and_(
DataBaseManager.get_organization_filter(request_info.organization),
DataBaseManager.get_city_filter(request_info.city))
)).join(
db.session.query(Person).filter(
DataBaseManager.get_username_filter(request_info.username))).join(
db.session.query(Award).join(KnowledgeField).join(AwardName).join(AwardYear).filter(and_(
DataBaseManager.get_award_name_filter(request_info.award),
DataBaseManager.get_area_filter(request_info.area),
DataBaseManager.get_year_filter(request_info.start_year, request_info.end_year),
DataBaseManager.get_rank_filter(request_info.rank)))).order_by(
db.session.query(AwardYear.award_year).join(Award).filter(Award.id_award == AwardReceiving.id_award))
def check_rank(rank):
if not rank:
return ""
return rank
def get_full_info(id_award_receiving: int, app):
with app.app_context():
data = AwardReceiving.query.get(id_award_receiving)
if not data:
return None
team = []
for record in AwardReceiving.query.filter(AwardReceiving.id_award == data.id_award):
team.append({"person": record.person.person_full_name})
return {"organization": Organization.query.get(data.id_organization).organization_name,
"rank": DataBaseManager.check_rank(Award.query.get(data.id_award).award_rank),
"achievement": Award.query.get(data.id_award).award_achievement,
"team": team}
def get_people(request_info: RequestInfo, app, db):
with app.app_context():
answer = []
for data in DataBaseManager.get_data(request_info, db):
print(data)
answer.append({"id_award_receiving": data.id_award_receiving,
"person_full_name": Person.query.get(data.id_person).person_full_name,
"award_year": Award.query.get(data.id_award).award_year.award_year,
"award_name": Award.query.get(data.id_award).award_name.award_name,
"area": Award.query.get(data.id_award).knowledge_field.knowledge_field_name})
return answer
def get_piece_of_people(request_info: RequestInfo, app, db, start_index):
with app.app_context():
answer = []
i = 0
for data in DataBaseManager.get_data(request_info, db):
if i >= start_index and i < start_index + 100:
answer.append({"id_award_receiving": data.id_award_receiving,
"person_full_name": Person.query.get(data.id_person).person_full_name,
"award_year": Award.query.get(data.id_award).award_year.award_year,
"award_name": Award.query.get(data.id_award).award_name.award_name,
"area": Award.query.get(data.id_award).knowledge_field.knowledge_field_name})
i += 1
if i == start_index + 100:
break
return answer
def get_cities_info(request_info: RequestInfo, app, db):
with app.app_context():
if request_info.city != "all":
count = DataBaseManager.get_data(request_info, db).count()
address = db.session.query(Address).filter(Address.city_name == request_info.city).first()
if not address:
return None
answer = {"id_address": address.id_address,
"city": request_info.city,
"latitude": address.latitude,
"longitude": address.longitude,
"count": count}
return answer
answer = []
for city, in db.session.query(Address.city_name):
request_info.city = city
count = DataBaseManager.get_data(request_info, db).count()
if count == 0:
continue
address = db.session.query(Address).filter(Address.city_name == request_info.city).first()
answer.append({"id_address": address.id_address,
"city": city,
"latitude": address.latitude,
"longitude": address.longitude,
"count": count})
return answer
def get_usernames_for_filter(app, db):
with app.app_context():
result = {"usernames": []}
for person, in db.session.query(Person.person_full_name).filter(
sqlalchemy.not_((Person.person_full_name.contains('нет данных')))):
result['usernames'].append(person)
return result
def get_years_for_filter(app, db):
with app.app_context():
result = {"years": []}
for year, in db.session.query(AwardYear.award_year).filter(
AwardYear.award_year != -1):
result['years'].append(year)
return result
def get_ranks_for_filter(app, db):
with app.app_context():
set_ = set()
result = {"ranks": []}
for rank, in db.session.query(Award.award_rank).filter(
Award.award_rank != None):
if rank not in set_:
set_.add(rank)
result['ranks'].append(rank)
return result
def get_knowledge_areas_for_filter(app, db):
with app.app_context():
result = {"knowledge_areas": []}
for knowledge_area, in db.session.query(KnowledgeField.knowledge_field_name).filter(
sqlalchemy.not_(KnowledgeField.knowledge_field_name.contains('Нет данных'))):
result['knowledge_areas'].append(knowledge_area)
return result
def get_awards_names_for_filter(app, db):
with app.app_context():
result = {"awards_names": []}
for award_name, in db.session.query(AwardName.award_name):
result['awards_names'].append(award_name)
return result
def get_organizations_names_for_filter(app, db):
with app.app_context():
result = {"organizations_names": []}
for organization_name, in db.session.query(Organization.organization_name).filter(
sqlalchemy.not_(Organization.organization_name.contains('Нет данных'))):
result['organizations_names'].append(organization_name)
return result
def get_city_filter(city):
if city == "all":
return Address.city_name != city
return Address.city_name == city
def get_organization_filter(org):
if org == "all":
return Organization.organization_name != org
return Organization.organization_name.like("%{}%".format(org))
def get_year_filter(start, end):
if start == "all":
return AwardYear.award_year >= 0
return and_(AwardYear.award_year >= start, AwardYear.award_year <= end)
def get_award_name_filter(awards_names):
if awards_names[0] == 'all':
return AwardName.award_name != 'all'
return AwardName.award_name.in_(awards_names)
def get_area_filter(areas):
if areas[0] == "all":
return KnowledgeField.knowledge_field_name != ""
return KnowledgeField.knowledge_field_name.in_(areas)
def get_rank_filter(ranks):
if ranks[0] == "all":
return or_(Award.award_rank >= 0, Award.award_rank == None)
for i in range(len(ranks)):
if ranks[i] == 'null':
return or_(Award.award_rank.in_(ranks), Award.award_rank == None)
return Award.award_rank.in_(ranks)
def get_username_filter(username):
if username != "all":
return Person.person_full_name.like("%{}%".format(username))
return Person.person_full_name != username