diff --git a/.gitignore b/.gitignore index 2d4daa40..8b684b87 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ .idea* +*/resources/* +*/__pycache__/* +*.pyc \ No newline at end of file diff --git a/dcsena/AsyncDirectoryUploader.py b/dcsena/AsyncDirectoryUploader.py new file mode 100644 index 00000000..664519f9 --- /dev/null +++ b/dcsena/AsyncDirectoryUploader.py @@ -0,0 +1,54 @@ +import os +import glob +import asyncio +import functools +from concurrent.futures import Executor + +from dcsena import BaseFileUploader +from dcsena.BaseDirectoryUploader import BaseDirectoryUploader + + +class AsyncDirectoryUploader(BaseDirectoryUploader): + """Class for uploading a directory using the fileuploader implementation for storage + + Args: + file_uploader: Client for uploading files through whatever storage provider + executor: thread pool executor to execute tasks in parallel + """ + def __init__(self, file_uploader: BaseFileUploader, executor: Executor): + self.file_uploader = file_uploader + self.executor = executor + + @staticmethod + def get_files_to_upload(root_dir: str): + """Gets all files recursively in the specified directory + + Args: + root_dir: root directory to begin file search + + Returns: + All files in directory + """ + files = glob.glob("**/*", root_dir=root_dir, recursive=True) + # We can't upload directories so first need to filter them out + return [f for f in files if os.path.isfile(os.path.join(root_dir, f))] + + async def upload_directory_async(self, root_dir: str): + """Uses asyncio and executor to make async file upload calls for all files in directory + + Args: + root_dir: root directory to upload files for + + Returns: + completed task results + """ + loop = asyncio.get_running_loop() + tasks = [] + for f in self.get_files_to_upload(root_dir): + file_path = os.path.join(root_dir, f) + key = f.replace("\\", "/") + tasks.append(loop.run_in_executor(self.executor, functools.partial(self.file_uploader.upload_file, key=key, + file_path=file_path))) + completed, pending = await asyncio.wait(tasks) + results = [t.result() for t in completed] + return results diff --git a/dcsena/BaseDirectoryUploader.py b/dcsena/BaseDirectoryUploader.py new file mode 100644 index 00000000..d81a6884 --- /dev/null +++ b/dcsena/BaseDirectoryUploader.py @@ -0,0 +1,11 @@ +from abc import ABC + + +class BaseDirectoryUploader(ABC): + def upload_directory_async(self, root_dir: str): + """Abstract class for Directory Uploading + + Args: + root_dir: Directory to upload + """ + pass diff --git a/dcsena/BaseFileUploader.py b/dcsena/BaseFileUploader.py new file mode 100644 index 00000000..3886c06a --- /dev/null +++ b/dcsena/BaseFileUploader.py @@ -0,0 +1,13 @@ +from abc import ABC + + +class BaseFileUploader(ABC): + """Abstract class for File Uploading""" + def upload_file(self, key: str, file_path: str): + """Abstract method for file upload + + Args: + key: Key name of file to upload + file_path: Path to the file to upload + """ + pass diff --git a/dcsena/README.md b/dcsena/README.md new file mode 100644 index 00000000..647d04af --- /dev/null +++ b/dcsena/README.md @@ -0,0 +1,66 @@ +# Multi-threaded file uploader (Backend) + +> Ideal candidate: skilled python developer with solid knowledge of cloud and distributed systems. + +# Overview + +Create a python application that uploads a set of given files to a cloud object storage in parallel through the cloud provider's or third party API. + +# Requirements + +1. Support up to 100,000nds of files, all inside one directory with arbitrary sizes. The root directory may contain subdirectories. +2. The object storage container which holds the objects is private and only credential-based access is allowed. +3. Each object inside object storage should have an associated metadata which contains file size, last modification time and file permissions. + +# Expectations + +- Fast (utilize full network bandwidth), low CPU (do not block all other processes) and low Memory (<25% tentatively) file uploader +- Support for AWS S3 +- Modular and Object oriented implementation (to add other cloud providers) +- Clean and documented code +- Tests + +# Dev Design + +## Multithreading vs Multiprocessing vs Asyncio +Probably the most important decision for this implementation is how to handle the parallelization required for the parallel file uploader. +Given the constraints (low CPU, low memory, high IO operations, variable IO speed depending on file size), asyncio is the ideal choice here as it's meant for parallel, heavy IO ops. +However, boto3 and s3's upload_file are not async. There's a library called aiobotocore that attempts to create an async boto3 library, but it has pretty limited support (does not support S3Transfer, only put). It also has a hardpinned botocore dependencies, which will be a PITA from a dependency management size. +Luckily, aibotocore is not really needed, and we can leverage the asyncio running loop and run_in_executor with a thread pool to get around these limitations. + + + +# Setup + +Setup virtualenv and install dependencies +```commandline +python3 -m venv venv/ +source venv/bin/activate +pip install -r requirements.txt +``` + +Setup AWS credentials if you have not already. BotoCredentials are best handled through environment variables. + +Create resources file: +```commandline +python3 create_resources_script.py +``` + +Run file uploader: +```commandline +BUCKET_NAME="foo" python3 main.py +``` +or if you don't have a default AWS profile setup: +```commandline +BUCKET_NAME="foo" AWS_ACCESS_KEY_ID="123" AWS_SECRET_ACCESS_KEY="abc" python3 main.py +``` + +# Things not done & Future Improvements +- I only tested with 1k small files here. Proper benchmarks to ensure desired memory and CPU profile at 100k would be a nice next step here. + +## Requirements Check-In +1. By leveraging asyncio and threadpool, we can support upload of 100k files. Requirements met. +2. By leveraging a cloud file uploader such as S3, we are abstracting away authentication/authorization to cloud provider. However, S3, GCP, etc. all easily support public/private files with credential-based access. Requirements met. +3. By leveraging a cloud file uploader such as S3, we also get associated metadata for free. S3 will store file size, last modification time, and file perms. +With regards to last modification time and file permissions, I'm assuming that the cloud provider's metadata is sufficient here. If we wanted to store local modification time and local file perms, a metadata file would be needed. For example, if I upload a file at 12 PM, then at 1PM I modify it, and then at 2PM I upload it, the cloud provider would only say lastModificationTime is 2pm. +Local file permissions (RWX) is a can of worms, given different operating systems/users but if that was needed, similarly a metadata file would be needed. Marking requirements met under the listed assumptions. diff --git a/dcsena/S3FileUploader.py b/dcsena/S3FileUploader.py new file mode 100644 index 00000000..fcbdff94 --- /dev/null +++ b/dcsena/S3FileUploader.py @@ -0,0 +1,28 @@ +from botocore.client import BaseClient + +from dcsena.BaseFileUploader import BaseFileUploader + + +class S3FileUploader(BaseFileUploader): + """S3 Implementation for FileUploader + + Args: + bucket_name: S3 Bucket to upload files to + s3_client: S3 Client + """ + def __init__(self, bucket_name: str, s3_client: BaseClient): + self.bucket_name = bucket_name + self.s3_client = s3_client + + def upload_file(self, key: str, file_path: str): + """Method for file upload + + Args: + key: Key name of file to upload + file_path: Path to the file to upload + """ + # This function call is the same as S3Transfer upload_file + # and is under the hood automatically doing a multipart upload when file is over a specific size. + # This means that we don't need to handle specific logic for large files that would need to be chunked. + # TransferConfig can be passed in if config changes are needed, such as concurrency and size limits + self.s3_client.upload_file(file_path, self.bucket_name, key) diff --git a/dcsena/create_resources_script.py b/dcsena/create_resources_script.py new file mode 100644 index 00000000..87224aa8 --- /dev/null +++ b/dcsena/create_resources_script.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +import os +import random + +RESOURCES_DIR = "./resources" + +FILE_CONTENT = "hi1234" + + +def create_files(target_dir, num_files, max_depth): + """Creating a directory full of files and subdirectories full of files + + Args: + target_dir: File directory to create files in + num_files: Number of files to create + max_depth: How many subdirectories to create + """ + depth = 0 + current_dir = target_dir + for num in range(0, num_files): + random_num = random.randint(0, 100) + if random_num < 50 and depth < max_depth: + current_dir += "/child-dir-{}".format(depth) + if not os.path.exists(current_dir): + os.mkdir(current_dir) + depth += 1 + target_file_name = "{}/{}.txt".format(current_dir, num) + with open(target_file_name, 'w') as f: + f.write(FILE_CONTENT) + + +# Run script to create files for testing purposes +if __name__ == "__main__": + create_files(RESOURCES_DIR, num_files=100000, max_depth=5) diff --git a/dcsena/main.py b/dcsena/main.py new file mode 100644 index 00000000..d53486d8 --- /dev/null +++ b/dcsena/main.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import concurrent.futures +import os +import boto3 +import asyncio + +from dcsena.AsyncDirectoryUploader import AsyncDirectoryUploader +from dcsena.S3FileUploader import S3FileUploader + +BUCKET_NAME = os.environ["S3_BUCKET"] + +RESOURCES_DIR_PATH = "./resources" + + +async def upload(dir_name): + boto3_session = boto3.Session() + s3_client = boto3_session.client('s3') + s3_file_uploader = S3FileUploader(BUCKET_NAME, s3_client) + executor = concurrent.futures.ThreadPoolExecutor() + dir_uploader = AsyncDirectoryUploader(s3_file_uploader, executor) + return await dir_uploader.upload_directory_async(dir_name) + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(upload(RESOURCES_DIR_PATH)) diff --git a/dcsena/requirements.txt b/dcsena/requirements.txt new file mode 100644 index 00000000..a9eae3c4 --- /dev/null +++ b/dcsena/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.34.35 +botocore==1.34.35 \ No newline at end of file diff --git a/dcsena/tests/AsyncDirectoryUploaderTest.py b/dcsena/tests/AsyncDirectoryUploaderTest.py new file mode 100644 index 00000000..1fdf579b --- /dev/null +++ b/dcsena/tests/AsyncDirectoryUploaderTest.py @@ -0,0 +1,69 @@ +import unittest +from asyncio import Future +from concurrent.futures import Executor +from threading import Lock +from unittest.mock import patch +from unittest import IsolatedAsyncioTestCase + +from dcsena.AsyncDirectoryUploader import AsyncDirectoryUploader + +TEST_BUCKET = "foo123" + + +class AsyncDirectoryUploaderTest(unittest.TestCase): + @patch("dcsena.BaseFileUploader") + @patch("concurrent.futures.Executor") + def test_get_files(self, mock_file_uploader, mock_executor): + sut = AsyncDirectoryUploader(mock_file_uploader, mock_executor) + files = sut.get_files_to_upload("../resources") + # files exist and they don't include directories + self.assertTrue(len(files) != 0) + self.assertTrue("child-dir-0" not in files) + + +# DummyExecutor as it's easier to use than mocking +class DummyExecutor(Executor): + + def __init__(self): + self._shutdown = False + self._shutdownLock = Lock() + + def submit(self, fn, *args, **kwargs): + with self._shutdownLock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = Future() + try: + result = fn(*args, **kwargs) + except BaseException as e: + f.set_exception(e) + else: + f.set_result(result) + + return f + + def shutdown(self, wait=True, *, cancel_futures=False): + with self._shutdownLock: + self._shutdown = True + + +class AsyncDirectoryUploaderTestAsync(IsolatedAsyncioTestCase): + @patch("dcsena.BaseFileUploader") + async def test_upload_directory_async_happy(self, mock_file_uploader): + mock_executor = DummyExecutor() + sut = AsyncDirectoryUploader(mock_file_uploader, mock_executor) + r = await sut.upload_directory_async("../resources") + self.assertTrue(r) + + @patch("dcsena.BaseFileUploader") + async def test_upload_directory_async_unhappy(self, mock_file_uploader): + mock_file_uploader.upload_file.side_effect = Exception("Error") + mock_executor = DummyExecutor() + sut = AsyncDirectoryUploader(mock_file_uploader, mock_executor) + with self.assertRaises(Exception): + await sut.upload_directory_async("../resources") + + +if __name__ == '__main__': + unittest.main() diff --git a/dcsena/tests/S3FileUploaderTest.py b/dcsena/tests/S3FileUploaderTest.py new file mode 100644 index 00000000..1829a5a7 --- /dev/null +++ b/dcsena/tests/S3FileUploaderTest.py @@ -0,0 +1,24 @@ +import unittest +from unittest.mock import Mock, patch + +from dcsena.S3FileUploader import S3FileUploader + +TEST_BUCKET = "foo123" + + +class S3FileUploaderTest(unittest.TestCase): + @patch("botocore.client.BaseClient") + def test_upload_happy(self, mock_s3_client): + s3_file_uploader = S3FileUploader(TEST_BUCKET, mock_s3_client) + s3_file_uploader.upload_file("my_key", "my_file") + + @patch("botocore.client.BaseClient") + def test_upload_throws_error(self, mock_s3_client): + mock_s3_client.upload_file.side_effect = Exception("Error") + s3_file_uploader = S3FileUploader(TEST_BUCKET, mock_s3_client) + with self.assertRaises(Exception): + s3_file_uploader.upload_file("my_key", "my_file") + + +if __name__ == '__main__': + unittest.main() diff --git a/dcsena/tests/__init__.py b/dcsena/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dcsena/tests/verify_upload_test_script.py b/dcsena/tests/verify_upload_test_script.py new file mode 100644 index 00000000..81821b8c --- /dev/null +++ b/dcsena/tests/verify_upload_test_script.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import os +import boto3 + +BUCKET_NAME = os.environ["S3_BUCKET"] + +RESOURCES_DIR_PATH = "./resources" + + +def validate_results(s3_client): + paginator = s3_client.get_paginator('list_objects_v2') + page_iterator = paginator.paginate(Bucket=BUCKET_NAME) + num_files = 0 + for page in page_iterator: + for file in page['Contents']: + print(file['Key']) + print(file['LastModified']) + print(file['Size']) + num_files += 1 + print(num_files) + + +if __name__ == "__main__": + boto3_session = boto3.Session() + s3_client = boto3_session.client('s3') + validate_results(s3_client) diff --git a/example-github-username/README.md b/example-github-username/README.md index f562fef0..b06466df 100644 --- a/example-github-username/README.md +++ b/example-github-username/README.md @@ -1,6 +1,6 @@ -# ToDos - -- [x] Create a subfolder according to my GitHub username -- [ ] Re-use the content from `../` for `README.md` and modify as needed. -- [ ] Contribute code in this folder -- [ ] Put a pull request when done +# ToDos + +- [x] Create a subfolder according to my GitHub username +- [ ] Re-use the content from `../` for `README.md` and modify as needed. +- [ ] Contribute code in this folder +- [ ] Put a pull request when done \ No newline at end of file