Skip to content

Commit

Permalink
Merge pull request #10 from mediacatch/feature/52-multipart-upload
Browse files Browse the repository at this point in the history
Multipart Upload
  • Loading branch information
FredHaa authored May 3, 2023
2 parents 2e3e35f + c3d9f2e commit ba9a73e
Show file tree
Hide file tree
Showing 4 changed files with 574 additions and 18 deletions.
15 changes: 14 additions & 1 deletion src/mediacatch_s2t/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,22 @@
'https://s2t.mediacatch.io'
)

PRESIGNED_ENDPOINT = (
SINGLE_UPLOAD_ENDPOINT = (
os.environ.get('MEDIACATCH_PRESIGN_ENDPOINT') or
'/presigned-post-url'
)
MULTIPART_UPLOAD_CREATE_ENDPOINT = (
os.environ.get('MEDIACATCH_MULTIPART_UPLOAD_CREATE_ENDPOINT') or
'/multipart-upload/id'
)
MULTIPART_UPLOAD_URL_ENDPOINT = (
os.environ.get('MEDIACATCH_MULTIPART_UPLOAD_URL_ENDPOINT') or
'/multipart-upload/url'
)
MULTIPART_UPLOAD_COMPLETE_ENDPOINT = (
os.environ.get('MEDIACATCH_MULTIPART_UPLOAD_COMPLETE_ENDPOINT') or
'/multipart-upload/complete'
)
UPDATE_STATUS_ENDPOINT = (
os.environ.get('MEDIACATCH_UPDATE_STATUS_ENDPOINT') or
'/upload-completed'
Expand All @@ -25,3 +37,4 @@
'/result'
)
PROCESSING_TIME_RATIO = 0.1
CHUNK_SIZE_MIN = 50 * 1024 * 1024
268 changes: 253 additions & 15 deletions src/mediacatch_s2t/uploader.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
import json
import os
import pathlib
import shutil
import tempfile

import requests
import subprocess
import json
from typing import NamedTuple


from mediacatch_s2t import (
URL, PRESIGNED_ENDPOINT, TRANSCRIPT_ENDPOINT, UPDATE_STATUS_ENDPOINT, PROCESSING_TIME_RATIO
URL,
SINGLE_UPLOAD_ENDPOINT, TRANSCRIPT_ENDPOINT, UPDATE_STATUS_ENDPOINT,
MULTIPART_UPLOAD_CREATE_ENDPOINT, MULTIPART_UPLOAD_URL_ENDPOINT,
MULTIPART_UPLOAD_COMPLETE_ENDPOINT,
PROCESSING_TIME_RATIO, CHUNK_SIZE_MIN
)


class FFProbeResult(NamedTuple):
return_code: int
json: str
error: str


class UploaderException(Exception):
pass
message = "Error from uploader module"

def __init__(self, cause=None):
self.cause = cause

def __str__(self):
if self.cause:
return "{}: {}".format(self.message, str(self.cause))
else:
return self.message


class Uploader:
class UploaderBase:
def __init__(self, file, api_key, language='da'):
self.file = file
self.api_key = api_key
Expand All @@ -31,6 +47,13 @@ def __init__(self, file, api_key, language='da'):
def _is_file_exist(self):
return pathlib.Path(self.file).is_file()

def is_multipart_upload(self) -> bool:
if self._is_file_exist():
filesize = os.path.getsize(self.file)
if filesize > CHUNK_SIZE_MIN:
return True
return False

def _is_response_error(self, response):
if response.status_code >= 400:
if response.status_code == 401:
Expand All @@ -46,7 +69,7 @@ def _make_post_request(self, *args, **kwargs):
raise Exception(msg)
return response
except Exception as e:
raise UploaderException(str(e))
raise UploaderException("Error during post request") from e

@property
def _transcript_link(self):
Expand All @@ -55,15 +78,17 @@ def _transcript_link(self):
@staticmethod
def _ffprobe(file_path) -> FFProbeResult:
command_array = ["ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
file_path]
result = subprocess.run(command_array, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
file_path]
result = subprocess.run(command_array, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
return FFProbeResult(return_code=result.returncode,
json=json.loads(result.stdout),
error=result.stderr)
json=json.loads(result.stdout),
error=result.stderr)

def get_duration(self):
"""Get audio track duration of a file.
Expand Down Expand Up @@ -100,7 +125,7 @@ def estimated_result_time(self, audio_length=0):

def _get_upload_url(self, mime_file):
response = self._make_post_request(
url=f'{URL}{PRESIGNED_ENDPOINT}',
url=f'{URL}{SINGLE_UPLOAD_ENDPOINT}',
json=mime_file,
headers={
"Content-type": 'application/json',
Expand Down Expand Up @@ -137,6 +162,15 @@ def _get_transcript_link(self):
)
return self._transcript_link


class Uploader(UploaderBase):
"""Uploader Class
This class is to send a file to the API server.
The API server currently only allows file less than 4gb
to be sent with this upload class.
"""

def upload_file(self):
result = {
"url": "",
Expand Down Expand Up @@ -178,11 +212,215 @@ def upload_file(self):
result = {
"url": transcript_link,
"status": "uploaded",
"estimated_processing_time": self.estimated_result_time(file_duration),
"estimated_processing_time": self.estimated_result_time(
file_duration),
"message": "The file has been uploaded."
}
return result


class ChunkedFileUploader(UploaderBase):
"""Multipart Uploader Class
This class is to split a bigfile into chunked files, and send them
with multipart upload method.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.filename = pathlib.Path(self.file).name
self.file_ext = pathlib.Path(self.file).suffix
self.filesize = os.path.getsize(self.file)

self.file_id: str = ""
self.chunk_maxsize: int = 0
self.total_chunks: int = 0
self.upload_id: str = ""

self.endpoint_create: str = f"{URL}{MULTIPART_UPLOAD_CREATE_ENDPOINT}"
self.endpoint_signed_url: str = f"{URL}{MULTIPART_UPLOAD_URL_ENDPOINT}"
self.endpoint_complete: str = f"{URL}{MULTIPART_UPLOAD_COMPLETE_ENDPOINT}"
self.headers: dict = self._get_headers()

self.temp_dir: str = ""
self.etags: list = []

self.result = {
"url": "",
"status": "",
"estimated_processing_time": 0,
"message": ""
}

def _get_headers(self) -> dict:
return {
"Content-type": "application/json",
"X-API-KEY": self.api_key
}

def _create_temp_dir_path(self) -> str:
prefix = 'mc_s2t_'
if self.file_id:
prefix += f"{self.file_id}_"
temporary_directory = tempfile.mkdtemp(prefix=prefix)
return temporary_directory

def _get_file_path(self, filename: str) -> str:
temp_dir = pathlib.Path(self.temp_dir)
filepath = temp_dir / filename
return str(filepath)

def _get_latest_chunk_size(self) -> int:
is_odd = self.filesize % self.chunk_maxsize
if is_odd:
total_chunks_filesize_before_the_last = (
self.chunk_maxsize * (self.total_chunks - 1)
)
last_chunk_filesize = (
self.filesize - total_chunks_filesize_before_the_last
)
else:
last_chunk_filesize = self.chunk_maxsize
return last_chunk_filesize

def _write_chunk_to_temp_file(self, chunk: bytes, filepath: str) -> None:
with open(filepath, 'wb') as f:
f.write(chunk)
return None

def _set_result_error_message(self, msg) -> None:
self.result["status"] = "error"
self.result["message"] = msg

def _set_metadata(self, file_id: str, chunk_maxsize: int,
total_chunks: int, upload_id: str) -> None:
self.file_id = file_id
self.chunk_maxsize = chunk_maxsize
self.total_chunks = total_chunks
self.upload_id = upload_id
return None

def _tear_down(self):
shutil.rmtree(self.temp_dir)

def create_multipart_upload(self, mime_file: dict) -> dict:
response = self._make_post_request(
url=self.endpoint_create,
headers=self.headers,
json=mime_file
)
data: dict = response.json()
return {
"chunk_maxsize": data["chunk_maxsize"],
"file_id": data["file_id"],
"total_chunks": data["total_chunks"],
"upload_id": data["upload_id"]
}

def split_file_into_chunks(self) -> None:
self.temp_dir = self._create_temp_dir_path()
with open(self.file, 'rb') as f:
part_number = 0
latest_chunk_size = self._get_latest_chunk_size()
while True:
part_number += 1
chunk_size = self.chunk_maxsize
if part_number == self.total_chunks:
chunk_size = latest_chunk_size
chunk = f.read(chunk_size)
if not chunk:
break

self._write_chunk_to_temp_file(
chunk=chunk,
filepath=self._get_file_path(str(part_number))
)
return None

def get_signed_url(self, part_number: int) -> str:
response = self._make_post_request(
url=self.endpoint_signed_url,
headers=self.headers,
json={
"file_id": self.file_id,
"upload_id": self.upload_id,
"part_number": part_number
}
)
data: dict = response.json()
return data["url"]

def upload_chunks(self, part_number: int, url: str) -> str:
filepath: str = self._get_file_path(str(part_number))
with open(filepath, 'rb') as f:
file_data = f.read()
response: requests.Response = requests.put(url=url, data=file_data)
etag: str = response.headers['ETag']
return etag

def complete_the_upload(self) -> bool:
response: requests.Response = self._make_post_request(
url=self.endpoint_complete,
headers=self.headers,
json={
"file_id": self.file_id,
"parts": self.etags
}
)
if response.status_code != 201:
return False
return True

def upload_file(self):
if not self._is_file_exist():
self._set_result_error_message("The file doesn't exist")
return self.result

file_duration, msg = self.get_duration()
if not file_duration:
self._set_result_error_message(msg)
return self.result

mime_file = {
"duration": file_duration,
"filename": self.filename,
"file_ext": self.file_ext,
"filesize": self.filesize,
"language": self.language,
}
try:
meta = self.create_multipart_upload(mime_file)
self._set_metadata(
file_id=meta["file_id"],
chunk_maxsize=meta["chunk_maxsize"],
total_chunks=meta["total_chunks"],
upload_id=meta["upload_id"]
)
self.split_file_into_chunks()
for part in range(1, self.total_chunks + 1):
url = self.get_signed_url(part)
etag = self.upload_chunks(part, url)
self.etags.append({'ETag': etag, 'PartNumber': part})
self.complete_the_upload()
transcript_link = self._get_transcript_link()
self._tear_down()
except Exception as e:
self._set_result_error_message(str(e))
return self.result

self.result = {
"url": transcript_link,
"status": "uploaded",
"estimated_processing_time": self.estimated_result_time(
file_duration),
"message": "The file has been uploaded."
}
return self.result


def upload_and_get_transcription(file, api_key, language):
is_multipart_upload = UploaderBase(
file, api_key, language).is_multipart_upload()
if is_multipart_upload:
return ChunkedFileUploader(file, api_key, language).upload_file()
return Uploader(file, api_key, language).upload_file()
Loading

0 comments on commit ba9a73e

Please sign in to comment.