-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdeputy.py
1573 lines (1286 loc) · 67 KB
/
deputy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Copyright (c) 2016-2019 Tony Allan
# This script is all a bit of a hack.
# Some effort has been made to put communication with the Deputy api into the Deputy class.
# The script can be used to:
# - convert the CSV file from Synergetic to a Duputy CSV useful for importing employee's into Deputy.
# - add a year level for each student using the training module employee field to conveniently store the data.
# https://www.deputy.com/api-doc/API
import argparse
import collections
import configparser
import csv
import datetime
import http.client
import json
import os
import re
import socket
import sys
import urllib.parse
import gspread
from oauth2client.service_account import ServiceAccountCredentials
def open_import_csv_reader(args):
return csv.DictReader(open(args.import_csv, encoding='utf-8-sig'))
# Five classes as defined:
# Counter is a Dict subclass to simplify counters
# DeputyException for API errors
# Deputy to provide API access
# Printx to facilitate CSV output to stdout for some commands
# College, which extends Deputy and adds a number of college specific functions and methods.
class Counter(object):
"""
A class of counters with a total and a set for each key.
"""
def __init__(self):
self.counters = collections.OrderedDict()
self.data = collections.OrderedDict()
self.Desc = collections.namedtuple('Desc', ['title', 'initial', 'increment'])
self.Counter = collections.namedtuple('Counter', ['id', 'title', 'count'])
self.total = collections.OrderedDict()
def add_counter(self, id, title=None, initial=0, increment=1):
self.counters[id] = self.Desc(title, initial, increment)
self.total[id] = initial
def count(self, key='', id=None, increment=None):
if key not in self.data:
self.data[key] = {}
for c in self.counters:
self.data[key][c] = self.counters[c].initial
if increment is None:
self.data[key][id] += self.counters[id].increment
self.total[id] += self.counters[id].increment
else:
self.data[key][id] += increment
self.total[id] += increment
def get_count(self, key, id=None):
if id is None:
return self.data[key]
else:
return self.data[key][id]
def get_total(self, id=None):
if id is None:
return self.total
else:
return self.total[id]
def get_totals(self):
result = []
for id in self.counters:
result.append(self.Counter(id, self.counters[id].title, self.total[id]))
return result
def __len__(self):
return len(self.data)
def __repr__(self):
return repr(self.data)
def __getitem__(self, key):
return self.data[key]
def __iter__(self):
return self.data.__iter__()
def __contains__(self, item):
return item in self.data
def __delitem__(self, key):
# totals are not reduced
del self.data[key]
class DeputyException(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
def __str__(self):
if self.code == 'user_exit':
return '\n{0}'.format(self.message)
else:
return '[Exception: {0}] {1}'.format(self.code, self.message)
class Deputy(object):
"""
This class and its subclasses are the only place the Deputy API is invoked.
Raises a DeputyException if there is a problem, otherwise returns a decoded JSON response.
"""
# Deputy columns needed for the Deputy bulk user creation upload file. This may change in the future.
DEPUTY_COLS = ('First Name', 'Last Name', 'Time Card Number', 'Email', 'Mobile Number',
'Birth Date', 'Employment Date', 'Weekday', 'Saturday', 'Sunday', 'Public Holiday')
def __init__(self, endpoint, token, timeout):
self.endpoint = endpoint
self.token = token
self.timeout = timeout
self.progress = Deputy.sample_progress
@staticmethod
def sample_progress(ptype, function, position):
"""
A callback could be used to show progress if this class were used to generate web content.
student_report(), in particular, takes a while to gather its data.
Set the callback with something like deputy.progress = your_function_name
"""
#print('[Fetching {0}={1} {2}]'.format(ptype, function, position))
pass
def api(self, api, method='GET', data=None, dp_meta=False):
"""
At least for Resource calls, api_resp is a list of results.
The dp-meta-option header is passed if dp_meta is set. This adds additional resonse data.
Returns the API data.
"""
#self.progress('api', api, 0)
#print('API', api)
url = urllib.parse.urlparse(urllib.parse.urljoin(self.endpoint, api))
try:
conn = http.client.HTTPSConnection(url.hostname, url.port, timeout=self.timeout)
except:
raise DeputyException('invalid_url' 'Invalid URL: {0}'.format(self.endpoint))
# format POST or PUT data as JSON and create the appripriate headers
body = json.dumps(data)
headers = {
'Authorization': 'OAuth {0}'.format(self.token),
'Content-type': 'application/json',
'Accept': 'application/json',
}
if dp_meta is False:
headers['dp-meta-option'] = 'none'
try:
conn.request(method, url.path, body, headers)
resp = conn.getresponse()
except KeyboardInterrupt:
raise DeputyException('user_exit', 'Ctrl-C - User requested exit.')
except socket.timeout:
raise DeputyException('socket_timeout', 'Socket timeout for API {0}'.format(api))
except socket.error as e:
# This exception is raised for socket-related errors.
raise DeputyException('sockey_error', 'Socket error ({0}) for API {1}.'.format(e.errno, api))
#print(resp.status, resp.reason, dict(resp.getheaders()), resp.read())
if resp.status == 302:
raise DeputyException('unexpected_api', 'Unexpected API {0} response {1} {2} using API URL {3}.'.format(api, resp.status, resp.reason, url.geturl()))
if resp.status != 200:
raise DeputyException('http_error', 'API {0} failed with {1} {2}.'.format(api, resp.status, resp.reason))
try:
resp = resp.read().decode('utf-8')
api_resp = json.loads(resp)
except ValueError:
if len(resp) == 0:
raise DeputyException('json_response_empty', 'Error parsing JSON API Response for {0} (zero length)'.format(api))
else:
raise DeputyException('json_response_parse', 'Error parsing JSON API Response for {0}'.format(api))
conn.close()
return api_resp
def resource(self, resource_name, key='Id', sort='Id', join=[], select=None):
"""
Get all resources where there might be more than 500 resources.
Resource name is just 'Employee' or 'Contact' -- just the name of the resource.
'key' is the dict key to fetch items from the dictionary.
'sort' is they data key to sort by.
'join' is a list of objects to include in the record, such as ['ContactObject']
'select' is one or more additional search terms select=[(field, type, data)]
The result an OrderedDict of namedtuple with the key as specified in the call order by 'sort'.
QUERY is very powerful by only the simplest features are used here.
See: http://api-doc.deputy.com/API/Resource_Calls -- /QUERY
May raise DeputyException from the API call.
"""
window = 500 # hardcoded in deputy's API.
position = 0
result = collections.OrderedDict()
while True:
self.progress('resource', resource_name, position)
query = {
'search': {
'f1':{'field':key, 'type':'is', 'data':''}
},
'sort': {sort: 'asc'},
'join' : join,
'start': position
}
if select is not None:
for s_field, s_type, s_data in select:
query['search'][s_field+'_'+str(s_data)] = {'field':s_field, 'type':s_type, 'data':s_data}
api_resp = self.api('resource/{0}/QUERY'.format(resource_name), method='POST', data=query)
for record in api_resp:
result[record[key]] = record
#print(len(api_resp), resource_name, position)
if len(api_resp) == window:
position += window
else:
break
self.progress('resource', resource_name, len(result))
return result
def employees(self, key='Id', sort='LastName', join=[]):
"""
Return OrderedDict of Active employees sorted by LastName.
May raise DeputyException.
"""
return self.resource('Employee', key=key, sort=sort, join=join, select=[('Active', 'eq', True)])
def employee_by_email(self):
"""
Return an OrderedDict of employee's with email as the key.
May raise DeputyException.
"""
employees = self.employees(join=['ContactObject'])
email_employees = collections.OrderedDict()
for id in employees:
employee = employees[id]
email_address = employee['ContactObject']['Email']
email_employees[email_address] = employee
return email_employees
def discarded_employees(self, key='Id', sort='LastName', join=[]):
"""
Return OrderedDict of Discarded employees sorted by LastName.
May raise DeputyException.
"""
return self.resource('Employee', key=key, sort=sort, join=join, select=[('Active', 'eq', False)])
def discarded_employee_by_email(self):
"""
Return an OrderedDict of Discarded employee's with email as the key.
May raise DeputyException.
"""
employees = self.discarded_employees(join=['ContactObject'])
#print(json.dumps(employees, sort_keys=True, indent=4, separators=(',', ': ')))
email_employees = collections.OrderedDict()
for id in employees:
employee = employees[id]
try:
email_address = employee['ContactObject']['Email']
except KeyError:
print('Could no process {} (id={})'.format(employee['DisplayName'], employee['Id']))
email_employees[email_address] = employee
return email_employees
class Printx(object):
"""
This is a helper class to allows outout to be formated as text or a CSV record.
"""
def __init__(self, title=None, csv_flag=False):
#Create a writer on stdout if csv selected
self.csv = csv_flag
if self.csv:
self.writer = csv.writer(sys.stdout, quoting=csv.QUOTE_MINIMAL)
if title is not None:
self.text(title)
def text(self, text, *values):
# if csv, write text to stderr because stdout is used for the csv output
try:
if self.csv:
print(text.format(*values), file=sys.stderr)
else:
print(text.format(*values))
except KeyError:
print('[Keyerror]', text, str(*values))
def headers(self, *values):
# only write a header if CSV output is required.
if self.csv:
self.writer.writerow(values)
def data(self, text, *values):
# write to CSV or normal text.
if self.csv:
self.writer.writerow(values)
else:
self.text(text, *values)
def stats(self, c):
self.text('')
for stat in c.stats:
self.text('{0}: {1}', stat.text, stat.value)
class College(Deputy):
"""
This class extends Deputy and adds a number of functions and methods.
Static Methods:
parse_student_record — to read an input CSV row, perform fixups, and create a Deputy user creation CSV row.
add_years_to_student_records — to add a student year level as a new TrainingRecord resource.
Methods:
"""
def __init__(self, endpoint, token, timeout):
# A function may also return some statistics.
self.stats = []
self.Stat = collections.namedtuple('Stat', ['id', 'text', 'value'])
super().__init__(endpoint, token, timeout)
@staticmethod
def parse_student_record(row, include_mobile=False):
"""
Function to parse a row from the input CSV file and create a row for the Deputy compatible
user CSV file, applying college specifc business rules.
Returns a tuple (messages, row), where messages is an array of parse processing messages,
and row is the Deputy output record.
synergetic_csv deputy_csv
---------------------------------- -----------------------------
Student ID
Title
Surname Last Name
Given1
Student Preferred First Name
BirthDate
Address1
Address2
Address3
Suburb
State
PostCode
Country
StudentTertiaryCode
Course -> used to create year level
StudentLegalFullName
CourseStatus
FileSemester
FileYear
StudentCampus
StudentBoarder
Email
NetworkLogin Time Card Number
OccupEmail Email
YearatUni -> used to create year level
Boarder=Non Res Special -> used to create year level
StudentPreviousSchool
Description
MobilePhoneActual Mobile Number
Deputy fields not used:
Birth Date / Employment Date / Weekday / Saturday / Sunday / Public Holiday
Indicates Postgrad:
1/2/3/1NR Year 1,2,3,1NR unless Course field suggests a higher degree.
"Study Abroad & Exchange" do bursary.
Exceptions:
3 yes. Missing YearatUni for Mr Wayne C Z for course Biomedicine
"""
messages = []
synergetic_id = row['Student ID'].strip()
first_name = row['Student Preferred'].strip()
last_name = row['Surname'].strip()
student_id = row['Network Login'].strip()
email = row['Trinity Email'].strip()
course = row['Course Description'].strip()
year_at_uni = row['UOMYear'].strip()
boarder = row['Boarder'].strip()
mobile = row['Mobile Phone'].strip()
name = '{0} {1}'.format(first_name, last_name).strip() # allow for students with a single name
# Fixup's to cater for poor quality and inconsistent input data.
# Empty row in the spreadsheet
if len(first_name) + len(last_name) + len(student_id) == 0:
return (messages, None)
# Fix missing NetworkLogin (assume email is OK in this instance)
if len(student_id) == 0:
if len(email) == 0:
student_id = None
messages.append('Excluded {0} (Student ID: {1}) for Missing NetworkLogin and/or Trinity Email.'.format(name, synergetic_id))
return (messages, None)
else:
student_id = email.split('@')[0]
messages.append('Missing NetworkLogin for {0}. Setting to {1} using email {2}.'.format(name, student_id, email))
# exclude some users (with potential include exceptions)
if student_id in exclude_list:
if student_id in include_list:
print('include (1)', student_id)
else:
messages.append('Excluded {0} ({1}) who is on the exclude list.'.format(name, student_id, course))
return (messages, None)
if year_at_uni == 'Not Selected':
# Make this a fatal error -- need to fix the data rather than ignore the student.
sys.exit('Fatal Error. {0} ({1}), Year at Uni = Not Selected.'.format(name, student_id, year_at_uni, course))
year = None
else:
# year 1,2,3,1NR assignment
try:
# year_at_uni data contains "4 Years" and "1 Year"
year_at_uni = year_at_uni.split(' ')[0]
if boarder == 'Non Res Special':
year ='Year1NR'
else:
if int(year_at_uni) > 3:
messages.append('Excluded {0} ({1}), Year at Uni {2} > 3 in course {3}.'.format(name, student_id, year_at_uni, course))
return (messages, None)
year = 'Year{0}'.format(year_at_uni)
except ValueError:
messages.append('Missing UOMYear for {0} ({1}). Setting to blank.'.format(name, student_id, course))
year = None
except:
sys.exit('Fatal Error. Unknown value for UOMYear for {0} ({1}).'.format(name, student_id, course))
# exclude postgrads
for e in exclude_postgrad:
if e in course:
messages.append('Excluded {0} ({1}) for Post Grad course {2}.'.format(name, student_id, course))
return (messages, None)
# fix Mobile phone number
if len(mobile) == 0:
messages.append('Missing phone number for {0} ({1}).'.format(name, student_id))
mobile = None
else:
mobile = mobile.replace(' ', '')
mobile = re.sub('^\+61', '0', mobile)
mobile = re.sub('^61', '0', mobile)
if mobile.startswith('00') or mobile.startswith('+'):
messages.append('International phone number for {0} ({1}): {2}. Setting to Blank.'.format(name, student_id, mobile))
mobile = ''
else:
# Excel sometimes drops the leading zero.
if len(mobile) is 9:
mobile = '0' + mobile
# An Australian mobile number must be 10 characters long.
if len(mobile) is not 10:
messages.append('Incorrect mobile number for {0} ({1}): {2}. Setting to Blank.'.format(name, student_id, mobile))
mobile = ''
else:
mobile = '{0} {1} {2}'.format(mobile[0:4],mobile[4:7], mobile[7:10])
# fix email address
if email_test is not None:
if email_test not in email:
messages.append('Missing or incorrect {0} email address {1} ({2}): {3}. Fixing.'.format(email_test, name, student_id, email))
if email_domain is not None:
email = '{0}@{1}'.format(student_id, email_domain)
else:
email = None
# create the new row
new_row = {
'synergetic_id': synergetic_id,
'first_name': first_name,
'last_name': last_name,
'student_id': student_id,
'email': email,
'year': year,
'mobile': mobile
}
return (messages, new_row)
def add_years_to_student_records(self, years, student_years, csv_reader, test=False):
"""
Add the student year level as a training module for each student found in the import_csv file.
A training module is used because it is conveniently placed in the Deputy UI for Employee's.
The year level will not be changed if it is correct. It will be changed if it is different in
the CSV. It will be added if currently not specified.
student_years = {employee_id: year_text}. For example, year_text=Year2
Anyone excluded by parse_student_record() will NOT be updated, e.g. TCAC members and co-ordinators
Returns an array of processing messages.
May raise DeputyException.
"""
messages = []
employees = self.employee_by_email()
count = 0
not_found_count = 0
already_count = 0
added_count = 0
for in_row in csv_reader:
# parse record but discard any messages
(row_messages, parsed_row) = self.parse_student_record(in_row)
if parsed_row is None:
continue
messages.extend(row_messages)
email = parsed_row['email']
name = '{0} {1}'.format(parsed_row['first_name'], parsed_row['last_name'])
if email not in employees:
messages.append('Not Found: {0} ({1})'.format(name, email))
not_found_count += 1
continue
count += 1
employee = employees[email]
employee_id = employee['Id']
year = parsed_row['year']
if year is None:
continue
# dont add year if one already exists and is the correct year
# print(employee_id, employee_id in student_years, student_years[employee_id][0], year)
if employee_id in student_years:
if student_years[employee_id][0] == year:
already_count += 1
continue
else:
# remove incorrect year
if test:
messages.append(f'[test] Deleted old year for {name} ({employee_id})')
else:
api_resp = college.api('resource/TrainingRecord/{0}'.format(student_years[employee_id][1]), method='DELETE')
training_module = years[year]
if test:
messages.append(f'[test] Student {name} ({employee_id}) is in {year}')
# Add training module Years1/2/3/1NR for each student
# TODO FIX the date...
data = {
'Employee': employee_id,
'Module': training_module,
'TrainingDate': datetime.datetime.now().isoformat(),
'Active': True
}
if not test:
api_resp = self.api('resource/TrainingRecord', method='POST', data=data)
added_count += 1
messages.append('Processed {0} students.'.format(count))
messages.append('{0} students not found in Deputy.'.format(not_found_count))
messages.append('{0} students already had a year level set.'.format(already_count))
messages.append('Added year level to {0} students.'.format(added_count))
return messages
def delete_users(self, employees_by_email, student_years, csv_reader, use_csv=True, test=False):
"""
Delete (i.e. set active to false) any student who is not in import_csv.
The student to be deleted must:
-- be an employee !
-- must not be in input_csv (student_by_email)
-- must have a Year1/Year2/Year3/Year1NR training record
Returns an array of processing messages.
May raise DeputyException.
if use_csv is False then don't check students in the CSV file. Useful for end of year processing.
"""
messages = []
deleted_count = 0
student_by_email = {}
if use_csv:
for in_row in csv_reader:
# parse record but discard any messages
(messages, parsed_row) = self.parse_student_record(in_row)
if parsed_row is None:
continue
email = parsed_row['email']
name = '{0} {1}'.format(parsed_row['first_name'], parsed_row['last_name'])
student_by_email[email] = name
for employee_email in employees_by_email:
if employee_email not in student_by_email:
student = employees_by_email[employee_email]
student_id = student['Id']
student_name = student['DisplayName']
if student_id in student_years:
#print(student_id, student_name, student_years[student_id][0])
#print(json.dumps(student, sort_keys=True, indent=4, separators=(',', ': ')))
if test:
messages.append(f'[test] Deleted student: {student_name} {student_id}')
else:
messages.append(f'Deleted student: {student_name} {student_id}')
api_resp = college.api(f'resource/Employee/{student_id}', method='POST', data={'Active': False})
#messages.append('API response: {0}'.format(api_resp))
deleted_count += 1
messages.append('Processed {0} students.'.format(len(student_by_email)))
messages.append('{0} students deleted.'.format(deleted_count))
return messages
def reinstate_users(self, employees_by_email, student_years, csv_reader, test=True):
"""
Reinstate (i.e. set active to true) any student who is in import_csv.
The student to be reinstated must:
-- be an employee !
-- must be in input_csv (student_by_email)
-- must have a Year1/Year2/Year3/Year1NR training record
Returns an array of processing messages.
May raise DeputyException.
"""
messages = []
reinstated_count = 0
# Get a list of students
student_by_email = {}
for in_row in csv_reader:
# parse record but discard any messages
(messages, parsed_row) = self.parse_student_record(in_row)
if parsed_row is None:
continue
email = parsed_row['email']
name = '{0} {1}'.format(parsed_row['first_name'], parsed_row['last_name'])
student_by_email[email] = name
for employee_email in employees_by_email:
if employee_email in student_by_email:
student = employees_by_email[employee_email]
student_id = student['Id']
student_name = student['DisplayName']
if student_id in student_years:
#print(student_id, student_name, student_years[student_id][0])
#print(json.dumps(student, sort_keys=True, indent=4, separators=(',', ': ')))
messages.append('Reinstated student: {0} {1}'.format(student_name, student_id))
api_resp = college.api('resource/Employee/{0}'.format(student_id), method='POST', data={'Active': True})
messages.append('API response: {0}'.format(api_resp))
reinstated_count += 1
messages.append('Processed {0} students.'.format(len(student_by_email)))
messages.append('{0} students reinstated.'.format(reinstated_count))
return messages
def years(self):
"""
This is a college specific method to fetch the training module id labels.
Returns something like:
{ "Year1": 4,
"Year2": 6,
"Year3": 7 }
May raise DeputyException.
"""
api_resp = self.resource('TrainingModule')
years = {}
for tmi in api_resp:
tm = api_resp[tmi]
if tm['Title'] =='Year 3':
# ignore historical error
continue
if tm['Title'].startswith('Year'):
years[tm['Title']] = tm['Id']
#self.stats.append(self.Stat('Training Modules', 'Training Modules', len(api_resp)))
self.stats.append(self.Stat('years', 'Years', len(years)))
return years
def student_years(self):
"""
This is a college specific method.
Return a hash of year levels for each student (EmployeeId) who has a year assigned:
[{employee_id:(year_level, training_record_id)}]
Assumes only one year per student.
May raise DeputyException.
"""
# invert the list
year_list = {}
years = self.years()
for year in years:
year_list[years[year]] = year
training_records = {}
api_resp = self.resource('TrainingRecord')
for record_i in api_resp:
record = api_resp[record_i]
if record['Module'] in year_list:
training_records[record['Employee']] = (year_list[record['Module']], record['Id'])
self.stats.append(self.Stat('training_records', 'Training Records', len(api_resp)))
self.stats.append(self.Stat('training_records_wm', 'Training Records (with Module)', len(training_records)))
return training_records
def bursary_student_list(self, include_list):
"""
List of Bursary Students.
A Bursary Student is an employee who has a training record that includes Year1/2/3/1NR.
"""
students = self.employees(join=['ContactObject'])
student_years = self.student_years() # e.g. for each student "709": ["Year3",7]
Student = collections.namedtuple('Student', ['Id', 'Name', 'Year', 'Email'])
result = []
no_year_count = 0
no_student_years = 0
for student_id in students:
student = students[student_id]
name = student['DisplayName']
email_address = student['ContactObject']['Email']
if email_address.split('@')[0] in include_list:
#print('include (2)', student_id)
result.append(Student(student_id, name, '', email_address))
else:
if student_id in student_years:
year = student_years[student_id][0]
else:
no_student_years += 1
continue
if year is None:
no_year_count += 1
else:
result.append(Student(student_id, name, year, email_address))
self.stats.append(self.Stat('students', 'Active Deputy Employees', len(students)))
self.stats.append(self.Stat('no_year_count', 'Employees with no Year', no_year_count))
self.stats.append(self.Stat('no_student_years', 'Student not in student_years', no_student_years))
self.stats.append(self.Stat('bursary_students', 'Bursary Students', len(result)))
return result
def deputy_journal_entries(self, start_date=None, end_date=None):
"""
List of Journal Entries for active employee's.
Journal entries are selected by Date (yyyy-mm-dd) between start_date and end_date.
"""
employees = self.employees(join=['ContactObject'])
journals = self.resource('Journal',
select=[
('Date', 'ge', start_date),
('Date', 'le', end_date)
])
Journal = collections.namedtuple('Journal', ['Date', 'Name', 'Email', 'Category', 'Comment', 'Creator'])
result = []
for journal_id in journals:
journal = journals[journal_id]
employee_id = journal['EmployeeId']
if employee_id in employees:
# only bother about employees that are still active
employee = employees[employee_id]
name = employee['DisplayName']
date = journal['Date'][0:10] # just the date
comment = journal['Comment']
if len(journal['Category']) > 0:
# assume only one used for now
category = journal['Category'][0]['Category']
else:
category = ''
email = employee['ContactObject']['Email']
if journal['Creator'] in employees:
# employee who created the record is still active
creator = employees[journal['Creator']]['DisplayName']
else:
creator = ''
result.append(Journal(date, name, email, category, comment, creator))
self.stats.append(self.Stat('journal_entries', 'Journal Entries', len(result)))
return result
def student_timesheet_count(self, location_name, start_date=None, end_date=None):
"""
Return a count of approved and non-approved, non-leave timesheets by employee_id.
Timesheets are selected by Date (yyyy-mm-dd) between start_date and end_date.
"""
timesheets = self.resource('Timesheet', join=['OperationalUnitObject'],
select=[
('Employee', 'ne', 0),
('Date', 'ge', start_date),
('Date', 'le', end_date)
])
students = Counter()
students.add_counter('timesheet', 'Timesheet')
students.add_counter('approved_timesheet', 'Approved Timesheet')
for id in timesheets:
timesheet = timesheets[id]
# ignore if there is no location or it's not a match
if location_name is not None:
if timesheet['OperationalUnitObject']['CompanyName'] != location_name:
continue
# # make sure someone approved then
# if not timesheet['TimeApproved']:
# continue
# make sure they are not a leave timesheet
if timesheet['IsLeave']:
continue
employee_id = timesheet['Employee']
students.count(employee_id, 'timesheet')
if timesheet['TimeApproved']:
students.count(employee_id, 'approved_timesheet')
return students
def student_roster_count(self, location_name, start_date=None, end_date=None):
"""
Return a count of approved, non-leave timesheets by Employee for the selected location.
Employee may be for an inactive employee.
Employee is ignored if it is zero.
Rosters are selected by Date (yyyy-mm-dd) between start_date and end_date.
"""
rosters = self.resource('Roster', join=['OperationalUnitObject'],
select=[
('Employee', 'ne', 0),
('Date', 'ge', start_date),
('Date', 'le', end_date)
])
students = Counter()
students.add_counter('rostered', 'Rosters Rostered')
students.add_counter('completed', 'Rosters Completed')
students.add_counter('open', 'Rosters Open')
for id in rosters:
roster = rosters[id]
# ignore if there is no location or it's not a match
if location_name is not None:
if roster['OperationalUnitObject']['CompanyName'] != location_name:
continue
employee_id = roster['Employee']
timesheet = roster['MatchedByTimesheet']
students.count(employee_id, 'rostered')
if timesheet > 0:
students.count(employee_id, 'completed')
if roster['Open']:
students.count(employee_id, 'open')
# if employee_id == 1022:
# print(employee_id, students[1022])
# print(roster, '\n\n')
self.stats.append(self.Stat('rosters', 'Rosters (for all locations)', len(rosters)))
self.stats.append(self.Stat('students', 'Rosters with Students', len(students)))
for total in students.get_totals():
self.stats.append(self.Stat(*total))
return students
def student_report(self, obligation_by_year, location_name, include_list, start_date=None, end_date=None):
"""
Student roster data.
Timesheet data exists but is not currently used.
Rosters are selected by Date between start_date and end_date.
"""
# fetch 'Student', ['Id', 'Name', 'Year', 'Email']
students = self.bursary_student_list(include_list)
# count approved and non-approved, non-leave 'timesheet'
student_timesheet_count = self.student_timesheet_count(location_name, start_date=start_date, end_date=end_date)
# count 'rostered', 'completed', 'open' rosters
student_roster_count = self.student_roster_count(location_name, start_date=start_date, end_date=end_date)
roster_stats = self.stats
#print(json.dumps(roster_stats, indent=4, separators=(',', ': ')))
Report = collections.namedtuple('Report', ['Name', 'Year', 'Obligation', 'Rostered', 'Open', 'Completed',
'PercentRostered', 'PercentCompleted', 'Issues', 'Email', 'Timesheets', 'ApprovedTimesheets'])
counts = Counter()
counts.add_counter('Year1', 'Students in Year 1')
counts.add_counter('Year2', 'Students in Year 2')
counts.add_counter('Year3', 'Students in Year 3')
counts.add_counter('Year1NR', 'Students in Year 1NR')
counts.add_counter('roster_rostered_count', 'Rostered')
counts.add_counter('roster_completed_count', 'Completed Rosters')
counts.add_counter('roster_open_count', 'Open Rosters')
counts.add_counter('timesheet_count', 'Timesheets')
counts.add_counter('approved_timesheet_count', 'Approved Timesheets')
# write out the sorted list of results with a percentage complete
# loop using student_list because it is sorted and therefore the report will be sorted.
result = []
for student in students:
if student.Id in student_roster_count:
src = student_roster_count[student.Id]
else:
src = {'rostered': 0, 'completed': 0, 'open': 0}
if student.Id in student_timesheet_count:
stc = student_timesheet_count[student.Id]
else:
stc = {'approved_timesheet': 0, 'timesheet': 0}
counts.count(id='roster_rostered_count', increment=src['rostered'])
counts.count(id='roster_completed_count', increment=src['completed'])
counts.count(id='roster_open_count', increment=src['open'])
counts.count(id='timesheet_count', increment=stc['timesheet'])
counts.count(id='approved_timesheet_count', increment=stc['approved_timesheet'])
if student.Email.split('@')[0] in include_list:
obligation = 0
else:
try:
obligation = int(obligation_by_year[student.Year])
except KeyError:
print('Year Level data error ({}) for {}'.format(student.Year, student.Name))
print('Fix the error before proceeding.')
sys.exit(1)
counts.count(id=student.Year)
issues = ''
if student.Email.split('@')[0] in include_list:
percentage_rostered = ''
percentage_complete = ''
else:
percentage_rostered = '{0:.0f}%'.format(((0.0+src['rostered'])/obligation)*100.0)
if (0.0+src['rostered'])/obligation < 1:
issues = 'Incomplete roster. '
percentage_complete = '{0:.0f}%'.format(((0.0+src['completed'])/obligation)*100.0)
if (0.0+src['completed'])/obligation < 1:
issues += 'Outstanding Shifts.'
result.append(Report(student.Name, student.Year, obligation,
src['rostered'], src['open'], src['completed'], percentage_rostered,
percentage_complete, issues, student.Email, stc['timesheet'], stc['approved_timesheet']))
# and some summary info