-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextract_gradebook.py
93 lines (81 loc) · 3.11 KB
/
extract_gradebook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from zipfile import ZipFile
from re import match, split, IGNORECASE
from datetime import datetime
from collections import defaultdict
from pprint import pprint
from os.path import exists, join, basename, dirname, realpath
from os import makedirs, rename
from shutil import rmtree
from tempfile import mkdtemp
def parse_txt(txt):
lines = txt.splitlines()
pattern = '(Name|Assignment|Filename): '
entry = defaultdict(list)
for line in lines:
pairs = split(pattern, line)[1:]
if pairs:
if pairs[0] == 'Filename':
entry['files'].append(pairs[1])
else:
entry[pairs[0]] = pairs[1]
return entry
def extract_flat(zip, filename, my_dir):
zipInfo = zip.getinfo(filename)
zipInfo.filename = basename(filename)
zip.extract(zipInfo, my_dir)
def extract_submission(zf, record, prefix="submissions/"):
user_dir = join(prefix, record['sid'])
if not exists(user_dir):
makedirs(user_dir)
contained_zip = False
for f in record['files']:
try:
extract_flat(zf, join(record['dir'], f), user_dir)
#zf.extract(join(record['dir'], f), user_dir)
if match('.*\.zip$', f, flags=IGNORECASE):
contained_zip = True
tmp_dir = mkdtemp()
# with record['zipfile'].open(f) as sub_file:
# print "ZIP file %s" % f
with ZipFile(join(user_dir, f)) as int_zip:
for ifn in int_zip.namelist():
if not match('.*/$', ifn): # if not dir
int_zip.extract(ifn, user_dir)
#int_zip.extract(ifn, tmp_dir)
#proper_file = ifn.decode('ascii', 'ignore')
#rename(join(tmp_dir, ifn),
#join(user_dir, basename(proper_file)))
rmtree(tmp_dir)
except Exception as e:
print("Couldn't process file %s => %s" % (f, e))
pprint(record)
if not contained_zip:
print(" record for %s did not contain a zip "
"file, instead these files were included:"
% record['sid'])
for sf in record['files']:
print(' > %s' % sf)
def parse_gradebook_file(zf):
records = {}
for f in zf.namelist():
m = match('^(.*)_([0-9]*)_attempt_(.*).txt$', f)
if m:
with zf.open(f) as txt_file:
record_txt = txt_file.read().decode('utf-8')
entry = parse_txt(record_txt)
record = {
'assignment': m.group(1),
'sid': m.group(2),
'dir': '',
'date': datetime.strptime(m.group(3),
'%Y-%m-%d-%H-%M-%S'),
}
record.update(entry)
records[record['sid']] = record
return records
with ZipFile('gradebook.zip') as zf:
gradebook = parse_gradebook_file(zf)
print(gradebook)
for k, v in gradebook.items():
pprint(v)
extract_submission(zf, v)