-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
68 lines (50 loc) · 2.11 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from sqlalchemy import create_engine, text
from pathlib import Path
import utils
import fraud
dir_source = Path("./data")
dir_destination = Path("./archive")
files_transactions = sorted(list(dir_source.glob("transactions_*.txt")))
files_passport = sorted(list(dir_source.glob("passport_blacklist_*.xlsx")))
files_terminals = sorted(list(dir_source.glob("terminals_*.xlsx")))
if not files_transactions and not files_passport and not files_terminals:
exit()
#dialect+driver://username:password@host:port/database
engine = create_engine('postgresql+psycopg2://hse:[email protected]:6432/db')
sqlalchemy_conn = engine.connect()
# Отключение автокоммита
sqlalchemy_conn.autocommit = False
ddl_file = Path('./main.ddl')
sqlalchemy_conn.execute(
text(ddl_file.read_text())
)
sqlalchemy_conn.commit()
#####################################################################################################
### Загрузка в stg и fact
# table_transactions
if files_transactions:
for f in files_transactions:
utils.process_transaction_file(sqlalchemy_conn, f)
#Перемещение файла в архив
f.rename(dir_destination/f.with_suffix('.backup').name)
# table_passport
if files_passport:
for f in files_passport:
utils.process_passport_file(sqlalchemy_conn, f)
# Перемещение файла в архив
f.rename(dir_destination/f.with_suffix('.backup').name)
# table_terminals
if files_terminals:
for f in files_terminals:
utils.process_terminal_file(sqlalchemy_conn, f)
# Перемещение файла в архив
f.rename(dir_destination/f.with_suffix('.backup').name)
# table_account
utils.process_accounts_table(sqlalchemy_conn, 'info.accounts')
#table_cards
utils.process_cards_table(sqlalchemy_conn, 'info.cards')
# table_clients
utils.process_clients_table(sqlalchemy_conn, 'info.clients')
# table_fraud
fraud.process_fraud_table(sqlalchemy_conn)
sqlalchemy_conn.close()