-
Notifications
You must be signed in to change notification settings - Fork 0
/
ranking-update.py
executable file
·179 lines (156 loc) · 8.48 KB
/
ranking-update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python3
import collections, functools, json, logging, lxml.html, os, re, sqlite3, traceback, urllib.error, urllib.request
# This assumes that ranking.sqlite3 is in the same folder as this script.
dir_path = os.path.dirname(os.path.realpath(__file__))
db_path = os.path.join(dir_path + '/ranking.sqlite3')
# Kattis seems to block urllib user agent
kattis_user_agent = 'Mozilla/5.0 (X11; CrOS x86_64 8350.68.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'
# Connect to database.
db = sqlite3.connect(db_path)
db.row_factory = sqlite3.Row
def update_solved(site_id, username, solved):
cursor = db.execute('INSERT INTO site_score (site_id, username, solved) VALUES (?, ?, ?)',
(int(site_id), username, solved))
if cursor.rowcount == 0:
raise RuntimeError('Failed to update solved %s %s %s' % (site_id, username, solved))
def get_http(url):
return urllib.request.urlopen(url, timeout=10).read() # 10 second timeout
def scrape_codeforces(site_id, usernames):
# I don't see a better way than scanning all submissions of the user
for username in usernames:
body_bytes = get_http('http://www.codeforces.com/api/user.status?handle=%s' % username)
doc = json.loads(body_bytes.decode('utf-8'))
solved = set()
for obj in doc['result']:
# I'm assuming that (contestId, index) is a unique identifier for the problem
if obj['verdict'] == 'OK': solved.add((obj['problem']['contestId'], obj['problem']['index']))
update_solved(site_id, username, len(solved))
def scrape_codechef(site_id, usernames):
for username in usernames:
tree = lxml.html.fromstring(get_http('https://www.codechef.com/users/%s' % username))
solved = tree.cssselect("#problem_stats tr:nth-child(2) td")[0].text
update_solved(site_id, username, solved)
def scrape_coj(site_id, usernames):
for username in usernames:
tree = lxml.html.fromstring(get_http('http://coj.uci.cu/user/useraccount.xhtml?username=%s' % username))
solved = tree.cssselect("div.panel-heading:contains('Solved problems') span.badge")[0].text
update_solved(site_id, username, solved)
def scrape_kattis(site_id, usernames):
# First, get users who are listed as University of Calgary
# This reduces the number of requests needed
req = urllib.request.Request('https://open.kattis.com/universities/ucalgary.ca')
req.add_header('User-Agent', kattis_user_agent)
tree = lxml.html.fromstring(get_http(req))
solved = tree.cssselect('.table-kattis tbody tr')
for tr in solved:
username = tr.cssselect('a')[0].get('href').split('/')[-1]
score = float(tr.cssselect('td:last-child')[0].text)
if username in usernames:
update_solved(site_id, username, score)
usernames.remove(username)
# Then get other users
for username in usernames:
req = urllib.request.Request('https://open.kattis.com/users/%s' % username)
req.add_header('User-Agent', kattis_user_agent)
try:
tree = lxml.html.fromstring(get_http(req))
score = float(tree.cssselect('.rank tr:nth-child(2) td:nth-child(2)')[0].text)
update_solved(site_id, username, score)
except urllib.error.HTTPError:
logging.exception('Failed to fetch Kattis user "%s"', username)
def scrape_poj(site_id, usernames):
for username in usernames:
tree = lxml.html.fromstring(get_http('http://poj.org/userstatus?user_id=%s' % username))
solved = tree.cssselect("tr:contains('Solved:') a")[0].text
update_solved(site_id, username, solved)
def scrape_spoj(site_id, usernames):
for username in usernames:
tree = lxml.html.fromstring(get_http('http://www.spoj.com/users/%s/' % username))
solved = tree.cssselect('.profile-info-data-stats dd')[0].text
update_solved(site_id, username, solved)
def scrape_uva(base_url, site_id, usernames):
# uhunt has a weird API where it returns a list of bitsets of solved problems
body_bytes = get_http('%s/api/solved-bits/%s' % (base_url, ','.join(usernames)))
doc = json.loads(body_bytes.decode('utf-8'))
for obj in doc:
count = 0
for bs in obj['solved']:
while bs:
if bs & 1: count += 1
bs >>= 1
update_solved(site_id, str(obj['uid']), count)
KattisContest = collections.namedtuple('KattisContest', 'name is_over solved')
def parse_kattis_contest_html(html):
tree = lxml.html.fromstring(html)
contest_name = tree.cssselect('h2.title')[0].text
# Determine columns that represent problems
cell_to_problem_id = {}
col = 0
for th in tree.cssselect('#standings thead tr th'):
class_attr = th.get('class')
if class_attr and 'problemcolheader-standings' in class_attr: # NOTE: This does not do a proper class check
problem_id = th.cssselect('a')[0].get('href').split('/')[-1]
cell_to_problem_id[col] = problem_id
colspan = th.get('colspan')
if not colspan: colspan = '1'
col += int(colspan)
solved = collections.defaultdict(list)
for tr in tree.cssselect('#standings tr'):
user_a = tr.cssselect('a')
if not user_a: continue # not a user row
usernames = [] # Rows can have more than one user.
for a in user_a:
user_href = a.get('href')
user_match = re.match(r'^/users/(.+)$', user_href)
if not user_match: continue
usernames.append(user_match.group(1))
if not usernames: continue
col = 0
for td in tr.cssselect('td'):
if col in cell_to_problem_id:
problem_id = cell_to_problem_id[col]
class_attr = td.get('class')
if class_attr and 'solved' in class_attr: # NOTE: This does not do a proper class check
for username in usernames:
solved[username].append(problem_id)
colspan = td.get('colspan')
if not colspan: colspan = '1'
col += int(colspan)
is_over = 'session-finished' in tree.cssselect('.contest-progress')[0].get('class')
return KattisContest(name=contest_name, is_over=is_over, solved=solved)
SupportedSite = collections.namedtuple('SupportedSite', 'id name scrape_func')
supported_sites = [
SupportedSite(id=1, name='Caribbean Online Judge', scrape_func=scrape_coj),
SupportedSite(id=2, name='CodeChef', scrape_func=scrape_codechef),
SupportedSite(id=3, name='Codeforces', scrape_func=scrape_codeforces),
SupportedSite(id=4, name='ICPC Live Archive', scrape_func=functools.partial(scrape_uva, 'https://icpcarchive.ecs.baylor.edu/uhunt')),
SupportedSite(id=5, name='Kattis', scrape_func=scrape_kattis),
SupportedSite(id=6, name='Peking Online Judge', scrape_func=scrape_poj),
SupportedSite(id=7, name='Sphere Online Judge', scrape_func=scrape_spoj),
SupportedSite(id=8, name='UVa Online Judge', scrape_func=functools.partial(scrape_uva, 'http://uhunt.felix-halim.net')),
]
# Scrape users on sites
for site in supported_sites:
print('Processing %s' % site.name)
usernames = set()
# Note: Primary key is (site_id, username).
for row in db.execute('SELECT username FROM site_account WHERE site_id=?', (site.id,)):
usernames.add(row['username'])
try:
site.scrape_func(site.id, usernames)
except:
logging.exception('Fatal error occured while scraping %s', site.name)
# Scrape Kattis contests that have not been scraped or were not yet finished at the last scrape.
for row in db.execute('SELECT kattis_contest_id AS k FROM meeting WHERE kattis_contest_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM kattis_contest WHERE kattis_contest_id=k AND is_over)'):
print('Scraping Kattis content %s' % row['k'])
req = urllib.request.Request('https://open.kattis.com/contests/%s' % row['k'])
req.add_header('User-Agent', kattis_user_agent)
html = get_http(req)
contest = parse_kattis_contest_html(html)
# Insert HTML to database so we have it
db.execute('INSERT OR REPLACE INTO kattis_contest (kattis_contest_id, kattis_contest_name, html, is_over) VALUES (?, ?, ?, ?)', (row['k'], contest.name, html, contest.is_over))
# Insert solved problems for each user.
for username, solved in contest.solved.items():
for problem_id in solved:
db.execute('INSERT OR IGNORE INTO kattis_contest_solved (kattis_contest_id, kattis_username, kattis_problem_id) VALUES (?,?,?)', (row['k'], username, problem_id))
db.commit()