Skip to content

Commit

Permalink
support for python 3.5 dropped
Browse files Browse the repository at this point in the history
-> using fstrings instead of format
  • Loading branch information
Schluggi committed Jan 12, 2021
1 parent d736784 commit c01c63d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 69 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.1.0 (upcoming)
- support for python 3.5 dropped

## 1.0.1
Introducing new version-system (major.minor.bugfix)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ SSL/TLS and STARTTLS support i wrote my own python-based version. I hope you lik
- Simple usage

## Requirements
- Python >= 3.5
- Python >= 3.6

## Installation
### pip (recommended)
Expand Down
130 changes: 63 additions & 67 deletions pymap-copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
def check_encryption(value):
value = value.lower()
if value not in ['ssl', 'tls', 'starttls', 'none']:
raise ArgumentTypeError('{} is an unknown encryption. Use can use ssl, tls, starttls or none instead.'.
format(value))
raise ArgumentTypeError(f'{value} is an unknown encryption. Use can use ssl, tls, starttls or none instead.')
return value


Expand All @@ -38,12 +37,12 @@ def colorize(s, color=None, bold=False, clear=False):
return s

if clear:
s = '\r\x1b[2K{}'.format(s)
s = f'\r\x1b[2K{s}'
if bold:
s = '\x1b[1m{}'.format(s)
s = f'\x1b[1m{s}'
if color:
s = '{}{}'.format(colors[color], s)
return '{}\x1b[0m'.format(s)
s = f'{colors[color]}{s}'
return f'{s}\x1b[0m'


def connect(server, port, encryption):
Expand All @@ -63,18 +62,18 @@ def connect(server, port, encryption):
client = IMAPClient(host=server, port=port, ssl=use_ssl, ssl_context=ssl_context)
if encryption == 'starttls':
client.starttls(ssl_context=ssl_context)
client_status = '{} ({})'.format(colorize('OK', color='green'), colorize('STARTTLS', color='green'))
client_status = f'{colorize("OK", color="green")} ({colorize("STARTTLS", color="green")})'

elif encryption in ['ssl', 'tls']:
client_status = '{} ({})'.format(colorize('OK', color='green'), colorize('SSL/TLS', color='green'))
client_status = f'{colorize("OK", color="green")} ({colorize("SSL/TLS", color="green")})'

else:
client_status = '{} ({})'.format(colorize('OK', color='green'), colorize('NOT ENCRYPTED', color='yellow'))
client_status = f'{colorize("OK", color="green")} ({colorize("NOT ENCRYPTED", color="yellow")})'

return client, client_status

except Exception as e:
client_status = '{} {}'.format(colorize('Error:', color='red', bold=True), imaperror_decode(e))
client_status = f'{colorize("Error:", color="red", bold=True)} {imaperror_decode(e)}'
return None, client_status


Expand All @@ -84,15 +83,15 @@ def login(client, user, password):
client.login(user, password)
return True, colorize('OK', color='green')
except Exception as e:
return False, '{} {}'.format(colorize('Error:', color='red', bold=True), imaperror_decode(e))
return False, f'{colorize("Error:", color="red", bold=True)} {imaperror_decode(e)}'
else:
return False, '{} No active connection'.format(colorize('Error:', color='red', bold=True))
return False, f'{colorize("Error:", color="red", bold=True)} No active connection'


parser = ArgumentParser(description='Copy and transfer IMAP mailboxes',
epilog='pymap-copy by {} ({})'.format(__author__, __url__))
epilog=f'pymap-copy by {__author__} ({__url__})')
parser.add_argument('-v', '--version', help='show version and exit.', action="version",
version='pymap-copy {} by {} ({})'.format(__version__, __author__, __url__))
version=f'pymap-copy {__version__} by {__author__} ({__url__})')

#: run mode arguments
parser.add_argument('-d', '--dry-run', help='copy & creating nothing, just feign', action="store_true")
Expand Down Expand Up @@ -193,33 +192,31 @@ def login(client, user, password):
}

if args.denied_flags:
denied_flags.extend(['\\{}'.format(flag).encode() for flag in args.denied_flags.lower().split(',')])
denied_flags.extend([f'\\{flag}'.encode() for flag in args.denied_flags.lower().split(',')])


print()

#: connecting source
print('Connecting source : {}:{}, '.format(args.source_server, source_port),
end='', flush=True)
print(f'Connecting source : {args.source_server}:{source_port}, ', end='', flush=True)
source, status = connect(args.source_server, source_port, args.source_encryption)
print(status)

#: connecting destination
print('Connecting destination : {}:{}, '.format(args.destination_server, destination_port),
end='', flush=True)
print(f'Connecting destination : {args.destination_server}:{destination_port}, ', end='', flush=True)
destination, status = connect(args.destination_server, destination_port, args.destination_encryption)
print(status)

print()


#: login source
print('Login source : {}, '.format(args.source_user), end='', flush=True)
print(f'Login source : {args.source_user}, ', end='', flush=True)
source_login_ok, status = login(source, args.source_user, args.source_pass)
print(status)

#: login destination
print('Login destination : {}, '.format(args.destination_user), end='', flush=True)
print(f'Login destination : {args.destination_user}, ', end='', flush=True)
destination_login_ok, status = login(destination, args.destination_user, args.destination_pass)
print(status)

Expand All @@ -235,15 +232,16 @@ def login(client, user, password):
destination_idle = IMAPIdle(destination, interval=args.idle_interval)
source_idle.start()
destination_idle.start()
print('{} (restarts every {} seconds)'.format(colorize('OK', color='green'), args.idle_interval))
print(f'{colorize("OK", color="green")} (restarts every {args.idle_interval} seconds)')

print()

#: get quota from source
print('Getting source quota : ', end='', flush=True)
if source.has_capability('QUOTA') and args.ignore_quota is False:
source_quota = source.get_quota()[0]
print('{}/{} ({:.0f}%)'.format(beautysized(source_quota.usage*1000), beautysized(source_quota.limit*1000),
source_quota.usage / source_quota.limit * 100))
print(f'{beautysized(source_quota.usage*1000)}/{beautysized(source_quota.limit*1000)} '
f'({source_quota.usage / source_quota.limit * 100:.0f}%)')
else:
source_quota = None
print('server does not support quota')
Expand All @@ -252,9 +250,8 @@ def login(client, user, password):
print('Getting destination quota : ', end='', flush=True)
if destination.has_capability('QUOTA') and args.ignore_quota is False:
destination_quota = destination.get_quota()[0]
print('{}/{} ({:.0f}%)'.format(beautysized(destination_quota.usage*1000),
beautysized(destination_quota.limit*1000),
destination_quota.usage / destination_quota.limit * 100))
print(f'{beautysized(destination_quota.usage*1000)}/{beautysized(destination_quota.limit*1000)} '
f'({destination_quota.usage / destination_quota.limit * 100:.0f}%)')
else:
destination_quota = None
print('server does not support quota')
Expand All @@ -264,8 +261,8 @@ def login(client, user, password):
if source_quota and destination_quota:
destination_quota_free = destination_quota.limit - destination_quota.usage
if destination_quota_free < source_quota.usage:
print('{} Insufficient quota: The source usage is {} KB but there only {} KB free on the destination server'
.format(colorize('Error:', bold=True, color='cyan'), source_quota.usage, destination_quota_free),
print(f'{colorize("Error:", bold=True, color="cyan")} Insufficient quota: The source usage is '
f'{source_quota.usage} KB but there only {destination_quota_free} KB free on the destination server',
end='', flush=True)
if args.ignore_quota:
print(' (ignoring)')
Expand All @@ -290,8 +287,8 @@ def login(client, user, password):

if args.source_folder:
if name not in args.source_folder and name.startswith(wildcards) is False:
print(colorize('Getting source folders : Progressing ({} mails) (skipping): {}'.
format(stats['source_mails'], name), clear=True), flush=True, end='')
print(colorize(f'Getting source folders : Progressing ({stats["source_mails"]} mails) (skipping): '
f'{name}', clear=True), flush=True, end='')
continue

try:
Expand Down Expand Up @@ -341,11 +338,10 @@ def login(client, user, password):

del mails[:args.buffer_size]

print(colorize('Getting source folders : {} mails in {} folders ({}) '.
format(stats['source_mails'], len(db['source']['folders']),
beautysized(sum([f['size'] for f in db['source']['folders'].values()]))), clear=True), end='')
print(colorize(f'Getting source folders : {stats["source_mails"]} mails in {len(db["source"]["folders"])} folders '
f'({beautysized(sum([f["size"] for f in db["source"]["folders"].values()]))}) ', clear=True), end='')
if any((args.source_folder, args.destination_root)):
print('({})'.format(colorize('filtered by arguments', color='yellow')), end='')
print(f'({colorize("filtered by arguments", color="yellow")})', end='')
print()

destination_idle.stop_idle()
Expand Down Expand Up @@ -394,20 +390,20 @@ def login(client, user, password):
beautysized(sum([f['size'] for f in db['destination']['folders'].values()]))),
clear=True), end='')
if any((args.source_folder, args.destination_root)):
print('({})'.format(colorize('filtered by arguments', color='yellow')), end='')
print(f'({colorize("filtered by arguments", color="yellow")})', end='')
print('\n')

#: list mode
if args.list:
print(colorize('Source:', bold=True))
for name in db['source']['folders']:
print('{} ({} mails, {})'.format(name, len(db['source']['folders'][name]['mails']),
beautysized(db['source']['folders'][name]['size'])))
print(f'{name} ({len(db["source"]["folders"][name]["mails"])} mails, '
f'{beautysized(db["source"]["folders"][name]["size"])})')

print('\n{}'.format(colorize('Destination:', bold=True)))
print(f'\n{colorize("Destination:", bold=True)}')
for name in db['destination']['folders']:
print('{} ({} mails, {})'.format(name, len(db['destination']['folders'][name]['mails']),
beautysized(db['destination']['folders'][name]['size'])))
print(f'{name} ({len(db["destination"]["folders"][name]["mails"])} mails, '
f'{beautysized(db["destination"]["folders"][name]["size"])})')

print()
print(colorize('Everything skipped! (list mode)', color='cyan'))
Expand Down Expand Up @@ -456,9 +452,9 @@ def login(client, user, password):

if args.destination_root:
if args.destination_root_merge is False or \
(df_name.startswith('{}{}'.format(args.destination_root, destination_delimiter)) is False
(df_name.startswith(f'{args.destination_root}{destination_delimiter}') is False
and df_name != args.destination_root):
df_name = '{}{}{}'.format(args.destination_root, destination_delimiter, df_name)
df_name = f'{args.destination_root}{destination_delimiter}{df_name}'

#: link special IMAP folder
if not args.ignore_folder_flags:
Expand Down Expand Up @@ -541,11 +537,11 @@ def login(client, user, password):
try:
msg_id_decoded = msg_id.decode()
except Exception as sub_exception:
msg_id_decoded = '(decode failure): {}'.format(sub_exception)
msg_id_decoded = f'(decode failure): {sub_exception}'

stats['errors'].append({'size': size,
'subject': subject,
'exception': '{}: {}'.format(type(e).__name__, e),
'exception': f'{type(e).__name__}: {e}',
'folder': df_name,
'date': date,
'id': msg_id_decoded})
Expand Down Expand Up @@ -595,25 +591,25 @@ def login(client, user, password):
if any([msg in status.lower() for msg in success_messages]):
stats['copied_mails'] += 1
else:
raise exceptions.IMAPClientError('Unknown success message: {}'.format(status.decode()))
raise exceptions.IMAPClientError(f'Unknown success message: {status.decode()}')

except exceptions.IMAPClientError as e:
e_decoded = imaperror_decode(e)

try:
msg_id_decoded = msg_id.decode()
except Exception as sub_exception:
msg_id_decoded = '(decode failure): {}'.format(sub_exception)
msg_id_decoded = f'(decode failure): {sub_exception}'

error_information = {'size': beautysized(size),
'subject': subject,
'exception': '{}: {}'.format(type(e).__name__, e),
'exception': f'{type(e).__name__}: {e}',
'folder': df_name,
'date': date,
'id': msg_id_decoded}

stats['errors'].append(error_information)
print('\n{} {}\n'.format(colorize('Error:', color='red', bold=True), e))
print(f'\n{colorize("Error:", color="red", bold=True)} {e}\n')

if args.abort_on_error:
raise KeyboardInterrupt
Expand Down Expand Up @@ -643,43 +639,43 @@ def login(client, user, password):
source.logout()
print(colorize('OK', color='green'))
except exceptions.IMAPClientError as e:
print('ERROR: {}'.format(imaperror_decode(e)))
print(f'ERROR: {imaperror_decode(e)}')

#: logout destination
try:
print('Logout destination...', end='', flush=True)
destination.logout()
print(colorize('OK', color='green'))
except exceptions.IMAPClientError as e:
print('ERROR: {}'.format(imaperror_decode(e)))
print(f'ERROR: {imaperror_decode(e)}')


#: print statistics
print('\n\nCopied {} mails and {} folders in {:.2f}s\n'.format(
colorize('{}/{}'.format(stats['copied_mails'], stats['source_mails']), bold=True),
colorize('{}/{}'.format(stats['copied_folders'], len(db['source']['folders'])), bold=True),
colorize(f'{stats["copied_mails"]}/{stats["source_mails"]}', bold=True),
colorize(f'{stats["copied_folders"]}/{len(db["source"]["folders"])}', bold=True),
time()-stats['start_time']))

if args.dry_run:
print(colorize('Everything skipped! (dry-run)', color='cyan'))
else:
print('Skipped folders : {}'.format(sum([stats['skipped_folders'][c] for c in stats['skipped_folders']])))
print('├─ Empty : {} (skip-empty-folders mode only)'.format(stats['skipped_folders']['empty']))
print('├─ No parent folder : {}'.format(stats['skipped_folders']['no_parent']))
print('└─ Already exists : {}'.format(stats['skipped_folders']['already_exists']))
print(f'Skipped folders : {sum([stats["skipped_folders"][c] for c in stats["skipped_folders"]])}')
print(f'├─ Empty : {stats["skipped_folders"]["empty"]} (skip-empty-folders mode only)')
print(f'├─ No parent folder : {stats["skipped_folders"]["no_parent"]}')
print(f'└─ Already exists : {stats["skipped_folders"]["already_exists"]}')
print()
print('Skipped mails : {}'.format(sum([stats['skipped_mails'][c] for c in stats['skipped_mails']])))
print('├─ Zero sized : {}'.format(stats['skipped_mails']['zero_size']))
print('├─ To large : {} (max-mail-size mode only)'.format(stats['skipped_mails']['max_size']))
print('├─ No envelope : {}'.format(stats['skipped_mails']['no_envelope']))
print('├─ Line length : {} (max-line-length mode only)'.format(stats['skipped_mails']['max_line_length']))
print('└─ Already exists : {} (incremental mode only)'.format(stats['skipped_mails']['already_exists']))

print('\nErrors ({}):'.format(len(stats['errors'])))
print(f'Skipped mails : {sum([stats["skipped_mails"][c] for c in stats["skipped_mails"]])}')
print(f'├─ Zero sized : {stats["skipped_mails"]["zero_size"]}')
print(f'├─ To large : {stats["skipped_mails"]["max_size"]} (max-mail-size mode only)')
print(f'├─ No envelope : {stats["skipped_mails"]["no_envelope"]}')
print(f'├─ Line length : {stats["skipped_mails"]["max_line_length"]} (max-line-length mode only)')
print(f'└─ Already exists : {stats["skipped_mails"]["already_exists"]} (incremental mode only)')

print(f'\nErrors ({len(stats["errors"])}):')
if stats['errors']:
for err in stats['errors']:
print('({}) ({}) ({}) ({}) ({}): {}'.format(err['size'], err['date'], err['folder'], err['id'],
err['subject'], err['exception']))
print(f'({err["size"]}) ({err["date"]}) ({err["folder"]}) ({err["id"]}) ({err["subject"]}): '
f'{err["exception"]}')
else:
print('(no errors)')

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
setuptools.setup(
name='pymap-copy',
version='1.0.1',
python_requires='>=3.5',
python_requires='>=3.6',
scripts=['pymap-copy.py'],
author='Lukas Schulte-Tickmann',
author_email='[email protected]',
Expand Down

0 comments on commit c01c63d

Please sign in to comment.