Skip to content

Commit

Permalink
Lib testing refactor (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
tilk authored Nov 29, 2024
1 parent 4dc0d7f commit 7399cbf
Show file tree
Hide file tree
Showing 8 changed files with 966 additions and 917 deletions.
220 changes: 220 additions & 0 deletions test/lib/test_connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import pytest
import random
from operator import and_
from functools import reduce
from typing import TypeAlias
from parameterized import parameterized

from amaranth import *
from transactron import *
from transactron.utils._typing import MethodLayout
from transactron.lib.adapters import Adapter
from transactron.lib.connectors import *
from transactron.testing.testbenchio import CallTrigger
from transactron.testing import (
SimpleTestCircuit,
TestCaseWithSimulator,
data_layout,
TestbenchIO,
TestbenchContext,
)


class RevConnect(Elaboratable):
def __init__(self, layout: MethodLayout):
self.connect = Connect(rev_layout=layout)
self.read = self.connect.write
self.write = self.connect.read

def elaborate(self, platform):
return self.connect


FIFO_Like: TypeAlias = FIFO | Forwarder | Connect | RevConnect | Pipe


class TestFifoBase(TestCaseWithSimulator):
def do_test_fifo(
self, fifo_class: type[FIFO_Like], writer_rand: int = 0, reader_rand: int = 0, fifo_kwargs: dict = {}
):
iosize = 8

m = SimpleTestCircuit(fifo_class(data_layout(iosize), **fifo_kwargs))

random.seed(1337)

async def writer(sim: TestbenchContext):
for i in range(2**iosize):
await m.write.call(sim, data=i)
await self.random_wait(sim, writer_rand)

async def reader(sim: TestbenchContext):
for i in range(2**iosize):
assert (await m.read.call(sim)).data == i
await self.random_wait(sim, reader_rand)

with self.run_simulation(m) as sim:
sim.add_testbench(reader)
sim.add_testbench(writer)


class TestFIFO(TestFifoBase):
@parameterized.expand([(0, 0), (2, 0), (0, 2), (1, 1)])
def test_fifo(self, writer_rand, reader_rand):
self.do_test_fifo(FIFO, writer_rand=writer_rand, reader_rand=reader_rand, fifo_kwargs=dict(depth=4))


class TestConnect(TestFifoBase):
@parameterized.expand([(0, 0), (2, 0), (0, 2), (1, 1)])
def test_fifo(self, writer_rand, reader_rand):
self.do_test_fifo(Connect, writer_rand=writer_rand, reader_rand=reader_rand)

@parameterized.expand([(0, 0), (2, 0), (0, 2), (1, 1)])
def test_rev_fifo(self, writer_rand, reader_rand):
self.do_test_fifo(RevConnect, writer_rand=writer_rand, reader_rand=reader_rand)


class TestForwarder(TestFifoBase):
@parameterized.expand([(0, 0), (2, 0), (0, 2), (1, 1)])
def test_fifo(self, writer_rand, reader_rand):
self.do_test_fifo(Forwarder, writer_rand=writer_rand, reader_rand=reader_rand)

def test_forwarding(self):
iosize = 8

m = SimpleTestCircuit(Forwarder(data_layout(iosize)))

async def forward_check(sim: TestbenchContext, x: int):
read_res, write_res = await CallTrigger(sim).call(m.read).call(m.write, data=x)
assert read_res is not None and read_res.data == x
assert write_res is not None

async def process(sim: TestbenchContext):
# test forwarding behavior
for x in range(4):
await forward_check(sim, x)

# load the overflow buffer
res = await m.write.call_try(sim, data=42)
assert res is not None

# writes are not possible now
res = await m.write.call_try(sim, data=42)
assert res is None

# read from the overflow buffer, writes still blocked
read_res, write_res = await CallTrigger(sim).call(m.read).call(m.write, data=111)
assert read_res is not None and read_res.data == 42
assert write_res is None

# forwarding now works again
for x in range(4):
await forward_check(sim, x)

with self.run_simulation(m) as sim:
sim.add_testbench(process)


class TestPipe(TestFifoBase):
@parameterized.expand([(0, 0), (2, 0), (0, 2), (1, 1)])
def test_fifo(self, writer_rand, reader_rand):
self.do_test_fifo(Pipe, writer_rand=writer_rand, reader_rand=reader_rand)

def test_pipelining(self):
self.do_test_fifo(Pipe, writer_rand=0, reader_rand=0)


class ManyToOneConnectTransTestCircuit(Elaboratable):
def __init__(self, count: int, lay: MethodLayout):
self.count = count
self.lay = lay
self.inputs: list[TestbenchIO] = []

def elaborate(self, platform):
m = TModule()

get_results = []
for i in range(self.count):
input = TestbenchIO(Adapter(o=self.lay))
get_results.append(input.adapter.iface)
m.submodules[f"input_{i}"] = input
self.inputs.append(input)

# Create ManyToOneConnectTrans, which will serialize results from different inputs
output = TestbenchIO(Adapter(i=self.lay))
m.submodules.output = self.output = output
m.submodules.fu_arbitration = ManyToOneConnectTrans(get_results=get_results, put_result=output.adapter.iface)

return m


class TestManyToOneConnectTrans(TestCaseWithSimulator):
def initialize(self):
f1_size = 14
f2_size = 3
self.lay = [("field1", f1_size), ("field2", f2_size)]

self.m = ManyToOneConnectTransTestCircuit(self.count, self.lay)
random.seed(14)

self.inputs = []
# Create list with info if we processed all data from inputs
self.producer_end = [False for i in range(self.count)]
self.expected_output = {}
self.max_wait = 4

# Prepare random results for inputs
for i in range(self.count):
data = []
input_size = random.randint(20, 30)
for j in range(input_size):
t = (
random.randint(0, 2**f1_size),
random.randint(0, 2**f2_size),
)
data.append(t)
if t in self.expected_output:
self.expected_output[t] += 1
else:
self.expected_output[t] = 1
self.inputs.append(data)

def generate_producer(self, i: int):
"""
This is an helper function, which generates a producer process,
which will simulate an FU. Producer will insert in random intervals new
results to its output FIFO. This records will be next serialized by FUArbiter.
"""

async def producer(sim: TestbenchContext):
inputs = self.inputs[i]
for field1, field2 in inputs:
self.m.inputs[i].call_init(sim, field1=field1, field2=field2)
await self.random_wait(sim, self.max_wait)
self.producer_end[i] = True

return producer

async def consumer(self, sim: TestbenchContext):
# TODO: this test doesn't test anything, needs to be fixed!
while reduce(and_, self.producer_end, True):
result = await self.m.output.call_do(sim)

assert result is not None

t = (result["field1"], result["field2"])
assert t in self.expected_output
if self.expected_output[t] == 1:
del self.expected_output[t]
else:
self.expected_output[t] -= 1
await self.random_wait(sim, self.max_wait)

@pytest.mark.parametrize("count", [1, 4])
def test(self, count: int):
self.count = count
self.initialize()
with self.run_simulation(self.m) as sim:
sim.add_testbench(self.consumer)
for i in range(self.count):
sim.add_testbench(self.generate_producer(i))
File renamed without changes.
90 changes: 90 additions & 0 deletions test/lib/test_reqres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import random
from collections import deque

from amaranth import *
from transactron import *
from transactron.lib.adapters import Adapter
from transactron.lib.reqres import *
from transactron.testing.method_mock import MethodMock
from transactron.utils import ModuleConnector
from transactron.testing import (
SimpleTestCircuit,
TestCaseWithSimulator,
def_method_mock,
TestbenchIO,
TestbenchContext,
)


class TestSerializer(TestCaseWithSimulator):
def setup_method(self):
self.test_count = 100

self.port_count = 2
self.data_width = 5

self.requestor_rand = 4

layout = [("field", self.data_width)]

self.req_method = TestbenchIO(Adapter(i=layout))
self.resp_method = TestbenchIO(Adapter(o=layout))

self.test_circuit = SimpleTestCircuit(
Serializer(
port_count=self.port_count,
serialized_req_method=self.req_method.adapter.iface,
serialized_resp_method=self.resp_method.adapter.iface,
),
)
self.m = ModuleConnector(
test_circuit=self.test_circuit, req_method=self.req_method, resp_method=self.resp_method
)

random.seed(14)

self.serialized_data = deque()
self.port_data = [deque() for _ in range(self.port_count)]

self.got_request = False

@def_method_mock(lambda self: self.req_method, enable=lambda self: not self.got_request)
def serial_req_mock(self, field):
@MethodMock.effect
def eff():
self.serialized_data.append(field)
self.got_request = True

@def_method_mock(lambda self: self.resp_method, enable=lambda self: self.got_request)
def serial_resp_mock(self):
@MethodMock.effect
def eff():
self.got_request = False

if self.serialized_data:
return {"field": self.serialized_data[-1]}

def requestor(self, i: int):
async def f(sim: TestbenchContext):
for _ in range(self.test_count):
d = random.randrange(2**self.data_width)
await self.test_circuit.serialize_in[i].call(sim, field=d)
self.port_data[i].append(d)
await self.random_wait(sim, self.requestor_rand, min_cycle_cnt=1)

return f

def responder(self, i: int):
async def f(sim: TestbenchContext):
for _ in range(self.test_count):
data_out = await self.test_circuit.serialize_out[i].call(sim)
assert self.port_data[i].popleft() == data_out.field
await self.random_wait(sim, self.requestor_rand, min_cycle_cnt=1)

return f

def test_serial(self):
with self.run_simulation(self.m) as sim:
for i in range(self.port_count):
sim.add_testbench(self.requestor(i))
sim.add_testbench(self.responder(i))
Loading

0 comments on commit 7399cbf

Please sign in to comment.