-
Notifications
You must be signed in to change notification settings - Fork 314
/
crawler.py
239 lines (208 loc) · 8.14 KB
/
crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import json
import logging
import os
import signal
import sys
import time
import typing
from pathlib import Path
from threading import Lock
from types import FrameType
from typing import Any, Callable, List, Literal, Optional
import sentry_sdk
from openwpm import mp_logger
from openwpm.command_sequence import CommandSequence
from openwpm.config import BrowserParams, ManagerParams
from openwpm.storage.cloud_storage.gcp_storage import (
GcsStructuredProvider,
GcsUnstructuredProvider,
)
from openwpm.task_manager import TaskManager
from openwpm.utilities import rediswq
# Configuration via environment variables
# Crawler specific config
REDIS_HOST = os.getenv("REDIS_HOST", "redis-box")
REDIS_QUEUE_NAME = os.getenv("REDIS_QUEUE_NAME", "crawl-queue")
MAX_JOB_RETRIES = int(os.getenv("MAX_JOB_RETRIES", "2"))
DWELL_TIME = int(os.getenv("DWELL_TIME", "10"))
TIMEOUT = int(os.getenv("TIMEOUT", "60"))
# Storage Provider Params
CRAWL_DIRECTORY = os.getenv("CRAWL_DIRECTORY", "crawl-data")
GCS_BUCKET = os.getenv("GCS_BUCKET", "openwpm-crawls")
GCP_PROJECT = os.getenv("GCP_PROJECT", "")
AUTH_TOKEN = os.getenv("GCP_AUTH_TOKEN", "cloud")
# Browser Params
DISPLAY_MODE = os.getenv("DISPLAY_MODE", "headless")
assert DISPLAY_MODE in ["headless", "xvfb", "native"]
DISPLAY_MODE = typing.cast(Literal["headless", "xvfb", "native"], DISPLAY_MODE)
HTTP_INSTRUMENT = os.getenv("HTTP_INSTRUMENT", "1") == "1"
COOKIE_INSTRUMENT = os.getenv("COOKIE_INSTRUMENT", "1") == "1"
NAVIGATION_INSTRUMENT = os.getenv("NAVIGATION_INSTRUMENT", "1") == "1"
JS_INSTRUMENT = os.getenv("JS_INSTRUMENT", "1") == "1"
CALLSTACK_INSTRUMENT = os.getenv("CALLSTACK_INSTRUMENT", "1") == "1"
JS_INSTRUMENT_SETTINGS = json.loads(
os.getenv("JS_INSTRUMENT_SETTINGS", '["collection_fingerprinting"]')
)
SAVE_CONTENT = os.getenv("SAVE_CONTENT", "")
PREFS = os.getenv("PREFS", None)
SENTRY_DSN = os.getenv("SENTRY_DSN", None)
LOGGER_SETTINGS = mp_logger.parse_config_from_env()
if CALLSTACK_INSTRUMENT is True:
# Must have JS_INSTRUMENT True for CALLSTACK_INSTRUMENT to work
JS_INSTRUMENT = True
EXTENDED_LEASE_TIME = 2 * (TIMEOUT + DWELL_TIME + 30)
# Loads the default manager params
# We can't use more than one browser per instance because the job management
# code below requires blocking commands. For more context see:
# https://github.com/openwpm/OpenWPM/issues/470
NUM_BROWSERS = 1
manager_params = ManagerParams()
browser_params = [BrowserParams() for _ in range(NUM_BROWSERS)]
# Browser configuration
for i in range(NUM_BROWSERS):
browser_params[i].display_mode = DISPLAY_MODE
browser_params[i].http_instrument = HTTP_INSTRUMENT
browser_params[i].cookie_instrument = COOKIE_INSTRUMENT
browser_params[i].navigation_instrument = NAVIGATION_INSTRUMENT
browser_params[i].callstack_instrument = CALLSTACK_INSTRUMENT
browser_params[i].js_instrument = JS_INSTRUMENT
browser_params[i].js_instrument_settings = JS_INSTRUMENT_SETTINGS
if SAVE_CONTENT == "1":
browser_params[i].save_content = True
elif SAVE_CONTENT == "0":
browser_params[i].save_content = False
else:
browser_params[i].save_content = SAVE_CONTENT
if PREFS:
browser_params[i].prefs = json.loads(PREFS)
# Manager configuration
manager_params.data_directory = Path("~/Desktop/") / CRAWL_DIRECTORY
manager_params.log_path = Path("~/Desktop/") / CRAWL_DIRECTORY / "openwpm.log"
structured = GcsStructuredProvider(
project=GCP_PROJECT,
bucket_name=GCS_BUCKET,
base_path=CRAWL_DIRECTORY,
token=AUTH_TOKEN,
)
unstructured = GcsUnstructuredProvider(
project=GCP_PROJECT,
bucket_name=GCS_BUCKET,
base_path=CRAWL_DIRECTORY + "/data",
token=AUTH_TOKEN,
)
# Instantiates the measurement platform
# Commands time out by default after 60 seconds
manager = TaskManager(
manager_params,
browser_params,
structured,
unstructured,
logger_kwargs=LOGGER_SETTINGS,
)
# At this point, Sentry should be initiated
if SENTRY_DSN:
# Add crawler.py-specific context
with sentry_sdk.configure_scope() as scope:
# tags generate breakdown charts and search filters
scope.set_tag("CRAWL_DIRECTORY", CRAWL_DIRECTORY)
scope.set_tag("GCS_BUCKET", GCS_BUCKET)
scope.set_tag("DISPLAY_MODE", DISPLAY_MODE)
scope.set_tag("HTTP_INSTRUMENT", HTTP_INSTRUMENT)
scope.set_tag("COOKIE_INSTRUMENT", COOKIE_INSTRUMENT)
scope.set_tag("NAVIGATION_INSTRUMENT", NAVIGATION_INSTRUMENT)
scope.set_tag("JS_INSTRUMENT", JS_INSTRUMENT)
scope.set_tag("JS_INSTRUMENT_SETTINGS", JS_INSTRUMENT_SETTINGS)
scope.set_tag("CALLSTACK_INSTRUMENT", CALLSTACK_INSTRUMENT)
scope.set_tag("SAVE_CONTENT", SAVE_CONTENT)
scope.set_tag("DWELL_TIME", DWELL_TIME)
scope.set_tag("TIMEOUT", TIMEOUT)
scope.set_tag("MAX_JOB_RETRIES", MAX_JOB_RETRIES)
scope.set_tag("CRAWL_REFERENCE", "%s/%s" % (GCS_BUCKET, CRAWL_DIRECTORY))
# context adds addition information that may be of interest
if PREFS:
scope.set_context("PREFS", json.loads(PREFS))
scope.set_context(
"crawl_config",
{
"REDIS_QUEUE_NAME": REDIS_QUEUE_NAME,
},
)
# Send a sentry error message (temporarily - to easily be able
# to compare error frequencies to crawl worker instance count)
sentry_sdk.capture_message("Crawl worker started")
# Connect to job queue
job_queue = rediswq.RedisWQ(
name=REDIS_QUEUE_NAME, host=REDIS_HOST, max_retries=MAX_JOB_RETRIES
)
manager.logger.info("Worker with sessionID: %s" % job_queue.sessionID())
manager.logger.info("Initial queue state: empty=%s" % job_queue.empty())
unsaved_jobs: List[bytes] = list()
unsaved_jobs_lock = Lock()
shutting_down = False
def on_shutdown(
manager: TaskManager, unsaved_jobs_lock: Lock
) -> Callable[[int, Optional[FrameType]], None]:
def actual_callback(s: int, _: Optional[FrameType]) -> None:
global shutting_down
manager.logger.error("Got interupted by %r, shutting down", s)
with unsaved_jobs_lock:
shutting_down = True
manager.close(relaxed=False)
sys.exit(1)
return actual_callback
# Register signal listeners for shutdown
for sig in [signal.SIGTERM, signal.SIGINT]:
signal.signal(sig, on_shutdown(manager, unsaved_jobs_lock))
def get_job_completion_callback(
logger: logging.Logger,
unsaved_jobs_lock: Lock,
job_queue: rediswq.RedisWQ,
job: bytes,
) -> Callable[[bool], None]:
def callback(success: bool) -> None:
with unsaved_jobs_lock:
if success:
logger.info("Job %r is done", job)
job_queue.complete(job)
else:
logger.warning("Job %r got interrupted", job)
unsaved_jobs.remove(job)
return callback
no_job_since = None
# Crawl sites specified in job queue until empty
while not job_queue.empty():
job_queue.check_expired_leases()
with unsaved_jobs_lock:
manager.logger.debug("Currently unfinished jobs are: %s", unsaved_jobs)
for unsaved_job in unsaved_jobs:
if not job_queue.renew_lease(unsaved_job, EXTENDED_LEASE_TIME):
manager.logger.error("Unsaved job: %s timed out", unsaved_job)
job = job_queue.lease(lease_secs=TIMEOUT + DWELL_TIME + 30, block=True, timeout=5)
if job is None:
manager.logger.info("Waiting for work")
time.sleep(5)
continue
unsaved_jobs.append(job)
retry_number = job_queue.get_retry_number(job)
site_rank, site = job.decode("utf-8").split(",")
if "://" not in site:
site = "http://" + site
manager.logger.info("Visiting %s..." % site)
callback = get_job_completion_callback(
manager.logger, unsaved_jobs_lock, job_queue, job
)
command_sequence = CommandSequence(
site,
blocking=True,
reset=True,
retry_number=retry_number,
callback=callback,
site_rank=int(site_rank),
)
command_sequence.get(sleep=DWELL_TIME, timeout=TIMEOUT)
manager.execute_command_sequence(command_sequence)
else:
manager.logger.info("Job queue finished, exiting.")
manager.close()
if SENTRY_DSN:
sentry_sdk.capture_message("Crawl worker finished")