-
Notifications
You must be signed in to change notification settings - Fork 0
/
urlpool.py
196 lines (151 loc) · 6.18 KB
/
urlpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: utf-8 -*-
import concurrent.futures
import requests
import queue
import threading
import datetime
import ReadGoogle
import urllib.request as req
import mysql.connector
from mysql.connector import Error
import configparser
import json_parse
# Time interval (in seconds)
INTERVAL = 10 * 60
# The number of worker threads
MAX_WORKERS =4
# You should set up request headers
# if you want to better evade anti-spider programs
HEADERS = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
#'Host': None,
'If-Modified-Since': '0',
#'Referer': None,
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.62 Safari/537.36',
}
############################
def db_connect(config):
#https://stackoverflow.com/questions/42906665/import-my-database-connection-with-python
try:
dbconn = mysql.connector.connect(host = config['mysqlDB']['host'],
database = config['mysqlDB']['database'],user = config['mysqlDB']['user'],
password = config['mysqlDB']['password']
,auth_plugin='mysql_native_password')
except Exception as err:
print ("exception"+err)
if dbconn.is_connected():
print('Connected to MySQL database')
return dbconn
def handle_response(response,url,dbconn):
# TODO implement your logics here !!!
print("handle")
query="select response, create_time from api_snapshot where url='"+url+"' and version=0 order by create_time desc"
cursor = dbconn.cursor(buffered=True)
cursor.execute(query,multi=true)
result = cursor.fetchone()
if result is not None:
#print('%s' %response,create_date )# %(response,create_time))
print("old value"+result[0][:20])
print(response[:20])
if result[0] != response:
write_db(url,response,datetime.datetime.now(),0,dbconn)
alarm(response, result)
#get_info_db(query,url,dbconn,0,datetime.datetime.now())
else:
print("first write"+response[:20])
write_db(url,response,datetime.datetime.now(),0,dbconn)
def get_info_db(sql,addr,dbconn,version=0,date='1990-01-01'):
# version=0 means latest
if date>'1990-01-01':
sql=sql+"select balance, create_time from balance_history where address='"+add+"'"
sql=sql+" and date>"
sql=sql+" order by CreateTime desc"
try:
dbconn = mysql.connector.connect(host='localhost',database='python_mysql',user='root',password='secret')
if conn.is_connected():
print('Connected to MySQL database')
cursor = dbconn.cursor(buffered=True)
cursor.execute(sql,multi=true)
results = cursor.fetchall()
for row in results:
print (row[0])
if count(queryresult)>=version:
queryresult[version]
except Exception as err:
print ("exception2"+err)
pass
return balance
def alarm(msg,msg2):
print("alarm")
pass
def write_db(url,response,dt,version,conn):
query0 = "UPDATE api_snapshot set version=NULL where url=%s"
args0 =(url,)
query = "INSERT INTO api_snapshot (url,response,create_time,version) " \
"VALUES(%s,%s,%s,%s)"
args = (url,response,dt,version)
try:
print (query0+str(url))
cursor = conn.cursor(buffered=True) #bug as init_URL didn't require buffered, also delte binance Ticker then can insert
cursor.execute(query0, args0)
print("overwrite verion")
cursor.execute(query, args,multi=true)
conn.commit()
print("writedb")
if cursor.lastrowid:
print('last insert id', cursor.lastrowid)
else:
print('last insert id not found')
conn.commit()
except Error as error:
print("write Db err:"+str(error))
# Retrieve a single page and report the URL and contents
def load_url(session, url,dbconn):
print("now"+url)
raw_response = session.get(url) #init_requests()#
print (url+"http code %d" %(raw_response.status_code))#"parsedJson:"+json)
json=json_parse.parsejson(raw_response.content.decode("utf-8"),url) #decode from binary b'string'
if raw_response.status_code == 200:
# You can refactor this part and
# make it run in another thread
# devoted to handling local IO tasks,
# to reduce the burden of Net IO worker threads
return handle_response(json,url,dbconn)
def ThreadPoolExecutor():
return concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
# Generate a session object
def Session():
session = requests.Session()
session.headers.update(HEADERS)
return session
config = configparser.ConfigParser()
config.read('dbconfig.ini')
dbconn=db_connect(config)
#load_url(Session(),'https://www.binance.com/api/v1//exchangeInfo',dbconn)
#load_url(Session(),'https://www.binance.com/api/v1/ticker/allBookTickers',dbconn)
#load_url(Session(),'https://api.kucoin.com/v1/market/open/symbols',dbconn)
URLS=[["https://api.kucoin.com/v1/market/open/symbols"],["http://api.huobi.pro/v1/common/currencys"],["https://api.coinex.com/v1/market/list"],["https://data.gate.io/api2/1/pairs"],["https://api.bithumb.com/public/ticker/All"],["https://www.binance.com/api/v1/ticker/allBookTickers"],["https://www.binance.com/api/v1//exchangeInfo"]]
# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor() as executor, Session() as session:
if not URLS:
raise RuntimeError('Please fill in the array `URLS` to start probing!')
tasks = queue.Queue()
for urlArray in URLS:
url=urlArray[0]
#print(url)
tasks.put_nowait(url)
def wind_up(url):
#print('wind_up(url={})'.format(url))
tasks.put(url)
i=0
while True:
url = tasks.get()
# Work
executor.submit(load_url, session, url,dbconn)
i=i+1
print(i)
threading.Timer(interval=INTERVAL, function=wind_up, args=(url,)).start()