-
Notifications
You must be signed in to change notification settings - Fork 27
/
SavedChrome.py
304 lines (218 loc) · 6.31 KB
/
SavedChrome.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import logging
import traceback
import sqlite3
import time
import random
import json
import ChromeController
NEXT_BUTTON_XPATH = '//a[text()="Next"]'
class DbInterface():
def __init__(self, save_path):
self.log = logging.getLogger("Main.DB")
self.db = sqlite3.connect(save_path)
self.db.enable_load_extension(True)
self.check_init_db()
def check_init_db(self):
cur = self.db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS web_pages
(url text UNIQUE, referrer text, content text, meta text);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS key_value
(key text UNIQUE, value text);
""")
self.db.commit()
def insert_kv(self, key, value):
cur = self.db.cursor()
cur.execute("""
INSERT OR REPLACE INTO key_value (key, value)
VALUES (?, ?)
""", (key, json.dumps(value)))
self.db.commit()
def get_kv(self, key):
cur = self.db.cursor()
cur.execute("""
SELECT (value) FROM key_value
WHERE (key = ?)
""", (key, ))
res = cur.fetchone()
self.db.commit()
if res:
return json.loads(res[0])
else:
return None
def have_kv(self, key):
cur = self.db.cursor()
cur.execute("""
SELECT COUNT(*) FROM key_value
WHERE key = ?;
""", (key, ))
res = cur.fetchone()
self.db.commit()
return res[0]
def get_page(self, url):
cur = self.db.cursor()
cur.execute("""
SELECT url, referrer, content, meta FROM web_pages
WHERE url = ?;
""", (url, ))
res = cur.fetchone()
self.db.commit()
if res:
url, referrer, content, meta = res
return {
"url" : url,
"referrer" : referrer,
"content" : content,
"meta" : json.loads(meta),
}
return None
def have_page(self, url):
cur = self.db.cursor()
cur.execute("""
SELECT COUNT(*) FROM web_pages
WHERE url = ?;
""", (url, ))
res = cur.fetchone()
self.db.commit()
return res[0]
def insert_page(self, url, content, meta=None):
self.log.info("Saving key: %s", url)
cur = self.db.cursor()
if meta:
cur.execute("""
INSERT OR REPLACE INTO web_pages (url, content, meta)
VALUES (?, ?, ?)
""", (url, content, json.dumps(meta)))
else:
cur.execute("""
INSERT OR REPLACE INTO web_pages (url, content)
VALUES (?, ?)
""", (url, content))
self.db.commit()
def get_page_count(self):
cur = self.db.cursor()
cur.execute("""
SELECT count(1) FROM web_pages;
""")
res = cur.fetchone()
self.db.commit()
return res[0]
def get_all_pages(self):
cur = self.db.cursor()
cur.execute("""
SELECT url, referrer, content, meta FROM web_pages
""")
ret = []
for res in cur:
url, referrer, content, meta = res
ret.append({
"url" : url,
"referrer" : referrer,
"content" : content,
"meta" : json.loads(meta),
})
self.db.commit()
return ret
class LoggedChrome(ChromeController.ChromeRemoteDebugInterface):
def __init__(self, save_to_fn):
self.db = DbInterface(save_to_fn)
super().__init__(
binary='google-chrome',
headless=False,
)
def closure(container, url, content, meta):
self.content_handler(container, url, content, meta)
self.install_listener_for_content(closure)
self.last_save = time.time()
def content_handler(self, container, url, content, meta):
self.last_save = time.time()
self.db.insert_page(url, content, meta)
def sleep_and_process(self, duration):
hundred_ms_intervals = int(duration * 10)
for _ in range(hundred_ms_intervals):
self.process_available()
time.sleep(0.1)
while self.is_still_active():
self.process_available()
time.sleep(0.1)
self.process_available()
def is_still_active(self):
ACTIVE_TIME = 10 # seconds
# Pet chromium
self.process_available()
# Return whether the last file saved was within the last ACTIVE_TIME seconds
if time.time() < self.last_save + ACTIVE_TIME:
return True
return False
def get_dom_root_id(self):
'''
Get the NodeID for the DOM Root.
This assumes the page has fully loaded.
'''
# We have to find the DOM root node ID
dom_attr = self.DOM_getDocument(depth=-1, pierce=False)
assert 'result' in dom_attr
assert 'root' in dom_attr['result']
assert 'nodeId' in dom_attr['result']['root']
# Now, we have the root node ID.
root_node_id = dom_attr['result']['root']['nodeId']
return root_node_id
class Bot():
def __init__(self, filename):
self.cr = LoggedChrome(filename)
self.log = logging.getLogger("Main.Bot")
def _random_sleep_scroll(self):
steps = int(random.triangular(1,3,6))
print("Performing %s sleep steps" % (steps, ))
for x in range(steps):
self.cr.sleep_and_process(random.triangular(1, 2, 3))
self.cr.scroll_page(random.triangular(200, 500, 1500))
print("Sleep step %s done" % (x, ))
def nav_and_scroll(self, url):
self.cr.blocking_navigate(url)
def walk_pages(self, url):
self.cr.blocking_navigate(url)
self._random_sleep_scroll()
while 1:
# I haven't figured out exactly why you need to call this before find_element,
# but I think get_dom_root_id() (and therefore DOM0.getDocument()) causes
# the debug tools to load the DOM in the remote browser, or something.
# If you remove it, find_element() (and as such DOM.performSearch)
# returns invalid, but not empty results!
self.cr.get_dom_root_id()
node_ids = self.cr.find_element(NEXT_BUTTON_XPATH)
if not node_ids:
print("No next button results!")
# import pdb
# pdb.set_trace()
return
next_button_id = node_ids[0]
try:
# get_dom_item_center_coords scrolls the item into view if needed.
x_pos, y_pos = self.cr.get_dom_item_center_coords(next_button_id)
self.cr.sleep_and_process(random.triangular(5, 10, 20))
self.log.info("Extracting rendered source.")
rendered = self.cr.get_rendered_page_source()
current_url = self.cr.get_current_url()
self.cr.db.insert_page(url=current_url, content=rendered)
self.log.info("Clicking next button at %s", ((x_pos, y_pos), ))
self.cr.click_item_at_coords(x_pos, y_pos)
except Exception:
print("Failure?")
traceback.print_exc()
import IPython
# IPython.embed()
import pdb
pdb.set_trace()
self._random_sleep_scroll()
def test():
bt = Bot()
# cr.blocking_navigate("https://www.wlnupdates.com")
# cr.sleep_and_process(10)
# cr.db.get_page_count()
bt.test()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
test()