diff --git a/imapclient/conn.py b/imapclient/conn.py new file mode 100644 index 00000000..c9563959 --- /dev/null +++ b/imapclient/conn.py @@ -0,0 +1,213 @@ +""" +Low Level IMAP Connection +""" + + +import errno +import socket +import subprocess +import sys +from io import DEFAULT_BUFFER_SIZE +from logging import getLogger +from . import exceptions + +try: + import ssl + HAVE_SSL = True +except ImportError: + HAVE_SSL = False + +# Maximal line length when calling readline(). This is to prevent +# reading arbitrary length lines. RFC 3501 and 2060 (IMAP 4rev1) +# don't specify a line length. RFC 2683 suggests limiting client +# command lines to 1000 octets and that servers should be prepared +# to accept command lines up to 8000 octets, so we used to use 10K here. +# In the modern world (eg: gmail) the response to, for example, a +# search command can be quite large, so we now use 1M. +_MAXLINE = 1000000 + +IMAP4_PORT = 143 +IMAP4_SSL_PORT = 993 + +logger = getLogger(__name__) + +class IMAP4: + def __init__(self, host='', port=IMAP4_PORT): + self.open(host, port) + + def _create_socket(self): + # Default value of IMAP4.host is '', but socket.getaddrinfo() + # (which is used by socket.create_connection()) expects None + # as a default value for host. + host = None if not self.host else self.host + sys.audit("imaplib.open", self, self.host, self.port) + return socket.create_connection((host, self.port)) + + def open(self, host = '', port = IMAP4_PORT): + """Setup connection to remote server on "host:port" + (default: localhost:standard IMAP4 port). + This connection will be used by the routines: + read, readline, send, shutdown. + """ + self.host = host + self.port = port + self.sock = self._create_socket() + self.file = self.sock.makefile('rb') + + def read(self, size): + """Read 'size' bytes from remote.""" + return self.file.read(size) + + def readline(self): + """Read line from remote.""" + line = self.file.readline(_MAXLINE + 1) + if len(line) > _MAXLINE: + raise exceptions.IMAPClientError("got more than %d bytes" % _MAXLINE) + return line + + def get_line(self): + + line = self.readline() + if not line: + raise exceptions.IMAPClientAbortError('socket error: EOF') + + # Protocol mandates all lines terminated by CRLF + if not line.endswith(b'\r\n'): + raise exceptions.IMAPClientAbortError('socket error: unterminated line: %r' % line) + + line = line[:-2] + if __debug__: + logger.debug('< %r' % line) + return line + + def send(self, data): + """Send data to remote.""" + sys.audit("imaplib.send", self, data) + self.sock.sendall(data) + + def shutdown(self): + """Close I/O established in "open".""" + self.file.close() + try: + self.sock.shutdown(socket.SHUT_RDWR) + except OSError as exc: + # The server might already have closed the connection. + # On Windows, this may result in WSAEINVAL (error 10022): + # An invalid operation was attempted. + if (exc.errno != errno.ENOTCONN + and getattr(exc, 'winerror', 0) != 10022): + raise + finally: + self.sock.close() + + def socket(self): + """Return socket instance used to connect to IMAP4 server. + + socket = .socket() + """ + return self.sock + + +class IMAP4_SSL(IMAP4): + + """IMAP4 client class over SSL connection + + Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile[, ssl_context]]]]]) + + host - host's name (default: localhost); + port - port number (default: standard IMAP4 SSL port); + keyfile - PEM formatted file that contains your private key (default: None); + certfile - PEM formatted certificate chain file (default: None); + ssl_context - a SSLContext object that contains your certificate chain + and private key (default: None) + Note: if ssl_context is provided, then parameters keyfile or + certfile should not be set otherwise ValueError is raised. + + for more documentation see the docstring of the parent class IMAP4. + """ + def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None, + certfile=None, ssl_context=None): + + if not HAVE_SSL: + raise ValueError('SSL support missing') + + if ssl_context is not None and keyfile is not None: + raise ValueError("ssl_context and keyfile arguments are mutually " + "exclusive") + if ssl_context is not None and certfile is not None: + raise ValueError("ssl_context and certfile arguments are mutually " + "exclusive") + if keyfile is not None or certfile is not None: + import warnings + warnings.warn("keyfile and certfile are deprecated, use a " + "custom ssl_context instead", DeprecationWarning, 2) + self.keyfile = keyfile + self.certfile = certfile + if ssl_context is None: + ssl_context = ssl._create_stdlib_context(certfile=certfile, + keyfile=keyfile) + self.ssl_context = ssl_context + IMAP4.__init__(self, host, port) + + def _create_socket(self): + sock = IMAP4._create_socket(self) + return self.ssl_context.wrap_socket(sock, + server_hostname=self.host) + + def open(self, host='', port=IMAP4_SSL_PORT): + """Setup connection to remote server on "host:port". + (default: localhost:standard IMAP4 SSL port). + This connection will be used by the routines: + read, readline, send, shutdown. + """ + IMAP4.open(self, host, port) + +class IMAP4_stream(IMAP4): + + """IMAP4 client class over a stream + + Instantiate with: IMAP4_stream(command) + + "command" - a string that can be passed to subprocess.Popen() + + for more documentation see the docstring of the parent class IMAP4. + """ + def __init__(self, command): + self.command = command + IMAP4.__init__(self) + + def open(self, host = None, port = None): + """Setup a stream connection. + This connection will be used by the routines: + read, readline, send, shutdown. + """ + self.host = None # For compatibility with parent class + self.port = None + self.sock = None + self.file = None + self.process = subprocess.Popen(self.command, + bufsize=DEFAULT_BUFFER_SIZE, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + shell=True, close_fds=True) + self.writefile = self.process.stdin + self.readfile = self.process.stdout + + def read(self, size): + """Read 'size' bytes from remote.""" + return self.readfile.read(size) + + def readline(self): + """Read line from remote.""" + return self.readfile.readline() + + def send(self, data): + """Send data to remote.""" + self.writefile.write(data) + self.writefile.flush() + + def shutdown(self): + """Close I/O established in "open".""" + self.readfile.close() + self.writefile.close() + self.process.wait() + diff --git a/imapclient/exceptions.py b/imapclient/exceptions.py index 26fd51c5..d4b6eedd 100644 --- a/imapclient/exceptions.py +++ b/imapclient/exceptions.py @@ -1,12 +1,17 @@ -import imaplib # Base class allowing to catch any IMAPClient related exceptions -# To ensure backward compatibility, we "rename" the imaplib general -# exception class, so we can catch its exceptions without having to -# deal with it in IMAPClient codebase -IMAPClientError = imaplib.IMAP4.error -IMAPClientAbortError = imaplib.IMAP4.abort -IMAPClientReadOnlyError = imaplib.IMAP4.readonly +class IMAPClientError(RuntimeError): + pass + + +class IMAPClientAbortError(IMAPClientError): + # Service errors - close and retry + pass + + +class IMAPClientReadOnlyError(IMAPClientError): + # Mailbox status changed to READ-ONLY + pass class CapabilityError(IMAPClientError): diff --git a/imapclient/imap4.py b/imapclient/imap4.py index 618dc15c..acdad9fa 100644 --- a/imapclient/imap4.py +++ b/imapclient/imap4.py @@ -2,15 +2,14 @@ # Released subject to the New BSD License # Please see http://en.wikipedia.org/wiki/BSD_licenses -import imaplib import socket +from . import conn - -class IMAP4WithTimeout(imaplib.IMAP4): +class IMAP4WithTimeout(conn.IMAP4): def __init__(self, address, port, timeout): self._timeout = timeout - imaplib.IMAP4.__init__(self, address, port) + conn.IMAP4.__init__(self, address, port) def _create_socket(self): return socket.create_connection((self.host, self.port), self._timeout.connect) diff --git a/imapclient/imapclient.py b/imapclient/imapclient.py index 46e9257f..b4946a67 100644 --- a/imapclient/imapclient.py +++ b/imapclient/imapclient.py @@ -5,31 +5,40 @@ from __future__ import unicode_literals import functools -import imaplib import itertools +import random +import re import select import socket import sys -import re from collections import namedtuple from datetime import datetime, date +from logging import getLogger from operator import itemgetter -from logging import LoggerAdapter, getLogger from six import moves, iteritems, text_type, integer_types, PY3, binary_type, iterbytes +from . import conn from . import exceptions from . import imap4 from . import response_lexer from . import tls +from . import util from .datetime_util import datetime_to_INTERNALDATE, format_criteria_date from .imap_utf7 import encode as encode_utf7, decode as decode_utf7 from .response_parser import parse_response, parse_message_list, parse_fetch_response from .util import to_bytes, to_unicode, assert_imap_protocol, chunk + xrange = moves.xrange +try: + import ssl +except ImportError: + ssl = None + try: from select import poll + POLL_SUPPORT = True except: # Fallback to select() on systems that don't support poll() @@ -38,51 +47,19 @@ if PY3: long = int # long is just int in python3 - logger = getLogger(__name__) +imaplib_logger = getLogger(__name__ + '.imaplib') __all__ = ['IMAPClient', 'SocketTimeout', 'DELETED', 'SEEN', 'ANSWERED', 'FLAGGED', 'DRAFT', 'RECENT'] - -# We also offer the gmail-specific XLIST command... -if 'XLIST' not in imaplib.Commands: - imaplib.Commands['XLIST'] = ('NONAUTH', 'AUTH', 'SELECTED') - -# ...and IDLE -if 'IDLE' not in imaplib.Commands: - imaplib.Commands['IDLE'] = ('NONAUTH', 'AUTH', 'SELECTED') - -# ..and STARTTLS -if 'STARTTLS' not in imaplib.Commands: - imaplib.Commands['STARTTLS'] = ('NONAUTH',) - -# ...and ID. RFC2971 says that this command is valid in all states, -# but not that some servers (*cough* FastMail *cough*) don't seem to -# accept it in state NONAUTH. -if 'ID' not in imaplib.Commands: - imaplib.Commands['ID'] = ('NONAUTH', 'AUTH', 'SELECTED') - -# ... and UNSELECT. RFC3691 does not specify the state but there is no -# reason to use the command without AUTH state and a mailbox selected. -if 'UNSELECT' not in imaplib.Commands: - imaplib.Commands['UNSELECT'] = ('AUTH', 'SELECTED') - -# .. and ENABLE. -if 'ENABLE' not in imaplib.Commands: - imaplib.Commands['ENABLE'] = ('AUTH',) - -# .. and MOVE for RFC6851. -if 'MOVE' not in imaplib.Commands: - imaplib.Commands['MOVE'] = ('AUTH', 'SELECTED') - # System flags DELETED = br'\Deleted' SEEN = br'\Seen' ANSWERED = br'\Answered' FLAGGED = br'\Flagged' DRAFT = br'\Draft' -RECENT = br'\Recent' # This flag is read-only +RECENT = br'\Recent' # This flag is read-only # Special folders, see RFC6154 # \Flagged is omitted because it is the same as the flag defined above @@ -140,6 +117,7 @@ class MailboxQuotaRoots(namedtuple("MailboxQuotaRoots", "mailbox quota_roots")): :ivar quota_roots: list of quota roots associated with the mailbox """ + class Quota(namedtuple("Quota", "quota_root resource usage limit")): """Resource quota. @@ -156,7 +134,6 @@ def require_capability(capability): """Decorator raising CapabilityError when a capability is not available.""" def actual_decorator(func): - @functools.wraps(func) def wrapper(client, *args, **kwargs): if not client.has_capability(capability): @@ -166,9 +143,40 @@ def wrapper(client, *args, **kwargs): return func(client, *args, **kwargs) return wrapper + return actual_decorator +# Patterns to match server responses + +CRLF = b'\r\n' +AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first + +Continuation = re.compile(br'\+( (?P.*))?') + +MapCRLF = re.compile(br'\r\n|\r|\n') +# We no longer exclude the ']' character from the data portion of the response +# code, even though it violates the RFC. Popular IMAP servers such as Gmail +# allow flags with ']', and there are programs (including imaplib!) that can +# produce them. The problem with this is if the 'text' portion of the response +# includes a ']' we'll parse the response wrong (which is the point of the RFC +# restriction). However, that seems less likely to be a problem in practice +# than being unable to correctly parse flags that include ']' chars, which +# was reported as a real-world problem in issue #21815. +Response_code = re.compile(br'\[(?P[A-Z-]+)( (?P.*))?\]') +Untagged_response = re.compile(br'\* (?P[A-Z-]+)( (?P.*))?') +Literal = re.compile(br'.*{(?P\d+)}$') +Untagged_status = re.compile(br'\* (?P\d+) (?P[A-Z-]+)( (?P.*))?') + +def _check_resp(expected, command, typ, data): + """Check command responses for errors. + + Raises IMAPClient.Error if the command fails. + """ + if typ != expected: + raise exceptions.IMAPClientError("%s failed: %s" % (command, to_unicode(data[0]))) + + class IMAPClient(object): """A connection to the IMAP server specified by *host* is made when this class is instantiated. @@ -250,6 +258,7 @@ def __init__(self, host, port=None, use_uid=True, ssl=True, stream=False, self.use_uid = use_uid self.folder_encode = True self.normalise_times = True + self.welcome = None # the server greeting message # If the user gives a single timeout value, assume it is the same for # connection and read/write operations @@ -261,17 +270,68 @@ def __init__(self, host, port=None, use_uid=True, ssl=True, stream=False, self._cached_capabilities = None self._idle_tag = None - self._imap = self._create_IMAP4() + self._conn = self._create_conn() logger.debug("Connected to host %s over %s", self.host, "SSL/TLS" if ssl else "plain text") self._set_read_timeout() - # Small hack to make imaplib log everything to its own logger - imaplib_logger = IMAPlibLoggerAdapter( - getLogger('imapclient.imaplib'), dict() - ) - self._imap.debug = 5 - self._imap._mesg = imaplib_logger.debug + + # from imaplib + self.state = 'NONAUTH' + self._literal = None # A literal argument to a command + self._tagged_commands = {} # Tagged commands awaiting response + self._untagged_responses = {} # {typ: [data, ...], ...} + self._continuation_response = '' # Last continuation response + self.is_readonly = False # READ-ONLY desired state + self._tagnum = 0 + self._encoding = 'ascii' + + try: + self._connect() + except Exception: + try: + self._conn.shutdown() + except OSError: + pass + raise + + def _connect(self): + # Create unique tag for this session, + # and compile tagged response matcher. + + self.tagpre = util.Int2AP(random.randint(4096, 65535)) + self.tagre = re.compile(br'(?P' + + self.tagpre + + br'\d+) (?P[A-Z]+) (?P.*)', re.ASCII) + + # Get server welcome message, + # request and store CAPABILITY response. + + if __debug__: + self._cmd_log_len = 10 + self._cmd_log_idx = 0 + self._cmd_log = {} # Last `_cmd_log_len' interactions + imaplib_logger.debug('new IMAP4 connection, tag=%s' % self.tagpre) + + self.welcome = self._get_response() + if 'PREAUTH' in self._untagged_responses: + self.state = 'AUTH' + elif 'OK' in self._untagged_responses: + self.state = 'NONAUTH' + else: + raise exceptions.IMAPClientError(self.welcome) + + self._preauth_capabilities = self._get_preauth_capabilities() + if __debug__: + imaplib_logger.debug('CAPABILITIES: %r' % (self._preauth_capabilities,)) + + for version in AllowedVersions: + if version not in self._preauth_capabilities: + continue + self.PROTOCOL_VERSION = version + return + + raise exceptions.IMAPClientError('server not IMAP4 compliant') def __enter__(self): return self @@ -290,9 +350,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): except Exception as e: logger.info("Could not close the connection cleanly: %s", e) - def _create_IMAP4(self): + def _create_conn(self): if self.stream: - return imaplib.IMAP4_stream(self.host) + return conn.IMAP4_stream(self.host) if self.ssl: return tls.IMAP4_TLS(self.host, self.port, self.ssl_context, @@ -302,13 +362,7 @@ def _create_IMAP4(self): def _set_read_timeout(self): if self._timeout is not None: - self._sock.settimeout(self._timeout.read) - - @property - def _sock(self): - # In py2, imaplib has sslobj (for SSL connections), and sock for non-SSL. - # In the py3 version it's just sock. - return getattr(self._imap, 'sslobj', self._imap.sock) + self._conn.sock.settimeout(self._timeout.read) @require_capability('STARTTLS') def starttls(self, ssl_context=None): @@ -328,25 +382,27 @@ def starttls(self, ssl_context=None): Raises :py:exc:`AbortError` if the server does not support STARTTLS or an SSL connection is already established. """ + self.check_state('NONAUTH') if self.ssl or self._starttls_done: raise exceptions.IMAPClientAbortError('TLS session already established') - typ, data = self._imap._simple_command("STARTTLS") + typ, data = self._simple_command("STARTTLS") self._checkok('starttls', typ, data) self._starttls_done = True - self._imap.sock = tls.wrap_socket(self._imap.sock, ssl_context, self.host) - self._imap.file = self._imap.sock.makefile('rb') + self._conn.sock = tls.wrap_socket(self._conn.sock, ssl_context, self.host) + self._conn.file = self._conn.sock.makefile('rb') return data[0] def login(self, username, password): """Login using *username* and *password*, returning the server response. """ + self.check_state('NONAUTH') try: - rv = self._command_and_check( - 'login', + rv = self._subcommand_and_check( + self._cmd_login, to_unicode(username), to_unicode(password), unpack=True, @@ -357,6 +413,19 @@ def login(self, username, password): logger.info('Logged in as %s', username) return rv + def _cmd_login(self, user, password): + """Identify client using plaintext password. + + (typ, [data]) = .login(user, password) + + NB: 'password' will be quoted. + """ + typ, dat = self._simple_command('LOGIN', user, _quote(password)) + if typ != 'OK': + raise exceptions.IMAPClientError(dat[-1]) + self.state = 'AUTH' + return typ, dat + def oauth2_login(self, user, access_token, mech='XOAUTH2', vendor=None): """Authenticate using the OAUTH2 method. @@ -368,10 +437,35 @@ def oauth2_login(self, user, access_token, mech='XOAUTH2', vendor=None): auth_string += 'vendor=%s\1' % vendor auth_string += '\1' try: - return self._command_and_check('authenticate', mech, lambda x: auth_string) + return self._subcommand_and_check(self._cmd_authenticate, mech, lambda x: auth_string) except exceptions.IMAPClientError as e: raise exceptions.LoginError(str(e)) + def _cmd_authenticate(self, mechanism, authobject): + """Authenticate command - requires response processing. + + 'mechanism' specifies which authentication mechanism is to + be used - it must appear in .capabilities in the + form AUTH=. + + 'authobject' must be a callable object: + + data = authobject(response) + + It will be called to process server continuation responses; the + response argument it is passed will be a bytes. It should return bytes + data that will be base64 encoded and sent to the server. It should + return None if the client abort response '*' should be sent instead. + """ + self.check_state('NONAUTH') + mech = mechanism.upper() + self._literal = util._Authenticator(authobject).process + typ, dat = self._simple_command('AUTHENTICATE', mech) + if typ != 'OK': + raise exceptions.IMAPClientError(dat[-1].decode('utf-8', 'replace')) + self.state = 'AUTH' + return typ, dat + def plain_login(self, identity, password, authorization_identity=None): """Authenticate using the PLAIN method (requires server support). """ @@ -379,25 +473,38 @@ def plain_login(self, identity, password, authorization_identity=None): authorization_identity = "" auth_string = '%s\0%s\0%s' % (authorization_identity, identity, password) try: - return self._command_and_check('authenticate', 'PLAIN', lambda _: auth_string, unpack=True) + return self._subcommand_and_check(self._cmd_authenticate, 'PLAIN', lambda _: auth_string, unpack=True) except exceptions.IMAPClientError as e: raise exceptions.LoginError(str(e)) def logout(self): """Logout, returning the server response. """ - typ, data = self._imap.logout() - self._check_resp('BYE', 'logout', typ, data) + typ, data = self._cmd_logout() + _check_resp('BYE', 'logout', typ, data) logger.info('Logged out, connection closed') return data[0] + def _cmd_logout(self): + """Shutdown connection to server. + + (typ, [data]) = .logout() + + Returns server 'BYE' response. + """ + self.check_state('NONAUTH','AUTH','SELECTED','LOGOUT') + self.state = 'LOGOUT' + typ, dat = self._simple_command('LOGOUT') + self.shutdown() + return typ, dat + def shutdown(self): """Close the connection to the IMAP server (without logging out) In most cases, :py:meth:`.logout` should be used instead of this. The logout method also shutdown down the connection. """ - self._imap.shutdown() + self._conn.shutdown() logger.info('Connection closed') @require_capability('ENABLE') @@ -417,9 +524,9 @@ def enable(self, *capabilities): See :rfc:`5161` for more details. """ - if self._imap.state != 'AUTH': + if self.state != 'AUTH': raise exceptions.IllegalStateError( - 'ENABLE command illegal in state %s' % self._imap.state + 'ENABLE command illegal in state %s' % self.state ) resp = self._raw_command_untagged( @@ -449,9 +556,11 @@ def id_(self, parameters=None): _quote(v) for v in itertools.chain.from_iterable(parameters.items())) - typ, data = self._imap._simple_command('ID', args) + # RFC2971 says all states but workaround FastMail bug + self.check_state('NONAUTH', 'AUTH', 'SELECTED') + typ, data = self._simple_command('ID', args) self._checkok('id', typ, data) - typ, data = self._imap._untagged_response(typ, data, 'ID') + typ, data = self._untagged_response(typ, data, 'ID') return parse_response(data) def capabilities(self): @@ -468,7 +577,7 @@ def capabilities(self): """ # Ensure cached capabilities aren't used post-STARTTLS. As per # https://tools.ietf.org/html/rfc2595#section-3.1 - if self._starttls_done and self._imap.state == 'NONAUTH': + if self._starttls_done and self.state == 'NONAUTH': self._cached_capabilities = None return self._do_capabilites() @@ -478,23 +587,22 @@ def capabilities(self): # If the server returned an untagged CAPABILITY response # (during authentication), cache it and return that. - untagged = _dict_bytes_normaliser(self._imap.untagged_responses) + untagged = _dict_bytes_normaliser(self._untagged_responses) response = untagged.pop('CAPABILITY', None) if response: self._cached_capabilities = self._normalise_capabilites(response[0]) return self._cached_capabilities # If authenticated, but don't have a capability response, ask for one - if self._imap.state in ('SELECTED', 'AUTH'): + if self.state in ('SELECTED', 'AUTH'): self._cached_capabilities = self._do_capabilites() return self._cached_capabilities - # Return capabilities that imaplib requested at connection - # time (pre-auth) - return tuple(to_bytes(c) for c in self._imap.capabilities) + # Return capabilities fetched at connection time + return tuple(to_bytes(c) for c in self._preauth_capabilities) def _do_capabilites(self): - raw_response = self._command_and_check('capability', unpack=True) + raw_response = self._subcommand_and_check(self._cmd_capability, unpack=True) return self._normalise_capabilites(raw_response) def _normalise_capabilites(self, raw_response): @@ -525,7 +633,7 @@ def namespace(self): See :rfc:`2342` for more details. """ - data = self._command_and_check('namespace') + data = self._subcommand_and_check(self._cmd_namespace) parts = [] for item in parse_response(data): if item is None: @@ -539,6 +647,16 @@ def namespace(self): parts.append(tuple(converted)) return Namespace(*parts) + def _cmd_namespace(self): + """ Returns IMAP namespaces ala rfc2342 + + (typ, [data, ...]) = .namespace() + """ + self.check_state('AUTH','SELECTED') + name = 'NAMESPACE' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, dat, name) + def list_folders(self, directory="", pattern="*"): """Get a listing of folders on the server as a list of ``(flags, delimiter, name)`` tuples. @@ -560,6 +678,7 @@ def list_folders(self, directory="", pattern="*"): decoded from modified UTF-7, except if folder_decode is not set. """ + self.check_state('AUTH', 'SELECTED') return self._do_list('LIST', directory, pattern) @require_capability('XLIST') @@ -592,6 +711,7 @@ def xlist_folders(self, directory="", pattern="*"): The *directory* and *pattern* arguments are as per list_folders(). """ + self.check_state('NONAUTH', 'AUTH', 'SELECTED') return self._do_list('XLIST', directory, pattern) def list_sub_folders(self, directory="", pattern="*"): @@ -601,14 +721,15 @@ def list_sub_folders(self, directory="", pattern="*"): The default behaviour will list all subscribed folders. The *directory* and *pattern* arguments are as per list_folders(). """ + self.check_state('AUTH','SELECTED') return self._do_list('LSUB', directory, pattern) def _do_list(self, cmd, directory, pattern): directory = self._normalise_folder(directory) pattern = self._normalise_folder(pattern) - typ, dat = self._imap._simple_command(cmd, directory, pattern) + typ, dat = self._simple_command(cmd, directory, pattern) self._checkok(cmd, typ, dat) - typ, dat = self._imap._untagged_response(typ, dat, cmd) + typ, dat = self._untagged_response(typ, dat, cmd) return self._proc_folder_list(dat) def _proc_folder_list(self, folder_data): @@ -684,8 +805,39 @@ def select_folder(self, folder, readonly=False): b'UIDNEXT': 11, b'UIDVALIDITY': 1239278212} """ - self._command_and_check('select', self._normalise_folder(folder), readonly) - return self._process_select_response(self._imap.untagged_responses) + self._subcommand_and_check(self._cmd_select, self._normalise_folder(folder), readonly) + return self._process_select_response(self._untagged_responses) + + def _cmd_select(self, mailbox='INBOX', readonly=False): + """Select a mailbox. + + Flush all untagged responses. + + (typ, [data]) = .select(mailbox='INBOX', readonly=False) + + 'data' is count of messages in mailbox ('EXISTS' response). + + Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY'), so + other responses should be obtained via .response('FLAGS') etc. + """ + self.check_state('AUTH','SELECTED') + self._untagged_responses = {} # Flush old responses. + self.is_readonly = readonly + if readonly: + name = 'EXAMINE' + else: + name = 'SELECT' + typ, dat = self._simple_command(name, mailbox) + if typ != 'OK': + self.state = 'AUTH' # Might have been 'SELECTED' + return typ, dat + self.state = 'SELECTED' + if 'READ-ONLY' in self._untagged_responses \ + and not readonly: + if __debug__: + self._dump_ur(self._untagged_responses) + raise exceptions.IMAPClientReadOnlyError('%s is not writable' % mailbox) + return typ, self._untagged_responses.get('EXISTS', [None]) @require_capability('UNSELECT') def unselect_folder(self): @@ -697,8 +849,8 @@ def unselect_folder(self): Returns the UNSELECT response string returned by the server. """ logger.debug('< UNSELECT') - # IMAP4 class has no `unselect` method so we can't use `_command_and_check` there - _typ, data = self._imap._simple_command("UNSELECT") + self.check_state('AUTH', 'SELECTED') # RFC3691 does not specify any state + _typ, data = self._simple_command("UNSELECT") return data[0] def _process_select_response(self, resp): @@ -743,7 +895,7 @@ def noop(self): (6, b'FETCH', (b'FLAGS', (b'sne',)))]) """ - tag = self._imap._command('NOOP') + tag = self._command('NOOP') return self._consume_until_tagged_response(tag, 'NOOP') @require_capability('IDLE') @@ -762,8 +914,9 @@ def idle(self): See :rfc:`2177` for more information about the IDLE extension. """ - self._idle_tag = self._imap._command('IDLE') - resp = self._imap._get_response() + self.check_state('NONAUTH', 'AUTH', 'SELECTED') + self._idle_tag = self._command('IDLE') + resp = self._get_response() if resp is not None: raise exceptions.IMAPClientError('Unexpected IDLE response: %s' % resp) @@ -806,7 +959,7 @@ def idle_check(self, timeout=None): (1, b'EXISTS'), (1, b'FETCH', (b'FLAGS', (b'\\NotJunk',)))] """ - sock = self._sock + sock = self._conn.sock # make the socket non-blocking so the timeout can be # implemented for this call @@ -824,7 +977,7 @@ def idle_check(self, timeout=None): if events: while True: try: - line = self._imap._get_line() + line = self._conn.get_line() except (socket.timeout, socket.error): break except IMAPClient.AbortError: @@ -858,7 +1011,7 @@ def idle_done(self): ``idle_check()``. """ logger.debug('< DONE') - self._imap.send(b'DONE\r\n') + self._conn.send(b'DONE\r\n') return self._consume_until_tagged_response(self._idle_tag, 'IDLE') def folder_status(self, folder, what=None): @@ -878,34 +1031,85 @@ def folder_status(self, folder, what=None): what_ = '(%s)' % (' '.join(what)) fname = self._normalise_folder(folder) - data = self._command_and_check('status', fname, what_) + data = self._subcommand_and_check(self._cmd_status, fname, what_) response = parse_response(data) status_items = response[-1] return dict(as_pairs(status_items)) + def _cmd_status(self, mailbox, names): + """Request named status conditions for mailbox. + + (typ, [data]) = .status(mailbox, names) + """ + self.check_state('AUTH', 'SELECTED') + name = 'STATUS' + # if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! + # raise exceptions.IMAPClientError('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) + typ, dat = self._simple_command(name, mailbox, names) + return self._untagged_response(typ, dat, name) + def close_folder(self): """Close the currently selected folder, returning the server response string. """ - return self._command_and_check('close', unpack=True) + return self._subcommand_and_check(self._cmd_close, unpack=True) + + def _cmd_close(self): + """Close currently selected mailbox. + + Deleted messages are removed from writable mailbox. + This is the recommended command before 'LOGOUT'. + + (typ, [data]) = .close() + """ + self.check_state('SELECTED') + try: + typ, dat = self._simple_command('CLOSE') + finally: + self.state = 'AUTH' + return typ, dat def create_folder(self, folder): """Create *folder* on the server returning the server response string. """ - return self._command_and_check('create', self._normalise_folder(folder), unpack=True) + return self._subcommand_and_check(self._cmd_create, self._normalise_folder(folder), unpack=True) + + def _cmd_create(self, mailbox): + """Create new mailbox. + + (typ, [data]) = .create(mailbox) + """ + self.check_state('AUTH', 'SELECTED') + return self._simple_command('CREATE', mailbox) def rename_folder(self, old_name, new_name): """Change the name of a folder on the server. """ - return self._command_and_check('rename', + return self._subcommand_and_check(self._cmd_rename, self._normalise_folder(old_name), self._normalise_folder(new_name), unpack=True) + def _cmd_rename(self, oldmailbox, newmailbox): + """Rename old mailbox name to new. + + (typ, [data]) = .rename(oldmailbox, newmailbox) + """ + self.check_state('AUTH', 'SELECTED') + return self._simple_command('RENAME', oldmailbox, newmailbox) + def delete_folder(self, folder): """Delete *folder* on the server returning the server response string. """ - return self._command_and_check('delete', self._normalise_folder(folder), unpack=True) + return self._subcommand_and_check(self._cmd_delete, self._normalise_folder(folder), unpack=True) + + def _cmd_delete(self, mailbox): + """Delete old mailbox. + + (typ, [data]) = .delete(mailbox) + """ + self.check_state('AUTH', 'SELECTED') + return self._simple_command('DELETE', mailbox) def folder_exists(self, folder): """Return ``True`` if *folder* exists on the server. @@ -915,12 +1119,28 @@ def folder_exists(self, folder): def subscribe_folder(self, folder): """Subscribe to *folder*, returning the server response string. """ - return self._command_and_check('subscribe', self._normalise_folder(folder)) + return self._subcommand_and_check(self._cmd_subscribe, self._normalise_folder(folder)) + + def _cmd_subscribe(self, mailbox): + """Subscribe to new mailbox. + + (typ, [data]) = .subscribe(mailbox) + """ + self.check_state('AUTH', 'SELECTED') + return self._simple_command('SUBSCRIBE', mailbox) def unsubscribe_folder(self, folder): """Unsubscribe to *folder*, returning the server response string. """ - return self._command_and_check('unsubscribe', self._normalise_folder(folder)) + return self._subcommand_and_check(self._cmd_unsubscribe, self._normalise_folder(folder)) + + def _cmd_unsubscribe(self, mailbox): + """Unsubscribe from old mailbox. + + (typ, [data]) = .unsubscribe(mailbox) + """ + self.check_state('AUTH', 'SELECTED') + return self._simple_command('UNSUBSCRIBE', mailbox) def search(self, criteria='ALL', charset=None): """Return a list of messages ids from the currently selected @@ -1002,6 +1222,7 @@ def gmail_search(self, query, charset='UTF-8'): return self._search([b'X-GM-RAW', query], charset) def _search(self, criteria, charset): + self.check_state('SELECTED') args = [] if charset: args.extend([b'CHARSET', to_bytes(charset)]) @@ -1009,7 +1230,7 @@ def _search(self, criteria, charset): try: data = self._raw_command_untagged(b'SEARCH', args) - except imaplib.IMAP4.error as e: + except exceptions.IMAPClientError as e: # Make BAD IMAP responses easier to understand to the user, with a link to the docs m = re.match(r'SEARCH command error: BAD \[(.+)\]', str(e)) if m: @@ -1019,7 +1240,7 @@ def _search(self, criteria, charset): '{criteria}\nPlease refer to the documentation for more information ' 'about search criteria syntax..\n' 'https://imapclient.readthedocs.io/en/master/#imapclient.IMAPClient.search' - .format( + .format( original_msg=m.group(1), criteria='"%s"' % criteria if not isinstance(criteria, list) else criteria ) @@ -1053,6 +1274,7 @@ def sort(self, sort_criteria, criteria='ALL', charset='UTF-8'): Note that SORT is an extension to the IMAP4 standard so it may not be supported by all IMAP servers. """ + self.check_state('SELECTED') args = [ _normalise_sort_criteria(sort_criteria), to_bytes(charset), @@ -1078,6 +1300,7 @@ def thread(self, algorithm='REFERENCES', criteria='ALL', charset='UTF-8'): See :rfc:`5256` for more details. """ + self.check_state('SELECTED') algorithm = to_bytes(algorithm) if not self.has_capability(b'THREAD=' + algorithm): raise exceptions.CapabilityError( @@ -1085,7 +1308,7 @@ def thread(self, algorithm='REFERENCES', criteria='ALL', charset='UTF-8'): ) args = [algorithm, to_bytes(charset)] + \ - _normalise_search_criteria(criteria, charset) + _normalise_search_criteria(criteria, charset) data = self._raw_command_untagged(b'THREAD', args) return parse_response(data) @@ -1239,6 +1462,7 @@ def fetch(self, messages, data, modifiers=None): b'SEQ': 110}} """ + self.check_state('SELECTED') if not messages: return {} @@ -1250,10 +1474,10 @@ def fetch(self, messages, data, modifiers=None): ] if self.use_uid: args.insert(0, 'UID') - tag = self._imap._command(*args) - typ, data = self._imap._command_complete('FETCH', tag) + tag = self._command(*args) + typ, data = self._command_complete('FETCH', tag) self._checkok('fetch', typ, data) - typ, data = self._imap._untagged_response(typ, data, 'FETCH') + typ, data = self._untagged_response(typ, data, 'FETCH') return parse_fetch_response(data, self.normalise_times, self.use_uid) def append(self, folder, msg, flags=(), msg_time=None): @@ -1281,13 +1505,37 @@ def append(self, folder, msg, flags=(), msg_time=None): time_val = to_bytes(time_val) else: time_val = None - return self._command_and_check('append', + return self._subcommand_and_check(self._cmd_append, self._normalise_folder(folder), seq_to_parenstr(flags), time_val, to_bytes(msg), unpack=True) + def _cmd_append(self, mailbox, flags, date_time, message): + """Append message to named mailbox. + + (typ, [data]) = .append(mailbox, flags, date_time, message) + + All args except `message' can be None. + """ + self.check_state('AUTH', 'SELECTED') + name = 'APPEND' + if not mailbox: + mailbox = 'INBOX' + if flags: + if (flags[0], flags[-1]) != ('(', ')'): + flags = '(%s)' % flags + else: + flags = None + if date_time: + date_time = util.Time2Internaldate(date_time) + else: + date_time = None + literal = MapCRLF.sub(CRLF, message) + self._literal = literal + return self._simple_command(name, mailbox, flags, date_time) + @require_capability('MULTIAPPEND') def multiappend(self, folder, msgs): """Append messages to *folder* using the MULTIAPPEND feature from :rfc:`3502`. @@ -1310,10 +1558,11 @@ def copy(self, messages, folder): *folder*. Returns the COPY response string returned by the server. """ - return self._command_and_check('copy', - join_message_ids(messages), - self._normalise_folder(folder), - uid=True, unpack=True) + self.check_state('SELECTED') + return self._uid_command_and_check('COPY', + join_message_ids(messages), + self._normalise_folder(folder), + unpack=True) @require_capability('MOVE') def move(self, messages, folder): @@ -1324,10 +1573,11 @@ def move(self, messages, folder): :param messages: List of message UIDs to move. :param folder: The destination folder name. """ - return self._command_and_check('move', - join_message_ids(messages), - self._normalise_folder(folder), - uid=True, unpack=True) + self.check_state('AUTH','SELECTED') # RFC6851 + return self._uid_command_and_check('MOVE', + join_message_ids(messages), + self._normalise_folder(folder), + unpack=True) def expunge(self, messages=None): """When, no *messages* are specified, remove all messages @@ -1359,11 +1609,12 @@ def expunge(self, messages=None): See :rfc:`4315#section-2.1` section 2.1 for more details. """ + self.check_state('SELECTED') if messages: if not self.use_uid: raise ValueError('cannot EXPUNGE by ID when not using uids') - return self._command_and_check('EXPUNGE', join_message_ids(messages), uid=True) - tag = self._imap._command('EXPUNGE') + return self._uid_command_and_check('EXPUNGE', join_message_ids(messages)) + tag = self._command('EXPUNGE') return self._consume_until_tagged_response(tag, 'EXPUNGE') @require_capability('ACL') @@ -1371,11 +1622,20 @@ def getacl(self, folder): """Returns a list of ``(who, acl)`` tuples describing the access controls for *folder*. """ - data = self._command_and_check('getacl', self._normalise_folder(folder)) + data = self._subcommand_and_check(self._cmd_getacl, self._normalise_folder(folder)) parts = list(response_lexer.TokenSource(data)) - parts = parts[1:] # First item is folder name + parts = parts[1:] # First item is folder name return [(parts[i], parts[i + 1]) for i in xrange(0, len(parts), 2)] + def _cmd_getacl(self, mailbox): + """Get the ACLs for a mailbox. + + (typ, [data]) = .getacl(mailbox) + """ + self.check_state('AUTH', 'SELECTED') + typ, dat = self._simple_command('GETACL', mailbox) + return self._untagged_response(typ, dat, 'ACL') + @require_capability('ACL') def setacl(self, folder, who, what): """Set an ACL (*what*) for user (*who*) for a folder. @@ -1383,11 +1643,19 @@ def setacl(self, folder, who, what): Set *what* to an empty string to remove an ACL. Returns the server response string. """ - return self._command_and_check('setacl', + return self._subcommand_and_check(self._cmd_setacl, self._normalise_folder(folder), who, what, unpack=True) + def _cmd_setacl(self, mailbox, who, what): + """Set a mailbox acl. + + (typ, [data]) = .setacl(mailbox, who, what) + """ + self.check_state('AUTH', 'SELECTED') + return self._simple_command('SETACL', mailbox, who, what) + @require_capability('QUOTA') def get_quota(self, mailbox="INBOX"): """Get the quotas associated with a mailbox. @@ -1407,9 +1675,20 @@ def _get_quota(self, quota_root=""): Returns a list of Quota objects. """ return _parse_quota( - self._command_and_check('getquota', _quote(quota_root)) + self._subcommand_and_check(self._cmd_getquota, _quote(quota_root)) ) + def _cmd_getquota(self, root): + """Get the quota root's resource usage and limits. + + Part of the IMAP4 QUOTA extension defined in rfc2087. + + (typ, [data]) = .getquota(root) + """ + self.check_state('AUTH', 'SELECTED') + typ, dat = self._simple_command('GETQUOTA', root) + return self._untagged_response(typ, dat, 'QUOTA') + @require_capability('QUOTA') def get_quota_root(self, mailbox): """Get the quota roots for a mailbox. @@ -1421,11 +1700,12 @@ def get_quota_root(self, mailbox): Return a tuple of MailboxQuotaRoots and list of Quota associated """ + self.check_state('AUTH','SELECTED') quota_root_rep = self._raw_command_untagged( b'GETQUOTAROOT', to_bytes(mailbox), uid=False, response_name='QUOTAROOT' ) - quota_rep = pop_with_default(self._imap.untagged_responses, 'QUOTA', []) + quota_rep = pop_with_default(self._untagged_responses, 'QUOTA', []) quota_root_rep = parse_response(quota_root_rep) quota_root = MailboxQuotaRoots( to_unicode(quota_root_rep[0]), @@ -1439,6 +1719,7 @@ def set_quota(self, quotas): :param quotas: list of Quota objects """ + self.check_state('AUTH','SELECTED') if not quotas: return @@ -1466,19 +1747,275 @@ def set_quota(self, quotas): ) return _parse_quota(response) - def _check_resp(self, expected, command, typ, data): - """Check command responses for errors. + # Internal methods + + def _append_untagged(self, typ, dat): + if dat is None: + dat = b'' + ur = self._untagged_responses + if __debug__: + imaplib_logger.debug('untagged_responses[%s] %s += ["%r"]' % + (typ, len(ur.get(typ, '')), dat)) + if typ in ur: + ur[typ].append(dat) + else: + ur[typ] = [dat] - Raises IMAPClient.Error if the command fails. - """ - if typ != expected: - raise exceptions.IMAPClientError("%s failed: %s" % (command, to_unicode(data[0]))) + def _check_bye(self): + bye = self._untagged_responses.get('BYE') + if bye: + raise exceptions.IMAPClientAbortError(bye[-1].decode(self._encoding, 'replace')) + + def check_state(self, *states): + if self.state in states: + return + + self._literal = None + raise exceptions.IMAPClientError("command illegal in state %s, " + "only allowed in states %s" % + (self.state, ', '.join(states))) + + def _command(self, name, *args): + for typ in ('OK', 'NO', 'BAD'): + if typ in self._untagged_responses: + del self._untagged_responses[typ] + + if 'READ-ONLY' in self._untagged_responses \ + and not self.is_readonly: + raise exceptions.IMAPClientReadOnlyError('mailbox status changed to READ-ONLY') + + tag = self._new_tag() + name = bytes(name, self._encoding) + data = tag + b' ' + name + for arg in args: + if arg is None: continue + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + data = data + b' ' + arg + + literal = self._literal + if literal is not None: + self._literal = None + if type(literal) is type(self._command): + literator = literal + else: + literator = None + data = data + bytes(' {%s}' % len(literal), self._encoding) + + if __debug__: + msg = util.redact_password('> %r' % data) + imaplib_logger.debug(msg) + + try: + self._conn.send(data + CRLF) + except OSError as val: + raise exceptions.IMAPClientAbortError('socket error: %s' % val) + + if literal is None: + return tag + + while 1: + # Wait for continuation response + + while self._get_response(): + if self._tagged_commands[tag]: # BAD/NO? + return tag + + # Send literal + + if literator: + literal = literator(self._continuation_response) + + if __debug__: + imaplib_logger.debug('write literal size %s' % len(literal)) + + try: + self._conn.send(literal) + self._conn.send(CRLF) + except OSError as val: + raise exceptions.IMAPClientAbortError('socket error: %s' % val) + + if not literator: + break + + return tag + + def _command_complete(self, name, tag): + logout = (name == 'LOGOUT') + # BYE is expected after LOGOUT + if not logout: + self._check_bye() + try: + typ, data = self._get_tagged_response(tag, expect_bye=logout) + except exceptions.IMAPClientAbortError as val: + raise exceptions.IMAPClientAbortError('command: %s => %s' % (name, val)) + except exceptions.IMAPClientError as val: + raise exceptions.IMAPClientError('command: %s => %s' % (name, val)) + if not logout: + self._check_bye() + if typ == 'BAD': + raise exceptions.IMAPClientError('%s command error: %s %s' % (name, typ, data)) + return typ, data + + def _get_preauth_capabilities(self): + typ, dat = self._cmd_capability() + if dat == [None]: + raise exceptions.IMAPClientError('no CAPABILITY response from server') + dat = str(dat[-1], self._encoding) + dat = dat.upper() + return tuple(dat.split()) + + def _cmd_capability(self): + """(typ, [data]) = .capability() + Fetch capabilities list from server.""" + name = 'CAPABILITY' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, dat, name) + + def _get_response(self): + # Read response and store. + # + # Returns None for continuation responses, + # otherwise first response line received. + + resp = self._conn.get_line() + + # Command completion response? + + if self._match(self.tagre, resp): + tag = self.mo.group('tag') + if not tag in self._tagged_commands: + raise exceptions.IMAPClientAbortError('unexpected tagged response: %r' % resp) + + typ = self.mo.group('type') + typ = str(typ, self._encoding) + dat = self.mo.group('data') + self._tagged_commands[tag] = (typ, [dat]) + else: + dat2 = None + + # '*' (untagged) responses? + + if not self._match(Untagged_response, resp): + if self._match(Untagged_status, resp): + dat2 = self.mo.group('data2') + + if self.mo is None: + # Only other possibility is '+' (continuation) response... + + if self._match(Continuation, resp): + self._continuation_response = self.mo.group('data') + return None # NB: indicates continuation + + raise exceptions.IMAPClientAbortError("unexpected response: %r" % resp) + + typ = self.mo.group('type') + typ = str(typ, self._encoding) + dat = self.mo.group('data') + if dat is None: dat = b'' # Null untagged response + if dat2: dat = dat + b' ' + dat2 + + # Is there a literal to come? + + while self._match(Literal, dat): + + # Read literal direct from connection. + + size = int(self.mo.group('size')) + if __debug__: + imaplib_logger.debug('read literal size %s' % size) + data = self._conn.read(size) + + # Store response with literal as tuple + + self._append_untagged(typ, (dat, data)) + + # Read trailer - possibly containing another literal + + dat = self._conn.get_line() + + self._append_untagged(typ, dat) + + # Bracketed response information? + + if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat): + typ = self.mo.group('type') + typ = str(typ, self._encoding) + self._append_untagged(typ, self.mo.group('data')) + + if __debug__: + if typ in ('NO', 'BAD', 'BYE'): + imaplib_logger.debug('%s response: %r' % (typ, dat)) + + return resp + + def _get_tagged_response(self, tag, expect_bye=False): + + while 1: + result = self._tagged_commands[tag] + if result is not None: + del self._tagged_commands[tag] + return result + + if expect_bye: + typ = 'BYE' + bye = self._untagged_responses.pop(typ, None) + if bye is not None: + # Server replies to the "LOGOUT" command with "BYE" + return (typ, bye) + + # If we've seen a BYE at this point, the socket will be + # closed, so report the BYE now. + self._check_bye() + + # Some have reported "unexpected response" exceptions. + # Note that ignoring them here causes loops. + # Instead, send me details of the unexpected response and + # I'll update the code in `_get_response()'. + self._get_response() + + def _match(self, cre, s): + # Run compiled regular expression match method on 's'. + # Save result, return success. + + self.mo = cre.match(s) + if __debug__: + if self.mo is not None: + imaplib_logger.debug("\tmatched %r => %r" % (cre.pattern, self.mo.groups())) + return self.mo is not None + + def _new_tag(self): + tag = self.tagpre + bytes(str(self._tagnum), self._encoding) + self._tagnum = self._tagnum + 1 + self._tagged_commands[tag] = None + return tag + + def _simple_command(self, name, *args): + return self._command_complete(name, self._command(name, *args)) + + def _untagged_response(self, typ, dat, name): + if typ == 'NO': + return typ, dat + if not name in self._untagged_responses: + return typ, [None] + data = self._untagged_responses.pop(name) + if __debug__: + imaplib_logger.debug('untagged_responses[%s] => %s' % (name, data)) + return typ, data + + def _dump_ur(self, dict): + # Dump untagged responses (in `dict'). + l = dict.items() + if not l: return + t = '\n\t\t' + l = map(lambda x: '%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or ''), l) + imaplib_logger.debug('untagged responses dump:%s%s' % (t, t.join(l))) def _consume_until_tagged_response(self, tag, command): - tagged_commands = self._imap.tagged_commands + tagged_commands = self._tagged_commands resps = [] while True: - line = self._imap._get_response() + line = self._get_response() if tagged_commands[tag]: break resps.append(_parse_untagged_response(line)) @@ -1487,11 +2024,11 @@ def _consume_until_tagged_response(self, tag, command): return data[0], resps def _raw_command_untagged(self, command, args, response_name=None, unpack=False, uid=True): - # TODO: eventually this should replace _command_and_check (call it _command) + # TODO: eventually this should replace _subcommand_and_check (call it _command) typ, data = self._raw_command(command, args, uid=uid) if response_name is None: response_name = command - typ, data = self._imap._untagged_response(typ, data, to_unicode(response_name)) + typ, data = self._untagged_response(typ, data, to_unicode(response_name)) self._checkok(to_unicode(command), typ, data) if unpack: return data[0] @@ -1515,7 +2052,7 @@ def _raw_command(self, command, args, uid=True): if not isinstance(args, list): args = [args] - tag = self._imap._new_tag() + tag = self._new_tag() prefix = [to_bytes(tag)] if uid and self.use_uid: prefix.append(b'UID') @@ -1531,7 +2068,7 @@ def _raw_command(self, command, args, uid=True): if line: out = b' '.join(line) logger.debug('> %s', out) - self._imap.send(out) + self._conn.send(out) line = [] # Now send the (unquoted) literal @@ -1539,18 +2076,18 @@ def _raw_command(self, command, args, uid=True): item = item.original self._send_literal(tag, item) if not is_last: - self._imap.send(b' ') + self._conn.send(b' ') else: line.append(item) if line: out = b' '.join(line) logger.debug('> %s', out) - self._imap.send(out) + self._conn.send(out) - self._imap.send(b'\r\n') + self._conn.send(b'\r\n') - return self._imap._command_complete(to_unicode(command), tag) + return self._command_complete(to_unicode(command), tag) def _send_literal(self, tag, item): """Send a single literal for the command with *tag*. @@ -1558,47 +2095,66 @@ def _send_literal(self, tag, item): if b'LITERAL+' in self._cached_capabilities: out = b' {' + str(len(item)).encode('ascii') + b'+}\r\n' + item logger.debug('> %s', debug_trunc(out, 64)) - self._imap.send(out) + self._conn.send(out) return out = b' {' + str(len(item)).encode('ascii') + b'}\r\n' logger.debug('> %s', out) - self._imap.send(out) + self._conn.send(out) # Wait for continuation response - while self._imap._get_response(): - tagged_resp = self._imap.tagged_commands.get(tag) + while self._get_response(): + tagged_resp = self._tagged_commands.get(tag) if tagged_resp: raise exceptions.IMAPClientAbortError( "unexpected response while waiting for continuation response: " + repr(tagged_resp)) logger.debug(" (literal) > %s", debug_trunc(item, 256)) - self._imap.send(item) + self._conn.send(item) - def _command_and_check(self, command, *args, **kwargs): + def _uid_command_and_check(self, command, *args, **kwargs): + """Execute a simple command "command arg ...". + + If self.use_uid is True, identify messages by UID rather than message number. + + Returns response appropriate to 'command'. + """ unpack = pop_with_default(kwargs, 'unpack', False) - uid = pop_with_default(kwargs, 'uid', False) assert not kwargs, "unexpected keyword args: " + ', '.join(kwargs) - if uid and self.use_uid: - if PY3: - command = to_unicode(command) # imaplib must die - typ, data = self._imap.uid(command, *args) + if self.use_uid: + typ, data = self._simple_command('UID', command, *args) else: - meth = getattr(self._imap, to_unicode(command)) - typ, data = meth(*args) + typ, data = self._simple_command(command, *args) + + # I don't know why this is always fetch + typ, data = self._untagged_response(typ, data, 'FETCH') self._checkok(command, typ, data) if unpack: return data[0] return data + def _subcommand_and_check(self, meth, *args, **kwargs): + """Call another method and check the result code. + + Return the data from the command. If unpack=True, return data[0]. + """ + unpack = pop_with_default(kwargs, 'unpack', False) + assert not kwargs, "unexpected keyword args: " + ', '.join(kwargs) + + typ, data = meth(*args) + self._checkok(meth.__func__.__name__.upper(), typ, data) + if unpack: + return data[0] + return data + def _checkok(self, command, typ, data): - self._check_resp('OK', command, typ, data) + _check_resp('OK', command, typ, data) def _gm_label_store(self, cmd, messages, labels, silent): response = self._store(cmd, messages, self._normalise_labels(labels), - b'X-GM-LABELS', silent=silent) + b'X-GM-LABELS', silent=silent) return {msg: utf7_decode_sequence(labels) for msg, labels in iteritems(response)} if response else None @@ -1612,11 +2168,11 @@ def _store(self, cmd, messages, flags, fetch_key, silent): if silent: cmd += b".SILENT" - data = self._command_and_check('store', - join_message_ids(messages), - cmd, - seq_to_parenstr(flags), - uid=True) + self.check_state('SELECTED') + data = self._uid_command_and_check('STORE', + join_message_ids(messages), + cmd, + seq_to_parenstr(flags)) if silent: return None return self._filter_fetch_dict(parse_fetch_response(data), @@ -1638,14 +2194,118 @@ def _normalise_labels(self, labels): labels = (labels,) return [_quote(encode_utf7(l)) for l in labels] - @property - def welcome(self): - """access the server greeting message""" - try: - return self._imap.welcome - except AttributeError: - pass + # More stuff from imaplib + + def _cmd_recent(self): + """Return most recent 'RECENT' responses if any exist, + else prompt server for an update using the 'NOOP' command. + + (typ, [data]) = .recent() + + 'data' is None if no new messages, + else list of RECENT responses, most recent last. + """ + name = 'RECENT' + typ, dat = self._untagged_response('OK', [None], name) + if dat[-1]: + return typ, dat + typ, dat = self.noop() # Prod server for response + return self._untagged_response(typ, dat, name) + + def _cmd_partial(self, message_num, message_part, start, length): + """Fetch truncated part of a message. + + (typ, [data, ...]) = .partial(message_num, message_part, start, length) + + 'data' is tuple of message part envelope and data. + """ + self.check_state('SELECTED') # NB: obsolete + name = 'PARTIAL' + typ, dat = self._simple_command(name, message_num, message_part, start, length) + return self._untagged_response(typ, dat, 'FETCH') + + def _cmd_proxyauth(self, user): + """Assume authentication as "user". + + Allows an authorised administrator to proxy into any user's + mailbox. + + (typ, [data]) = .proxyauth(user) + """ + self.check_state('AUTH') + + name = 'PROXYAUTH' + return self._simple_command('PROXYAUTH', user) + + def _cmd_setannotation(self, *args): + """(typ, [data]) = .setannotation(mailbox[, entry, attribute]+) + Set ANNOTATIONs.""" + + typ, dat = self._simple_command('SETANNOTATION', *args) + return self._untagged_response(typ, dat, 'ANNOTATION') + + def _cmd_xatom(self, name, *args): + """Allow simple extension commands + notified by server in CAPABILITY response. + + Assumes command is legal in current state. + (typ, [data]) = .xatom(name, arg, ...) + + Returns response appropriate to extension command `name'. + """ + name = name.upper() + if not name in self._capabilities: + raise exceptions.IMAPClientError('unknown extension command: %s' % name) + return self._simple_command(name, *args) + + def _cmd_myrights(self, mailbox): + """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox). + + (typ, [data]) = .myrights(mailbox) + """ + self.check_state('AUTH','SELECTED') + typ,dat = self._simple_command('MYRIGHTS', mailbox) + return self._untagged_response(typ, dat, 'MYRIGHTS') + + def _cmd_login_cram_md5(self, user, password): + """ Force use of CRAM-MD5 authentication. + + (typ, [data]) = .login_cram_md5(user, password) + """ + self.user, self.password = user, password + return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH) + + def _CRAM_MD5_AUTH(self, challenge): + """ Authobject to use with CRAM-MD5 authentication. """ + import hmac + pwd = (self.password.encode('utf-8') if isinstance(self.password, str) + else self.password) + return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest() + + + def _cmd_check(self): + """Checkpoint mailbox on server. + + (typ, [data]) = .check() + """ + self.check_state('SELECTED') + return self._simple_command('CHECK') + + def _cmd_deleteacl(self, mailbox, who): + """Delete the ACLs (remove any rights) set for who on mailbox. + + (typ, [data]) = .deleteacl(mailbox, who) + """ + self.check_state('AUTH','SELECTED') + return self._simple_command('DELETEACL', mailbox, who) + + def _cmd_getannotation(self, mailbox, entry, attribute): + """(typ, [data]) = .getannotation(mailbox, entry, attribute) + Retrieve ANNOTATIONs.""" + + typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute) + return self._untagged_response(typ, dat, 'ANNOTATION') def _quote(arg): if isinstance(arg, text_type): @@ -1658,6 +2318,7 @@ def _quote(arg): q = b'"' return q + arg + q + def _normalise_search_criteria(criteria, charset=None): if not criteria: raise exceptions.InvalidCriteriaError('no criteria specified') @@ -1678,7 +2339,7 @@ def _normalise_search_criteria(criteria, charset=None): inner = _normalise_search_criteria(item) inner[0] = b'(' + inner[0] inner[-1] = inner[-1] + b')' - out.extend(inner) # flatten + out.extend(inner) # flatten else: out.append(_quoted.maybe(to_bytes(item, charset))) return out @@ -1689,10 +2350,12 @@ def _normalise_sort_criteria(criteria, charset=None): criteria = [criteria] return b'(' + b' '.join(to_bytes(item).upper() for item in criteria) + b')' + class _literal(bytes): """Hold message data that should always be sent as a literal.""" pass + class _quoted(binary_type): """ This class holds a quoted bytes value which provides access to the @@ -1744,6 +2407,7 @@ def _normalise_text_list(items): items = (items,) return (to_unicode(c) for c in items) + def join_message_ids(messages): """Convert a sequence of messages ids or a single integer message id into an id byte string for use with IMAP commands @@ -1855,7 +2519,7 @@ def debug_trunc(v, maxlen): if len(v) < maxlen: return repr(v) hl = maxlen // 2 - return repr(v[:hl]) + "..." + repr(v[-hl:]) + return repr(v[:hl]) + "..." + repr(v[-hl:]) def utf7_decode_sequence(seq): @@ -1874,17 +2538,3 @@ def _parse_quota(quota_rep): limit=quota_resource_info[2] )) return rv - - -class IMAPlibLoggerAdapter(LoggerAdapter): - """Adapter preventing IMAP secrets from going to the logging facility.""" - - def process(self, msg, kwargs): - for command in ("LOGIN", "AUTHENTICATE"): - if msg.startswith(">") and command in msg: - msg_start = msg.split(command)[0] - msg = "{}{} **REDACTED**".format(msg_start, command) - break - return super(IMAPlibLoggerAdapter, self).process( - msg, kwargs - ) diff --git a/imapclient/testable_imapclient.py b/imapclient/testable_imapclient.py index 64d64112..71ba851e 100644 --- a/imapclient/testable_imapclient.py +++ b/imapclient/testable_imapclient.py @@ -30,21 +30,25 @@ class TestableIMAPClient(IMAPClient): def __init__(self): super(TestableIMAPClient, self).__init__('somehost') - def _create_IMAP4(self): - return MockIMAP4() + def _new_tag(self): + return 'tag' + + def _connect(self): + pass + def _create_conn(self): + return MockConn() -class MockIMAP4(Mock): + +class MockConn(Mock): def __init__(self, *args, **kwargs): super(Mock, self).__init__(*args, **kwargs) - self.use_uid = True self.sent = b'' # Accumulates what was given to send() - self.tagged_commands = {} - self._starttls_done = False def send(self, data): + print(data) self.sent += data - def _new_tag(self): - return 'tag' + def get_line(self): + return b'* OK [CAPABILITY IMAP4rev1 SASL-IR LOGIN-REFERRALS ID ENABLE IDLE LITERAL+ STARTTLS AUTH=PLAIN] Dovecot ready.' \ No newline at end of file diff --git a/imapclient/tls.py b/imapclient/tls.py index be211e4c..c596fd95 100644 --- a/imapclient/tls.py +++ b/imapclient/tls.py @@ -7,7 +7,7 @@ Layer Security (TLS a.k.a. SSL). """ -import imaplib +from . import conn import socket import ssl @@ -32,7 +32,7 @@ def wrap_socket(sock, ssl_context, host): return ssl_context.wrap_socket(sock, server_hostname=host) -class IMAP4_TLS(imaplib.IMAP4): +class IMAP4_TLS(conn.IMAP4): """IMAP4 client class for TLS/SSL connections. Adapted from imaplib.IMAP4_SSL. @@ -41,7 +41,7 @@ class IMAP4_TLS(imaplib.IMAP4): def __init__(self, host, port, ssl_context, timeout): self.ssl_context = ssl_context self._timeout = timeout - imaplib.IMAP4.__init__(self, host, port) + conn.IMAP4.__init__(self, host, port) def open(self, host, port): self.host = host @@ -60,4 +60,4 @@ def send(self, data): self.sock.sendall(data) def shutdown(self): - imaplib.IMAP4.shutdown(self) + conn.IMAP4.shutdown(self) diff --git a/imapclient/util.py b/imapclient/util.py index b2410746..df3c9275 100644 --- a/imapclient/util.py +++ b/imapclient/util.py @@ -4,9 +4,12 @@ from __future__ import unicode_literals +import re import six import logging from six import binary_type, text_type +from datetime import datetime, timezone, timedelta +import binascii, time, calendar from . import exceptions @@ -43,3 +46,147 @@ def assert_imap_protocol(condition, message=None): def chunk(lst, size): for i in six.moves.range(0, len(lst), size): yield lst[i:i + size] + +def redact_password(msg): + """Preventing IMAP secrets from going to the logging facility.""" + for command in ("LOGIN", "AUTHENTICATE"): + if command in msg: + msg_start = msg.split(command)[0] + return "{}{} **REDACTED**".format(msg_start, command) + return msg + +class _Authenticator: + + """Private class to provide en/decoding + for base64-based authentication conversation. + """ + + def __init__(self, mechinst): + self.mech = mechinst # Callable object to provide/process data + + def process(self, data): + ret = self.mech(self.decode(data)) + if ret is None: + return b'*' # Abort conversation + return self.encode(ret) + + def encode(self, inp): + # + # Invoke binascii.b2a_base64 iteratively with + # short even length buffers, strip the trailing + # line feed from the result and append. "Even" + # means a number that factors to both 6 and 8, + # so when it gets to the end of the 8-bit input + # there's no partial 6-bit output. + # + oup = b'' + if isinstance(inp, str): + inp = inp.encode('utf-8') + while inp: + if len(inp) > 48: + t = inp[:48] + inp = inp[48:] + else: + t = inp + inp = b'' + e = binascii.b2a_base64(t) + if e: + oup = oup + e[:-1] + return oup + + def decode(self, inp): + if not inp: + return b'' + return binascii.a2b_base64(inp) + +Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ') +Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])} +InternalDate = re.compile(br'.*INTERNALDATE "' + br'(?P[ 0123][0-9])-(?P[A-Z][a-z][a-z])-(?P[0-9][0-9][0-9][0-9])' + br' (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])' + br' (?P[-+])(?P[0-9][0-9])(?P[0-9][0-9])' + br'"') + +def Internaldate2tuple(resp): + """Parse an IMAP4 INTERNALDATE string. + + Return corresponding local time. The return value is a + time.struct_time tuple or None if the string has wrong format. + """ + + mo = InternalDate.match(resp) + if not mo: + return None + + mon = Mon2num[mo.group('mon')] + zonen = mo.group('zonen') + + day = int(mo.group('day')) + year = int(mo.group('year')) + hour = int(mo.group('hour')) + min = int(mo.group('min')) + sec = int(mo.group('sec')) + zoneh = int(mo.group('zoneh')) + zonem = int(mo.group('zonem')) + + # INTERNALDATE timezone must be subtracted to get UT + + zone = (zoneh*60 + zonem)*60 + if zonen == b'-': + zone = -zone + + tt = (year, mon, day, hour, min, sec, -1, -1, -1) + utc = calendar.timegm(tt) - zone + + return time.localtime(utc) + +def Int2AP(num): + """Convert integer to A-P string representation.""" + + val = b''; AP = b'ABCDEFGHIJKLMNOP' + num = int(abs(num)) + while num: + num, mod = divmod(num, 16) + val = AP[mod:mod+1] + val + return val + + +def Time2Internaldate(date_time): + + """Convert date_time to IMAP4 INTERNALDATE representation. + + Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The + date_time argument can be a number (int or float) representing + seconds since epoch (as returned by time.time()), a 9-tuple + representing local time, an instance of time.struct_time (as + returned by time.localtime()), an aware datetime instance or a + double-quoted string. In the last case, it is assumed to already + be in the correct format. + """ + if isinstance(date_time, (int, float)): + dt = datetime.fromtimestamp(date_time, + timezone.utc).astimezone() + elif isinstance(date_time, tuple): + try: + gmtoff = date_time.tm_gmtoff + except AttributeError: + if time.daylight: + dst = date_time[8] + if dst == -1: + dst = time.localtime(time.mktime(date_time))[8] + gmtoff = -(time.timezone, time.altzone)[dst] + else: + gmtoff = -time.timezone + delta = timedelta(seconds=gmtoff) + dt = datetime(*date_time[:6], tzinfo=timezone(delta)) + elif isinstance(date_time, datetime): + if date_time.tzinfo is None: + raise ValueError("date_time must be aware") + dt = date_time + elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'): + return date_time # Assume in correct format + else: + raise ValueError("date_time not of a known type") + fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month]) + return dt.strftime(fmt) + diff --git a/livetest.py b/livetest.py index b2576386..1543ad28 100644 --- a/livetest.py +++ b/livetest.py @@ -16,6 +16,8 @@ import string import sys import time +import logging + from datetime import datetime from email.utils import make_msgid @@ -33,6 +35,7 @@ # TODO cleaner verbose output: avoid "__main__" and separator between classes +logger = logging.getLogger('livetest') SIMPLE_MESSAGE = 'Subject: something\r\n\r\nFoo\r\n' @@ -82,6 +85,7 @@ class _TestBase(unittest.TestCase): @classmethod def setUpClass(cls): + logger.debug('-- setupClass ') client = create_client_from_config(cls.conf) cls.client = client client.use_uid = cls.use_uid @@ -91,16 +95,21 @@ def setUpClass(cls): cls.condstore_enabled = True cls.base_folder = cls.conf.namespace[0] + '__imapclient' cls.folder_delimiter = cls.conf.namespace[1] + logger.debug('-- /setupClass') def setUp(self): + logger.debug('-- setup ') self.clear_test_folders() self.unsub_all_test_folders() self.client.create_folder(self.base_folder) self.client.select_folder(self.base_folder) + logger.debug('-- /setup ') def tearDown(self): + logger.debug('-- tearDown ') self.clear_test_folders() self.unsub_all_test_folders() + logger.debug('-- /tearDown ') @classmethod def tearDownClass(cls): @@ -176,11 +185,11 @@ def unsub_all_test_folders(self): self.client.unsubscribe_folder(folder) def is_gmail(self): - return self.client._imap.host == 'imap.gmail.com' + return self.client.host == 'imap.gmail.com' def is_fastmail(self): - return (self.client._imap.host == 'mail.messagingengine.com' or - self.client._imap.host == 'imap.fastmail.com') + return (self.client.host == 'mail.messagingengine.com' or + self.client.host == 'imap.fastmail.com') def is_exchange(self): # Assume that these capabilities mean we're talking to MS @@ -272,12 +281,12 @@ def test_select_and_close(self): def test_select_read_only(self): self.append_msg(SIMPLE_MESSAGE) - untagged = _dict_bytes_normaliser(self.client._imap.untagged_responses) + untagged = _dict_bytes_normaliser(self.client._untagged_responses) self.assertNotIn(b'READ-ONLY', untagged) resp = self.client.select_folder(self.base_folder, readonly=True) - untagged = _dict_bytes_normaliser(self.client._imap.untagged_responses) + untagged = _dict_bytes_normaliser(self.client._untagged_responses) self.assertIn(b'READ-ONLY', untagged) self.assertEqual(resp[b'EXISTS'], 1) self.assertIsInstance(resp[b'RECENT'], int) @@ -541,7 +550,7 @@ def tearDown(self): quiet_logout(self.client) def test_small_connection_timeout_fail(self): - self.conf.timeout = SocketTimeout(connect=0.001, read=10) + self.conf.timeout = SocketTimeout(connect=0.00000001, read=10) with self.assertRaises(socket.timeout): self.client = create_client_from_config(self.conf) @@ -551,7 +560,7 @@ def test_small_read_timeout_fail(self): test pass, we don't login once connected but simply try a 'noop', that should not be able to complete in under a such a small time. """ - self.conf.timeout = SocketTimeout(connect=30, read=0.00001) + self.conf.timeout = SocketTimeout(connect=30, read=0.00000001) self.client = create_client_from_config(self.conf, login=False) with self.assertRaises(socket.timeout): self.client.noop() @@ -1046,6 +1055,11 @@ def argv_error(msg): def parse_argv(): + if '-v' in sys.argv: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.ERROR) + args = sys.argv[1:] if not args: argv_error('Please specify a host configuration file. See livetest-sample.ini for an example.') diff --git a/livetest.sh b/livetest.sh new file mode 100755 index 00000000..c109ab40 --- /dev/null +++ b/livetest.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +podman run --name=dovecot --rm -d -p 14301:143 dovecot/dovecot + +cat >/tmp/livetest.ini < b'ICHH1 LOGIN foo@bar.org "secret"'""") - if sys.version_info >= (3, 6, 4): - # LoggerAdapter in Python 3.6.4+ calls logger.log() - logger_mock.log.assert_called_once_with( - logging.INFO, - "> b'ICHH1 LOGIN **REDACTED**", - extra={} - ) - else: - # LoggerAdapter in Python 3.4 to 3.6 calls logger._log() - logger_mock._log.assert_called_once_with( - logging.INFO, - "> b'ICHH1 LOGIN **REDACTED**", - (), - extra={} - ) - else: - # LoggerAdapter in Python 2.7 calls logger.info() - adapter.info('> ICHH1 LOGIN foo@bar.org "secret"') - logger_mock.info.assert_called_once_with( - "> ICHH1 LOGIN **REDACTED**", - extra={} - ) + self.assertEqual("> b'ICHH1 LOGIN **REDACTED**", redact_password("""> b'ICHH1 LOGIN foo@bar.org "secret"'""")) class TestTimeNormalisation(IMAPClientTest): + initial_state = 'SELECTED' def test_default(self): self.assertTrue(self.client.normalise_times) @patch('imapclient.imapclient.parse_fetch_response') def test_pass_through(self, parse_fetch_response): - self.client._imap._command_complete.return_value = ('OK', sentinel.data) - self.client._imap._untagged_response.return_value = ('OK', sentinel.fetch_data) + self.client._command_complete.return_value = ('OK', sentinel.data) + self.client._untagged_response.return_value = ('OK', sentinel.fetch_data) self.client.use_uid = sentinel.use_uid def check(expected): @@ -667,7 +642,8 @@ def setUp(self): self.client._cached_capabilities = [b'NAMESPACE'] def set_return(self, value): - self.client._imap.namespace.return_value = ('OK', [value]) + self.patch_method('_cmd_namespace') + self.client._cmd_namespace.return_value = ('OK', [value]) def test_simple(self): self.set_return(b'(("FOO." "/")) NIL NIL') @@ -699,52 +675,56 @@ def test_complex(self): class TestCapabilities(IMAPClientTest): + def setUp(self): + IMAPClientTest.setUp(self) + self.patch_method('_cmd_capability') + def test_preauth(self): - self.client._imap.capabilities = ('FOO', 'BAR') - self.client._imap.untagged_responses = {} + self.client._preauth_capabilities = ('FOO', 'BAR') + self.client._untagged_responses = {} self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR')) def test_server_returned_capability_after_auth(self): - self.client._imap.capabilities = (b'FOO',) - self.client._imap.untagged_responses = {'CAPABILITY': [b'FOO MORE']} + self.client._preauth_capabilities = (b'FOO',) + self.client._untagged_responses = {'CAPABILITY': [b'FOO MORE']} self.assertEqual(self.client._cached_capabilities, None) self.assertEqual(self.client.capabilities(), (b'FOO', b'MORE')) self.assertEqual(self.client._cached_capabilities, (b'FOO', b'MORE')) - self.assertEqual(self.client._imap.untagged_responses, {}) + self.assertEqual(self.client._untagged_responses, {}) def test_caching(self): - self.client._imap.capabilities = ('FOO',) - self.client._imap.untagged_responses = {} + self.client._preauth_capabilities = ('FOO',) + self.client._untagged_responses = {} self.client._cached_capabilities = (b'FOO', b'MORE') self.assertEqual(self.client.capabilities(), (b'FOO', b'MORE')) def test_post_auth_request(self): - self.client._imap.capabilities = ('FOO',) - self.client._imap.untagged_responses = {} - self.client._imap.state = 'SELECTED' - self.client._imap.capability.return_value = ('OK', [b'FOO BAR']) + self.client._preauth_capabilities = ('FOO',) + self.client._untagged_responses = {} + self.client.state = 'SELECTED' + self.client._cmd_capability.return_value = ('OK', [b'FOO BAR']) self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR')) self.assertEqual(self.client._cached_capabilities, (b'FOO', b'BAR')) def test_with_starttls(self): # Initial connection - self.client._imap.capabilities = ('FOO',) - self.client._imap.untagged_responses = {} - self.client._imap.state = 'NONAUTH' + self.client._preauth_capabilities = ('FOO',) + self.client._untagged_responses = {} + self.client.state = 'NONAUTH' self.assertEqual(self.client.capabilities(), (b'FOO',)) # Now do STARTTLS; capabilities change and should be reported. self.client._starttls_done = True - self.client._imap.capability.return_value = ('OK', [b'FOO BAR']) + self.client._cmd_capability.return_value = ('OK', [b'FOO BAR']) self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR')) # Login done; capabilities change again. - self.client._imap.state = 'AUTH' - self.client._imap.capability.return_value = ('OK', [b'FOO BAR QUX']) + self.client.state = 'AUTH' + self.client._cmd_capability.return_value = ('OK', [b'FOO BAR QUX']) self.assertEqual(self.client.capabilities(), (b'FOO', b'BAR', b'QUX')) def test_has_capability(self): @@ -787,12 +767,12 @@ def setUp(self): self.client._cached_capabilities = [b'ID'] def test_id(self): - self.client._imap._simple_command.return_value = ('OK', [b'Success']) - self.client._imap._untagged_response.return_value = ( + self.client._simple_command.return_value = ('OK', [b'Success']) + self.client._untagged_response.return_value = ( b'OK', [b'("name" "GImap" "vendor" "Google, Inc.")']) id_response = self.client.id_({'name': 'IMAPClient'}) - self.client._imap._simple_command.assert_called_with( + self.client._simple_command.assert_called_with( 'ID', '("name" "IMAPClient")') self.assertSequenceEqual( @@ -811,15 +791,15 @@ class TestRawCommand(IMAPClientTest): def setUp(self): super(TestRawCommand, self).setUp() - self.client._imap._get_response.return_value = None - self.client._imap._command_complete.return_value = ('OK', ['done']) + self.client._get_response.return_value = None + self.client._command_complete.return_value = ('OK', ['done']) self.client._cached_capabilities = () def check(self, command, args, expected): typ, data = self.client._raw_command(command, args) self.assertEqual(typ, 'OK') self.assertEqual(data, ['done']) - self.assertEqual(self.client._imap.sent, expected) + self.assertEqual(self.client._conn.sent, expected) def test_plain(self): self.check(b'search', [b'ALL'], @@ -857,7 +837,7 @@ def test_literal_plus(self): typ, data = self.client._raw_command(b'APPEND', [b'\xff', _literal(b'hello')], uid=False) self.assertEqual(typ, 'OK') self.assertEqual(data, ['done']) - self.assertEqual(self.client._imap.sent, + self.assertEqual(self.client._conn.sent, b'tag APPEND {1+}\r\n' b'\xff {5+}\r\n' b'hello\r\n') @@ -874,19 +854,20 @@ def test_invalid_input_type(self): self.assertRaises(ValueError, self.client._raw_command, u'foo', ['foo']) def test_failed_continuation_wait(self): - self.client._imap._get_response.return_value = b'blah' - self.client._imap.tagged_commands['tag'] = ('NO', ['go away']) + self.client._get_response.return_value = b'blah' + self.client._tagged_commands['tag'] = ('NO', ['go away']) expected_error = "unexpected response while waiting for continuation response: \(u?'NO', \[u?'go away'\]\)" with self.assertRaisesRegex(IMAPClient.AbortError, expected_error): self.client._raw_command(b'FOO', [b'\xff']) class TestExpunge(IMAPClientTest): + initial_state = 'SELECTED' def test_expunge(self): mockCommand = Mock(return_value=sentinel.tag) mockConsume = Mock(return_value=sentinel.out) - self.client._imap._command = mockCommand + self.client._command = mockCommand self.client._consume_until_tagged_response = mockConsume result = self.client.expunge() mockCommand.assert_called_with('EXPUNGE') @@ -894,37 +875,42 @@ def test_expunge(self): self.assertEqual(sentinel.out, result) def test_id_expunge(self): - self.client._imap.uid.return_value = ('OK', [None]) + self.client._uid_command_and_check = Mock(return_value = [None]) self.assertEqual([None], self.client.expunge(['4','5', '6'])) class TestShutdown(IMAPClientTest): def test_shutdown(self): self.client.shutdown() - self.client._imap.shutdown.assert_called_once_with() + self.client._conn.shutdown.assert_called_once_with() class TestContextManager(IMAPClientTest): + def setUp(self): + IMAPClientTest.setUp(self) + self.client.logout = Mock() + self.client.shutdown = Mock() + def test_context_manager(self): with self.client as client: self.assertIsInstance(client, IMAPClient) - self.client._imap.logout.assert_called_once_with() + self.client.logout.assert_called_once_with() @patch('imapclient.imapclient.logger') def test_context_manager_fail_closing(self, mock_logger): - self.client._imap.logout.side_effect = RuntimeError("Error logout") - self.client._imap.shutdown.side_effect = RuntimeError("Error shutdown") + self.client.logout.side_effect = RuntimeError("Error logout") + self.client.shutdown.side_effect = RuntimeError("Error shutdown") with self.client as client: self.assertIsInstance(client, IMAPClient) - self.client._imap.logout.assert_called_once_with() - self.client._imap.shutdown.assert_called_once_with() + self.client.logout.assert_called_once_with() + self.client.shutdown.assert_called_once_with() mock_logger.info.assert_called_once_with( 'Could not close the connection cleanly: %s', - self.client._imap.shutdown.side_effect + self.client.shutdown.side_effect ) def test_exception_inside_context_manager(self): @@ -937,8 +923,8 @@ class TestProtocolError(IMAPClientTest): def test_tagged_response_with_parse_error(self): client = self.client - client._imap.tagged_commands = {sentinel.tag: None} - client._imap._get_response = lambda: b'NOT-A-STAR 99 EXISTS' + client._tagged_commands = {sentinel.tag: None} + client._get_response = lambda: b'NOT-A-STAR 99 EXISTS' with self.assertRaises(ProtocolError): client._consume_until_tagged_response(sentinel.tag, b'IDLE') diff --git a/tests/test_init.py b/tests/test_init.py index ca21848e..bef6986e 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -17,8 +17,12 @@ def setUp(self): self.tls = patcher.start() self.addCleanup(patcher.stop) - patcher = patch('imapclient.imapclient.imaplib') - self.imaplib = patcher.start() + patcher = patch('imapclient.imapclient.conn') + self.conn = patcher.start() + self.addCleanup(patcher.stop) + + patcher = patch('imapclient.imapclient.IMAPClient._connect') + patcher.start() self.addCleanup(patcher.stop) def test_plain(self): @@ -26,8 +30,9 @@ def test_plain(self): self.imap4.IMAP4WithTimeout.return_value = fakeIMAP4 imap = IMAPClient('1.2.3.4', ssl=False, timeout=sentinel.timeout) + imap._connect = Mock() - self.assertEqual(imap._imap, fakeIMAP4) + self.assertEqual(imap._conn, fakeIMAP4) self.imap4.IMAP4WithTimeout.assert_called_with( '1.2.3.4', 143, SocketTimeout(sentinel.timeout, sentinel.timeout) @@ -44,8 +49,9 @@ def test_SSL(self): imap = IMAPClient('1.2.3.4', ssl_context=sentinel.context, timeout=sentinel.timeout) + imap._connect = Mock() - self.assertEqual(imap._imap, fakeIMAP4_TLS) + self.assertEqual(imap._conn, fakeIMAP4_TLS) self.tls.IMAP4_TLS.assert_called_with( '1.2.3.4', 993, sentinel.context, @@ -59,12 +65,12 @@ def test_SSL(self): def test_stream(self): fakeIMAP4_stream = Mock() - self.imaplib.IMAP4_stream.return_value = fakeIMAP4_stream + self.conn.IMAP4_stream.return_value = fakeIMAP4_stream imap = IMAPClient('command', stream=True, ssl=False) - self.assertEqual(imap._imap, fakeIMAP4_stream) - self.imaplib.IMAP4_stream.assert_called_with('command') + self.assertEqual(imap._conn, fakeIMAP4_stream) + self.conn.IMAP4_stream.assert_called_with('command') self.assertEqual(imap.host, 'command') self.assertEqual(imap.port, None) diff --git a/tests/test_search.py b/tests/test_search.py index 4b0efa63..f11be5d1 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -6,15 +6,14 @@ from datetime import date, datetime -import imaplib - -from imapclient.exceptions import InvalidCriteriaError +from imapclient.exceptions import InvalidCriteriaError, IMAPClientError from imapclient.imapclient import _quoted from .imapclient_test import IMAPClientTest from .util import Mock class TestSearchBase(IMAPClientTest): + initial_state = 'SELECTED' def setUp(self): super(TestSearchBase, self).setUp() @@ -106,10 +105,10 @@ def test_nested_tuple(self): def test_search_custom_exception_with_invalid_list(self): def search_bad_command_exp(*args, **kwargs): - raise imaplib.IMAP4.error('SEARCH command error: BAD ["Unknown argument NOT DELETED"]') + raise IMAPClientError('SEARCH command error: BAD ["Unknown argument NOT DELETED"]') self.client._raw_command_untagged.side_effect = search_bad_command_exp - with self.assertRaises(imaplib.IMAP4.error) as cm: + with self.assertRaises(IMAPClientError) as cm: self.client.search(['NOT DELETED']) self.assertIn( # Python 2.x will add a `u` prefix in the list representation, so let it handle the @@ -123,10 +122,10 @@ def search_bad_command_exp(*args, **kwargs): def test_search_custom_exception_with_invalid_text(self): # Check the criteria is surrounding with quotes if the user is using a plain text criteria def search_bad_command_exp2(*args, **kwargs): - raise imaplib.IMAP4.error('SEARCH command error: BAD ["Unknown argument TOO"]') + raise IMAPClientError('SEARCH command error: BAD ["Unknown argument TOO"]') self.client._raw_command_untagged.side_effect = search_bad_command_exp2 - with self.assertRaises(imaplib.IMAP4.error) as cm: + with self.assertRaises(IMAPClientError) as cm: self.client.search('TOO some@email.com') self.assertIn('may have been caused by a syntax error in the criteria: "TOO some@email.com"', str(cm.exception)) diff --git a/tests/test_sort.py b/tests/test_sort.py index 0c1d8c2a..c82a5c18 100644 --- a/tests/test_sort.py +++ b/tests/test_sort.py @@ -11,6 +11,7 @@ class TestSort(IMAPClientTest): + initial_state = 'SELECTED' def setUp(self): super(TestSort, self).setUp() diff --git a/tests/test_starttls.py b/tests/test_starttls.py index edd3a590..7628cc81 100644 --- a/tests/test_starttls.py +++ b/tests/test_starttls.py @@ -8,7 +8,7 @@ from imapclient.exceptions import IMAPClientError from .imapclient_test import IMAPClientTest -from .util import Mock, patch, sentinel +from .util import Mock, patch, sentinel, ANY class TestStarttls(IMAPClientTest): @@ -20,7 +20,7 @@ def setUp(self): self.tls = patcher.start() self.addCleanup(patcher.stop) - self.client._imap.sock = sentinel.old_sock + self.client.sock = sentinel.old_sock self.new_sock = Mock() self.new_sock.makefile.return_value = sentinel.file @@ -29,23 +29,24 @@ def setUp(self): self.client.host = sentinel.host self.client.ssl = False self.client._starttls_done = False - self.client._imap._simple_command.return_value = "OK", [b'start TLS negotiation'] + self.client._simple_command.return_value = "OK", [b'start TLS negotiation'] self.client._cached_capabilities = [b'STARTTLS'] + self.client._do_capabilites = Mock(return_value=[b'STARTTLS']) def test_works(self): resp = self.client.starttls(sentinel.ssl_context) self.tls.wrap_socket.assert_called_once_with( - sentinel.old_sock, + ANY, sentinel.ssl_context, sentinel.host, ) self.new_sock.makefile.assert_called_once_with('rb') - self.assertEqual(self.client._imap.file, sentinel.file) + self.assertEqual(self.client._conn.file, sentinel.file) self.assertEqual(resp, b'start TLS negotiation') def test_command_fails(self): - self.client._imap._simple_command.return_value = "NO", [b'sorry'] + self.client._simple_command.return_value = "NO", [b'sorry'] with self.assertRaises(IMAPClientError) as raised: self.client.starttls(sentinel.ssl_context) diff --git a/tests/test_store.py b/tests/test_store.py index 2f0911fa..da56ef8b 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -13,6 +13,7 @@ class TestFlagsConsts(IMAPClientTest): + initial_state = 'SELECTED' def test_flags_are_bytes(self): for flag in DELETED, SEEN, ANSWERED, FLAGGED, DRAFT, RECENT: @@ -21,10 +22,11 @@ def test_flags_are_bytes(self): class TestFlags(IMAPClientTest): + initial_state = 'SELECTED' def setUp(self): super(TestFlags, self).setUp() - self.client._command_and_check = Mock() + self.client._uid_command_and_check = Mock() def test_get(self): with patch.object(self.client, 'fetch', autospec=True, @@ -52,7 +54,7 @@ def _check(self, meth, expected_command, silent=False): if silent: expected_command += b".SILENT" - cc = self.client._command_and_check + cc = self.client._uid_command_and_check cc.return_value = [ b'11 (FLAGS (blah foo) UID 1)', b'11 (UID 1 OTHER (dont))', @@ -61,10 +63,9 @@ def _check(self, meth, expected_command, silent=False): ] resp = meth([1, 2], 'foo', silent=silent) cc.assert_called_once_with( - 'store', b"1,2", + 'STORE', b"1,2", expected_command, - '(foo)', - uid=True) + '(foo)') if silent: self.assertIsNone(resp) else: @@ -76,10 +77,11 @@ def _check(self, meth, expected_command, silent=False): cc.reset_mock() class TestGmailLabels(IMAPClientTest): + initial_state = 'SELECTED' def setUp(self): super(TestGmailLabels, self).setUp() - self.client._command_and_check = Mock() + self.client._uid_command_and_check = Mock() def test_get(self): with patch.object(self.client, 'fetch', autospec=True, @@ -107,7 +109,7 @@ def _check(self, meth, expected_command, silent=False): if silent: expected_command += b".SILENT" - cc = self.client._command_and_check + cc = self.client._uid_command_and_check cc.return_value = [ b'11 (X-GM-LABELS (&AUE-abel "f\\"o\\"o") UID 1)', b'22 (X-GM-LABELS ("f\\"o\\"o") UID 2)', @@ -116,10 +118,9 @@ def _check(self, meth, expected_command, silent=False): ] resp = meth([1, 2], 'f"o"o', silent=silent) cc.assert_called_once_with( - 'store', b"1,2", + 'STORE', b"1,2", expected_command, - '("f\\"o\\"o")', - uid=True) + '("f\\"o\\"o")') if silent: self.assertIsNone(resp) else: diff --git a/tests/test_thread.py b/tests/test_thread.py index 694ca2e0..8bd9bb7c 100644 --- a/tests/test_thread.py +++ b/tests/test_thread.py @@ -11,6 +11,7 @@ class TestThread(IMAPClientTest): + initial_state = 'SELECTED' def setUp(self): super(TestThread, self).setUp() diff --git a/tests/util.py b/tests/util.py index 39c81b40..5f14dc96 100644 --- a/tests/util.py +++ b/tests/util.py @@ -5,9 +5,9 @@ from __future__ import unicode_literals try: - from unittest.mock import Mock, patch, sentinel, DEFAULT + from unittest.mock import Mock, patch, sentinel, DEFAULT, ANY except ImportError: - from mock import Mock, patch, sentinel, DEFAULT + from mock import Mock, patch, sentinel, DEFAULT, ANY def find_unittest2():