Skip to content

Commit

Permalink
Merge branch 'clz-fixes' of github.com:kuznia-rdzeni/transactron into…
Browse files Browse the repository at this point in the history
… clz-fixes
  • Loading branch information
Hazardu committed Dec 17, 2024
2 parents 126fad0 + be58b6b commit c254ba7
Show file tree
Hide file tree
Showing 6 changed files with 844 additions and 3 deletions.
60 changes: 60 additions & 0 deletions test/lib/test_fifo.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from amaranth_types.types import ShapeLike
import pytest
from amaranth import *
from amaranth.lib import data

from transactron.lib import AdapterTrans, BasicFifo
from transactron.lib.fifo import WideFifo
from transactron.utils.amaranth_ext import const_of

from transactron.testing import TestCaseWithSimulator, TestbenchIO, data_layout, TestbenchContext
from collections import deque
import random

from transactron.testing.infrastructure import SimpleTestCircuit


class BasicFifoTestCircuit(Elaboratable):
def __init__(self, depth):
Expand Down Expand Up @@ -62,3 +68,57 @@ async def target(sim: TestbenchContext):
with self.run_simulation(fifoc) as sim:
sim.add_testbench(source)
sim.add_testbench(target)


class TestWideFifo(TestCaseWithSimulator):
async def source(self, sim: TestbenchContext):
cycles = 100

for _ in range(cycles):
await self.random_wait_geom(sim, 0.5)
count = random.randint(1, self.write_width)
data = [const_of(random.randrange(2**self.bits), self.shape) for _ in range(self.write_width)]
await self.circ.write.call(sim, count=count, data=data)
await sim.delay(2e-9) # Ensures following code runs after peek_verifier and target
self.expq.extend(data[:count])

self.done = True

async def target(self, sim: TestbenchContext):
while not self.done or self.expq:
await self.random_wait_geom(sim, 0.5)
count = random.randint(1, self.read_width)
v = await self.circ.read.call_try(sim, count=count)
await sim.delay(1e-9) # Ensures following code runs after peek_verifier
if v is not None:
assert v.count == min(count, len(self.expq))
assert v.data[: v.count] == [self.expq.popleft() for _ in range(v.count)]

async def peek_verifier(self, sim: TestbenchContext):
while not self.done or self.expq:
v = await self.circ.peek.call_try(sim)
if v is not None:
assert v.count == min(self.read_width, len(self.expq))
assert v.data[: v.count] == [self.expq[i] for i in range(v.count)]
else:
assert not self.expq

@pytest.mark.parametrize("shape", [4, data.ArrayLayout(2, 2)])
@pytest.mark.parametrize("depth", [2, 5])
@pytest.mark.parametrize("read_width, write_width", [(1, 1), (2, 2), (1, 3), (3, 1)])
def test_randomized(self, shape: ShapeLike, depth: int, read_width: int, write_width: int):
random.seed(42)

self.shape = shape
self.bits = Shape.cast(shape).width
self.circ = SimpleTestCircuit(WideFifo(shape, depth, read_width, write_width))
self.read_width = read_width
self.write_width = write_width

self.expq = deque()
self.done = False

with self.run_simulation(self.circ) as sim:
sim.add_testbench(self.source)
sim.add_testbench(self.target)
sim.add_testbench(self.peek_verifier)
144 changes: 144 additions & 0 deletions test/utils/test_shifter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import random
import pytest
from collections.abc import Callable, Iterable, Sequence
from typing import Any, cast
from amaranth import *
from amaranth.lib import data
from amaranth_types.types import ShapeLike, ValueLike
from transactron.utils import assign
from transactron.utils.amaranth_ext.functions import const_of
from transactron.utils.amaranth_ext.shifter import *
from transactron.testing import TestCaseWithSimulator, TestbenchContext


class ShifterCircuit(Elaboratable):
def __init__(
self,
shift_fun: Callable[[ValueLike, ValueLike], Value],
width: int,
shift_kwargs: Iterable[tuple[str, Any]] = (),
):
self.input = Signal(width)
self.output = Signal(width)
self.offset = Signal(range(width + 1))
self.shift_fun = shift_fun
self.kwargs = dict(shift_kwargs)

def elaborate(self, platform):
m = Module()

m.d.comb += self.output.eq(self.shift_fun(self.input, self.offset, **self.kwargs))

return m


class TestShifter(TestCaseWithSimulator):
@pytest.mark.parametrize(
"shift_fun, shift_kwargs, test_fun",
[
(shift_left, [], lambda val, offset, width: (val << offset) % 2**width),
(shift_right, [], lambda val, offset, width: (val >> offset)),
(
shift_left,
[("placeholder", 1)],
lambda val, offset, width: ((val << offset) | (2**width - 1 >> (width - offset))) % 2**width,
),
(
shift_right,
[("placeholder", 1)],
lambda val, offset, width: ((val >> offset) | (2**width - 1 << (width - offset))) % 2**width,
),
(rotate_left, [], lambda val, offset, width: ((val << offset) | (val >> (width - offset))) % 2**width),
(rotate_right, [], lambda val, offset, width: ((val >> offset) | (val << (width - offset))) % 2**width),
],
)
def test_shifter(self, shift_fun, shift_kwargs, test_fun):
width = 8
tests = 50
dut = ShifterCircuit(shift_fun, width, shift_kwargs)

async def test_process(sim: TestbenchContext):
for _ in range(tests):
val = random.randrange(2**width)
offset = random.randrange(width + 1)
sim.set(dut.input, val)
sim.set(dut.offset, offset)
_, result = await sim.delay(1e-9).sample(dut.output)
assert result == test_fun(val, offset, width)

with self.run_simulation(dut, add_transaction_module=False) as sim:
sim.add_testbench(test_process)


class VecShifterCircuit(Elaboratable):
def __init__(
self,
shift_fun: Callable[[Sequence, ValueLike], Sequence],
shape: ShapeLike,
width: int,
shift_kwargs: Iterable[tuple[str, Any]] = (),
):
self.input = Signal(data.ArrayLayout(shape, width))
self.output = Signal(data.ArrayLayout(shape, width))
self.offset = Signal(range(width + 1))
self.shift_fun = shift_fun
self.kwargs = dict(shift_kwargs)

def elaborate(self, platform):
m = Module()

m.d.comb += assign(self.output, self.shift_fun(cast(Sequence, self.input), self.offset, **self.kwargs))

return m


class TestVecShifter(TestCaseWithSimulator):
@pytest.mark.parametrize(
"shape",
[
4,
data.ArrayLayout(2, 2),
],
)
@pytest.mark.parametrize(
"shift_fun, shift_kwargs, test_fun",
[
(shift_vec_left, lambda mkc: [], lambda val, offset, mkc: [mkc(0)] * offset + val[: len(val) - offset]),
(shift_vec_right, lambda mkc: [], lambda val, offset, mkc: val[offset:] + [mkc(0)] * offset),
(
shift_vec_left,
lambda mkc: [("placeholder", mkc(1))],
lambda val, offset, mkc: [mkc(1)] * offset + val[: len(val) - offset],
),
(
shift_vec_right,
lambda mkc: [("placeholder", mkc(1))],
lambda val, offset, mkc: val[offset:] + [mkc(1)] * offset,
),
(
rotate_vec_left,
lambda mkc: [],
lambda val, offset, mkc: val[len(val) - offset :] + val[: len(val) - offset],
),
(rotate_vec_right, lambda mkc: [], lambda val, offset, mkc: val[offset:] + val[:offset]),
],
)
def test_vec_shifter(self, shape, shift_fun, shift_kwargs, test_fun):
def mk_const(x):
return const_of(x, shape)

width = 8
tests = 50
dut = VecShifterCircuit(shift_fun, shape, width, shift_kwargs(mk_const))

async def test_process(sim: TestbenchContext):
for _ in range(tests):
val = [mk_const(random.randrange(2 ** Shape.cast(shape).width)) for _ in range(width)]
offset = random.randrange(width + 1)
sim.set(dut.input, val)
sim.set(dut.offset, offset)
_, result = await sim.delay(1e-9).sample(dut.output)
assert result == test_fun(val, offset, mk_const)

with self.run_simulation(dut, add_transaction_module=False) as sim:
sim.add_testbench(test_process)
Loading

0 comments on commit c254ba7

Please sign in to comment.