Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add initial support for streaming torrent files #3657

Draft
wants to merge 33 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6378462
locate stream for streaming API by identifier
shyba Sep 1, 2022
8212e73
stream torrent from file
shyba Sep 1, 2022
7828041
stream type independent stream_url
shyba Sep 1, 2022
8ee5cee
update flags, set sequential as a flag
shyba Sep 5, 2022
6efd4dd
fix save path, fix prio, update deprecated calls
shyba Sep 5, 2022
b3bff39
stream from torrent pieces, holding the response until the piece is c…
shyba Sep 5, 2022
b2f8207
piece prioritization and deadlines
shyba Sep 7, 2022
df680e7
fix tests and off by one error
shyba Sep 9, 2022
7410991
save resume data on stop, remove/replace deprecated calls
shyba Sep 9, 2022
dd103d0
save file-torrent association for file list
shyba Sep 16, 2022
7746ded
add test case for restart, fix torrent file update
shyba Sep 16, 2022
37adc59
add tests for streaming, fix bugs
shyba Sep 23, 2022
7c7e185
refactor add_torrent, lints
shyba Sep 23, 2022
f650e8f
test and bugfixes for streaming multifile in a subfolder case
shyba Sep 29, 2022
e862c99
generate 3 files, check that streamed is the largest, add method to l…
shyba Sep 29, 2022
c8f2502
start the stream after adding
shyba Sep 30, 2022
af0ad41
generalize DownloadSDTimeout to DownloadMetadata timeout + fix usages
shyba Oct 12, 2022
b39971b
fix tests for changed error msg
shyba Oct 12, 2022
77d2c81
fix missing added_on for torrent files
shyba Oct 27, 2022
2bf0ca6
fix mime_type for torrent on json encoder
shyba Oct 27, 2022
732b7e7
fix suggested_file_name for torrent on json encoder
shyba Oct 27, 2022
31c6e0e
fix stream_name for torrent on json encoder
shyba Oct 27, 2022
651348f
fix status for completed torrents
shyba Oct 28, 2022
1041a19
deserialize torrent fields properly
shyba Oct 28, 2022
39da718
remove dead code
shyba Oct 28, 2022
8ce5306
fix filtering for fields missing on torrents
shyba Oct 28, 2022
2bea8f5
fix duplicated file entry on startup
shyba Oct 28, 2022
9dc617f
use a non-default port for streaming test so it can run with a live i…
shyba Oct 28, 2022
5cf63fa
restore torrent rowid on restart
shyba Nov 1, 2022
9d86982
test picking file from claim file name
shyba Nov 5, 2022
64aad14
pick file from file name, fallback to largest
shyba Nov 5, 2022
636b7ed
tests: enable logging lbry.torrent when verbosity changes
shyba Nov 5, 2022
dbe3ace
pylint
shyba Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lbry/error/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ Code | Name | Message
511 | CorruptBlob | Blobs is corrupted.
520 | BlobFailedEncryption | Failed to encrypt blob.
531 | DownloadCancelled | Download was canceled.
532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout.
532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout.
534 | InvalidStreamDescriptor | {message}
535 | InvalidData | {message}
536 | InvalidBlobHash | {message}
Expand Down
6 changes: 3 additions & 3 deletions lbry/error/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,18 +411,18 @@ def __init__(self):
super().__init__("Download was canceled.")


class DownloadSDTimeoutError(BlobError):
class DownloadMetadataTimeoutError(BlobError):

def __init__(self, download):
self.download = download
super().__init__(f"Failed to download sd blob {download} within timeout.")
super().__init__(f"Failed to download metadata for {download} within timeout.")


class DownloadDataTimeoutError(BlobError):

def __init__(self, download):
self.download = download
super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.")
super().__init__(f"Failed to download data blobs for {download} within timeout.")


class InvalidStreamDescriptorError(BlobError):
Expand Down
13 changes: 7 additions & 6 deletions lbry/extras/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from lbry.blob_exchange.downloader import download_blob
from lbry.dht.peer import make_kademlia_peer
from lbry.error import (
DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError,
ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError,
InputValueError
Expand Down Expand Up @@ -639,7 +639,7 @@ async def handle_stream_get_request(self, request: web.Request):
stream = await self.jsonrpc_get(uri)
if isinstance(stream, dict):
raise web.HTTPServerError(text=stream['error'])
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
raise web.HTTPFound(f"/stream/{stream.identifier}")

async def handle_stream_range_request(self, request: web.Request):
try:
Expand All @@ -658,12 +658,13 @@ async def handle_stream_range_request(self, request: web.Request):
log.debug("finished handling /stream range request")

async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1]
identifier = request.path.split("/stream/")[1]
if not self.file_manager.started.is_set():
await self.file_manager.started.wait()
if sd_hash not in self.file_manager.streams:
stream = self.file_manager.get_filtered(identifier=identifier)
if not stream:
return web.HTTPNotFound()
return await self.file_manager.stream_partial_content(request, sd_hash)
return await self.file_manager.stream_partial_content(request, identifier)

async def _process_rpc_call(self, data):
args = data.get('params', {})
Expand Down Expand Up @@ -1139,7 +1140,7 @@ async def jsonrpc_get(
save_file=save_file, wallet=wallet
)
if not stream:
raise DownloadSDTimeoutError(uri)
raise DownloadMetadataTimeoutError(uri)
except Exception as e:
# TODO: use error from lbry.error
log.warning("Error downloading %s: %s", uri, str(e))
Expand Down
16 changes: 4 additions & 12 deletions lbry/extras/daemon/json_response_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,18 @@ def encode_file(self, managed_stream):
else:
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
result = {
'streaming_url': None,
'streaming_url': managed_stream.stream_url,
'completed': managed_stream.completed,
'file_name': None,
'download_directory': None,
'download_path': None,
'points_paid': 0.0,
'stopped': not managed_stream.running,
'stream_hash': None,
'stream_name': None,
'suggested_file_name': None,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': None,
'mime_type': None,
'mime_type': managed_stream.mime_type,
'key': None,
'total_bytes_lower_bound': total_bytes_lower_bound,
'total_bytes': total_bytes,
Expand Down Expand Up @@ -326,12 +326,8 @@ def encode_file(self, managed_stream):
}
if is_stream:
result.update({
'streaming_url': managed_stream.stream_url,
'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key,
'blobs_completed': managed_stream.blobs_completed,
'blobs_in_stream': managed_stream.blobs_in_stream,
Expand All @@ -340,10 +336,6 @@ def encode_file(self, managed_stream):
'reflector_progress': managed_stream.reflector_progress,
'uploading_to_reflector': managed_stream.uploading_to_reflector
})
else:
result.update({
'streaming_url': f'file://{managed_stream.full_path}',
})
if output_exists:
result.update({
'file_name': managed_stream.file_name,
Expand Down
32 changes: 24 additions & 8 deletions lbry/extras/daemon/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import binascii
import time
from operator import itemgetter
from typing import Optional
from lbry.wallet import SQLiteMixin
from lbry.conf import Config
Expand Down Expand Up @@ -211,23 +212,26 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()


def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
if not file_name and not download_directory:
encoded_file_name, encoded_download_dir = None, None
else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
is_torrent = len(identifier_value) == 40
time_added = added_on or int(time.time())
transaction.execute(
"insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)",
(identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
).fetchall()

return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
return transaction.execute(
f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?",
(identifier_value, )).fetchone()[0]


class SQLiteStorage(SQLiteMixin):
Expand Down Expand Up @@ -632,6 +636,13 @@ def update_db_removed(transaction: sqlite3.Connection, removed):
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
return self.db.run(get_all_lbry_files)

async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
def _get_all_torrent_files(transaction):
cursor = transaction.execute(
"select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash")
return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall())
return list(await self.db.run(_get_all_torrent_files))

def change_file_status(self, stream_hash: str, new_status: str):
log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
Expand Down Expand Up @@ -872,15 +883,20 @@ async def save_content_claim(self, stream_hash, claim_outpoint):
if stream_hash in self.content_claim_callbacks:
await self.content_claim_callbacks[stream_hash]()

async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent(transaction):
async def add_torrent(self, bt_infohash, length, name):
def _save_torrent(transaction, bt_infohash, length, name):
transaction.execute(
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
).fetchall()
return await self.db.run(_save_torrent, bt_infohash, length, name)

async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent_claim(transaction):
transaction.execute(
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
).fetchall()
await self.db.run(_save_torrent)
await self.add_torrent(bt_infohash, length, name)
await self.db.run(_save_torrent_claim)
# update corresponding ManagedEncryptedFileDownloader object
if bt_infohash in self.content_claim_callbacks:
await self.content_claim_callbacks[bt_infohash]()
Expand All @@ -898,7 +914,7 @@ async def get_content_claim(self, stream_hash: str, include_supports: typing.Opt

async def get_content_claim_for_torrent(self, bt_infohash):
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
return claims[bt_infohash].as_dict() if claims else None
return claims[bt_infohash] if claims else None

# # # # # # # # # reflector functions # # # # # # # # #

Expand Down
33 changes: 19 additions & 14 deletions lbry/file/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.error import InvalidStreamURLError
from lbry.stream.managed_stream import ManagedStream
Expand Down Expand Up @@ -139,7 +139,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
existing[0].set_claim(claim_info, claim)
existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim)
else:
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
Expand Down Expand Up @@ -242,15 +242,15 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
stream.set_claim(claim_info.as_dict() if claim_info else None, claim)
if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
error = DownloadDataTimeoutError(stream.identifier)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
Expand Down Expand Up @@ -290,19 +290,24 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
)
)

async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
async def stream_partial_content(self, request: Request, identifier: str):
for source_manager in self.source_managers.values():
if source_manager.get_filtered(identifier=identifier):
return await source_manager.stream_partial_content(request, identifier)

def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects

:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers
"""
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
result = last_error = None
for manager in self.source_managers.values():
try:
result = (result or []) + manager.get_filtered(*args, **kwargs)
except ValueError as error:
last_error = error
if result is not None:
return result
raise last_error

async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values():
Expand Down
17 changes: 13 additions & 4 deletions lbry/file/source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import asyncio
import time
import typing
import logging
import binascii
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None
self._added_on = added_on
self._added_on = added_on or int(time.time())
self.analytics_manager = analytics_manager
self.downloader = None

Expand Down Expand Up @@ -91,6 +92,14 @@ def file_name(self) -> Optional[str]:
def added_on(self) -> Optional[int]:
return self._added_on

@property
def suggested_file_name(self):
return self._file_name

@property
def stream_name(self):
return self.suggested_file_name

@property
def status(self) -> str:
return self._status
Expand All @@ -99,9 +108,9 @@ def status(self) -> str:
def completed(self):
raise NotImplementedError()

# @property
# def stream_url(self):
# return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}"

@property
def finished(self) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions lbry/file/source_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

class SourceManager:
filter_fields = {
'identifier',
'rowid',
'status',
'file_name',
Expand Down Expand Up @@ -83,6 +84,7 @@ async def create(self, file_path: str, key: Optional[bytes] = None,
raise NotImplementedError()

async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.storage.delete_torrent(source.identifier)
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)
Expand Down
4 changes: 2 additions & 2 deletions lbry/stream/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import binascii

from lbry.dht.node import get_kademlia_peers_from_hosts
from lbry.error import DownloadSDTimeoutError
from lbry.error import DownloadMetadataTimeoutError
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
Expand Down Expand Up @@ -77,7 +77,7 @@ async def load_descriptor(self, connection_id: int = 0):
log.info("downloaded sd blob %s", self.sd_hash)
self.time_to_descriptor = self.loop.time() - now
except asyncio.TimeoutError:
raise DownloadSDTimeoutError(self.sd_hash)
raise DownloadMetadataTimeoutError(self.sd_hash)

# parse the descriptor
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
Expand Down
8 changes: 2 additions & 6 deletions lbry/stream/managed_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.error import DownloadSDTimeoutError
from lbry.error import DownloadMetadataTimeoutError
from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
Expand Down Expand Up @@ -104,10 +104,6 @@ def written_bytes(self) -> int:
def completed(self):
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()

@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"

async def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status
Expand Down Expand Up @@ -164,7 +160,7 @@ async def start(self, timeout: Optional[float] = None,
await asyncio.wait_for(self.downloader.start(), timeout)
except asyncio.TimeoutError:
self._running.clear()
raise DownloadSDTimeoutError(self.sd_hash)
raise DownloadMetadataTimeoutError(self.identifier)

if self.delayed_stop_task and not self.delayed_stop_task.done():
self.delayed_stop_task.cancel()
Expand Down
2 changes: 1 addition & 1 deletion lbry/stream/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream]

filter_fields = SourceManager.filter_fields
filter_fields = set(SourceManager.filter_fields)
filter_fields.update({
'sd_hash',
'stream_hash',
Expand Down
Loading