Skip to content

Commit

Permalink
Port ContentAddressableMemory from #395 (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
lekcyjna123 authored May 14, 2024
1 parent 6c872cd commit edf58b6
Show file tree
Hide file tree
Showing 13 changed files with 619 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ test/__profiles__/*.json
pytestdebug.log
_coreblocks_regression.lock
_coreblocks_regression.counter
.hypothesis

# cocotb build
/test/regression/cocotb/build
Expand Down
5 changes: 3 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-r requirements.txt
black==24.3.0
black==24.4.2
docutils==0.15.2
flake8==6.0.0
flake8==7.0.0
pep8-naming==0.13.3
markupsafe==2.0.1
myst-parser==0.18.0
Expand All @@ -19,3 +19,4 @@ pytest-xdist==3.5.0
pyelftools==0.29
tabulate==0.9.0
filelock==3.13.1
hypothesis==6.99.6
2 changes: 1 addition & 1 deletion test/frontend/test_decode_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class TestDecode(TestCaseWithSimulator):
@pytest.fixture(autouse=True)
def setup(self, configure_dependency_context):
def setup(self, fixture_initialize_testing_env):
self.gen_params = GenParams(test_core_config.replace(start_pc=24))

fifo_in = FIFO(self.gen_params.get(FetchLayouts).raw_instr, depth=2)
Expand Down
4 changes: 2 additions & 2 deletions test/frontend/test_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TestFetchUnit(TestCaseWithSimulator):
with_rvc: bool

@pytest.fixture(autouse=True)
def setup(self, configure_dependency_context):
def setup(self, fixture_initialize_testing_env):
self.pc = 0
self.gen_params = GenParams(
test_core_config.replace(
Expand Down Expand Up @@ -416,7 +416,7 @@ class TestPredictionChecker(TestCaseWithSimulator):
with_rvc: bool

@pytest.fixture(autouse=True)
def setup(self, configure_dependency_context):
def setup(self, fixture_initialize_testing_env):
self.gen_params = GenParams(
test_core_config.replace(compressed=self.with_rvc, fetch_block_bytes_log=self.fetch_block_log)
)
Expand Down
2 changes: 1 addition & 1 deletion test/func_blocks/fu/functional_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def compute_result(i1: int, i2: int, i_imm: int, pc: int, fn: _T, xlen: int) ->
raise NotImplementedError

@pytest.fixture(autouse=True)
def setup(self, configure_dependency_context):
def setup(self, fixture_initialize_testing_env):
self.gen_params = GenParams(test_core_config)

self.report_mock = TestbenchIO(Adapter(i=self.gen_params.get(ExceptionRegisterLayouts).report))
Expand Down
135 changes: 135 additions & 0 deletions test/transactron/test_transactron_lib_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from datetime import timedelta
from hypothesis import given, settings, Phase
from transactron.testing import *
from transactron.lib.storage import ContentAddressableMemory


class TestContentAddressableMemory(TestCaseWithSimulator):
addr_width = 4
content_width = 5
test_number = 30
nop_number = 3
addr_layout = data_layout(addr_width)
content_layout = data_layout(content_width)

def setUp(self):
self.entries_count = 8

self.circ = SimpleTestCircuit(
ContentAddressableMemory(self.addr_layout, self.content_layout, self.entries_count)
)

self.memory = {}

def generic_process(
self,
method,
input_lst,
behaviour_check=None,
state_change=None,
input_verification=None,
settle_count=0,
name="",
):
def f():
while input_lst:
# wait till all processes will end the previous cycle
yield from self.multi_settle(4)
elem = input_lst.pop()
if isinstance(elem, OpNOP):
yield
continue
if input_verification is not None and not input_verification(elem):
yield
continue
response = yield from method.call(**elem)
yield from self.multi_settle(settle_count)
if behaviour_check is not None:
# Here accesses to circuit are allowed
ret = behaviour_check(elem, response)
if isinstance(ret, Generator):
yield from ret
if state_change is not None:
# It is standard python function by purpose to don't allow accessing circuit
state_change(elem, response)
yield

return f

def push_process(self, in_push):
def verify_in(elem):
return not (frozenset(elem["addr"].items()) in self.memory)

def modify_state(elem, response):
self.memory[frozenset(elem["addr"].items())] = elem["data"]

return self.generic_process(
self.circ.push,
in_push,
state_change=modify_state,
input_verification=verify_in,
settle_count=3,
name="push",
)

def read_process(self, in_read):
def check(elem, response):
addr = elem["addr"]
frozen_addr = frozenset(addr.items())
if frozen_addr in self.memory:
assert response["not_found"] == 0
assert response["data"] == self.memory[frozen_addr]
else:
assert response["not_found"] == 1

return self.generic_process(self.circ.read, in_read, behaviour_check=check, settle_count=0, name="read")

def remove_process(self, in_remove):
def modify_state(elem, response):
if frozenset(elem["addr"].items()) in self.memory:
del self.memory[frozenset(elem["addr"].items())]

return self.generic_process(self.circ.remove, in_remove, state_change=modify_state, settle_count=2, name="remv")

def write_process(self, in_write):
def verify_in(elem):
ret = frozenset(elem["addr"].items()) in self.memory
return ret

def check(elem, response):
assert response["not_found"] == int(frozenset(elem["addr"].items()) not in self.memory)

def modify_state(elem, response):
if frozenset(elem["addr"].items()) in self.memory:
self.memory[frozenset(elem["addr"].items())] = elem["data"]

return self.generic_process(
self.circ.write,
in_write,
behaviour_check=check,
state_change=modify_state,
input_verification=None,
settle_count=1,
name="writ",
)

@settings(
max_examples=10,
phases=(Phase.explicit, Phase.reuse, Phase.generate, Phase.shrink),
derandomize=True,
deadline=timedelta(milliseconds=500),
)
@given(
generate_process_input(test_number, nop_number, [("addr", addr_layout), ("data", content_layout)]),
generate_process_input(test_number, nop_number, [("addr", addr_layout), ("data", content_layout)]),
generate_process_input(test_number, nop_number, [("addr", addr_layout)]),
generate_process_input(test_number, nop_number, [("addr", addr_layout)]),
)
def test_random(self, in_push, in_write, in_read, in_remove):
with self.reinitialize_fixtures():
self.setUp()
with self.run_simulation(self.circ, max_cycles=500) as sim:
sim.add_sync_process(self.push_process(in_push))
sim.add_sync_process(self.read_process(in_read))
sim.add_sync_process(self.write_process(in_write))
sim.add_sync_process(self.remove_process(in_remove))
92 changes: 92 additions & 0 deletions test/transactron/utils/test_amaranth_ext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from transactron.testing import *
import random
from transactron.utils.amaranth_ext import MultiPriorityEncoder


class TestMultiPriorityEncoder(TestCaseWithSimulator):
def get_expected(self, input):
places = []
for i in range(self.input_width):
if input % 2:
places.append(i)
input //= 2
places += [None] * self.output_count
return places

def process(self):
for _ in range(self.test_number):
input = random.randrange(2**self.input_width)
yield self.circ.input.eq(input)
yield Settle()
expected_output = self.get_expected(input)
for ex, real, valid in zip(expected_output, self.circ.outputs, self.circ.valids):
if ex is None:
assert (yield valid) == 0
else:
assert (yield valid) == 1
assert (yield real) == ex
yield Delay(1e-7)

@pytest.mark.parametrize("input_width", [1, 5, 16, 23, 24])
@pytest.mark.parametrize("output_count", [1, 3, 4])
def test_random(self, input_width, output_count):
random.seed(input_width + output_count)
self.test_number = 50
self.input_width = input_width
self.output_count = output_count
self.circ = MultiPriorityEncoder(self.input_width, self.output_count)

with self.run_simulation(self.circ) as sim:
sim.add_process(self.process)

@pytest.mark.parametrize("name", ["prio_encoder", None])
def test_static_create_simple(self, name):
random.seed(14)
self.test_number = 50
self.input_width = 7
self.output_count = 1

class DUT(Elaboratable):
def __init__(self, input_width, output_count, name):
self.input = Signal(input_width)
self.output_count = output_count
self.input_width = input_width
self.name = name

def elaborate(self, platform):
m = Module()
out, val = MultiPriorityEncoder.create_simple(m, self.input_width, self.input, name=self.name)
# Save as a list to use common interface in testing
self.outputs = [out]
self.valids = [val]
return m

self.circ = DUT(self.input_width, self.output_count, name)

with self.run_simulation(self.circ) as sim:
sim.add_process(self.process)

@pytest.mark.parametrize("name", ["prio_encoder", None])
def test_static_create(self, name):
random.seed(14)
self.test_number = 50
self.input_width = 7
self.output_count = 2

class DUT(Elaboratable):
def __init__(self, input_width, output_count, name):
self.input = Signal(input_width)
self.output_count = output_count
self.input_width = input_width
self.name = name

def elaborate(self, platform):
m = Module()
out = MultiPriorityEncoder.create(m, self.input_width, self.input, self.output_count, name=self.name)
self.outputs, self.valids = list(zip(*out))
return m

self.circ = DUT(self.input_width, self.output_count, name)

with self.run_simulation(self.circ) as sim:
sim.add_process(self.process)
Loading

0 comments on commit edf58b6

Please sign in to comment.