Skip to content

Commit

Permalink
Merge pull request #207 from bento-platform/feat/streaming/range
Browse files Browse the repository at this point in the history
feat: add streaming module with range header parsing + exc defs
  • Loading branch information
davidlougheed authored May 31, 2024
2 parents 8e8c440 + d17f498 commit 2dbcaed
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 145 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ methods for creating and processing ASTs.
for common structures and operations related to GA4GH's `/service-info`
specification.

### `streaming`

`streaming` contains helper code for streaming bytes via HTTP from files, and
proxied HTTP resources, including exception definitions and HTTP `Range` header
parsing.

### `workflows`

`workflows` contains common code used for handling workflow metadata processing
Expand Down
5 changes: 4 additions & 1 deletion bento_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from . import schemas
from . import search
from . import service_info
from . import streaming
from . import workflows

__version__ = metadata.version(__name__)
__all__ = ["__version__", "apps", "auth", "drs", "events", "schemas", "search", "service_info", "workflows"]
__all__ = [
"__version__", "apps", "auth", "drs", "events", "schemas", "search", "service_info", "streaming", "workflows"
]
Empty file added bento_lib/streaming/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions bento_lib/streaming/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
__all__ = [
"StreamingRangeNotSatisfiable",
"StreamingBadRange",
"StreamingProxyingError",
"StreamingResponseExceededLimit",
"StreamingBadURI",
"StreamingUnsupportedURIScheme",
]


class StreamingRangeNotSatisfiable(Exception):
def __init__(self, message: str, n_bytes: int | None):
self._n_bytes: int | None = n_bytes
super().__init__(message)

@property
def n_bytes(self) -> int:
return self._n_bytes


class StreamingBadRange(Exception):
pass


class StreamingProxyingError(Exception):
pass


class StreamingResponseExceededLimit(Exception):
pass


class StreamingBadURI(Exception):
pass


class StreamingUnsupportedURIScheme(Exception):
pass
80 changes: 80 additions & 0 deletions bento_lib/streaming/range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import re

from .exceptions import StreamingRangeNotSatisfiable, StreamingBadRange

__all__ = ["parse_range_header"]


BYTE_RANGE_INTERVAL_SPLIT = re.compile(r",\s*")
BYTE_RANGE_START_ONLY = re.compile(r"^(\d+)-$")
BYTE_RANGE_START_END = re.compile(r"^(\d+)-(\d+)$")
BYTE_RANGE_SUFFIX = re.compile(r"^-(\d+)$")


def parse_range_header(
range_header: str | None, content_length: int, refget_mode: bool = False
) -> tuple[tuple[int, int], ...]:
"""
Parse a range header (given a particular content length) into a validated series of sorted, non-overlapping
start/end-inclusive intervals.
"""

if range_header is None:
return ((0, content_length - 1),)

intervals: list[tuple[int, int]] = []

if not range_header.startswith("bytes="):
raise StreamingBadRange("only bytes range headers are supported")

intervals_str = range_header.removeprefix("bytes=")

# Cases: start- | start-end | -suffix, [start- | start-end | -suffix], ...

intervals_str_split = BYTE_RANGE_INTERVAL_SPLIT.split(intervals_str)

for iv in intervals_str_split:
if m := BYTE_RANGE_START_ONLY.match(iv):
intervals.append((int(m.group(1)), content_length - 1))
elif m := BYTE_RANGE_START_END.match(iv):
intervals.append((int(m.group(1)), int(m.group(2))))
elif m := BYTE_RANGE_SUFFIX.match(iv):
inclusive_content_length = content_length - 1
suffix_length = int(m.group(1)) # suffix: -500 === last 500:
intervals.append((max(inclusive_content_length - suffix_length + 1, 0), inclusive_content_length))
else:
raise StreamingBadRange("byte range did not match any pattern")

intervals.sort()
n_intervals: int = len(intervals)

# validate intervals are not inverted and do not overlap each other:
for i, int1 in enumerate(intervals):
int1_start, int1_end = int1

# Order of these checks is important - we want to give a 416 if start/end is beyond content length (which also
# results in an inverted interval)

if int1_start >= content_length:
# both ends of the range are 0-indexed, inclusive - so it starts at 0 and ends at content_length - 1
if refget_mode: # sigh... GA4GH moment
raise StreamingBadRange(f"start is beyond content length: {int1_start} >= {content_length}")
raise StreamingRangeNotSatisfiable(f"not satisfiable: {int1_start} >= {content_length}", content_length)

if int1_end >= content_length:
# both ends of the range are 0-indexed, inclusive - so it starts at 0 and ends at content_length - 1
if refget_mode: # sigh... GA4GH moment
raise StreamingBadRange(f"end is beyond content length: {int1_end} >= {content_length}")
raise StreamingRangeNotSatisfiable(f"not satisfiable: {int1_end} >= {content_length}", content_length)

if not refget_mode and int1_start > int1_end:
raise StreamingRangeNotSatisfiable(f"inverted interval: {int1}", content_length)

if i < n_intervals - 1:
int2 = intervals[i + 1]
int2_start, int2_end = int2

if int1_end >= int2_start:
raise StreamingRangeNotSatisfiable(f"intervals overlap: {int1}, {int2}", content_length)

return tuple(intervals)
Loading

0 comments on commit 2dbcaed

Please sign in to comment.