-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrape_utils.py
244 lines (170 loc) · 6.9 KB
/
scrape_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""A DSL to remove some of the tedium from writing BS4 scrapers.
"""
from collections import defaultdict
from datetime import datetime
from functools import reduce
import re
import bs4
from dateutil.parser import parse as dt_parse
import pytz
import usaddress
def apply_tuple(elt, instruction):
(selector, *rest) = instruction
result = elt
if isinstance(elt, list):
new_result = []
for member in elt:
add = apply_instruction(member, selector)
if isinstance(add, (list, map)):
new_result += add
else:
new_result.append(add)
result = new_result
elif elt:
result = apply_instruction(elt, selector)
return apply_tuple(result, rest) if rest else result
def apply_instruction(elt, instruction):
"""A high level function that takes a bs4 Tag or document, followed by a
directive that specifies the content to extract and the shape it should
have.
- Strings are treated as CSS selectors.
For example, scrape(doc, "a.link") would select and return the first
<a> tag in `doc` with the "link" class
- Lists are used to indicate that multiple tags should be returned, rather
than just the first:
scrape(doc, ["a.link"]) -- return all <A> tags with "link" class
- Subsequent elements in the list act as filters
scrape(doc, ["a.link", "img"]) -- return a.link tags containing an
<img>
scrape(doc, ["a.link", text, str.strip]) -- return a.link tags with
non-empty string contents (text is a function that takes a tag)
- Tuples group operations on a tag or tags
scrape(doc, ("a.link", text)) -- return the text of the first a.link
tag
scrape(doc, (["a.link"], text) -- return the text of each a.link tag
scrape(doc, (["a.link", "img"], attr("href"))) -- return the href
attribute of each a.link tag that has an img descendant
- Dicts return dicts
scrape(doc, {"text": ("a.link", text)})
scrape(doc, (["a.link"], {"text": text})) - return a list of dicts, one
for each a.link tag, where the "text" key of each dict contains the
text of the corresponding tag.
"""
if callable(instruction):
return instruction(elt)
if isinstance(instruction, str):
return elt.select_one(instruction)
if isinstance(instruction, list):
instruction, *filters = instruction
return [match for match in elt.select(instruction)
if all(apply_instruction(match, filt) for filt in filters)]
if isinstance(instruction, dict):
return dict_from_selectors(elt, instruction)
if isinstance(instruction, tuple):
return apply_tuple(elt, instruction)
if getattr(instruction, "search"):
return instruction.search(elt)
def dict_from_selectors(elt: bs4.element.Tag, selectors: dict):
"""Generate a dictionary by specifying keys and the CSS selectors to
retrieve information from the element.
:param elt: the context for the CSS selectors
:param selectors: a dictionary mapping output keys to instructions. The
instructions can be strings (selectors), tuples of (selector, fns...)
"""
result = {}
for prop, instruction in selectors.items():
applied = apply_instruction(elt, instruction)
if isinstance(applied, map):
result[prop] = list(applied)
elif isinstance(applied, bs4.Tag):
result[prop] = applied.text.strip()
else:
result[prop] = applied
return result
scrape = apply_instruction
def text(elt):
if isinstance(elt, bs4.Tag):
return elt.text
else:
return map(text, elt)
def stripped_text(elt):
return elt.text.strip()
def text_children(elt):
pieces = (child.strip() for child in elt.children if isinstance(child, str))
return list(filter(None, pieces))
def attr(attrname, val=None):
"""
With one argument, pulls an attributes into a scraped dictionary:
dict_from_selectors({"url": ("a", attr("href"))})
With two arguments, returns an element filter that
"""
if val:
if getattr(val, "match", None):
return lambda elt: val.match(elt.attrs.get("attrname", ""))
else:
return lambda elt: elt.attrs.get("attrname") == val
return lambda elt: elt.attrs.get(attrname, "")
def date(arg=None, tz=None):
if tz and not arg:
if isinstance(tz, str):
tz = pytz.timezone(tz)
return lambda x: date(x, tz)
if isinstance(arg, bs4.Tag):
arg = stripped_text(arg)
if tz:
dt = dt_parse(arg, ignoretz=True)
return tz.localize(dt)
return dt_parse(arg)
def text_contains(arg):
return lambda elt: arg in elt.text
def text_matches(patt):
if isinstance(patt, str):
patt = re.compile(patt)
return lambda elt: re.search(patt, elt.text)
children = bs4.Tag.children.fget
def either(*fns):
return lambda elt: any(fn(elt) for fn in fns)
def ch(*fns):
"Chain together some functions."
return lambda elt: reduce((lambda x, fn: apply_instruction(x, fn)), fns, elt)
def to_under(s):
"Converts a whitespace-separated string to underscore-separated."
return re.sub(r"\s+", "_", s.lower())
def col_names(table):
tr = table.select("thead > tr")[0]
return [th.get_text().strip() for th in tr.find_all("th")]
def tabular(field_processors={}, convert_column=to_under, index=None):
def process_table(elt):
table = elt if elt.name == "table" else elt.find("table")
columns = [convert_column(c) for c in col_names(table)]
rows = table.find("tbody").find_all("tr")
data = ({col_name: apply_instruction(cell, field_processors.get(col_name, stripped_text))
for col_name, cell in zip(columns, row.find_all("td"))}
for row in rows)
if index:
return {datum[index]: datum for datum in data}
else:
return list(data)
if isinstance(field_processors, bs4.Tag):
return process_table(field_processors)
return process_table
def address(text=None, defaults={}):
if not text and defaults:
return lambda elt: address(elt, defaults)
if isinstance(text, bs4.Tag):
text = text.text
tagged, _ = usaddress.tag(text)
tagged = defaultdict(str, tagged)
street_address = "{AddressNumber} {StreetName} {StreetNamePostType}".format(**tagged)
occupancy = "{a[Recipient]}\n{a[OccupancyIdentifier]} {a[OccupancyType]}".format(a=tagged)
place = tagged["Place"]
if "\n" in place:
occupancy_name, place = place.rsplit("\n", 1)
occupancy += f" {occupancy_name}"
occupancy = re.sub(r"(^\s+|(?<=\s)\s+|\s+$)", "", occupancy)
return {
"name": occupancy,
"city": place or defaults["city"],
"state": tagged.get("Statename", defaults["state"]),
"zip": tagged.get("ZipCode", defaults["zip"])
}