Skip to content

Commit

Permalink
Merge pull request #89 from NabilMostafa/stream-downloaded-files
Browse files Browse the repository at this point in the history
Stream downloaded files
  • Loading branch information
omarryhan authored Mar 1, 2022
2 parents 2e900ff + 0a27589 commit d56c7e5
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 6 deletions.
9 changes: 8 additions & 1 deletion aiogoogle/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ class MediaDownload:
chunksize (int): Size of a chunk of bytes that a session should write at a time when downloading.
pipe_to (object): class object to stream file content to
"""

def __init__(self, file_path, chunk_size=None):
def __init__(self, file_path=None, chunk_size=None, pipe_to=None):
self.file_path = file_path
self.pipe_to = pipe_to
self.chunk_size = chunk_size or DEFAULT_DOWNLOAD_CHUNK_SIZE


Expand Down Expand Up @@ -258,6 +261,8 @@ class Response:
download_file (str): path of the download file specified in the request
pipe_to (object): class object to stream file content to specified in the request.
upload_file (str): path of the upload file specified in the request
session_factory (aiogoogle.sessions.abc.AbstractSession): A callable implementation of aiogoogle's session interface
Expand All @@ -273,6 +278,7 @@ def __init__(
reason=None,
req=None,
download_file=None,
pipe_to=None,
upload_file=None,
session_factory=None,
):
Expand All @@ -287,6 +293,7 @@ def __init__(
self.reason = reason
self.req = req
self.download_file = download_file
self.pipe_to = pipe_to
self.upload_file = upload_file
self.session_factory = session_factory

Expand Down
26 changes: 23 additions & 3 deletions aiogoogle/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"json",
"upload_file",
"download_file",
"pipe_to",
"timeout",
]

Expand Down Expand Up @@ -60,6 +61,7 @@ def wrapper(
json=None,
upload_file=None,
download_file=None,
pipe_to=None,
timeout=None,
**uri_params,
):
Expand All @@ -84,6 +86,7 @@ def wrapper(
json,
upload_file,
download_file,
pipe_to,
timeout,
**uri_params,
)
Expand Down Expand Up @@ -387,6 +390,7 @@ def __call__(
json=None,
upload_file=None,
download_file=None,
pipe_to=None,
timeout=None,
**uri_params,
) -> Request:
Expand Down Expand Up @@ -548,16 +552,32 @@ def __call__(
if body is not None:
self._validate_body(body)

# Validate selected options
if download_file and pipe_to:
raise ValueError(
"can't have both (download_file) and (pipe_to) options"
)

if not download_file and not pipe_to:
media_download = None

# Process download_file
if download_file:
if validate is True:
if self.__getitem__("supportsMediaDownload") is not True:
raise ValidationError(
"download_file was provided while method doesn't support media download"
)
media_download = MediaDownload(download_file)
else:
media_download = None
media_download = MediaDownload(file_path=download_file)

# Process pipe_to
if pipe_to:
if validate is True:
if self.__getitem__("supportsMediaDownload") is not True:
raise ValidationError(
"pipe_to was provided while method doesn't support media download"
)
media_download = MediaDownload(pipe_to=pipe_to)

# Process upload_file
if upload_file:
Expand Down
13 changes: 11 additions & 2 deletions aiogoogle/sessions/aiohttp_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,22 @@ async def resolve_response(request, response):
json = None
download_file = None
upload_file = None
pipe_to = None

# If downloading file:
if request.media_download:
chunk_size = request.media_download.chunk_size
download_file = request.media_download.file_path
async with aiofiles.open(download_file, "wb+") as f:
pipe_to = request.media_download.pipe_to

if pipe_to:
async for line in response.content.iter_chunked(chunk_size):
await f.write(line)
await pipe_to.write(line)

if download_file:
async with aiofiles.open(download_file, "wb+") as f:
async for line in response.content.iter_chunked(chunk_size):
await f.write(line)
else:
if response.status != 204: # If no (no content)
try:
Expand All @@ -79,6 +87,7 @@ async def resolve_response(request, response):
reason=response.reason if getattr(response, "reason") else None,
req=request,
download_file=download_file,
pipe_to=pipe_to,
upload_file=upload_file,
session_factory=session_factory,
)
Expand Down
63 changes: 63 additions & 0 deletions examples/stream_drive_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/python3.7

'''
Scopes Required:
* https://www.googleapis.com/auth/drive
* https://www.googleapis.com/auth/drive.file
API explorer link:
* https://developers.google.com/apis-explorer/#p/drive/v3/drive.files.get
'''

import asyncio
import sys

from helpers import Aiogoogle, user_creds, client_creds

usage = """
Usage:
argv1: ID of the file you want to stream (see hint)
Hint:
run list_drive_files.py to list the names of the files you own alongside their IDs
Example:
List the id of "my_old_archive.tar.gz" by running:
./list_drive_files | grep my_old_archive.tar.gz
0Bw9MwYF2OXbSc0d0ZmNlRjVhMmM: myold_archive.tar.gz
Now run:
./stream_drive_file.py 0Bw9MwYF2OXbSc0d0ZmNlRjVhMmM
"""


class MyFile:
@staticmethod
async def write(data: bytes):
print(data)


async def stream_file(file_id):
async with Aiogoogle(user_creds=user_creds, client_creds=client_creds) as aiogoogle:
drive_v3 = await aiogoogle.discover("drive", "v3")

# Stream the file
await aiogoogle.as_user(
drive_v3.files.get(fileId=file_id, pipe_to=MyFile(), alt="media")
)


if __name__ == "__main__":
try:
file_id = sys.argv[1]
except IndexError:
print(usage)
sys.exit(1)
asyncio.run(stream_file(file_id))

0 comments on commit d56c7e5

Please sign in to comment.