From 40d560162377319f65538d59ce7e2dc6b347fc72 Mon Sep 17 00:00:00 2001 From: Quenos <34926461+Quenos@users.noreply.github.com> Date: Sun, 26 May 2024 22:48:20 +0200 Subject: [PATCH] Added DXLinkStreamer.get_event_nowait() (#150) * MarginReportEntry bug fix * MarginReportEntry rework * Added DXLinkStreamer.get_event_nowait * Added tests for DXLinkStreamer.get_event_nowait * Added tests for DXLinkStreamer.get_event_nowait --- tastytrade/streamer.py | 12 ++++++++++++ tests/test_streamer.py | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/tastytrade/streamer.py b/tastytrade/streamer.py index 033d5b1..e68456e 100644 --- a/tastytrade/streamer.py +++ b/tastytrade/streamer.py @@ -392,6 +392,18 @@ async def listen(self, event_type: EventType) -> AsyncIterator[Event]: while True: yield await self._queues[event_type].get() + def get_event_nowait(self, event_type: EventType) -> Optional[Event]: + """ + Using the existing subscriptions, pulls an event of the given type and + returns it. if the queue is empty None is returned. + + :param event_type: the type of event to get + """ + if not self._queues[event_type].empty(): + return self._queues[event_type].get_nowait() + else: + return None + async def get_event(self, event_type: EventType) -> Event: """ Using the existing subscription, pulls an event of the given type and diff --git a/tests/test_streamer.py b/tests/test_streamer.py index 5d55871..7969ab4 100644 --- a/tests/test_streamer.py +++ b/tests/test_streamer.py @@ -30,3 +30,16 @@ async def test_dxlink_streamer(session): break await streamer.unsubscribe_candle(subs[0], '1d') await streamer.unsubscribe(EventType.QUOTE, subs[1]) + + +@pytest.mark.asyncio +async def test_dxlink_streamer_nowait(session): + async with DXLinkStreamer(session) as streamer: + subs_not_none = ['SPY'] + subs_none = ['QQQQ'] + await streamer.subscribe(EventType.TRADE, subs_not_none) + await streamer.subscribe(EventType.QUOTE, subs_none) + assert streamer.get_event_nowait(EventType.TRADE) is not None + assert streamer.get_event_nowait(EventType.QUOTE) is None + await streamer.unsubscribe(EventType.TRADE, subs_not_none) + await streamer.unsubscribe(EventType.QUOTE, subs_none)