Skip to content

Commit

Permalink
Merge pull request #83 from simonsobs/dev
Browse files Browse the repository at this point in the history
Replace `nextline.subscribe_stdout()` in `CacheStdout`
  • Loading branch information
TaiSakuma authored Jun 25, 2024
2 parents 604face + d830ba2 commit 4828c93
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
4 changes: 2 additions & 2 deletions nextlinegraphql/plugins/ctrl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ async def update_lifespan_context(self, context: MutableMapping) -> None:
@asynccontextmanager
async def lifespan(self) -> AsyncIterator[None]:
'''Yield within the nextline context.'''
self._cache_stdout = CacheStdout(self._nextline)
self._cache_stdout = CacheStdout()
self._nextline.register(self._cache_stdout)
async with self._nextline:
async with self._cache_stdout, self._nextline:
yield

@spec.hookimpl
Expand Down
26 changes: 20 additions & 6 deletions nextlinegraphql/plugins/ctrl/cache.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
from collections.abc import AsyncIterator
from typing import Any

from nextline import Nextline
from nextline.events import OnWriteStdout
from nextline.plugin.spec import hookimpl
from nextline.utils.pubsub import PubSubItem


class CacheStdout:
def __init__(self, nextline: Nextline) -> None:
self._nextline = nextline
def __init__(self) -> None:
self._cache = list[str]()
self._pubsub = PubSubItem[str]()

async def subscribe(self) -> AsyncIterator[str]:
# TODO: Make sure no missing or duplicated items are yielded. If the
# `last` option is `True`, duplicate items can be yielded and shown on
# the web client. However, the tests still pass.
# NOTE: The cache can be implemented in `PubSubItem` itself.
yield ''.join(self._cache)
async for i in self._nextline.subscribe_stdout():
assert i.text is not None
yield i.text
async for text in self._pubsub.subscribe(last=False):
yield text

async def aclose(self) -> None:
await self._pubsub.aclose()

async def __aenter__(self) -> 'CacheStdout':
return self

async def __aexit__(self, *_: Any, **__: Any) -> None:
await self.aclose()

@hookimpl
async def on_initialize_run(self) -> None:
Expand All @@ -23,3 +36,4 @@ async def on_initialize_run(self) -> None:
@hookimpl
async def on_write_stdout(self, event: OnWriteStdout) -> None:
self._cache.append(event.text)
await self._pubsub.publish(event.text)
6 changes: 4 additions & 2 deletions tests/plugins/ctrl/schema/subscriptions/test_stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@
from tests.plugins.ctrl.schema.conftest import Schema

SOURCE = '''
import time
for i in range(10):
print(i)
time.sleep(0.005)
'''.strip()


async def test_schema(schema: Schema) -> None:
nextline = Nextline(SOURCE, trace_modules=True, trace_threads=True)
cache_stdout = CacheStdout(nextline)
cache_stdout = CacheStdout()
nextline.register(cache_stdout)
started = asyncio.Event()
context = {'nextline': nextline, 'ctrl': {'cache_stdout': cache_stdout}}
async with nextline:
async with cache_stdout, nextline:
task = asyncio.create_task(nextline.run_continue_and_wait(started=started))
await started.wait()
sub = asyncio.create_task(_subscribe_stdout(schema, context=context))
Expand Down

0 comments on commit 4828c93

Please sign in to comment.