forked from simonrob/email-oauth2-proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
emailproxy.py
executable file
·3454 lines (2959 loc) · 195 KB
/
emailproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# vim: ts=4 sw=4 et ai si
"""A simple IMAP/POP/SMTP proxy that intercepts authenticate and login commands, transparently replacing them with OAuth
2.0 authentication. Designed for apps/clients that don't support OAuth 2.0 but need to connect to modern servers."""
__author__ = 'Simon Robinson'
__copyright__ = 'Copyright (c) 2024 Simon Robinson'
__license__ = 'Apache 2.0'
__version__ = '2024-07-29' # ISO 8601 (YYYY-MM-DD)
__package_version__ = '.'.join([str(int(i)) for i in __version__.split('-')]) # for pyproject.toml usage only
import abc
import argparse
import base64
import binascii
import configparser
import contextlib
import datetime
import enum
import errno
import io
import ipaddress
import json
import logging
import logging.handlers
import os
import pathlib
import platform
import plistlib
import queue
import re
import select
import signal
import socket
import ssl
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
import warnings
import wsgiref.simple_server
import wsgiref.util
import zlib
# asyncore is essential, but has been deprecated and will be removed in python 3.12 (see PEP 594)
# pyasyncore is our workaround, so suppress this warning until the proxy is rewritten in, e.g., asyncio
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
import asyncore
# for encrypting/decrypting the locally-stored credentials
from cryptography.fernet import Fernet, MultiFernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
# by default the proxy is a GUI application with a menu bar/taskbar icon, but it is also useful in 'headless' contexts
# where not having to install GUI-only requirements can be helpful - see the proxy's readme (the `--no-gui` option)
MISSING_GUI_REQUIREMENTS = []
no_gui_parser = argparse.ArgumentParser(add_help=False)
no_gui_parser.add_argument('--no-gui', action='store_false', dest='gui')
no_gui_args = no_gui_parser.parse_known_args()[0]
if no_gui_args.gui:
try:
import pystray # the menu bar/taskbar GUI
except Exception as gui_requirement_import_error: # see #204 - incomplete pystray installation can throw exceptions
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
no_gui_args.gui = False # we need the dummy implementation
if not no_gui_args.gui:
class DummyPystray: # dummy implementation allows initialisation to complete
class Icon:
pass
pystray = DummyPystray # this is just to avoid unignorable IntelliJ warnings about naming and spacing
del no_gui_parser
del no_gui_args
try:
# noinspection PyUnresolvedReferences
from PIL import Image, ImageDraw, ImageFont # draw the menu bar icon from the TTF font stored in APP_ICON
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
try:
# noinspection PyUnresolvedReferences
import timeago # the last authenticated activity hint
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
try:
# noinspection PyUnresolvedReferences
import webview # the popup authentication window (in default and GUI `--external-auth` modes only)
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
try:
# pylint: disable-next=ungrouped-imports
import importlib.metadata as importlib_metadata # get package version numbers - available in stdlib from python 3.8
except ImportError:
try:
# noinspection PyUnresolvedReferences
import importlib_metadata
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
try:
# noinspection PyUnresolvedReferences
import packaging.version # parse package version numbers - used to work around various GUI-only package issues
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
# for macOS-specific functionality
if sys.platform == 'darwin':
try:
# PyUnresolvedReferences; see: youtrack.jetbrains.com/issue/PY-11963 (same for others with this suppression)
# noinspection PyPackageRequirements,PyUnresolvedReferences
import PyObjCTools.MachSignals # SIGTERM handling (only needed in GUI mode; `signal` is sufficient otherwise)
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
try:
# noinspection PyPackageRequirements,PyUnresolvedReferences
import SystemConfiguration # network availability monitoring
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
try:
# noinspection PyPackageRequirements
import AppKit # retina icon, menu update on click, native notifications and receiving system events
except ImportError as gui_requirement_import_error:
MISSING_GUI_REQUIREMENTS.append(gui_requirement_import_error)
class AppKit: # dummy implementation allows initialisation to complete
class NSObject:
pass
APP_NAME = 'Email OAuth 2.0 Proxy'
APP_SHORT_NAME = 'emailproxy'
APP_PACKAGE = 'ac.robinson.email-oauth2-proxy'
# noinspection SpellCheckingInspection
APP_ICON = b'''eNp1Uc9rE0EUfjM7u1nyq0m72aQxpnbTbFq0TbJNNkGkNpVKb2mxtgjWsqRJU+jaQHOoeMlVeoiCHqQXrwX/gEK9efGgNy+C4MWbHjxER
DCJb3dTUdQH733zvW/ezHszQADAAy3gIFO+kdbW3lXWAUgRs2sV02igdoL8MfLctrHf6PeBAXBe5OL27r2acry6hPprdLleNbbiXfkUtRfoeh0T4gaju
O6gT9TN5gEWo5GHGNjuXsVAPET+yuKmcdAAETaRR5BfuGuYVRCs/fQjBqGxt98En80/WzpYvaN3tPsvN4eufAWPc/r707dvLPyg/PiCcMSAq1n9AgXHs
MbeedvZz+zMH0YGZ99x7v9LxwyzpuBBpA8oTg9tB8kn0IiIHQLPwT9tuba4BfNQhervPZzdMGBWp1a9hJHYyHBeS2Y2r+I/2LF/9Ku3Q7tXZ9ogJKEEN
+EWbODRqpoaFwRXUJbDvK4Xghlek+WQ5KfKDM3N0dlshiQEQVHzuYJeKMxRVMNhWRISClYmc6qaUPxUitNZTdfz2QyfcmXIOK8xoOZKt7ViUkRqYXekW
J6Sp0urC5fCken5STr0KDoUlyhjVd4nxSUvq3tCftEn8r2ro+mxUDIaCMQmQrGZGHmi53tAT3rPGH1e3qF0p9w7LtcohwuyvnRxWZ8sZUej6WvlhXSk1
7k+POJ1iR73N/+w2xN0f4+GJcHtfqoWzgfi6cuZscC54lSq3SbN1tmzC4MXtcwN/zOC78r9BIfNc3M=''' # TTF ('e') -> zlib -> base64
CENSOR_CREDENTIALS = True
CENSOR_MESSAGE = b'[[ Credentials removed from proxy log ]]' # replaces actual credentials; must be a byte-type string
script_path = sys.executable if getattr(sys, 'frozen', False) else os.path.realpath(__file__) # for pyinstaller etc
if sys.platform == 'darwin' and '.app/Contents/MacOS/' in script_path: # pyinstaller .app binary is within the bundle
if float('.'.join(platform.mac_ver()[0].split('.')[:2])) >= 10.12: # need a known path (due to App Translocation)
script_path = pathlib.Path('~/.%s/%s' % (APP_SHORT_NAME, APP_SHORT_NAME)).expanduser()
else:
script_path = '.'.join(script_path.split('Contents/MacOS/')[0].split('/')[:-1])
script_path = os.getcwd() if __package__ is not None else os.path.dirname(script_path) # for packaged version (PyPI)
CONFIG_FILE_PATH = CACHE_STORE = os.path.join(script_path, '%s.config' % APP_SHORT_NAME)
CONFIG_SERVER_MATCHER = re.compile(r'^(?P<type>(IMAP|POP|SMTP))-(?P<port>\d+)$')
del script_path
MAX_CONNECTIONS = 0 # maximum concurrent IMAP/POP/SMTP connections; 0 = no limit; limit is per server
RECEIVE_BUFFER_SIZE = 65536 # number of bytes to try to read from the socket at a time (limit is per socket)
MAX_SSL_HANDSHAKE_ATTEMPTS = 1024 # number of attempts before aborting SSL/TLS handshake (max 10ms each); 0 = no limit
# IMAP/POP/SMTP require \r\n as a line terminator (we use lines only pre-authentication; afterwards just pass through)
LINE_TERMINATOR = b'\r\n'
LINE_TERMINATOR_LENGTH = len(LINE_TERMINATOR)
# seconds to wait before cancelling authentication requests (i.e., the user has this long to log in) - note that the
# actual server timeout is often around 60 seconds, so the connection may be closed in the background and immediately
# disconnect after login completes; however, the login credentials will still be saved and used for future requests
AUTHENTICATION_TIMEOUT = 600
TOKEN_EXPIRY_MARGIN = 600 # seconds before its expiry to refresh the OAuth 2.0 token
JWT_LIFETIME = 300 # seconds to add to the current time and use for the `exp` value in JWT certificate credentials
LOG_FILE_MAX_SIZE = 32 * 1024 * 1024 # when using a log file, its maximum size in bytes before rollover (0 = no limit)
LOG_FILE_MAX_BACKUPS = 10 # the number of log files to keep when LOG_FILE_MAX_SIZE is exceeded (0 = disable rollover)
IMAP_TAG_PATTERN = r'[!#$&\',-\[\]-z|}~]+' # https://ietf.org/rfc/rfc9051.html#name-formal-syntax
IMAP_AUTHENTICATION_REQUEST_MATCHER = re.compile('^(?P<tag>%s) (?P<command>(LOGIN|AUTHENTICATE)) '
'(?P<flags>.*)$' % IMAP_TAG_PATTERN, flags=re.IGNORECASE)
IMAP_LITERAL_MATCHER = re.compile(r'^{(?P<length>\d+)(?P<continuation>\+?)}$')
IMAP_CAPABILITY_MATCHER = re.compile(r'^\* (?:OK \[)?CAPABILITY .*$', flags=re.IGNORECASE) # note: '* ' *and* '* OK ['
REQUEST_QUEUE = queue.Queue() # requests for authentication
RESPONSE_QUEUE = queue.Queue() # responses from user
QUEUE_SENTINEL = object() # object to send to signify queues should exit loops
MENU_UPDATE = object() # object to send to trigger a force-refresh of the GUI menu (new catch-all account added)
PLIST_FILE_PATH = pathlib.Path('~/Library/LaunchAgents/%s.plist' % APP_PACKAGE).expanduser() # launchctl file location
CMD_FILE_PATH = pathlib.Path('~/AppData/Roaming/Microsoft/Windows/Start Menu/Programs/Startup/%s.cmd' %
APP_PACKAGE).expanduser() # Windows startup .cmd file location
AUTOSTART_FILE_PATH = pathlib.Path('~/.config/autostart/%s.desktop' % APP_PACKAGE).expanduser() # XDG Autostart file
# noinspection SpellCheckingInspection
SECURE_SERVER_ICON = '''iVBORw0KGgoAAAANSUhEUgAAABYAAAAWCAYAAADEtGw7AAAApElEQVR4Ae3VsQ2DMBBA0ZQs4NIreA03GSbyAl6DAbyN+xvh
Ovp0yY9EkQZ8XELHSa+x0S9OAm75cT+F+UFm+vhbmClQLCtF+SnMNAji11lcz5orzCQopo21KJIn3FB37iuaJ9yRd+4zuicsSINViSesyEgbMtQcZgIE
TyNBsIQrXgdVS3h2hGdf+Apf4eIIF+ub16FYBhQd4ci3IiAOBP8/z+kNGUS6hBN6UlIAAAAASUVORK5CYII=''' # 22px SF Symbols lock.fill
EXTERNAL_AUTH_HTML = '''<html><head><script type="text/javascript">function copyLink(targetLink){
var copySource=document.createElement('textarea');copySource.value=targetLink;copySource.style.position='absolute';
copySource.style.left='-9999px';document.body.appendChild(copySource);copySource.select();
document.execCommand('copy');document.body.removeChild(copySource);
document.getElementById('copy').innerText='✔'}</script><style type="text/css">body{margin:20px auto;line-height:1.3;
font-family:sans-serif;font-size:16px;color:#444;padding:0 24px}</style></head><body>
<h3 style="margin:0.3em 0;">Login authorisation request for %s</h3>
<p style="margin-top:0">Click the following link to open your browser and approve the request:</p>
<p><a href="%s" target="_blank" style="word-wrap:break-word;word-break:break-all">%s</a>
<a id="copy" onclick="copyLink('%s')" style="margin-left:0.5em;margin-top:0.1em;font-weight:bold;font-size:150%%;
text-decoration:none;cursor:pointer;float:right" title="Copy link">⧉</a></p>
<p style="margin-top:2em">After logging in and successfully authorising your account, paste and submit the
resulting URL from the browser's address bar using the box at the bottom of this page to allow the %s script to
transparently handle login requests on your behalf in future.</p>
<p>Note that your browser may show a navigation error (e.g., <em>"localhost refused to connect"</em>) after
successfully logging in, but the final URL is the only important part, and as long as this begins with the
correct redirection URI and contains a valid authorisation code your email client's request will succeed.''' + (
' If you are using Windows, submitting can take a few seconds.' if sys.platform == 'win32' else '') + '''</p>
<p style="margin-top:2em">According to your proxy configuration file, the expected URL will be of the form:</p>
<p><pre>%s <em>[...]</em> code=<em><strong>[code]</strong> [...]</em></em></pre></p>
<form name="auth" onsubmit="window.location.assign(document.forms.auth.url.value);
document.auth.submit.value='Submitting...'; document.auth.submit.disabled=true; return false">
<div style="display:flex;flex-direction:row;margin-top:4em"><label for="url">Authorisation success URL:
</label><input type="text" name="url" id="url" style="flex:1;margin:0 5px;width:65%%"><input type="submit"
id="submit" value="Submit"></div></form></body></html>'''
EXITING = False # used to check whether to restart failed threads - is set to True if the user has requested to exit
class Log:
"""Simple logging to syslog/Console.app on Linux/macOS and to a local file on Windows"""
_LOGGER = None
_HANDLER = None
_DATE_FORMAT = '%Y-%m-%d %H:%M:%S:'
_SYSLOG_MESSAGE_FORMAT = '%s: %%(message)s' % APP_NAME
_MACOS_USE_SYSLOG = False
@staticmethod
def initialise(log_file=None):
Log._LOGGER = logging.getLogger(APP_NAME)
if log_file or sys.platform == 'win32':
handler = logging.handlers.RotatingFileHandler(
log_file or os.path.join(os.getcwd() if __package__ is not None else
os.path.dirname(sys.executable if getattr(sys, 'frozen', False) else
os.path.realpath(__file__)), '%s.log' % APP_SHORT_NAME),
maxBytes=LOG_FILE_MAX_SIZE, backupCount=LOG_FILE_MAX_BACKUPS)
handler.setFormatter(logging.Formatter('%(asctime)s: %(message)s'))
elif sys.platform == 'darwin':
# noinspection PyPackageRequirements
import pyoslog # for macOS-specific unified logging
Log._MACOS_USE_SYSLOG = not pyoslog.is_supported()
if Log._MACOS_USE_SYSLOG: # syslog prior to 10.12
handler = logging.handlers.SysLogHandler(address='/var/run/syslog')
handler.setFormatter(logging.Formatter(Log._SYSLOG_MESSAGE_FORMAT))
else: # unified logging in 10.12+
handler = pyoslog.Handler()
handler.setSubsystem(APP_PACKAGE)
else:
if os.path.exists('/dev/log'):
handler = logging.handlers.SysLogHandler(address='/dev/log')
handler.setFormatter(logging.Formatter(Log._SYSLOG_MESSAGE_FORMAT))
else:
handler = logging.StreamHandler()
Log._HANDLER = handler
Log._LOGGER.addHandler(Log._HANDLER)
Log.set_level(logging.INFO)
@staticmethod
def get_level():
return Log._LOGGER.getEffectiveLevel()
@staticmethod
def set_level(level):
# set both handler and logger level as we just want a direct mapping input->output
Log._HANDLER.setLevel(level)
Log._LOGGER.setLevel(level)
@staticmethod
def _log(level_method, level, *args):
message = ' '.join(map(str, args))
if Log.get_level() <= level:
print(datetime.datetime.now().strftime(Log._DATE_FORMAT), message)
if len(message) > 2048 and (sys.platform not in ['win32', 'darwin'] or Log._MACOS_USE_SYSLOG):
truncation_message = ' [ NOTE: message over syslog length limit truncated to 2048 characters; run `%s' \
' --debug` in a terminal to see the full output ] ' % os.path.basename(__file__)
message = message[0:2048 - len(Log._SYSLOG_MESSAGE_FORMAT) - len(truncation_message)] + truncation_message
# note: need LOG_ALERT (i.e., warning) or higher to show in syslog on macOS
severity = Log._LOGGER.warning if Log._MACOS_USE_SYSLOG else level_method
severity(message)
@staticmethod
def debug(*args):
Log._log(Log._LOGGER.debug, logging.DEBUG, *args)
@staticmethod
def info(*args):
Log._log(Log._LOGGER.info, logging.INFO, *args)
@staticmethod
def error(*args):
Log._log(Log._LOGGER.error, logging.ERROR, *args)
@staticmethod
def error_string(error):
return getattr(error, 'message', repr(error))
@staticmethod
def format_host_port(address):
host, port, *_ = address
with contextlib.suppress(ValueError):
ip = ipaddress.ip_address(host)
host = '[%s]' % host if isinstance(ip, ipaddress.IPv6Address) else host
return '%s:%d' % (host, port)
@staticmethod
def get_last_error():
error_type, value, _traceback = sys.exc_info()
del _traceback # used to be required in python 2; may no-longer be needed, but best to be safe
return error_type, value # note that if no exception has currently been raised, this will return `None, None`
class CacheStore(abc.ABC):
"""Override this class to provide additional cache store options for a dictionary of OAuth 2.0 credentials, then add
an entry in AppConfig's `_EXTERNAL_CACHE_STORES` to make them available via the proxy's `--cache-store` parameter"""
@staticmethod
@abc.abstractmethod
def load(store_id):
return {}
@staticmethod
@abc.abstractmethod
def save(store_id, config_dict):
pass
class AWSSecretsManagerCacheStore(CacheStore):
# noinspection PyUnresolvedReferences
@staticmethod
def _get_boto3_client(store_id):
try:
# noinspection PyGlobalUndefined
global boto3, botocore
import boto3
import botocore.exceptions
except ModuleNotFoundError:
Log.error('Unable to load AWS SDK - please install the `boto3` module: `python -m pip install boto3`')
return None, None
# allow a profile to be chosen by prefixing the store_id - the separator used (`||`) will not be in an ARN
# or secret name (see: https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_CreateSecret.html)
split_id = store_id.split('||', maxsplit=1)
if '||' in store_id:
return split_id[1], boto3.session.Session(profile_name=split_id[0]).client('secretsmanager')
return store_id, boto3.client(service_name='secretsmanager')
@staticmethod
def _create_secret(aws_client, store_id):
if store_id.startswith('arn:'):
Log.info('Creating new AWS Secret "%s" failed - it is not possible to choose specific ARNs for new secrets')
return False
try:
aws_client.create_secret(Name=store_id, ForceOverwriteReplicaSecret=False)
Log.info('Created new AWS Secret "%s"' % store_id)
return True
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'AccessDeniedException':
AWSSecretsManagerCacheStore._log_error(
'Creating new AWS Secret "%s" failed - access denied: does the IAM user have the '
'`secretsmanager:CreateSecret` permission?' % store_id, e)
else:
AWSSecretsManagerCacheStore._log_error('Creating new AWS Secret "%s" failed with an unexpected error; '
'see the proxy\'s debug log' % store_id, e)
return False
@staticmethod
def _log_error(error_message, debug_error):
Log.debug('AWS %s: %s' % (debug_error.response['Error']['Code'], debug_error.response['Error']['Message']))
Log.error(error_message)
@staticmethod
def load(store_id):
store_id, aws_client = AWSSecretsManagerCacheStore._get_boto3_client(store_id)
if aws_client:
try:
Log.debug('Requesting credential cache from AWS Secret "%s"' % store_id)
retrieved_secrets = json.loads(aws_client.get_secret_value(SecretId=store_id)['SecretString'])
Log.info('Fetched', len(retrieved_secrets), 'cached account entries from AWS Secret "%s"' % store_id)
return retrieved_secrets
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
Log.info('AWS Secret "%s" does not exist - attempting to create it' % store_id)
AWSSecretsManagerCacheStore._create_secret(aws_client, store_id)
elif error_code == 'AccessDeniedException':
AWSSecretsManagerCacheStore._log_error(
'Fetching AWS Secret "%s" failed - access denied: does the IAM user have the '
'`secretsmanager:GetSecretValue` permission?' % store_id, e)
else:
AWSSecretsManagerCacheStore._log_error(
'Fetching AWS Secret "%s" failed - unexpected error; see the proxy debug log' % store_id, e)
else:
Log.error('Unable to get AWS SDK client; cannot fetch credentials from AWS Secrets Manager')
return {}
@staticmethod
def save(store_id, config_dict, create_secret=True):
store_id, aws_client = AWSSecretsManagerCacheStore._get_boto3_client(store_id)
if aws_client:
try:
Log.debug('Saving credential cache to AWS Secret "%s"' % store_id)
aws_client.put_secret_value(SecretId=store_id, SecretString=json.dumps(config_dict))
Log.info('Cached', len(config_dict), 'account entries to AWS Secret "%s"' % store_id)
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException' and create_secret:
Log.info('AWS Secret "%s" does not exist - attempting to create it' % store_id)
if AWSSecretsManagerCacheStore._create_secret(aws_client, store_id):
AWSSecretsManagerCacheStore.save(store_id, config_dict, create_secret=False)
elif error_code == 'AccessDeniedException':
AWSSecretsManagerCacheStore._log_error(
'Caching to AWS Secret "%s" failed - access denied: does the IAM user have the '
'`secretsmanager:PutSecretValue` permission?' % store_id, e)
else:
AWSSecretsManagerCacheStore._log_error(
'Caching to AWS Secret "%s" failed - unexpected error; see the proxy debug log' % store_id, e)
else:
Log.error('Unable to get AWS SDK client; cannot cache credentials to AWS Secrets Manager')
class ConcurrentConfigParser:
"""Helper wrapper to add locking to a ConfigParser object (note: only wraps the methods used in this script)"""
def __init__(self):
self.config = configparser.ConfigParser(interpolation=None)
self.lock = threading.Lock()
def read(self, filename):
with self.lock:
self.config.read(filename)
def sections(self):
with self.lock:
return self.config.sections()
def add_section(self, section):
with self.lock:
self.config.add_section(section)
def get(self, section, option, fallback=None):
with self.lock:
return self.config.get(section, option, fallback=fallback)
def getint(self, section, option, fallback=None):
with self.lock:
return self.config.getint(section, option, fallback=fallback)
def getboolean(self, section, option, fallback=None):
with self.lock:
return self.config.getboolean(section, option, fallback=fallback)
def set(self, section, option, value):
with self.lock:
self.config.set(section, option, value)
def remove_option(self, section, option):
with self.lock:
self.config.remove_option(section, option)
def write(self, file):
with self.lock:
self.config.write(file)
def items(self):
with self.lock:
return self.config.items() # used in read_dict when saving to cache store
class AppConfig:
"""Helper wrapper around ConfigParser to cache servers/accounts, and avoid writing to the file until necessary"""
_PARSER = None
_PARSER_LOCK = threading.Lock()
# note: removing the unencrypted version of `client_secret_encrypted` is not automatic with --cache-store (see docs)
_CACHED_OPTION_KEYS = ['access_token', 'access_token_expiry', 'refresh_token', 'token_salt', 'token_iterations',
'client_secret_encrypted', 'last_activity']
# additional cache stores may be implemented by extending CacheStore and adding a prefix entry in this dict
_EXTERNAL_CACHE_STORES = {'aws:': AWSSecretsManagerCacheStore}
@staticmethod
def _load():
config_parser = ConcurrentConfigParser()
config_parser.read(CONFIG_FILE_PATH)
# cached account credentials can be stored in the configuration file (default) or, via `--cache-store`, a
# separate local file or external service (such as a secrets manager) - we combine these sources at load time
if CACHE_STORE != CONFIG_FILE_PATH:
# it would be cleaner to avoid specific options here, but best to load unexpected sections only when enabled
allow_catch_all_accounts = config_parser.getboolean(APP_SHORT_NAME, 'allow_catch_all_accounts',
fallback=False)
cache_file_parser = AppConfig._load_cache(CACHE_STORE)
cache_file_accounts = [s for s in cache_file_parser.sections() if '@' in s]
for account in cache_file_accounts:
if allow_catch_all_accounts and account not in config_parser.sections(): # missing sub-accounts
config_parser.add_section(account)
for option in cache_file_parser.options(account):
if option in AppConfig._CACHED_OPTION_KEYS:
config_parser.set(account, option, cache_file_parser.get(account, option))
return config_parser
@staticmethod
def _load_cache(cache_store_identifier):
cache_file_parser = configparser.ConfigParser(interpolation=None)
for prefix, cache_store_handler in AppConfig._EXTERNAL_CACHE_STORES.items():
if cache_store_identifier.startswith(prefix):
cache_file_parser.read_dict(cache_store_handler.load(cache_store_identifier[len(prefix):]))
return cache_file_parser
cache_file_parser.read(cache_store_identifier) # default cache is a local file (does not error if non-existent)
return cache_file_parser
@staticmethod
def get():
with AppConfig._PARSER_LOCK:
if AppConfig._PARSER is None:
AppConfig._PARSER = AppConfig._load()
return AppConfig._PARSER
@staticmethod
def unload():
with AppConfig._PARSER_LOCK:
AppConfig._PARSER = None
@staticmethod
def get_global(name, fallback):
return AppConfig.get().getboolean(APP_SHORT_NAME, name, fallback)
@staticmethod
def servers():
return [s for s in AppConfig.get().sections() if CONFIG_SERVER_MATCHER.match(s)]
@staticmethod
def accounts():
return [s for s in AppConfig.get().sections() if '@' in s]
@staticmethod
def get_option_with_catch_all_fallback(config, account, option, fallback=None):
if AppConfig.get_global('allow_catch_all_accounts', fallback=False):
user_domain = '@%s' % account.split('@')[-1]
fallback = config.get(user_domain, option, fallback=config.get('@', option, fallback=fallback))
return config.get(account, option, fallback=fallback)
@staticmethod
def save():
with AppConfig._PARSER_LOCK:
if AppConfig._PARSER is None: # intentionally using _PARSER not get() so we don't (re-)load if unloaded
return
if CACHE_STORE != CONFIG_FILE_PATH:
# in `--cache-store` mode we ignore everything except _CACHED_OPTION_KEYS (OAuth 2.0 tokens, etc.)
output_config_parser = configparser.ConfigParser(interpolation=None)
output_config_parser.read_dict(AppConfig._PARSER) # a deep copy of the current configuration
config_accounts = [s for s in output_config_parser.sections() if '@' in s]
for account in config_accounts:
for option in output_config_parser.options(account):
if option not in AppConfig._CACHED_OPTION_KEYS:
output_config_parser.remove_option(account, option)
for section in output_config_parser.sections():
if section not in config_accounts or len(output_config_parser.options(section)) <= 0:
output_config_parser.remove_section(section)
AppConfig._save_cache(CACHE_STORE, output_config_parser)
else:
# by default we cache to the local configuration file, and rewrite all values each time
try:
with open(CONFIG_FILE_PATH, mode='w', encoding='utf-8') as config_output:
AppConfig._PARSER.write(config_output)
except IOError:
Log.error('Error saving state to config file at', CONFIG_FILE_PATH, '- is the file writable?')
@staticmethod
def _save_cache(cache_store_identifier, output_config_parser):
for prefix, cache_store_handler in AppConfig._EXTERNAL_CACHE_STORES.items():
if cache_store_identifier.startswith(prefix):
cache_store_handler.save(cache_store_identifier[len(prefix):],
{account: dict(output_config_parser.items(account)) for account in
output_config_parser.sections()})
return
try:
with open(cache_store_identifier, mode='w', encoding='utf-8') as config_output:
output_config_parser.write(config_output)
except IOError:
Log.error('Error saving state to cache store file at', cache_store_identifier, '- is the file writable?')
class Cryptographer:
ITERATIONS = 870_000 # taken from cryptography's suggestion of using Django's defaults
LEGACY_ITERATIONS = 100_000 # fallback when the iteration count is not in the config file (versions < 2023-10-17)
def __init__(self, config, username, password):
"""Creates a cryptographer which allows encrypting and decrypting sensitive information for this account,
(such as stored tokens), and also supports increasing the encryption/decryption iterations (i.e., strength)"""
self._salt = None
# token_salt (and iterations, below) can optionally be inherited in, e.g., CCG / service account configurations
token_salt = AppConfig.get_option_with_catch_all_fallback(config, username, 'token_salt')
if token_salt:
try:
self._salt = base64.b64decode(token_salt.encode('utf-8')) # catch incorrect third-party proxy guide
except (binascii.Error, UnicodeError):
Log.info('%s: Invalid `token_salt` value found in config file entry for account %s - this value is not '
'intended to be manually created; generating new `token_salt`' % (APP_NAME, username))
if not self._salt:
self._salt = os.urandom(16) # either a failed decode or the initial run when no salt exists
# the iteration count is stored with the credentials, so could if required be user-edited (see PR #198 comments)
iterations = int(AppConfig.get_option_with_catch_all_fallback(config, username, 'token_iterations',
fallback=self.LEGACY_ITERATIONS))
# with MultiFernet each fernet is tried in order to decrypt a value, but encryption always uses the first
# fernet, so sort unique iteration counts in descending order (i.e., use the best available encryption)
self._iterations_options = sorted({self.ITERATIONS, iterations, self.LEGACY_ITERATIONS}, reverse=True)
# generate encrypter/decrypter based on the password and salt
self._fernets = [Fernet(base64.urlsafe_b64encode(
PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=self._salt, iterations=iterations,
backend=default_backend()).derive(password.encode('utf-8')))) for iterations in
self._iterations_options]
self.fernet = MultiFernet(self._fernets)
@property
def salt(self):
return base64.b64encode(self._salt).decode('utf-8')
@property
def iterations(self):
return self._iterations_options[0]
def encrypt(self, value):
return self.fernet.encrypt(value.encode('utf-8')).decode('utf-8')
def decrypt(self, value):
return self.fernet.decrypt(value.encode('utf-8')).decode('utf-8')
def requires_rotation(self, value):
try:
self._fernets[0].decrypt(value.encode('utf-8')) # if the first fernet works, everything is up-to-date
return False
except InvalidToken:
try: # check to see if any fernet can decrypt the value - if so we can upgrade the encryption strength
self.decrypt(value)
return True
except InvalidToken:
return False
def rotate(self, value):
return self.fernet.rotate(value.encode('utf-8')).decode('utf-8')
class OAuth2Helper:
class TokenRefreshError(Exception):
pass
@staticmethod
def get_oauth2_credentials(username, password, reload_remote_accounts=True):
# noinspection GrazieInspection
"""Using the given username (i.e., email address) and password, reads account details from AppConfig and
handles OAuth 2.0 token request and renewal, saving the updated details back to AppConfig (or removing them
if invalid). Returns either (True, '[OAuth2 string for authentication]') or (False, '[Error message]')"""
# we support broader catch-all account names (e.g., `@domain.com` / `@`) if enabled
config_accounts = AppConfig.accounts()
valid_accounts = [username in config_accounts]
if AppConfig.get_global('allow_catch_all_accounts', fallback=False):
user_domain = '@%s' % username.split('@')[-1]
valid_accounts.extend([account in config_accounts for account in [user_domain, '@']])
if not any(valid_accounts):
Log.error('Proxy config file entry missing for account', username, '- aborting login')
return (False, '%s: No config file entry found for account %s - please add a new section with values '
'for permission_url, token_url, oauth2_scope, redirect_uri, client_id and '
'client_secret' % (APP_NAME, username))
config = AppConfig.get()
permission_url = AppConfig.get_option_with_catch_all_fallback(config, username, 'permission_url')
token_url = AppConfig.get_option_with_catch_all_fallback(config, username, 'token_url')
oauth2_scope = AppConfig.get_option_with_catch_all_fallback(config, username, 'oauth2_scope')
oauth2_flow = AppConfig.get_option_with_catch_all_fallback(config, username, 'oauth2_flow')
redirect_uri = AppConfig.get_option_with_catch_all_fallback(config, username, 'redirect_uri')
redirect_listen_address = AppConfig.get_option_with_catch_all_fallback(config, username,
'redirect_listen_address')
client_id = AppConfig.get_option_with_catch_all_fallback(config, username, 'client_id')
client_secret = AppConfig.get_option_with_catch_all_fallback(config, username, 'client_secret')
client_secret_encrypted = AppConfig.get_option_with_catch_all_fallback(config, username,
'client_secret_encrypted')
jwt_certificate_path = AppConfig.get_option_with_catch_all_fallback(config, username, 'jwt_certificate_path')
jwt_key_path = AppConfig.get_option_with_catch_all_fallback(config, username, 'jwt_key_path')
# note that we don't require permission_url here because it is not needed for the client credentials grant flow,
# and likewise for client_secret here because it can be optional for Office 365 configurations
if not (token_url and oauth2_scope and redirect_uri and client_id):
Log.error('Proxy config file entry incomplete for account', username, '- aborting login')
return (False, '%s: Incomplete config file entry found for account %s - please make sure all required '
'fields are added (permission_url, token_url, oauth2_scope, redirect_uri, client_id '
'and client_secret)' % (APP_NAME, username))
# while not technically forbidden (RFC 6749, A.1 and A.2), it is highly unlikely the example value is valid
example_client_value = '*** your'
example_client_status = [example_client_value in i for i in [client_id, client_secret] if i]
if any(example_client_status):
if all(example_client_status) or example_client_value in client_id:
Log.info('Warning: client configuration for account', username, 'seems to contain example values -',
'if authentication fails, please double-check these values are correct')
elif example_client_value in client_secret:
Log.info('Warning: client secret for account', username, 'seems to contain the example value - if you',
'are using an Office 365 setup that does not need a secret, please delete this line entirely;',
'otherwise, if authentication fails, please double-check this value is correct')
current_time = int(time.time())
access_token = config.get(username, 'access_token', fallback=None)
access_token_expiry = config.getint(username, 'access_token_expiry', fallback=current_time)
refresh_token = config.get(username, 'refresh_token', fallback=None)
# try reloading remotely cached tokens if possible
if not access_token and CACHE_STORE != CONFIG_FILE_PATH and reload_remote_accounts:
AppConfig.unload()
return OAuth2Helper.get_oauth2_credentials(username, password, reload_remote_accounts=False)
cryptographer = Cryptographer(config, username, password)
rotatable_values = {
'access_token': access_token,
'refresh_token': refresh_token,
'client_secret_encrypted': client_secret_encrypted
}
if any(value and cryptographer.requires_rotation(value) for value in rotatable_values.values()):
Log.info('Rotating stored secrets for account', username, 'to use new cryptographic parameters')
for key, value in rotatable_values.items():
if value:
config.set(username, key, cryptographer.rotate(value))
config.set(username, 'token_iterations', str(cryptographer.iterations))
AppConfig.save()
try:
# if both secret values are present we use the unencrypted version (as it may have been user-edited)
if client_secret_encrypted:
if not client_secret:
try:
client_secret = cryptographer.decrypt(client_secret_encrypted)
except InvalidToken as e: # needed to avoid looping (we don't remove secrets on decryption failure)
Log.error('Invalid password to decrypt `client_secret_encrypted` for account', username,
'- aborting login:', Log.error_string(e))
return False, '%s: Login failed - the password for account %s is incorrect' % (
APP_NAME, username)
else:
Log.info('Warning: found both `client_secret_encrypted` and `client_secret` for account', username,
'- the un-encrypted value will be used. Removing the un-encrypted value is recommended')
# O365 certificate credentials - see: learn.microsoft.com/entra/identity-platform/certificate-credentials
jwt_client_assertion = None
if jwt_certificate_path and jwt_key_path:
if client_secret or client_secret_encrypted:
client_secret_type = '`client_secret%s`' % ('_encrypted' if client_secret_encrypted else '')
Log.info('Warning: found both certificate credentials and', client_secret_type, 'for account',
username, '- the', client_secret_type, 'value will be used. To use certificate',
'credentials, remove the client secret value')
else:
try:
# noinspection PyUnresolvedReferences
import jwt
except ImportError:
return False, ('Unable to load jwt, which is a requirement when using certificate credentials '
'(`jwt_` options). Please run `python -m pip install -r requirements-core.txt`')
import uuid
from cryptography import x509
from cryptography.hazmat.primitives import serialization
try:
jwt_now = datetime.datetime.now(datetime.timezone.utc)
jwt_certificate_fingerprint = x509.load_pem_x509_certificate(
pathlib.Path(jwt_certificate_path).read_bytes()).fingerprint(hashes.SHA256())
jwt_client_assertion = jwt.encode(
{
'aud': token_url,
'exp': jwt_now + datetime.timedelta(seconds=JWT_LIFETIME),
'iss': client_id,
'jti': str(uuid.uuid4()),
'nbf': jwt_now,
'sub': client_id
},
serialization.load_pem_private_key(pathlib.Path(jwt_key_path).read_bytes(), password=None),
algorithm='RS256',
headers={
'x5t#S256': base64.urlsafe_b64encode(jwt_certificate_fingerprint).decode('utf-8')
})
except (FileNotFoundError, OSError): # catch OSError due to GitHub issue 257 (quoted paths)
return (False, 'Unable to create credentials assertion for account %s - please check that the '
'`jwt_certificate_path` and `jwt_key_path` values are correct' % username)
if access_token or refresh_token: # if possible, refresh the existing token(s)
if not access_token or access_token_expiry - current_time < TOKEN_EXPIRY_MARGIN:
if refresh_token:
response = OAuth2Helper.refresh_oauth2_access_token(token_url, client_id, client_secret,
jwt_client_assertion, username,
cryptographer.decrypt(refresh_token))
access_token = response['access_token']
config.set(username, 'access_token', cryptographer.encrypt(access_token))
config.set(username, 'access_token_expiry', str(current_time + response['expires_in']))
if 'refresh_token' in response:
config.set(username, 'refresh_token', cryptographer.encrypt(response['refresh_token']))
AppConfig.save()
else:
# avoid trying invalid (or soon to be) tokens - we used to keep tokens until the last possible
# moment here, but it is simpler to just obtain a new one within TOKEN_EXPIRY_MARGIN, especially
# when in CCG/ROPCG/Google Cloud service account modes, for all of which getting a new token
# involves no interaction from the user (note that in interactive mode it would be better to
# request a new token via the user before discarding the existing one, but since this happens
# very infrequently, we don't add the extra complexity for just 10 extra minutes of token life)
cryptographer.decrypt(access_token) # check request is valid (raises InvalidToken on failure)
access_token = None
else:
access_token = cryptographer.decrypt(access_token)
if not access_token:
auth_result = None
if permission_url: # O365 CCG/ROPCG and Google service accounts skip authorisation; no permission_url
oauth2_flow = 'authorization_code'
permission_url = OAuth2Helper.construct_oauth2_permission_url(permission_url, redirect_uri,
client_id, oauth2_scope, username)
# note: get_oauth2_authorisation_code is a blocking call (waiting on user to provide code)
success, auth_result = OAuth2Helper.get_oauth2_authorisation_code(permission_url, redirect_uri,
redirect_listen_address, username)
if not success:
Log.info('Authorisation result error for account', username, '- aborting login.', auth_result)
return False, '%s: Login failed for account %s: %s' % (APP_NAME, username, auth_result)
if not oauth2_flow:
Log.error('No `oauth2_flow` value specified for account', username, '- aborting login')
return (False, '%s: Incomplete config file entry found for account %s - please make sure an '
'`oauth2_flow` value is specified when using a method that does not require a '
'`permission_url`' % (APP_NAME, username))
response = OAuth2Helper.get_oauth2_authorisation_tokens(token_url, redirect_uri, client_id,
client_secret, jwt_client_assertion,
auth_result, oauth2_scope, oauth2_flow,
username, password)
if AppConfig.get_global('encrypt_client_secret_on_first_use', fallback=False):
if client_secret:
# note: save to the `username` entry even if `user_domain` exists, avoiding conflicts when
# using `encrypt_client_secret_on_first_use` with the `allow_catch_all_accounts` option
config.set(username, 'client_secret_encrypted', cryptographer.encrypt(client_secret))
config.remove_option(username, 'client_secret')
access_token = response['access_token']
if username not in config.sections():
config.add_section(username) # in catch-all mode the section may not yet exist
REQUEST_QUEUE.put(MENU_UPDATE) # make sure the menu shows the newly-added account
config.set(username, 'token_salt', cryptographer.salt)
config.set(username, 'token_iterations', str(cryptographer.iterations))
config.set(username, 'access_token', cryptographer.encrypt(access_token))
config.set(username, 'access_token_expiry', str(current_time + response['expires_in']))
if 'refresh_token' in response:
config.set(username, 'refresh_token', cryptographer.encrypt(response['refresh_token']))
elif permission_url: # ignore this situation with CCG/ROPCG/service account flows - it is expected
Log.info('Warning: no refresh token returned for account', username, '- you will need to',
're-authenticate each time the access token expires (does your `oauth2_scope` value allow',
'`offline` use?)')
AppConfig.save()
# send authentication command to server (response checked in ServerConnection) - note: we only support
# single-trip authentication (SASL) without actually checking the server's capabilities - improve?
oauth2_string = OAuth2Helper.construct_oauth2_string(username, access_token)
return True, oauth2_string
except OAuth2Helper.TokenRefreshError as e:
# always clear access tokens - can easily request another via the refresh token (with no user interaction)
has_access_token = bool(config.get(username, 'access_token', fallback=None))
config.remove_option(username, 'access_token')
config.remove_option(username, 'access_token_expiry')
if not has_access_token:
# if this is already a second failure, remove the refresh token as well, and force re-authentication
config.remove_option(username, 'token_salt')
config.remove_option(username, 'token_iterations')
config.remove_option(username, 'refresh_token')
AppConfig.save()
Log.info('Retrying login due to exception while refreshing access token for account', username,
'(attempt %d):' % (1 if has_access_token else 2), Log.error_string(e))
return OAuth2Helper.get_oauth2_credentials(username, password, reload_remote_accounts=False)
except InvalidToken as e:
# regardless of the `delete_account_token_on_password_error` setting, we only reset tokens for standard or
# ROPCG flows; when using CCG or a service account it is far safer to deny account access and require a
# config file edit in order to reset an account rather than allowing *any* password to be used for access
if AppConfig.get_global('delete_account_token_on_password_error', fallback=True) and (
permission_url or oauth2_flow not in ['client_credentials', 'service_account']):
config.remove_option(username, 'access_token')
config.remove_option(username, 'access_token_expiry')
config.remove_option(username, 'token_salt')
config.remove_option(username, 'token_iterations')
config.remove_option(username, 'refresh_token')
AppConfig.save()
Log.info('Retrying login due to exception while decrypting OAuth 2.0 credentials for account', username,
'(invalid password):', Log.error_string(e))
return OAuth2Helper.get_oauth2_credentials(username, password, reload_remote_accounts=False)
Log.error('Invalid password to decrypt credentials for account', username, '- aborting login:',
Log.error_string(e))
return False, '%s: Login failed - the password for account %s is incorrect' % (APP_NAME, username)
except Exception as e:
# note that we don't currently remove cached credentials here, as failures on the initial request are before
# caching happens, and the assumption is that refresh token request exceptions are temporal (e.g., network
# errors: URLError(OSError(50, 'Network is down'))) - access token 400 Bad Request HTTPErrors with messages
# such as 'authorisation code was already redeemed' are caused by our support for simultaneous requests,
# and will work from the next request; however, please report an issue if you encounter problems here
Log.info('Caught exception while requesting OAuth 2.0 credentials for account %s:' % username,
Log.error_string(e))
return False, '%s: Login failed for account %s - please check your internet connection and retry' % (
APP_NAME, username)
@staticmethod
def oauth2_url_escape(text):
return urllib.parse.quote(text, safe='~-._') # see https://tools.ietf.org/html/rfc3986#section-2.3
@staticmethod
def oauth2_url_unescape(text):
return urllib.parse.unquote(text)
@staticmethod
def match_redirect_uri(config, received):
parsed_config = urllib.parse.urlparse(config)
parsed_received = urllib.parse.urlparse(received)
# match host:port and path (except trailing slashes), but allow mismatch of the scheme (i.e., http/https) (#96)
return parsed_config.netloc == parsed_received.netloc and parsed_config.path.rstrip(
'/') == parsed_received.path.rstrip('/')
@staticmethod
def start_redirection_receiver_server(token_request):
"""Starts a local WSGI web server to receive OAuth responses"""
redirect_listen_type = 'redirect_listen_address' if token_request['redirect_listen_address'] else 'redirect_uri'
parsed_uri = urllib.parse.urlparse(token_request[redirect_listen_type])
parsed_port = 80 if parsed_uri.port is None else parsed_uri.port
Log.debug('Local server auth mode (%s): starting server to listen for authentication response' %
Log.format_host_port((parsed_uri.hostname, parsed_port)))
class LoggingWSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
# pylint: disable-next=arguments-differ
def log_message(self, _format_string, *args):
Log.debug('Local server auth mode (%s): received authentication response' % Log.format_host_port(
(parsed_uri.hostname, parsed_port)), *args)
class RedirectionReceiverWSGIApplication:
def __call__(self, environ, start_response):
start_response('200 OK', [('Content-type', 'text/html; charset=utf-8')])
token_request['response_url'] = '/'.join(token_request['redirect_uri'].split('/')[0:3]) + environ.get(
'PATH_INFO') + '?' + environ.get('QUERY_STRING')
return [('<html><head><title>%s authentication complete (%s)</title><style type="text/css">body{margin:'