Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: introducing a fixture for proper teardown #242

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 171 additions & 158 deletions tests/test_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,23 @@
import pytest
from streamsync.ss_types import EventRequest, InitSessionRequest, InitSessionRequestPayload, StreamsyncEvent
import asyncio
import contextlib

from tests import test_app_dir


@pytest.fixture
def setup_app_runner():
Copy link
Collaborator

@FabienArcellier FabienArcellier Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advocate to remove the dependence to pytest here, it's not required because we use it as a simple context manager. The boilerplate for pytest make that heavy and explicit dependency is better when we can.

with setup_app_runner("./not_an_app", "run") as ar:
  pass

Copy link
Collaborator

@FabienArcellier FabienArcellier Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and add the typing if you can

@contextlib.contextmanager
def setup_app_runner(app_dir: str, app_command: str):
    ar = AppRunner(app_dir, app_command)
    try:
        yield ar
    finally:
        ar.shut_down()

Copy link
Collaborator

@FabienArcellier FabienArcellier Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from conftest import setup_app_runner

def test_init_wrong_path(self) -> None:
    with setup_app_runner("./not_an_app", "run") as ar:
        with pytest.raises(SystemExit) as wrapped_e:
            ar.load()
        assert wrapped_e.type == SystemExit

@contextlib.contextmanager
def _manage_launch_args(app_dir, app_command):
ar = AppRunner(app_dir, app_command)
try:
yield ar
finally:
ar.shut_down()
return _manage_launch_args


class TestAppRunner:

numberinput_instance_path = [
Expand All @@ -28,161 +42,161 @@ class TestAppRunner:
]
proposed_session_id = "c13a280fe17ec663047ec14de15cd93ad686fecf5f9a4dbf262d3a86de8cb577"

def test_init_wrong_path(self) -> None:
ar = AppRunner("./not_an_app", "run")
with pytest.raises(SystemExit) as wrapped_e:
ar.load()
assert wrapped_e.type == SystemExit
ar.shut_down()
@pytest.mark.usefixtures("setup_app_runner")
def test_init_wrong_path(self, setup_app_runner) -> None:
with setup_app_runner("./not_an_app", "run") as ar:
with pytest.raises(SystemExit) as wrapped_e:
ar.load()
assert wrapped_e.type == SystemExit

def test_init_wrong_mode(self) -> None:
with pytest.raises(ValueError):
AppRunner(test_app_dir, "virus")

@pytest.mark.asyncio
async def test_pre_session(self) -> None:
ar = AppRunner(test_app_dir, "run")
er = EventRequest(
type="event",
payload=StreamsyncEvent(
@pytest.mark.usefixtures("setup_app_runner")
async def test_pre_session(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
er = EventRequest(
type="event",
payload=StreamsyncEvent(
type="virus",
instancePath=self.numberinput_instance_path,
payload={
"virus": "yes"
}
)
)
ar.load()
r = await ar.dispatch_message(None, er)
assert r.status == "error"

@pytest.mark.asyncio
@pytest.mark.usefixtures("setup_app_runner")
async def test_valid_session_invalid_event(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id
er = EventRequest(type="event", payload=StreamsyncEvent(
type="virus",
instancePath=self.numberinput_instance_path,
payload={
"virus": "yes"
}
)
)
ar.load()
r = await ar.dispatch_message(None, er)
assert r.status == "error"
ar.shut_down()
))
rev = await ar.dispatch_message(None, er)
assert rev.status == "error"

@pytest.mark.asyncio
async def test_valid_session_invalid_event(self) -> None:
ar = AppRunner(test_app_dir, "run")
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
@pytest.mark.usefixtures("setup_app_runner")
async def test_valid_event(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
)
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id
er = EventRequest(type="event", payload=StreamsyncEvent(
type="virus",
instancePath=self.numberinput_instance_path,
payload={
"virus": "yes"
}
))
rev = await ar.dispatch_message(None, er)
assert rev.status == "error"
ar.shut_down()
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-number-change",
instancePath=self.numberinput_instance_path,
payload="129673"
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == True
assert ev_res.payload.mutations.get(
"+inspected_payload") == "129673.0"
assert ev_res.payload.mutations.get(
"+b.pet_count") == 129673

@pytest.mark.asyncio
async def test_valid_event(self) -> None:
ar = AppRunner(test_app_dir, "run")
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
@pytest.mark.usefixtures("setup_app_runner")
async def test_async_handler(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
)
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-number-change",
instancePath=self.numberinput_instance_path,
payload="129673"
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == True
assert ev_res.payload.mutations.get(
"+inspected_payload") == "129673.0"
assert ev_res.payload.mutations.get(
"+b.pet_count") == 129673
ar.shut_down()
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id

@pytest.mark.asyncio
async def test_async_handler(self) -> None:
ar = AppRunner(test_app_dir, "run")
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
sres = await ar.dispatch_message(None, si)
assert sres.status == "ok"
assert sres.payload.model_dump().get("sessionId") == self.proposed_session_id

# Firing an event to bypass "initial" state mutations
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-number-change",
instancePath=self.numberinput_instance_path,
payload="129673"
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)

ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-click",
instancePath=self.async_handler_click_path
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == True
assert "+counter" in ev_res.payload.mutations
ar.shut_down()
# Firing an event to bypass "initial" state mutations
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-number-change",
instancePath=self.numberinput_instance_path,
payload="129673"
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)

ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="ss-click",
instancePath=self.async_handler_click_path
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == True
assert "+counter" in ev_res.payload.mutations

@pytest.mark.asyncio
async def test_bad_event_handler(self) -> None:
ar = AppRunner(test_app_dir, "run")
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
@pytest.mark.usefixtures("setup_app_runner")
async def test_bad_event_handler(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
ar.load()
si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
)
await ar.dispatch_message(None, si)
bad_button_instance_path = [
{"componentId": "root", "instanceNumber": 0},
{"componentId": "28a2212b-bc58-4398-8a72-2554e5296490", "instanceNumber": 0},
{"componentId": "232d749a-5e0c-4802-bbe1-f8cae06db112", "instanceNumber": 0}
]
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="click",
instancePath=bad_button_instance_path,
payload={}
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
print(repr(ev_res))
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == False
ar.shut_down()

def test_run_code_edit(self) -> None:
ar = AppRunner(test_app_dir, "run")
with pytest.raises(PermissionError):
ar.update_code(None, "exec(virus)")
with pytest.raises(PermissionError):
ar.save_code(None, "exec(virus)")
ar.shut_down()

await ar.dispatch_message(None, si)
bad_button_instance_path = [
{"componentId": "root", "instanceNumber": 0},
{"componentId": "28a2212b-bc58-4398-8a72-2554e5296490", "instanceNumber": 0},
{"componentId": "232d749a-5e0c-4802-bbe1-f8cae06db112", "instanceNumber": 0}
]
ev_req = EventRequest(type="event", payload=StreamsyncEvent(
type="click",
instancePath=bad_button_instance_path,
payload={}
))
ev_res = await ar.dispatch_message(self.proposed_session_id, ev_req)
print(repr(ev_res))
assert ev_res.status == "ok"
assert ev_res.payload.result.get("ok") == False

@pytest.mark.usefixtures("setup_app_runner")
def test_run_code_edit(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "run") as ar:
with pytest.raises(PermissionError):
ar.update_code(None, "exec(virus)")
with pytest.raises(PermissionError):
ar.save_code(None, "exec(virus)")

def run_loader_thread(self, app_runner: AppRunner) -> None:
app_runner.update_code(None, "print('188542')")

Expand All @@ -195,29 +209,28 @@ async def wait_for_code_update(self, app_runner: AppRunner) -> None:
return True

@pytest.mark.asyncio
async def test_code_update(self) -> None:
ar = AppRunner(test_app_dir, "edit")
ar.hook_to_running_event_loop()
ar.load()
wait_update_task = asyncio.create_task(self.wait_for_code_update(ar))
loader_thread = threading.Thread(target=self.run_loader_thread, args=(ar,))
loader_thread.start()
code_update_result = await wait_update_task
loader_thread.join()

assert code_update_result == True

si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
@pytest.mark.usefixtures("setup_app_runner")
async def test_code_update(self, setup_app_runner) -> None:
with setup_app_runner(test_app_dir, "edit") as ar:
ar.hook_to_running_event_loop()
ar.load()
wait_update_task = asyncio.create_task(self.wait_for_code_update(ar))
loader_thread = threading.Thread(target=self.run_loader_thread, args=(ar,))
loader_thread.start()
code_update_result = await wait_update_task
loader_thread.join()

assert code_update_result == True

si = InitSessionRequest(
type="sessionInit",
payload=InitSessionRequestPayload(
cookies={},
headers={},
proposedSessionId=self.proposed_session_id
)
)
)
si_res = await ar.dispatch_message(None, si)
mail = list(si_res.payload.model_dump().get("mail"))

assert mail[0].get("payload").get("message") == "188542\n"
si_res = await ar.dispatch_message(None, si)
mail = list(si_res.payload.model_dump().get("mail"))

ar.shut_down()
assert mail[0].get("payload").get("message") == "188542\n"
Loading