diff --git a/src/pcloud/api.py b/src/pcloud/api.py index 2b1f398..412009a 100644 --- a/src/pcloud/api.py +++ b/src/pcloud/api.py @@ -1,28 +1,19 @@ +import os +import requests +import zipfile + from hashlib import sha1 from io import BytesIO + +from pcloud.jsonprotocol import PCloudJSONConnection from pcloud.oauth2 import TokenHandler +from pcloud.utils import log from pcloud.validate import MODE_AND from pcloud.validate import RequiredParameterCheck -from requests_toolbelt.multipart.encoder import MultipartEncoder + from urllib.parse import urlparse from urllib.parse import urlunsplit -import datetime -import logging -import os.path -import requests -import sys -import zipfile - - -log = logging.getLogger("pcloud") -log.setLevel(logging.INFO) - -handler = logging.StreamHandler(sys.stderr) -handler.setLevel(logging.INFO) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -log.addHandler(handler) # File open flags https://docs.pcloud.com/methods/fileops/file_open.html O_WRITE = int("0x0002", 16) @@ -47,40 +38,42 @@ class InvalidFileModeError(Exception): """File mode not supported""" -# Helpers -def to_api_datetime(dt): - """Converter to a datetime structure the pCloud API understands - - See https://docs.pcloud.com/structures/datetime.html - """ - if isinstance(dt, datetime.datetime): - return dt.isoformat() - return dt - - class PyCloud(object): endpoints = { "api": "https://api.pcloud.com/", "eapi": "https://eapi.pcloud.com/", - "test": "http://localhost:5023/", + "test": "localhost:5023", + "binapi": "https://binapi.pcloud.com", + "bineapi": "https://bineapi.pcloud.com", "nearest": "", } def __init__( - self, username, password, endpoint="api", token_expire=31536000, oauth2=False + self, username, password, endpoint="api", token_expire=31536000, oauth2=False, + connection=PCloudJSONConnection ): - self.session = requests.Session() + conn = connection(self) + self.connection = conn.connect() if endpoint not in self.endpoints: log.error( "Endpoint (%s) not found. Use one of: %s", endpoint, - ",".join(self.endpoints.keys()), + ", ".join(self.endpoints.keys()), ) return elif endpoint == "nearest": self.endpoint = self.getnearestendpoint() + elif endpoint not in connection.allowed_endpoints: + log.error( + "Endpoint (%s) not in allowed list of '%s'. Use one of: %s", + endpoint, + connection.__name__, + ", ".join(connection.allowed_endpoints), + ) + return else: self.endpoint = self.endpoints.get(endpoint) + log.info(f"Using pCloud API endpoint: {self.endpoint}") self.username = username.lower().encode("utf-8") self.password = password.encode("utf-8") @@ -122,25 +115,8 @@ def oauth2_authorize( return cls("", access_token, endpoint, token_expire, oauth2=True) def _do_request(self, method, authenticate=True, json=True, endpoint=None, **kw): - if authenticate and self.auth_token: # Password authentication - params = {"auth": self.auth_token} - elif authenticate and self.access_token: # OAuth2 authentication - params = {"access_token": self.access_token} - else: - params = {} - if endpoint is None: - endpoint = self.endpoint - params.update(kw) - log.debug("Doing request to %s%s", endpoint, method) - log.debug("Params: %s", params) - resp = self.session.get(endpoint + method, params=params) - if json: - result = resp.json() - else: - result = resp.content - log.debug("Response: %s", result) - return result - + return self.connection.do_get_request(method, authenticate, json, endpoint, **kw) + # Authentication def getdigest(self): resp = self._do_request("getdigest", authenticate=False) @@ -174,8 +150,8 @@ def supportedlanguages(self, **kwargs): def getnearestendpoint(self): default_api = self.endpoints.get("api") resp = self._do_request( - "getapiserver", authenticate=False, endpoint=default_api - ) + "getapiserver", authenticate=False, endpoint=default_api) + api = resp.get("api") if len(api): return urlunsplit(["https", api[0], "/", "", ""]) @@ -234,24 +210,11 @@ def copyfolder(self, **kwargs): raise NotImplementedError # File - def _upload(self, method, files, **kwargs): - if self.auth_token: # Password authentication - kwargs["auth"] = self.auth_token - elif self.access_token: # OAuth2 authentication - kwargs["access_token"] = self.access_token - fields = list(kwargs.items()) - fields.extend(files) - m = MultipartEncoder(fields=fields) - resp = requests.post( - self.endpoint + method, data=m, headers={"Content-Type": m.content_type} - ) - return resp.json() - @RequiredParameterCheck(("files", "data")) def uploadfile(self, **kwargs): """upload a file to pCloud - 1) You can specify a list of filenames to read + 1) You can specify a list of filenames to upload files=['/home/pcloud/foo.txt', '/home/pcloud/bar.txt'] 2) you can specify binary data via the data parameter and @@ -276,7 +239,7 @@ def uploadfile(self, **kwargs): if "folderid" in kwargs: # cast folderid to string, since API allows this but requests not kwargs["folderid"] = str(kwargs["folderid"]) - return self._upload("uploadfile", files, **kwargs) + return self.connection.upload("uploadfile", files, **kwargs) @RequiredParameterCheck(("progresshash",)) def uploadprogress(self, **kwargs): @@ -365,54 +328,53 @@ def gettextfile(self, **kwargs): # File API methods @RequiredParameterCheck(("flags",)) def file_open(self, **kwargs): - return self._do_request("file_open", **kwargs) + return self._do_request("file_open", use_session=True, **kwargs) @RequiredParameterCheck(("fd", "count")) def file_read(self, **kwargs): - return self._do_request("file_read", json=False, **kwargs) + return self._do_request("file_read", json=False, use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_pread(self, **kwargs): - return self._do_request("file_pread", json=False, **kwargs) + return self._do_request("file_pread", json=False, use_session=True, **kwargs) @RequiredParameterCheck(("fd", "data")) def file_pread_ifmod(self, **kwargs): - return self._do_request("file_pread_ifmod", json=False, **kwargs) + return self._do_request("file_pread_ifmod", json=False, use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_size(self, **kwargs): - return self._do_request("file_size", **kwargs) + return self._do_request("file_size", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_truncate(self, **kwargs): - return self._do_request("file_truncate", **kwargs) + return self._do_request("file_truncate", use_session=True, **kwargs) @RequiredParameterCheck(("fd", "data")) def file_write(self, **kwargs): files = [("file", ("upload-file.io", BytesIO(kwargs.pop("data"))))] kwargs["fd"] = str(kwargs["fd"]) - return self._upload("file_write", files, **kwargs) - # return self._do_request("file_write", **kwargs) + return self.connection.upload("file_write", files, **kwargs) @RequiredParameterCheck(("fd",)) def file_pwrite(self, **kwargs): - return self._do_request("file_pwrite", **kwargs) + return self._do_request("file_pwrite", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_checksum(self, **kwargs): - return self._do_request("file_checksum", **kwargs) + return self._do_request("file_checksum", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_seek(self, **kwargs): - return self._do_request("file_seek", **kwargs) + return self._do_request("file_seek", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_close(self, **kwargs): - return self._do_request("file_close", **kwargs) + return self._do_request("file_close", use_session=True, **kwargs) @RequiredParameterCheck(("fd",)) def file_lock(self, **kwargs): - return self._do_request("file_lock", **kwargs) + return self._do_request("file_lock", use_session=True, **kwargs) # Archiving @RequiredParameterCheck(("path", "fileid")) diff --git a/src/pcloud/binaryprotocol.py b/src/pcloud/binaryprotocol.py new file mode 100644 index 0000000..3fb6194 --- /dev/null +++ b/src/pcloud/binaryprotocol.py @@ -0,0 +1,282 @@ +import io +import socket +import ssl + +from urllib.parse import urlparse + + +class PCloudBuffer(io.BufferedRWPair): + """ Buffer that raises IOError on insufficient bytes for read. """ + + def read(self, size=-1): + result = super().read(size) + if size != -1 and len(result) != size: + raise IOError(f"Requested {size} bytes, got {len(result)}") + return result + + +class PCloudBinaryConnection(object): + """ Connection to pcloud.com based on their binary protocol. + + NOTE: .connect() must be called to establish network communication. + """ + + allowed_endpoints = frozenset(["binapi", "bineapi", "test", "nearest"]) + + def __init__(self, api, persistent_params=None): + """ Initializes the binary API. + NOTE: .connect() must be called to establish network communication. + """ + self.api = api + self.server = urlparse(api.endpoint).netloc + self.timeout = 30 + self.socket = None + self.fp = None + if persistent_params is None: + self.persistent_params = {} + else: + self.persistent_params = persistent_params + + def do_get_request(self, method, authenticate=True, json=True, endpoint=None, **kw): + """Sends command and returns result. Blocks if result is needed. + + If '_data' is in params it is the file data + :param method: the pcloud method to call + :param **params: parameters to be passed to the api, except: + - _data is the file data + - _data_progress_callback is the upload callback + - _noresult - if no result should be returned (you must call + .get_result manually) + :returns dictionary returned by the api or None if _noresult is set + """ + data = kw.pop('_data', None) + data_progress_callback = kw.pop('_data_progress_callback', None) + noresult = kw.pop('_noresult', None) + self.send_command_nb(method, + kw, + data=data, + data_progress_callback=data_progress_callback) + if not noresult: + return self.get_result() + + def upload(self, method, files, **kwargs): + if self.api.auth_token: # Password authentication + kwargs["auth"] = self.api.auth_token + elif self.api.access_token: # OAuth2 authentication + kwargs["access_token"] = self.api.access_token + + progress_callback = kwargs.pop("progress_callback", None) + for entry in files: + filename, fd = entry[1] + response = self.do_get_request(method, + _data=fd, + filename=filename, + _data_progress_callback=progress_callback, + **kwargs) + return response + + def connect(self): + """Establish connection and return self.""" + if self.socket: + raise ValueError("maybe connect called twice?") + context = ssl.create_default_context() + sock = socket.create_connection((self.server, 443), self.timeout) + self.socket = context.wrap_socket(sock, server_hostname=self.server) + raw = socket.SocketIO(self.socket, 'rwb') + self.socket._io_refs += 1 + self.fp = PCloudBuffer(raw, raw, 8192) + return self + + def _prepare_send_request(self, method, params, data_len): + req = bytearray() + # actually preallocating would be more efficient but... + + method_name = method.encode('utf-8') + method_len = len(method_name) + assert method_len < 128 + + if data_len is not None: + method_len |= 0x80 + + req.extend(method_len.to_bytes(1, 'little')) + if data_len is not None: + req.extend(data_len.to_bytes(8, 'little')) + + req.extend(method_name) + req.extend(len(params).to_bytes(1, 'little')) + + for key,value in params.items(): + key = key.encode('utf-8') + key_len = len(key) + assert key_len < 64, "Parameter name too long" + + if isinstance(value, int) and value < 0: + # negative numbers are converted to string + value = str(value) + + if isinstance(value, list): + # lists (usually ints) are joined with , + value = ','.join(map(str, value)) + + if isinstance(value, str): + value = value.encode('utf-8') + + if isinstance(value, bytes): + req.extend(key_len.to_bytes(1, 'little')) + req.extend(key) + req.extend(len(value).to_bytes(4, 'little')) + req.extend(value) + elif isinstance(value, int): + req.extend((key_len | 0x40).to_bytes(1, 'little')) + req.extend(key) + req.extend(value.to_bytes(8, 'little')) + elif isinstance(value, bool): + req.extend((key_len | 0x80).to_bytes(1, 'little')) + req.extend(key) + req.extend(value.to_bytes(1, 'little')) + else: + raise ValueError("Unknown value type {0}".format(type(value))) + + return req + + def _send_raw_data(self, data, data_len, progress_callback): + """Sends data at the end of send_command.""" + if isinstance(data, io.IOBase): + written_bytes, to_write = 0, data_len + while data_len > 0: + to_write = min(data_len, 8192) + if to_write != self.fp.write(data.read(to_write)): + raise IOError("Mismatch between bytes written and supplied data length") + data_len -= to_write + if progress_callback: + progress_callback(to_write) + else: + written_bytes = self.fp.write(data) + if written_bytes != data_len: + raise IOError("Mismatch between bytes written and supplied data length") + + def _determine_data_len(self, data, data_len=None): + if data is None: + data_len = None + elif data_len is None: # and data is not None + data_len = getattr(data, '__len__', lambda : None)() + if data_len is None: + if isinstance(data, io.IOBase) and data.seekable(): + pos = data.tell() + data_len = data.seek(0, io.SEEK_END) - pos + data.seek(pos, io.SEEK_SET) + if data_len is None: + raise ValueError("Unable to determine data length") + return data_len + + def send_command_nb(self, + method, params, + data=None, data_len=None, + data_progress_callback=None): + """Send command without blocking. + + NOTE: params is updated with self.persistent_params + + :param data_len: if not None should be consistent with data. + :param data_progress_callback: called only for data which is io.IOBase + """ + data_len = self._determine_data_len(data, data_len) + + params.update(self.persistent_params) + req = self._prepare_send_request(method, params, data_len) + assert len(req) < 65536, "Request too long {0}".format(len(req)) + self.fp.write(len(req).to_bytes(2, 'little')) + self.fp.write(req) + + if data is not None: + self._send_raw_data(data, data_len, data_progress_callback) + + self.fp.flush() + + def get_result(self): + """Return the result from a call to the pcloud API.""" + self.fp.read(4) # FIXME: ignores length, seems it is not needed? ASK + return self._read_object(strings=dict()) + + def _read_object(self, strings): + obj_type = self.fp.read(1)[0] + # TODO: optimize checks based on actual usage + + if (obj_type <= 3) or (100 <= obj_type <= 149): + # new string + if 100 <= obj_type: + str_len = obj_type - 100 + else: + str_len = int.from_bytes(self.fp.read(obj_type + 1), 'little') + string = self.fp.read(str_len).decode('utf-8') + strings[len(strings)] = string + return string + if 4 <= obj_type <= 7: + # existing string, long index + return strings[int.from_bytes(self.fp.read(obj_type - 3), 'little')] + if 8 <= obj_type <= 15: + # int + return int.from_bytes(self.fp.read(obj_type - 7), 'little') + if obj_type == 16: + # hash + result = {} + while self.fp.peek(1)[0] != 255: + key = self._read_object(strings) + result[key] = self._read_object(strings) + self.fp.read(1) # consume byte 255 + if "data" in result: + return self.read_data(result.get("data")) + return result + if obj_type == 17: + # list + # FIXME: potential stack overflow + # FIXME: with the current api, only listfolder(recursive=1) + result = [] + while self.fp.peek(1)[0] != 255: + result.append(self._read_object(strings)) + self.fp.read(1) # consume byte 255 + return result + if obj_type == 18: + return False + if obj_type == 19: + return True + if obj_type == 20: + # data, return data_length + # be sure to consume the data + return int.from_bytes(self.fp.read(8), 'little') + if 150 <= obj_type <= 199: + # existing string, short index + return strings[obj_type - 150] + if 200 <= obj_type <= 219: + # int, inline + return obj_type - 200 + #if obj_type == 255: raise StopIteration + + # nothing matched + raise ValueError("Unknown value returned: {0}".format(obj_type)) + + def read_data(self, data_len): + return self.fp.read(data_len) + + def get_data_stream(self): + """Returns raw stream, from the socket. + + NOTE: Be careful with this file + NOTE: Be sure to consume exactly data_len bytes. + """ + return self.fp + + def write_data(self, writer, data_len, progress_callback=None): + """Write data from response. + + NOTE: Be sure to consume all of it. + """ + while data_len > 0: + to_write = min(8192, data_len) + assert to_write == writer.write(self.fp.read(to_write)) + data_len -= to_write + if progress_callback: + progress_callback(to_write) + + def close(self): + self.socket.close() diff --git a/src/pcloud/jsonprotocol.py b/src/pcloud/jsonprotocol.py new file mode 100644 index 0000000..7f278da --- /dev/null +++ b/src/pcloud/jsonprotocol.py @@ -0,0 +1,78 @@ +import requests + +from pcloud.utils import log +from requests_toolbelt.multipart.encoder import MultipartEncoder + + +class PCloudJSONConnection(object): + """ Connection to pcloud.com based on their JSON protocol. + """ + + allowed_endpoints = frozenset(["api", "eapi", "test", "nearest"]) + + def __init__(self, api): + """ Connect to pcloud API based on their JSON protocol. + """ + self.session = requests.Session() + self.api = api + + def connect(self): + return self + + def do_get_request(self, method, authenticate=True, json=True, endpoint=None, **kw): + if authenticate and self.api.auth_token: # Password authentication + params = {"auth": self.api.auth_token} + elif authenticate and self.api.access_token: # OAuth2 authentication + params = {"access_token": self.api.access_token} + else: + params = {} + if endpoint is None: + endpoint = self.api.endpoint + params.update(kw) + log.debug("Doing request to %s%s", endpoint, method) + log.debug("Params: %s", params) + if "use_session" in kw: + get_method = self.session.get + else: + get_method = requests.get + resp = get_method(endpoint + method, params=params, allow_redirects=False) + resp.raise_for_status() + # data = dump_all(resp) + # print(data.decode('utf-8')) + if json: + result = resp.json() + else: + result = resp.content + log.debug("Response: %s", result) + return result + + def upload(self, method, files, **kwargs): + if self.api.auth_token: # Password authentication + kwargs["auth"] = self.api.auth_token + elif self.api.access_token: # OAuth2 authentication + kwargs["access_token"] = self.api.access_token + fields = list(kwargs.items()) + fields.extend(files) + + # from requests import Request, Session + + # s = Session() + + # for entry in files: + # filename, fd = entry[1] + # fields["filename"] = filename + # req = Request('PUT', self.api.endpoint + method, data=fields) + # prepped = req.prepare() + # prepped.body = fd + # resp = s.send(prepped) + + # resp = self.session.post(self.api.endpoint + method, files=files, data=kwargs) + m = MultipartEncoder(fields=fields) + resp = requests.post( + self.api.endpoint + method, data=m, headers={"Content-Type": m.content_type} + ) + # data = dump_all(resp) + # print(data.decode('utf-8')) + return resp.json() + + diff --git a/src/pcloud/pcloudfs.py b/src/pcloud/pcloudfs.py index 835c392..03f6a73 100644 --- a/src/pcloud/pcloudfs.py +++ b/src/pcloud/pcloudfs.py @@ -182,7 +182,7 @@ def _info_from_metadata(self, metadata, namespaces): info = { "basic": { "is_dir": metadata.get("isfolder", False), - "name": metadata.get("name"), + "name": metadata.get("name") } } if "details" in namespaces: @@ -193,6 +193,8 @@ def _info_from_metadata(self, metadata, namespaces): "created": self._to_datetime(metadata.get("created")), "metadata_changed": self._to_datetime(metadata.get("modified")), "size": metadata.get("size", 0), + "folderid": metadata.get("folderid"), + "fileid": metadata.get("fileid") } if "link" in namespaces: pass @@ -305,6 +307,7 @@ def on_close(pcloudfile): return pcloudfile.raw.close() + # import ipdb; ipdb.set_trace() if _mode.create: dir_path = dirname(_path) if dir_path != "/": @@ -324,10 +327,6 @@ def on_close(pcloudfile): if _mode.appending: resp = self.pcloud.file_open(path=_path, flags=flags) fd = resp.get("fd") - if fd is None: - # try a second time, if file could not be opened - resp = self.pcloud.file_open(path=_path, flags=api.O_WRITE) - fd = resp.get("fd") if fd is not None: data = self.pcloud.file_read(fd=fd, count=info.size) if resp.get('result') != 0: diff --git a/src/pcloud/tests/test_bin_integration.py b/src/pcloud/tests/test_bin_integration.py new file mode 100644 index 0000000..a475297 --- /dev/null +++ b/src/pcloud/tests/test_bin_integration.py @@ -0,0 +1,96 @@ +import os +import pytest +import time +import zipfile + +from fs import opener +from io import BytesIO +from pathlib import Path +from pcloud.api import PyCloud +from pcloud.api import O_CREAT +from pcloud.binaryprotocol import PCloudBinaryConnection +from urllib.parse import quote + + +@pytest.fixture +def pycloud(): + username = os.environ.get("PCLOUD_USERNAME") + password = os.environ.get("PCLOUD_PASSWORD") + return PyCloud(username, password, endpoint="bineapi", connection=PCloudBinaryConnection) + + +folder_for_tests = "integration-bin-test" +# upload `data/upload.txt` to integration test instance, +# generate a public link (code) and insert the code below. +# Generating public links with the API is currently not possible. +public_code = "XZ0UCJZ5o9LaCgvhDQq9LD7GXrx40pSsRoV" + + +@pytest.fixture +def testfolder(pycloud): + pycloud.createfolder(folderid=0, name=folder_for_tests) + yield folder_for_tests + pycloud.deletefolderrecursive(path=f"/{folder_for_tests}") + + +def test_login(pycloud): + ui = pycloud.userinfo() + assert ui["email"] == os.environ.get("PCLOUD_USERNAME") + + +def test_upload_download_roundrobin(pycloud, testfolder): + testfile = testfile = Path(__file__).parent / "data" / "upload.txt" + result = pycloud.uploadfile(path=f"/{testfolder}", files=[testfile]) + size = result["metadata"][0]["size"] + assert result["result"] == 0 + assert size == 14 + fd = pycloud.file_open(path=f"/{folder_for_tests}/upload.txt", flags=O_CREAT)["fd"] + result = pycloud.file_read(fd=fd, count=size) + with open(testfile) as f: + assert result == bytes(f.read(), "utf-8") + result = pycloud.file_close(fd=fd) + assert result["result"] == 0 + + +def x_test_publink_zip_with_unzip(pycloud): + result = pycloud.getpubzip(code=public_code, unzip=True) + assert result == b"Hello pCloud!\n" + + +def x_test_publink_zip(pycloud): + zipresponse = pycloud.getpubzip(code=public_code) + # I'm not sure, if zipping is deterministic, + # so let's only check, if we find a valid ZIP file + zipfmem = BytesIO(zipresponse) + zf = zipfile.ZipFile(zipfmem) + result_code = zf.testzip() + assert result_code is None + + +def x_test_copyfile(pycloud, testfolder): + testfilename = "Getting started with pCloud.pdf" + tofilename = f"/{folder_for_tests}/{testfilename}" + resp = pycloud.copyfile(path=f"/{testfilename}", topath=tofilename) + assert resp["result"] == 0 + time.sleep(1) + resp = pycloud.checksumfile(path=tofilename) + assert ( + resp.get("sha256") + == "df745d42f69266c49141ea7270c45240cf883b9cdb6a14fffcdff33c04c5304c" + ), f"Failure with checksum in {resp}" + + +def test_listtokens(pycloud): + result = pycloud.listtokens() + assert result["result"] == 0 + assert len(result["tokens"]) > 1 + + +# def testpyfsopener(pycloud): +# username = quote(os.environ.get("PCLOUD_USERNAME")) +# password = quote(os.environ.get("PCLOUD_PASSWORD")) +# pcloud_url = f'pcloud://{username}:{password}/' +# pcloud_url = 'pcloud://itconsense+pytest%40gmail.com:eXOtICf4TH3r/' +# # import pdb; pdb.set_trace() +# with opener.open_fs(pcloud_url) as pcloud_fs: +# assert pcloud_fs.listdir('/') == {} diff --git a/src/pcloud/tests/test_helpers.py b/src/pcloud/tests/test_helpers.py index 625bc43..4ecb3c8 100644 --- a/src/pcloud/tests/test_helpers.py +++ b/src/pcloud/tests/test_helpers.py @@ -1,4 +1,4 @@ -from pcloud.api import to_api_datetime +from pcloud.utils import to_api_datetime import datetime diff --git a/src/pcloud/tests/test_pyfs.py b/src/pcloud/tests/test_pyfs.py index f95a406..cdef173 100644 --- a/src/pcloud/tests/test_pyfs.py +++ b/src/pcloud/tests/test_pyfs.py @@ -7,14 +7,15 @@ from fs.path import abspath from fs.test import FSTestCases from pcloud.pcloudfs import PCloudFS - +from pcloud.binaryprotocol import PCloudBinaryConnection class TestpCloudFS(FSTestCases, unittest.TestCase): @classmethod def setUpClass(cls): username = os.environ.get("PCLOUD_USERNAME") password = os.environ.get("PCLOUD_PASSWORD") - cls.pcloudfs = PCloudFS(username, password, endpoint="eapi") + cls.pcloudfs = PCloudFS( + username, password, endpoint="eapi") def make_fs(self): # Return an instance of your FS object here @@ -42,4 +43,4 @@ def tearDown(self): # The pCloud API tends to get unstable under load # Put some latency in the tests with this hack # to stabilize tests - time.sleep(3) + # time.sleep(5) diff --git a/src/pcloud/utils.py b/src/pcloud/utils.py new file mode 100644 index 0000000..78c5347 --- /dev/null +++ b/src/pcloud/utils.py @@ -0,0 +1,22 @@ +import datetime +import logging +import sys + +log = logging.getLogger("pcloud") +log.setLevel(logging.INFO) + +handler = logging.StreamHandler(sys.stderr) +handler.setLevel(logging.INFO) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +log.addHandler(handler) + +# Helpers +def to_api_datetime(dt): + """Converter to a datetime structure the pCloud API understands + + See https://docs.pcloud.com/structures/datetime.html + """ + if isinstance(dt, datetime.datetime): + return dt.isoformat() + return dt \ No newline at end of file