Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add forking to test framework #517

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added test/common/_test/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions test/common/_test/test_infrastructure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from amaranth import *
from amaranth.sim import Settle
from test.common import *


class EmptyCircuit(Elaboratable):
def __init__(self):
pass

def elaborate(self, platform):
m = Module()
return m


class TestNow(TestCaseWithSimulator):
def setUp(self):
self.test_cycles = 10
self.m = SimpleTestCircuit(EmptyCircuit())

def process(self):
for k in range(self.test_cycles):
now = yield Now()
assert k == now
# check if second call don't change the returned value
now = yield Now()
assert k == now

yield

def test_random(self):
with self.run_simulation(self.m, 50) as sim:
sim.add_sync_process(self.process)


class TestFork(TestCaseWithSimulator):
def setUp(self):
self.m = SimpleTestCircuit(EmptyCircuit())
self.data = 0

def subproces(self):
self.assertEqual(self.data, 0)
self.data = 1
yield
yield
self.assertEqual(self.data, 2)

def process(self):
yield Fork(self.subproces)
yield
yield Settle()
self.assertEqual(self.data, 1)
self.data = 2

def test(self):
with self.run_simulation(self.m, 20) as sim:
sim.add_sync_process(self.process)
9 changes: 7 additions & 2 deletions test/common/functions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from amaranth import *
from amaranth.hdl.ast import Statement
from amaranth.sim.core import Command
from typing import TypeVar, Any, Generator, TypeAlias
from typing import TypeVar, Any, Generator, TypeAlias, TYPE_CHECKING, Union
from transactron.utils._typing import RecordValueDict, RecordIntDict


if TYPE_CHECKING:
from .infrastructure import CoreblocksCommand


T = TypeVar("T")
TestGen: TypeAlias = Generator[Command | Value | Statement | None, Any, T]
TestGen: TypeAlias = Generator[Union[Command, Value, Statement, "CoreblocksCommand", None], Any, T]


def set_inputs(values: RecordValueDict, field: Record) -> TestGen[None]:
Expand Down
75 changes: 72 additions & 3 deletions test/common/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
import unittest
import functools
from contextlib import contextmanager, nullcontext
from typing import TypeVar, Generic, Type, TypeGuard, Any, Union, Callable, cast
from typing import TypeVar, Generic, Type, TypeGuard, Any, Union, Callable, cast, TypeAlias
from abc import ABC
from amaranth import *
from amaranth.sim import *
from .testbenchio import TestbenchIO
from .functions import TestGen
from ..gtkw_extension import write_vcd_ext
from transactron import Method
from transactron.lib import AdapterTrans
from transactron.core import TransactionModule
from transactron.utils import ModuleConnector, HasElaborate, auto_debug_signals, HasDebugSignals

T = TypeVar("T")
_T_nested_collection = T | list["_T_nested_collection[T]"] | dict[str, "_T_nested_collection[T]"]
_T_nested_collection: TypeAlias = T | list["_T_nested_collection[T]"] | dict[str, "_T_nested_collection[T]"]


def guard_nested_collection(cont: Any, t: Type[T]) -> TypeGuard[_T_nested_collection[T]]:
Expand Down Expand Up @@ -99,6 +101,53 @@ def elaborate(self, platform) -> HasElaborate:
return m


class CoreblocksCommand(ABC):
pass


class Now(CoreblocksCommand):
pass


class Fork(CoreblocksCommand):
def __init__(self, f: Callable[[], TestGen[None]]):
self.f = f


class SyncProcessWrapper:
def __init__(self, sim: "PysimSimulator", f: Callable[[], TestGen[None]]):
self.org_process = f
self.sim = sim
self.current_cycle = 0

def _wrapping_function(self):
response = None
org_coroutine = self.org_process()
try:
while True:
# call orginal test process and catch data yielded by it in `command` variable
command = org_coroutine.send(response)
# If process wait for new cycle
if command is None:
self.current_cycle += 1
# forward to amaranth
yield
# Do early forward to amaranth
elif not isinstance(command, CoreblocksCommand):
# Pass everything else to amaranth simulator without modifications
response = yield command
elif isinstance(command, Now):
response = self.current_cycle
elif isinstance(command, Fork):
f = command.f
self.sim.one_shot_callbacks.append(lambda: self.sim.add_sync_process(f))
response = None
else:
raise RuntimeError(f"Unrecognized command: {command}")
except StopIteration:
pass


class PysimSimulator(Simulator):
def __init__(self, module: HasElaborate, max_cycles: float = 10e4, add_transaction_module=True, traces_file=None):
test_module = TestModule(module, add_transaction_module)
Expand Down Expand Up @@ -132,10 +181,30 @@ def __init__(self, module: HasElaborate, max_cycles: float = 10e4, add_transacti
self.ctx = nullcontext()

self.deadline = clk_period * max_cycles
self.one_shot_callbacks = []

def add_sync_process(self, f: Callable[[], TestGen]):
f_wrapped = SyncProcessWrapper(self, f)
super().add_sync_process(f_wrapped._wrapping_function)

def run_until_with_callbacks(self, deadline, *, run_passive=False):
"""Run the simulation until it advances to `deadline` executing callbacks after each iteration.

This function is based on `run_until` from amaranth Simulator class. After each `advance` step
it calls all registred one shot callbacks. After execution of all one shot callbacks there are
removed from the list before starting the next iteration.
"""
# Convert deadline in seconds into internal amaranth 1 ps units
deadline = deadline * 1e12
assert cast(Any, self)._engine.now <= deadline
while (self.advance() or run_passive) and cast(Any, self)._engine.now < deadline:
for callback in self.one_shot_callbacks:
callback()
self.one_shot_callbacks.clear()

def run(self) -> bool:
with self.ctx:
self.run_until(self.deadline)
self.run_until_with_callbacks(self.deadline)

return not self.advance()

Expand Down