-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
153 lines (104 loc) · 3.82 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import re
import wget
import zipfile
import os
import logging
from parallel import run_parallel_batches
logging.basicConfig(level=logging.DEBUG)
def is_numeric(gram: str):
try:
int(gram)
return True
except ValueError:
pass
try:
float(gram)
return True
except ValueError:
pass
return False
def is_valid_gram(gram: str, n: int):
components = gram.split(" ")
if len(components) != n:
return False
if any(is_numeric(component) for component in components):
return False
if all(component.isalnum() for component in components):
return True
return all(any(c.isalpha() for c in component) for component in components)
def download_ngram_file(url, tmp_dir):
logging.info(f"start download url {url}")
# get zip name from url
zip_name = os.path.basename(url)
# download zip to tmp
zip_path = os.path.join(tmp_dir, zip_name)
wget.download(url, zip_path)
logging.info(f"unzip file {zip_path}")
# extract zip
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(tmp_dir)
# remove zip
os.remove(zip_path)
csv_path = os.path.join(tmp_dir, zip_name.replace(".zip", ""))
return csv_path
def parse_gram_results(csv_files, gram_dictionary, cutoff_year, to_lower):
for csv_file in csv_files:
logging.info(f"read csv {csv_file}")
try:
# read csv lines
with open(csv_file, "r") as f:
gram_lines = f.readlines()
except Exception as e:
logging.info(f"exception reading csv")
os.remove(csv_file)
continue
# remove csv file
os.remove(csv_file)
logging.info(f"parse ngram lines")
# parse ngram lines
for gram_line in gram_lines:
components = gram_line.split("\t")
gram = components[0]
if to_lower:
gram = gram.lower()
year = int(components[1])
count = int(components[2])
if year > cutoff_year:
if gram in gram_dictionary:
gram_dictionary[gram] += count
else:
gram_dictionary[gram] = count
def parse_google_ngram_files(num_grams: int, cutoff_year: int, max_entries: int, to_lower: bool, source_path: str, tmp_dir: str):
url_seeker = re.compile("<a href='(.*?)'>")
# open source html path
f = open(source_path, "r")
lines = f.readlines()
f.close()
jobs = []
gram_dictionary = {}
for line in lines:
# get url from html line
url = url_seeker.match(line).group(1)
# append job
jobs.append((download_ngram_file, (url, tmp_dir)))
run_parallel_batches(jobs, 4, lambda results: parse_gram_results(results, gram_dictionary, cutoff_year, to_lower))
logging.info(f"sort entries")
# get all entries, filter for valid grams and sort on frequency
all_entries = list(gram_dictionary.items())
all_entries = list(filter(lambda tup: is_valid_gram(tup[0], num_grams), all_entries))
all_entries.sort(key=lambda tup: tup[1], reverse=True)
return all_entries[:max_entries]
def parse(ngrams, cutoff_year, max_entries, lower_case, input_html, output_file):
tmp_dir = "tmp"
dest_dir = "results"
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
entries = parse_google_ngram_files(ngrams, cutoff_year, max_entries, lower_case, input_html, tmp_dir)
dest_path = os.path.join(dest_dir, output_file)
with open(dest_path, "w") as f:
for entry in entries:
grams = "\t".join(entry[0].split(" "))
f.write(f"{entry[1]}\t{grams}\n")
parse(1, 1980, 100_000, True, "sources/spanish_unigrams_sources.html", "grams.1")