-
Notifications
You must be signed in to change notification settings - Fork 0
/
influx_tools.py
131 lines (101 loc) · 4.34 KB
/
influx_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import datetime
import logging
import cfg4py
from coretypes import FrameType, bars_dtype
from omicron.dal.influx.flux import Flux
from omicron.dal.influx.influxclient import InfluxClient
from omicron.dal.influx.serialize import EPOCH, DataframeDeserializer
from omicron.models import get_influx_client
from omicron.models.security import Security
from omicron.models.timeframe import TimeFrame
from omicron.models.timeframe import TimeFrame as tf
logger = logging.getLogger(__name__)
async def remove_security_list():
client = get_influx_client()
measurement = "security_list"
year = 2007
while year < 2024:
print("deleting in security_list ", year)
await client.delete(measurement, datetime.datetime(year, 1, 1))
print("data deleted in security_list: ", year)
year += 1
async def drop_bars_board_1d(board: str):
client = get_influx_client()
measurement = "board_bars_1d" # day
await client.delete(measurement, datetime.datetime(2023, 1, 1))
print("all data deleted in bars:1d ", board)
print("board_bars_1d: all finished.")
async def drop_bars_1d():
client = get_influx_client()
measurement = "stock_bars_1d" # day
year = 2006
while year < 2007:
print("deleting in bars:1d ", year)
await client.delete(measurement, datetime.datetime(year, 1, 1))
print("data deleted in bars:1d, ", year)
year += 1
print("drop_bars_1d: all finished.")
async def drop_bars_1w():
client = get_influx_client()
measurement = "stock_bars_1w" # week
year = 2006
while year < 2024:
print("deleting in bars:1w ", year - 1)
await client.delete(measurement, datetime.datetime(year, 1, 1))
print("data deleted in bars:1w, ", year - 1)
year += 1
print("drop_bars_1w: all finished.")
async def drop_bars_1M():
client = get_influx_client()
measurement = "stock_bars_1M" # month
year = 2006
while year < 2024:
print("deleting in bars:1M ", year - 1)
await client.delete(measurement, datetime.datetime(year, 1, 1))
print("data deleted in bars:1M, ", year - 1)
year += 1
print("drop_bars_1M: all finished.")
async def drop_bars_via_scope(target_year, ft: FrameType):
if ft == FrameType.DAY:
measurement = "stock_bars_1d"
elif ft == FrameType.WEEK:
measurement = "stock_bars_1w"
elif ft == FrameType.MONTH:
measurement = "stock_bars_1M"
else:
return False
client = get_influx_client()
start = datetime.datetime(target_year, 1, 1)
end = datetime.datetime(target_year, 12, 31, 23, 59, 59)
start_str = f"{start.isoformat(timespec='seconds')}Z"
print("deleting in ", measurement, target_year)
await client.delete(measurement, stop=end, start=start_str)
print("data deleted in ", measurement, target_year)
print("drop ", measurement, " all finished.")
async def remove_sec_in_bars1d(
code: str, dt_start: datetime.date, dt_end: datetime.date
):
# 删除日线内所有数据
start = datetime.datetime.combine(dt_start, datetime.time(0, 0, 0))
end = datetime.datetime.combine(dt_end, datetime.time(23, 59, 59))
start_str = f"{start.isoformat(timespec='seconds')}Z"
client = get_influx_client()
measurement = "stock_bars_1d"
await client.delete(measurement, stop=end, start=start_str, tags={"code": code})
async def remove_allsecs_in_bars1d(target_date: datetime.date):
# 删除日线内所有数据
start = datetime.datetime.combine(target_date, datetime.time(0, 0, 0))
end = datetime.datetime.combine(target_date, datetime.time(23, 59, 59))
start_str = f"{start.isoformat(timespec='seconds')}Z"
client = get_influx_client()
measurement = "stock_bars_1d"
await client.delete(measurement, stop=end, start=start_str)
async def remove_sec_in_bars_min(code: str, target_date: datetime.date, ft: FrameType):
# 删除分钟线内所有数据
start = datetime.datetime.combine(target_date, datetime.time(0, 0, 0))
end = datetime.datetime.combine(target_date, datetime.time(23, 59, 59))
start_str = f"{start.isoformat(timespec='seconds')}Z"
client = get_influx_client()
measurement = "stock_bars_%s" % ft.value
await client.delete(measurement, stop=end, start=start_str, tags={"code": code})
logger.info("remove sec from %s: %s, %s", measurement, code, target_date)