Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel File Uploader by Dcsena #56

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
.idea*
*/resources/*
*/__pycache__/*
*.pyc
34 changes: 34 additions & 0 deletions dcsena/AsyncDirectoryUploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import glob
import asyncio
import functools
from concurrent.futures import Executor

from dcsena import BaseFileUploader
from dcsena.BaseDirectoryUploader import BaseDirectoryUploader


# Class for uploading a directory using the file uploader's implementation for storage
dcsena marked this conversation as resolved.
Show resolved Hide resolved
class AsyncDirectoryUploader(BaseDirectoryUploader):
def __init__(self, file_uploader: BaseFileUploader, executor: Executor):
self.file_uploader = file_uploader
self.executor = executor

@staticmethod
def get_files_to_upload(root_dir: str):
files = glob.glob("**/*", root_dir=root_dir, recursive=True)
# We can't upload directories so first need to filter them out
return [f for f in files if os.path.isfile(os.path.join(root_dir, f))]

# Using asyncio and thread pool executor to make async file upload calls
async def upload_directory_async(self, root_dir: str):
loop = asyncio.get_running_loop()
tasks = []
for f in self.get_files_to_upload(root_dir):
file_name = os.path.join(root_dir, f)
key = f.replace("\\", "/")
tasks.append(loop.run_in_executor(self.executor, functools.partial(self.file_uploader.upload_file, key=key,
file_name=file_name)))
completed, pending = await asyncio.wait(tasks)
results = [t.result() for t in completed]
return results
7 changes: 7 additions & 0 deletions dcsena/BaseDirectoryUploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC


# Abstract class for Directory Uploading
class BaseDirectoryUploader(ABC):
def upload_directory_async(self, root_dir: str):
pass
7 changes: 7 additions & 0 deletions dcsena/BaseFileUploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC


# Abstract class for File Uploading
class BaseFileUploader(ABC):
def upload_file(self, key: str, file_name: str):
pass
66 changes: 66 additions & 0 deletions dcsena/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Multi-threaded file uploader (Backend)

> Ideal candidate: skilled python developer with solid knowledge of cloud and distributed systems.

# Overview

Create a python application that uploads a set of given files to a cloud object storage in parallel through the cloud provider's or third party API.

# Requirements

1. Support up to 100,000nds of files, all inside one directory with arbitrary sizes. The root directory may contain subdirectories.
2. The object storage container which holds the objects is private and only credential-based access is allowed.
3. Each object inside object storage should have an associated metadata which contains file size, last modification time and file permissions.

# Expectations

- Fast (utilize full network bandwidth), low CPU (do not block all other processes) and low Memory (<25% tentatively) file uploader
- Support for AWS S3
- Modular and Object oriented implementation (to add other cloud providers)
- Clean and documented code
- Tests

# Dev Design

## Multithreading vs Multiprocessing vs Asyncio
Probably the most important decision for this implementation is how to handle the parallelization required for the parallel file uploader.
Given the constraints (low CPU, low memory, high IO operations, variable IO speed depending on file size), asyncio is the ideal choice here as it's meant for parallel, heavy IO ops.
However, boto3 and s3's upload_file are not async. There's a library called aiobotocore that attempts to create an async boto3 library, but it has pretty limited support (does not support S3Transfer, only put). It also has a hardpinned botocore dependencies, which will be a PITA from a dependency management size.
Luckily, aibotocore is not really needed, and we can leverage the asyncio running loop and run_in_executor with a thread pool to get around these limitations.



# Setup

Setup virtualenv and install dependencies
```commandline
python3 -m venv venv/
source venv/bin/activate
pip install -r requirements.txt
```

Setup AWS credentials if you have not already. BotoCredentials are best handled through environment variables.

Create resources file:
```commandline
python3 create_resources_script.py
```

Run file uploader:
```commandline
BUCKET_NAME="foo" python3 main.py
```
or if you don't have a default AWS profile setup:
```commandline
BUCKET_NAME="foo" AWS_ACCESS_KEY_ID="123" AWS_SECRET_ACCESS_KEY="abc" python3 main.py
```

# Things not done & Future Improvements
- I only tested with 1k small files here. Proper benchmarks to ensure desired memory and CPU profile at 100k would be a nice next step here.

## Requirements Check-In
1. By leveraging asyncio and threadpool, we can support upload of 100k files. Requirements met.
2. By leveraging a cloud file uploader such as S3, we are abstracting away authentication/authorization to cloud provider. However, S3, GCP, etc. all easily support public/private files with credential-based access. Requirements met.
3. By leveraging a cloud file uploader such as S3, we also get associated metadata for free. S3 will store file size, last modification time, and file perms.
With regards to last modification time and file permissions, I'm assuming that the cloud provider's metadata is sufficient here. If we wanted to store local modification time and local file perms, a metadata file would be needed. For example, if I upload a file at 12 PM, then at 1PM I modify it, and then at 2PM I upload it, the cloud provider would only say lastModificationTime is 2pm.
Local file permissions (RWX) is a can of worms, given different operating systems/users but if that was needed, similarly a metadata file would be needed. Marking requirements met under the listed assumptions.
18 changes: 18 additions & 0 deletions dcsena/S3FileUploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from botocore.client import BaseClient

from dcsena.BaseFileUploader import BaseFileUploader


# S3 Implementation
class S3FileUploader(BaseFileUploader):

def __init__(self, bucket_name: str, s3_client: BaseClient):
self.bucket_name = bucket_name
self.s3_client = s3_client

def upload_file(self, key: str, file_name: str):
# This function call is the same as S3Transfer upload_file
# and is under the hood automatically doing a multipart upload when file is over a specific size.
# This means that we don't need to handle specific logic for large files that would need to be chunked.
# TransferConfig can be passed in if config changes are needed, such as concurrency and size limits
self.s3_client.upload_file(file_name, self.bucket_name, key)
28 changes: 28 additions & 0 deletions dcsena/create_resources_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
import os
import random

RESOURCES_DIR = "./resources"

FILE_CONTENT = "hi1234"


# Creating a directory full of files and subdirectories full of files
def create_files(target_dir, num_files, max_depth):
depth = 0
current_dir = target_dir
for num in range(0, num_files):
random_num = random.randint(0, 100)
if random_num < 50 and depth < max_depth:
current_dir += "/child-dir-{}".format(depth)
if not os.path.exists(current_dir):
os.mkdir(current_dir)
depth += 1
target_file_name = "{}/{}.txt".format(current_dir, num)
with open(target_file_name, 'w') as f:
f.write(FILE_CONTENT)


# Run script to create files for testing purposes
if __name__ == "__main__":
create_files(RESOURCES_DIR, num_files=1000, max_depth=5)
26 changes: 26 additions & 0 deletions dcsena/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3
import concurrent.futures
import os
import boto3
import asyncio

from dcsena.AsyncDirectoryUploader import AsyncDirectoryUploader
from dcsena.S3FileUploader import S3FileUploader

BUCKET_NAME = os.environ["S3_BUCKET"]

RESOURCES_DIR_PATH = "./resources"


async def upload(dir_name):
boto3_session = boto3.Session()
s3_client = boto3_session.client('s3')
s3_file_uploader = S3FileUploader(BUCKET_NAME, s3_client)
executor = concurrent.futures.ThreadPoolExecutor()
dir_uploader = AsyncDirectoryUploader(s3_file_uploader, executor)
return await dir_uploader.upload_directory_async(dir_name)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(upload(RESOURCES_DIR_PATH))
2 changes: 2 additions & 0 deletions dcsena/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.34.35
botocore==1.34.35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been using pyproject.toml for packaging lately, it's concise and does all in one place. See https://github.com/Exabyte-io/esse/blob/dev/pyproject.toml, for example.

69 changes: 69 additions & 0 deletions dcsena/tests/AsyncDirectoryUploaderTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import unittest
from asyncio import Future
from concurrent.futures import Executor
from threading import Lock
from unittest.mock import patch
from unittest import IsolatedAsyncioTestCase

from dcsena.AsyncDirectoryUploader import AsyncDirectoryUploader

TEST_BUCKET = "foo123"


class AsyncDirectoryUploaderTest(unittest.TestCase):
@patch("dcsena.BaseFileUploader")
@patch("concurrent.futures.Executor")
def test_get_files(self, mock_file_uploader, mock_executor):
sut = AsyncDirectoryUploader(mock_file_uploader, mock_executor)
files = sut.get_files_to_upload("../resources")
# files exist and they don't include directories
self.assertTrue(len(files) != 0)
self.assertTrue("child-dir-0" not in files)


# DummyExecutor as it's easier to use than mocking
class DummyExecutor(Executor):

def __init__(self):
self._shutdown = False
self._shutdownLock = Lock()

def submit(self, fn, *args, **kwargs):
with self._shutdownLock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')

f = Future()
try:
result = fn(*args, **kwargs)
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)

return f

def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdownLock:
self._shutdown = True


class AsyncDirectoryUploaderTestAsync(IsolatedAsyncioTestCase):
@patch("dcsena.BaseFileUploader")
async def test_upload_directory_async_happy(self, mock_file_uploader):
mock_executor = DummyExecutor()
sut = AsyncDirectoryUploader(mock_file_uploader, mock_executor)
r = await sut.upload_directory_async("../resources")
self.assertTrue(r)

@patch("dcsena.BaseFileUploader")
async def test_upload_directory_async_unhappy(self, mock_file_uploader):
mock_file_uploader.upload_file.side_effect = Exception("Error")
mock_executor = DummyExecutor()
sut = AsyncDirectoryUploader(mock_file_uploader, mock_executor)
with self.assertRaises(Exception):
await sut.upload_directory_async("../resources")


if __name__ == '__main__':
unittest.main()
24 changes: 24 additions & 0 deletions dcsena/tests/S3FileUploaderTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import unittest
from unittest.mock import Mock, patch

from dcsena.S3FileUploader import S3FileUploader

TEST_BUCKET = "foo123"


class S3FileUploaderTest(unittest.TestCase):
@patch("botocore.client.BaseClient")
def test_upload_happy(self, mock_s3_client):
s3_file_uploader = S3FileUploader(TEST_BUCKET, mock_s3_client)
s3_file_uploader.upload_file("my_key", "my_file")

@patch("botocore.client.BaseClient")
def test_upload_throws_error(self, mock_s3_client):
mock_s3_client.upload_file.side_effect = Exception("Error")
s3_file_uploader = S3FileUploader(TEST_BUCKET, mock_s3_client)
with self.assertRaises(Exception):
s3_file_uploader.upload_file("my_key", "my_file")


if __name__ == '__main__':
unittest.main()
Empty file added dcsena/tests/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions dcsena/tests/verify_upload_test_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3
import os
import boto3

BUCKET_NAME = os.environ["S3_BUCKET"]

RESOURCES_DIR_PATH = "./resources"


def validate_results(s3_client):
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=BUCKET_NAME)
num_files = 0
for page in page_iterator:
for file in page['Contents']:
print(file['Key'])
print(file['LastModified'])
print(file['Size'])
num_files += 1
print(num_files)


if __name__ == "__main__":
boto3_session = boto3.Session()
s3_client = boto3_session.client('s3')
validate_results(s3_client)
12 changes: 6 additions & 6 deletions example-github-username/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ToDos

- [x] Create a subfolder according to my GitHub username
- [ ] Re-use the content from `../` for `README.md` and modify as needed.
- [ ] Contribute code in this folder
- [ ] Put a pull request when done
# ToDos
- [x] Create a subfolder according to my GitHub username
- [ ] Re-use the content from `../` for `README.md` and modify as needed.
- [ ] Contribute code in this folder
- [ ] Put a pull request when done