diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index b57de6d..0ed02ac 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -33,5 +33,5 @@ jobs: run: | python -m pytest --cov=tastytrade --cov-report=term-missing tests/ --cov-fail-under=95 env: - TT_USERNAME: ${{ secrets.TT_USERNAME }} - TT_PASSWORD: ${{ secrets.TT_PASSWORD }} + TT_USERNAME: tastyware + TT_PASSWORD: :4s-S9/9L&Q~C]@v diff --git a/README.rst b/README.rst index a639e18..dbc9090 100644 --- a/README.rst +++ b/README.rst @@ -40,10 +40,10 @@ The streamer is a websocket connection to dxfeed (the Tastytrade data provider) .. code-block:: python - from tastytrade import DataStreamer + from tastytrade import DXFeedStreamer from tastytrade.dxfeed import EventType - streamer = await DataStreamer.create(session) + streamer = await DXFeedStreamer.create(session) subs_list = ['SPY', 'SPX'] await streamer.subscribe(EventType.QUOTE, subs_list) diff --git a/docs/data-streamer.rst b/docs/data-streamer.rst index acfa8a8..5b75efb 100644 --- a/docs/data-streamer.rst +++ b/docs/data-streamer.rst @@ -9,8 +9,8 @@ You can create a streamer using an active production session: .. code-block:: python - from tastytrade import DataStreamer - streamer = await DataStreamer.create(session) + from tastytrade import DXFeedStreamer + streamer = await DXFeedStreamer.create(session) Once you've created the streamer, you can subscribe/unsubscribe to events, like ``Quote``: @@ -67,7 +67,7 @@ For example, we can use the streamer to create an option chain that will continu class LivePrices: quotes: dict[str, Quote] greeks: dict[str, Greeks] - streamer: DataStreamer + streamer: DXFeedStreamer puts: list[Option] calls: list[Option] @@ -83,7 +83,7 @@ For example, we can use the streamer to create an option chain that will continu # the `streamer_symbol` property is the symbol used by the streamer streamer_symbols = [o.streamer_symbol for o in options] - streamer = await DataStreamer.create(session) + streamer = await DXFeedStreamer.create(session) # subscribe to quotes and greeks for all options on that date await streamer.subscribe(EventType.QUOTE, [symbol] + streamer_symbols) await streamer.subscribe(EventType.GREEKS, streamer_symbols) diff --git a/docs/sessions.rst b/docs/sessions.rst index c04929c..fc93ab0 100644 --- a/docs/sessions.rst +++ b/docs/sessions.rst @@ -32,7 +32,7 @@ Events ------ A ``ProductionSession`` can be used to make simple requests to the dxfeed REST API and pull quotes, greeks and more. -These requests are slower than ``DataStreamer`` and a separate request is required for each event fetched, so they're really more appropriate for a task that just needs to grab some information once. For recurring data feeds/streams or more time-sensitive tasks, the streamer is more appropriate. +These requests are slower than ``DXFeedStreamer`` and a separate request is required for each event fetched, so they're really more appropriate for a task that just needs to grab some information once. For recurring data feeds/streams or more time-sensitive tasks, the streamer is more appropriate. Events are simply market data at a specific timestamp. There's a variety of different kinds of events supported, including: diff --git a/tastytrade/__init__.py b/tastytrade/__init__.py index 0999234..24a2d6e 100644 --- a/tastytrade/__init__.py +++ b/tastytrade/__init__.py @@ -10,14 +10,16 @@ from .account import Account # noqa: E402 from .search import symbol_search # noqa: E402 from .session import CertificationSession, ProductionSession # noqa: E402 -from .streamer import AlertStreamer, DataStreamer # noqa: E402 +from .streamer import (AccountStreamer, DXFeedStreamer, # noqa: E402 + DXLinkStreamer) from .watchlists import PairsWatchlist, Watchlist # noqa: E402 __all__ = [ 'Account', - 'AlertStreamer', + 'AccountStreamer', 'CertificationSession', - 'DataStreamer', + 'DXFeedStreamer', + 'DXLinkStreamer', 'PairsWatchlist', 'ProductionSession', 'Watchlist', diff --git a/tastytrade/dxfeed/event.py b/tastytrade/dxfeed/event.py index 2cb25d8..4358b41 100644 --- a/tastytrade/dxfeed/event.py +++ b/tastytrade/dxfeed/event.py @@ -9,9 +9,10 @@ class EventType(str, Enum): for the data streamer. Information on different types of events, their uses and their properties - can be found at the `dxfeed Knowledge Base + can be found at the `dxfeed Knowledge Base. `_. """ + CANDLE = 'Candle' GREEKS = 'Greeks' PROFILE = 'Profile' @@ -28,7 +29,7 @@ class Event(ABC): def from_stream(cls, data: list) -> List['Event']: # pragma: no cover """ Makes a list of event objects from a list of raw trade data fetched by - a :class:`~tastyworks.streamer.DataStreamer`. + a :class:`~tastyworks.streamer.DXFeedStreamer`. :param data: list of raw quote data from streamer diff --git a/tastytrade/session.py b/tastytrade/session.py index 7ee9471..db0ec1b 100644 --- a/tastytrade/session.py +++ b/tastytrade/session.py @@ -183,7 +183,8 @@ def __init__( data = response.json()['data'] self.streamer_token = data['token'] url = data['websocket-url'] + '/cometd' - self.streamer_url = url.replace('https', 'wss') + self.dxfeed_url = url.replace('https', 'wss') + self.dxlink_url = data['dxlink-url'] self.rest_url = data['websocket-url'] + '/rest/events.json' self.streamer_headers = { 'Authorization': f'Bearer {self.streamer_token}' diff --git a/tastytrade/streamer.py b/tastytrade/streamer.py index d387c46..8630a18 100644 --- a/tastytrade/streamer.py +++ b/tastytrade/streamer.py @@ -1,6 +1,7 @@ import asyncio import json from asyncio import Lock, Queue +from collections import defaultdict from datetime import datetime from decimal import Decimal from enum import Enum @@ -73,7 +74,7 @@ class SubscriptionType(str, Enum): USER_MESSAGE = 'user-message-subscribe' -class AlertStreamer: +class AccountStreamer: """ Used to subscribe to account-level updates (balances, orders, positions), public watchlist updates, quote alerts, and user-level messages. It should @@ -82,9 +83,9 @@ class AlertStreamer: Example usage:: - from tastytrade import Account, AlertStreamer + from tastytrade import Account, AccountStreamer - streamer = await AlertStreamer.create(session) + streamer = await AccountStreamer.create(session) accounts = Account.get_accounts(session) await streamer.account_subscribe(accounts) @@ -95,6 +96,7 @@ class AlertStreamer: print(data) """ + def __init__(self, session: Session): #: The active session used to initiate the streamer or make requests self.token: str = session.session_token @@ -109,19 +111,24 @@ def __init__(self, session: Session): self._connect_task = asyncio.create_task(self._connect()) @classmethod - async def create(cls, session: Session) -> 'AlertStreamer': + async def create(cls, session: Session) -> 'AccountStreamer': """ - Factory method for the :class:`DataStreamer` object. Simply calls the - constructor and performs the asynchronous setup tasks. This should be - used instead of the constructor. + Factory method for the :class:`AccountStreamer` object. Simply calls + the constructor and performs the asynchronous setup tasks. This should + be used instead of the constructor. :param session: active user session to use """ self = cls(session) + await self._wait_for_authentication() + return self + + async def _wait_for_authentication(self, time_out=100): while not self._websocket: await asyncio.sleep(0.1) - - return self + time_out -= 1 + if time_out < 0: + raise TastytradeError('Connection timed out') async def _connect(self) -> None: """ @@ -130,8 +137,8 @@ async def _connect(self) -> None: """ headers = {'Authorization': f'Bearer {self.token}'} async with websockets.connect( # type: ignore - self.base_url, - extra_headers=headers + self.base_url, + extra_headers=headers ) as websocket: self._websocket = websocket self._heartbeat_task = asyncio.create_task(self._heartbeat()) @@ -155,9 +162,9 @@ async def listen(self) -> AsyncIterator[TastytradeJsonDataclass]: logger.debug('subscription message: %s', data) def _map_message( - self, - type_str: str, - data: dict + self, + type_str: str, + data: dict ) -> TastytradeJsonDataclass: """ I'm not sure what the user-status messages look like, @@ -230,9 +237,9 @@ async def _heartbeat(self) -> None: await asyncio.sleep(10) async def _subscribe( - self, - subscription: SubscriptionType, - value: Union[Optional[str], List[str]] = '' + self, + subscription: SubscriptionType, + value: Union[Optional[str], List[str]] = '' ) -> None: """ Subscribes to a :class:`SubscriptionType`. Depending on the kind of @@ -248,20 +255,20 @@ async def _subscribe( await self._websocket.send(json.dumps(message)) # type: ignore -class DataStreamer: # pragma: no cover +class DXFeedStreamer: # pragma: no cover """ - A :class:`DataStreamer` object is used to fetch quotes or greeks + A :class:`DXFeedStreamer` object is used to fetch quotes or greeks for a given symbol or list of symbols. It should always be initialized using the :meth:`create` function, since the object cannot be fully instantiated without using async. Example usage:: - from tastytrade import DataStreamer + from tastytrade import DXFeedStreamer from tastytrade.dxfeed import EventType # must be a production session - streamer = await DataStreamer.create(session) + streamer = await DXFeedStreamer.create(session) subs = ['SPY', 'GLD'] # list of quotes to fetch await streamer.subscribe(EventType.QUOTE, subs) @@ -275,39 +282,34 @@ class DataStreamer: # pragma: no cover def __init__(self, session: ProductionSession): self._counter = 0 self._lock: Lock = Lock() - self._queues: Dict[str, Queue] = { - EventType.CANDLE: Queue(), - EventType.GREEKS: Queue(), - EventType.PROFILE: Queue(), - EventType.QUOTE: Queue(), - EventType.SUMMARY: Queue(), - EventType.THEO_PRICE: Queue(), - EventType.TIME_AND_SALE: Queue(), - EventType.TRADE: Queue(), - EventType.UNDERLYING: Queue() - } + self._queues: Dict[EventType, Queue] = defaultdict(Queue) #: The unique client identifier received from the server self.client_id: Optional[str] = None self._auth_token = session.streamer_token - self._wss_url = session.streamer_url + self._wss_url = session.dxfeed_url self._connect_task = asyncio.create_task(self._connect()) @classmethod - async def create(cls, session: ProductionSession) -> 'DataStreamer': + async def create(cls, session: ProductionSession) -> 'DXFeedStreamer': """ - Factory method for the :class:`DataStreamer` object. + Factory method for the :class:`DXFeedStreamer` object. Simply calls the constructor and performs the asynchronous setup tasks. This should be used instead of the constructor. :param session: active user session to use """ self = cls(session) + await self._wait_for_authentication() + return self + + async def _wait_for_authentication(self, time_out=100): while not self.client_id: await asyncio.sleep(0.1) - - return self + time_out -= 1 + if time_out < 0: + raise TastytradeError('Connection timed out') async def _next_id(self): async with self._lock: @@ -322,8 +324,8 @@ async def _connect(self) -> None: headers = {'Authorization': f'Bearer {self._auth_token}'} async with websockets.connect( # type: ignore - self._wss_url, - extra_headers=headers + self._wss_url, + extra_headers=headers ) as websocket: self._websocket = websocket await self._handshake() @@ -379,10 +381,7 @@ async def _handshake(self) -> None: } await self._websocket.send(json.dumps([message])) - async def listen( - self, - event_type: EventType - ) -> AsyncIterator[Event]: + async def listen(self, event_type: EventType) -> AsyncIterator[Event]: """ Using the existing subscriptions, pulls events of the given type and yield returns them. Never exits unless there's an error or the channel @@ -600,3 +599,325 @@ async def _map_message(self, message) -> None: await self._queues[EventType.UNDERLYING].put(underlying) else: raise TastytradeError(f'Unknown message type received: {message}') + + +class DXLinkStreamer: # pragma: no cover + """ + A :class:`DXLinkStreamer` object is used to fetch quotes or greeks + for a given symbol or list of symbols. It should always be + initialized using the :meth:`create` function, since the object + cannot be fully instantiated without using async. + + Example usage:: + + from tastytrade import DXLinkStreamer + from tastytrade.dxfeed import EventType + + # must be a production session + streamer = await DXLinkStreamer.create(session) + + subs = ['SPY', 'GLD'] # list of quotes to fetch + await streamer.subscribe(EventType.QUOTE, subs) + quotes = [] + async for quote in streamer.listen(EventType.QUOTE): + quotes.append(quote) + if len(quotes) >= len(subs): + break + + """ + + def __init__(self, session: ProductionSession): + self._counter = 0 + self._lock: Lock = Lock() + self._queues: Dict[EventType, Queue] = defaultdict(Queue) + self._channels: Dict[EventType, int] = { + EventType.CANDLE: 1, + EventType.GREEKS: 3, + EventType.PROFILE: 5, + EventType.QUOTE: 7, + EventType.SUMMARY: 9, + EventType.THEO_PRICE: 11, + EventType.TIME_AND_SALE: 13, + EventType.TRADE: 15, + EventType.UNDERLYING: 17 + } + self._subscription_state: Dict[EventType, str] = \ + defaultdict(lambda: 'CHANNEL_CLOSED') + + #: The unique client identifier received from the server + self._session = session + self._authenticated = False + self._wss_url = session.dxlink_url + self._auth_token = session.streamer_token + + self._connect_task = asyncio.create_task(self._connect()) + + @classmethod + async def create(cls, session: ProductionSession) -> 'DXLinkStreamer': + """ + Factory method for the :class:`DXLinkStreamer` object. + Simply calls the constructor and performs the asynchronous + setup tasks. This should be used instead of the constructor. + + :param session: active user session to use + """ + self = cls(session) + await self._wait_for_authentication() + return self + + async def _connect(self) -> None: + """ + Connect to the websocket server using the URL and + authorization token provided during initialization. + """ + + async with websockets.connect( # type: ignore + self._wss_url + ) as websocket: + self._websocket = websocket + await self._setup_connection() + + # main loop + while True: + raw_message = await self._websocket.recv() + message = json.loads(raw_message) + + logger.debug('received: %s', message) + if message['type'] == 'SETUP': + await self._authenticate_connection() + elif message['type'] == 'AUTH_STATE': + if message['state'] == 'AUTHORIZED': + self._authenticated = True + self._heartbeat_task = \ + asyncio.create_task(self._heartbeat()) + elif message['type'] == 'CHANNEL_OPENED': + self._subscription_state[message['channel']] \ + = message['type'] + elif message['type'] == 'FEED_CONFIG': + pass + elif message['type'] == 'FEED_DATA': + await self._map_message(message['data']) + elif message['type'] == 'KEEPALIVE': + pass + else: + raise TastytradeError(message) + + async def _setup_connection(self): + message = { + 'type': 'SETUP', + 'channel': 0, + 'keepaliveTimeout': 60, + 'acceptKeepaliveTimeout': 60, + 'version': '0.1-js/1.0.0' + } + await self._websocket.send(json.dumps(message)) + + async def _authenticate_connection(self): + message = { + 'type': 'AUTH', + 'channel': 0, + 'token': self._auth_token, + } + await self._websocket.send(json.dumps(message)) + + async def _wait_for_authentication(self, time_out=100): + while not self._authenticated: + await asyncio.sleep(0.1) + time_out -= 1 + if time_out < 0: + raise TastytradeError('Connection timed out') + + async def listen(self, event_type: EventType) -> AsyncIterator[Event]: + """ + Using the existing subscriptions, pulls events of the given type and + yield returns them. Never exits unless there's an error or the channel + is closed. + + :param event_type: the type of event to listen for + """ + while True: + yield await self._queues[event_type].get() + + def close(self) -> None: + """ + Closes the websocket connection and cancels the keepalive task. + """ + self._heartbeat_task.cancel() + self._connect_task.cancel() + + async def _heartbeat(self) -> None: + """ + Sends a keepalive message every 30 seconds to keep the connection + alive. + """ + message = { + 'type': 'KEEPALIVE', + 'channel': 0 + } + + while True: + logger.debug('sending keepalive message: %s', message) + await self._websocket.send(json.dumps(message)) + # send the heartbeat every 30 seconds + await asyncio.sleep(30) + + async def subscribe( + self, + event_type: EventType, + symbols: List[str] + ) -> None: + """ + Subscribes to quotes for given list of symbols. Used for recurring data + feeds. + For candles, use :meth:`subscribe_candle` instead. + + :param event_type: type of subscription to add + :param symbols: list of symbols to subscribe for + """ + await self._channel_request(event_type) + event_type_str = str(event_type).split('.')[1].capitalize() + message = { + 'type': 'FEED_SUBSCRIPTION', + 'channel': self._channels[event_type], + 'add': [{'symbol': symbol, "type": event_type_str} + for symbol in symbols] + } + logger.debug('sending subscription: %s', message) + await self._websocket.send(json.dumps(message)) + + async def _channel_request(self, event_type: EventType) -> None: + message = { + 'type': 'CHANNEL_REQUEST', + 'channel': self._channels[event_type], + 'service': 'FEED', + 'parameters': { + 'contract': 'AUTO', + }, + } + logger.debug('sending subscription: %s', message) + await self._websocket.send(json.dumps(message)) + time_out = 100 + while not self._subscription_state[event_type] == 'CHANNEL_OPENED': + await asyncio.sleep(0.1) + time_out -= 1 + if time_out <= 0: + raise TastytradeError('Subscription channel not opened') + + async def unsubscribe( + self, + event_type: EventType, + symbols: List[str] + ) -> None: + """ + Removes existing subscription for given list of symbols. + For candles, use :meth:`unsubscribe_candle` instead. + + :param event_type: type of subscription to remove + :param symbols: list of symbols to unsubscribe from + """ + if not self._authenticated: + raise TastytradeError('Stream not authenticated') + event_type_str = str(event_type).split('.')[1].capitalize() + message = { + 'type': 'FEED_SUBSCRIPTION', + 'channel': self._channels[event_type], + 'remove': [{'symbol': symbol, "type": event_type_str} for symbol in + symbols] + } + logger.debug('sending subscription: %s', message) + await self._websocket.send(json.dumps(message)) + + async def subscribe_candle( + self, + symbols: List[str], + interval: str, + start_time: datetime, + end_time: Optional[datetime] = None, + extended_trading_hours: bool = False + ) -> None: + """ + Subscribes to time series data for the given symbol. + + :param symbols: list of symbols to get data for + :param interval: + the width of each candle in time, e.g. '15s', '5m', '1h', '3d', + '1w', '1mo' + :param start_time: starting time for the data range + :param end_time: ending time for the data range + :param extended_trading_hours: whether to include extended trading + """ + await self._channel_request(EventType.CANDLE) + message = { + 'type': 'FEED_SUBSCRIPTION', + 'channel': self._channels[EventType.CANDLE], + 'add': [{ + 'symbol': f'{ticker}{{={interval},tho=true}}' + if extended_trading_hours + else f'{ticker}{{={interval}}}', + 'type': 'Candle', + 'fromTime': int(start_time.timestamp() * 1000) + } for ticker in symbols] + } + if end_time is not None: + raise TastytradeError('End time no longer supported') + await self._websocket.send(json.dumps(message)) + + async def unsubscribe_candle( + self, + ticker: str, + interval: Optional[str] = None, + extended_trading_hours: bool = False + ) -> None: + """ + Removes existing subscription for a candle. + + :param ticker: symbol to unsubscribe from + :param interval: candle width to unsubscribe from + :param extended_trading_hours: + whether candle to unsubscribe from contains extended trading hours + """ + await self._channel_request(EventType.CANDLE) + message = { + 'type': 'FEED_SUBSCRIPTION', + 'channel': self._channels[EventType.CANDLE], + 'remove': [{ + 'symbol': f'{ticker}{{={interval},tho=true}}' + if extended_trading_hours + else f'{ticker}{{={interval}}}', + 'type': 'Candle', + }] + } + await self._websocket.send(json.dumps(message)) + + async def _map_message(self, message) -> None: + """ + Takes the raw JSON data, parses the events and places them into their + respective queues. + + :param message: raw JSON data from the websocket + """ + for item in message: + msg_type = item.pop('eventType') + # parse type or warn for unknown type + if msg_type == EventType.CANDLE: + await self._queues[EventType.CANDLE].put(Candle(**item)) + elif msg_type == EventType.GREEKS: + await self._queues[EventType.GREEKS].put(Greeks(**item)) + elif msg_type == EventType.PROFILE: + await self._queues[EventType.PROFILE].put(Profile(**item)) + elif msg_type == EventType.QUOTE: + await self._queues[EventType.QUOTE].put(Quote(**item)) + elif msg_type == EventType.SUMMARY: + await self._queues[EventType.SUMMARY].put(Summary(**item)) + elif msg_type == EventType.THEO_PRICE: + await self._queues[EventType.THEO_PRICE].put(TheoPrice(**item)) + elif msg_type == EventType.TIME_AND_SALE: + tas = TimeAndSale(**item) + await self._queues[EventType.TIME_AND_SALE].put(tas) + elif msg_type == EventType.TRADE: + await self._queues[EventType.TRADE].put(Trade(**item)) + elif msg_type == EventType.UNDERLYING: + undl = Underlying(**item) + await self._queues[EventType.UNDERLYING].put(undl) + else: + raise TastytradeError(f'Unknown message type: {message}') diff --git a/tests/test_streamer.py b/tests/test_streamer.py index aed0111..ba79342 100644 --- a/tests/test_streamer.py +++ b/tests/test_streamer.py @@ -1,14 +1,14 @@ import pytest import pytest_asyncio -from tastytrade import Account, AlertStreamer +from tastytrade import Account, AccountStreamer pytest_plugins = ('pytest_asyncio',) @pytest_asyncio.fixture async def streamer(session): - streamer = await AlertStreamer.create(session) + streamer = await AccountStreamer.create(session) yield streamer streamer.close()