-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
160 lines (123 loc) · 4.61 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import asyncio
import datetime
import logging
import os
import sys
import time
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import List
import arrow
import cfg4py
import omicron
from coretypes import FrameType
from omicron.dal.cache import cache
from data_fix.download_days import redownload_bars1d_for_target_day
from data_fix.download_mins import redownload_bars_mins_for_target_day
from data_fix.download_week import redownload_bars1w_for_target_day
from datascan.validate_data_in_week import reverse_scanner_handler
from download_bars.day_handler import scanner_handler_day
from download_bars.week_handler import month_download_handler, week_download_handler
from fetchers.abstract_quotes_fetcher import AbstractQuotesFetcher
from influx_tools import (
drop_bars_1M,
drop_bars_board_1d,
remove_allsecs_in_bars1d,
remove_sec_in_bars1d,
)
from pack_data.pack_bars import pack_data_from_db
from pricestats.sum_history import sum_price_stats
from rapidscan.main import get_cache_keyname
from rebuild_minio.build_min_data import rebuild_minio_for_min
logger = logging.getLogger(__name__)
def get_config_dir():
_dir = path.normpath(os.path.join(path.dirname(__file__), "config"))
return _dir
def init_log_path(log_dir):
if os.path.exists(log_dir):
return 0
try:
os.makedirs(log_dir)
except Exception as e:
print(e)
exit("failed to create log folder")
return 0
def init_logger(filename: str, loglevel: int):
LOG_FORMAT = r"%(asctime)s %(levelname)s %(filename)s[line:%(lineno)d] %(message)s"
DATE_FORMAT = r"%Y-%m-%d %H:%M:%S %a"
fh = TimedRotatingFileHandler(
filename, when="D", interval=1, backupCount=7, encoding="utf-8"
)
fh.setLevel(loglevel)
formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=DATE_FORMAT)
fh.setFormatter(formatter)
console = logging.StreamHandler(sys.stdout)
console.setLevel(loglevel)
console.setFormatter(formatter)
logging.basicConfig(
level=loglevel,
format=LOG_FORMAT,
datefmt=DATE_FORMAT,
handlers=[console, fh],
)
class Omega(object):
def __init__(self, fetcher_impl: str, **kwargs):
self.fetcher_impl = fetcher_impl
self.params = kwargs
async def init(self, *args):
logger.info("init %s", self.__class__.__name__)
await omicron.cache.init()
try:
await omicron.init()
except Exception as e:
logger.error("No calendar and securities in cache, %s", e)
time.sleep(5)
os._exit(1)
logger.info("<<< init %s process done", self.__class__.__name__)
try:
# await drop_bars_board_1d("boards")
# await sum_price_stats()
# await remove_allsecs_in_bars1d(datetime.date(2022, 8, 15))
# await drop_bars_1w()
# await drop_bars_1M()
# await drop_bars_via_scope(target_year, FrameType.WEEK)
# await remove_sec_in_bars1d("000985.XSHG", datetime.date(2022,1,1), datetime.date(2023,3,1))
# pack history data into pickle file
await pack_data_from_db()
# await AbstractQuotesFetcher.create_instance(self.fetcher_impl, **self.params)
# await week_download_handler()
# await month_download_handler()
# await scanner_handler_day()
# await scanner_handler_minutes(ft, False)
# await reverse_scanner_handler(scanning_type=0)
# await redownload_bars1w_for_target_day()
# await redownload_bars1d_for_target_day()
# await redownload_bars_mins_for_target_day()
# await rebuild_minio_for_min()
except Exception as e:
logger.exception(e)
logger.info("failed to execution: %s", e)
return False
logger.info("all tasks finished.")
await omicron.close()
def start():
current_dir = os.getcwd()
print("current dir:", current_dir)
config_dir = get_config_dir()
cfg4py.init(config_dir, False)
cfg = cfg4py.get_instance()
loglevel = logging.INFO
log_dir = path.normpath(os.path.join(current_dir, "logs"))
init_log_path(log_dir)
logfile = path.normpath(path.join(log_dir, "datascan.log"))
init_logger(logfile, loglevel)
fetcher = cfg.quotes_fetcher
impl = fetcher.impl
account = fetcher.account
password = fetcher.password
omega = Omega(impl, account=account, password=password)
loop = asyncio.get_event_loop()
loop.run_until_complete(omega.init())
# loop.run_forever()
if __name__ == "__main__":
start()