-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Port ContentAddressableMemory from kuznia-rdzeni/coreblocks#395 (kuzn…
- Loading branch information
1 parent
4168375
commit 1483b7f
Showing
8 changed files
with
611 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
from datetime import timedelta | ||
from hypothesis import given, settings, Phase | ||
from transactron.testing import * | ||
from transactron.lib.storage import ContentAddressableMemory | ||
|
||
|
||
class TestContentAddressableMemory(TestCaseWithSimulator): | ||
addr_width = 4 | ||
content_width = 5 | ||
test_number = 30 | ||
nop_number = 3 | ||
addr_layout = data_layout(addr_width) | ||
content_layout = data_layout(content_width) | ||
|
||
def setUp(self): | ||
self.entries_count = 8 | ||
|
||
self.circ = SimpleTestCircuit( | ||
ContentAddressableMemory(self.addr_layout, self.content_layout, self.entries_count) | ||
) | ||
|
||
self.memory = {} | ||
|
||
def generic_process( | ||
self, | ||
method, | ||
input_lst, | ||
behaviour_check=None, | ||
state_change=None, | ||
input_verification=None, | ||
settle_count=0, | ||
name="", | ||
): | ||
def f(): | ||
while input_lst: | ||
# wait till all processes will end the previous cycle | ||
yield from self.multi_settle(4) | ||
elem = input_lst.pop() | ||
if isinstance(elem, OpNOP): | ||
yield | ||
continue | ||
if input_verification is not None and not input_verification(elem): | ||
yield | ||
continue | ||
response = yield from method.call(**elem) | ||
yield from self.multi_settle(settle_count) | ||
if behaviour_check is not None: | ||
# Here accesses to circuit are allowed | ||
ret = behaviour_check(elem, response) | ||
if isinstance(ret, Generator): | ||
yield from ret | ||
if state_change is not None: | ||
# It is standard python function by purpose to don't allow accessing circuit | ||
state_change(elem, response) | ||
yield | ||
|
||
return f | ||
|
||
def push_process(self, in_push): | ||
def verify_in(elem): | ||
return not (frozenset(elem["addr"].items()) in self.memory) | ||
|
||
def modify_state(elem, response): | ||
self.memory[frozenset(elem["addr"].items())] = elem["data"] | ||
|
||
return self.generic_process( | ||
self.circ.push, | ||
in_push, | ||
state_change=modify_state, | ||
input_verification=verify_in, | ||
settle_count=3, | ||
name="push", | ||
) | ||
|
||
def read_process(self, in_read): | ||
def check(elem, response): | ||
addr = elem["addr"] | ||
frozen_addr = frozenset(addr.items()) | ||
if frozen_addr in self.memory: | ||
assert response["not_found"] == 0 | ||
assert response["data"] == self.memory[frozen_addr] | ||
else: | ||
assert response["not_found"] == 1 | ||
|
||
return self.generic_process(self.circ.read, in_read, behaviour_check=check, settle_count=0, name="read") | ||
|
||
def remove_process(self, in_remove): | ||
def modify_state(elem, response): | ||
if frozenset(elem["addr"].items()) in self.memory: | ||
del self.memory[frozenset(elem["addr"].items())] | ||
|
||
return self.generic_process(self.circ.remove, in_remove, state_change=modify_state, settle_count=2, name="remv") | ||
|
||
def write_process(self, in_write): | ||
def verify_in(elem): | ||
ret = frozenset(elem["addr"].items()) in self.memory | ||
return ret | ||
|
||
def check(elem, response): | ||
assert response["not_found"] == int(frozenset(elem["addr"].items()) not in self.memory) | ||
|
||
def modify_state(elem, response): | ||
if frozenset(elem["addr"].items()) in self.memory: | ||
self.memory[frozenset(elem["addr"].items())] = elem["data"] | ||
|
||
return self.generic_process( | ||
self.circ.write, | ||
in_write, | ||
behaviour_check=check, | ||
state_change=modify_state, | ||
input_verification=None, | ||
settle_count=1, | ||
name="writ", | ||
) | ||
|
||
@settings( | ||
max_examples=10, | ||
phases=(Phase.explicit, Phase.reuse, Phase.generate, Phase.shrink), | ||
derandomize=True, | ||
deadline=timedelta(milliseconds=500), | ||
) | ||
@given( | ||
generate_process_input(test_number, nop_number, [("addr", addr_layout), ("data", content_layout)]), | ||
generate_process_input(test_number, nop_number, [("addr", addr_layout), ("data", content_layout)]), | ||
generate_process_input(test_number, nop_number, [("addr", addr_layout)]), | ||
generate_process_input(test_number, nop_number, [("addr", addr_layout)]), | ||
) | ||
def test_random(self, in_push, in_write, in_read, in_remove): | ||
with self.reinitialize_fixtures(): | ||
self.setUp() | ||
with self.run_simulation(self.circ, max_cycles=500) as sim: | ||
sim.add_sync_process(self.push_process(in_push)) | ||
sim.add_sync_process(self.read_process(in_read)) | ||
sim.add_sync_process(self.write_process(in_write)) | ||
sim.add_sync_process(self.remove_process(in_remove)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from transactron.testing import * | ||
import random | ||
from transactron.utils.amaranth_ext import MultiPriorityEncoder | ||
|
||
|
||
class TestMultiPriorityEncoder(TestCaseWithSimulator): | ||
def get_expected(self, input): | ||
places = [] | ||
for i in range(self.input_width): | ||
if input % 2: | ||
places.append(i) | ||
input //= 2 | ||
places += [None] * self.output_count | ||
return places | ||
|
||
def process(self): | ||
for _ in range(self.test_number): | ||
input = random.randrange(2**self.input_width) | ||
yield self.circ.input.eq(input) | ||
yield Settle() | ||
expected_output = self.get_expected(input) | ||
for ex, real, valid in zip(expected_output, self.circ.outputs, self.circ.valids): | ||
if ex is None: | ||
assert (yield valid) == 0 | ||
else: | ||
assert (yield valid) == 1 | ||
assert (yield real) == ex | ||
yield Delay(1e-7) | ||
|
||
@pytest.mark.parametrize("input_width", [1, 5, 16, 23, 24]) | ||
@pytest.mark.parametrize("output_count", [1, 3, 4]) | ||
def test_random(self, input_width, output_count): | ||
random.seed(input_width + output_count) | ||
self.test_number = 50 | ||
self.input_width = input_width | ||
self.output_count = output_count | ||
self.circ = MultiPriorityEncoder(self.input_width, self.output_count) | ||
|
||
with self.run_simulation(self.circ) as sim: | ||
sim.add_process(self.process) | ||
|
||
@pytest.mark.parametrize("name", ["prio_encoder", None]) | ||
def test_static_create_simple(self, name): | ||
random.seed(14) | ||
self.test_number = 50 | ||
self.input_width = 7 | ||
self.output_count = 1 | ||
|
||
class DUT(Elaboratable): | ||
def __init__(self, input_width, output_count, name): | ||
self.input = Signal(input_width) | ||
self.output_count = output_count | ||
self.input_width = input_width | ||
self.name = name | ||
|
||
def elaborate(self, platform): | ||
m = Module() | ||
out, val = MultiPriorityEncoder.create_simple(m, self.input_width, self.input, name=self.name) | ||
# Save as a list to use common interface in testing | ||
self.outputs = [out] | ||
self.valids = [val] | ||
return m | ||
|
||
self.circ = DUT(self.input_width, self.output_count, name) | ||
|
||
with self.run_simulation(self.circ) as sim: | ||
sim.add_process(self.process) | ||
|
||
@pytest.mark.parametrize("name", ["prio_encoder", None]) | ||
def test_static_create(self, name): | ||
random.seed(14) | ||
self.test_number = 50 | ||
self.input_width = 7 | ||
self.output_count = 2 | ||
|
||
class DUT(Elaboratable): | ||
def __init__(self, input_width, output_count, name): | ||
self.input = Signal(input_width) | ||
self.output_count = output_count | ||
self.input_width = input_width | ||
self.name = name | ||
|
||
def elaborate(self, platform): | ||
m = Module() | ||
out = MultiPriorityEncoder.create(m, self.input_width, self.input, self.output_count, name=self.name) | ||
self.outputs, self.valids = list(zip(*out)) | ||
return m | ||
|
||
self.circ = DUT(self.input_width, self.output_count, name) | ||
|
||
with self.run_simulation(self.circ) as sim: | ||
sim.add_process(self.process) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.