Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiport MemoryBank #736

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 102 additions & 59 deletions test/transactron/lib/test_transaction_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,97 +142,140 @@ class TestMemoryBank(TestCaseWithSimulator):

@pytest.mark.parametrize("max_addr, writer_rand, reader_req_rand, reader_resp_rand, seed", test_conf)
@pytest.mark.parametrize("transparent", [False, True])
@pytest.mark.parametrize("read_ports", [1, 2])
@pytest.mark.parametrize("write_ports", [1, 2])
def test_mem(
self, max_addr: int, writer_rand: int, reader_req_rand: int, reader_resp_rand: int, seed: int, transparent: bool
self,
max_addr: int,
writer_rand: int,
reader_req_rand: int,
reader_resp_rand: int,
seed: int,
transparent: bool,
read_ports: int,
write_ports: int,
):
test_count = 200

data_width = 6
m = SimpleTestCircuit(
MemoryBank(data_layout=[("data", data_width)], elem_count=max_addr, transparent=transparent)
MemoryBank(
data_layout=[("data", data_width)],
elem_count=max_addr,
transparent=transparent,
read_ports=read_ports,
write_ports=write_ports,
)
)

data: list[int] = list(0 for _ in range(max_addr))
read_req_queue = deque()
data: list[int] = [0 for _ in range(max_addr)]
read_req_queues = [deque() for _ in range(read_ports)]

random.seed(seed)

def writer():
for cycle in range(test_count):
d = random.randrange(2**data_width)
a = random.randrange(max_addr)
yield from m.write.call(data=d, addr=a)
for _ in range(2 if not transparent else 0):
yield Settle()
data[a] = d
yield from self.random_wait(writer_rand)
def writer(i):
def process():
for cycle in range(test_count):
d = random.randrange(2**data_width)
a = random.randrange(max_addr)
yield from m.writes[i].call(data=d, addr=a)
for _ in range(i + 2 if not transparent else i):
yield Settle()
data[a] = d
yield from self.random_wait(writer_rand)

def reader_req():
for cycle in range(test_count):
a = random.randrange(max_addr)
yield from m.read_req.call(addr=a)
for _ in range(1):
yield Settle()
d = data[a]
read_req_queue.append(d)
yield from self.random_wait(reader_req_rand)

def reader_resp():
for cycle in range(test_count):
for _ in range(3):
yield Settle()
while not read_req_queue:
yield from self.random_wait(reader_resp_rand or 1, min_cycle_cnt=1)
for _ in range(3):
return process

def reader_req(i):
def process():
for cycle in range(test_count):
a = random.randrange(max_addr)
yield from m.read_reqs[i].call(addr=a)
for _ in range(1 if not transparent else write_ports + 2):
yield Settle()
d = read_req_queue.popleft()
assert (yield from m.read_resp.call()) == {"data": d}
yield from self.random_wait(reader_resp_rand)
d = data[a]
read_req_queues[i].append(d)
yield from self.random_wait(reader_req_rand)

return process

def reader_resp(i):
def process():
for cycle in range(test_count):
for _ in range(write_ports + 3):
yield Settle()
while not read_req_queues[i]:
yield from self.random_wait(reader_resp_rand or 1, min_cycle_cnt=1)
for _ in range(write_ports + 3):
yield Settle()
d = read_req_queues[i].popleft()
assert (yield from m.read_resps[i].call()) == {"data": d}
yield from self.random_wait(reader_resp_rand)

return process

pipeline_test = writer_rand == 0 and reader_req_rand == 0 and reader_resp_rand == 0
max_cycles = test_count + 2 if pipeline_test else 100000

with self.run_simulation(m, max_cycles=max_cycles) as sim:
sim.add_process(reader_req)
sim.add_process(reader_resp)
sim.add_process(writer)
for i in range(read_ports):
sim.add_process(reader_req(i))
sim.add_process(reader_resp(i))
for i in range(write_ports):
sim.add_process(writer(i))


class TestAsyncMemoryBank(TestCaseWithSimulator):
@parameterized.expand([(9, 3, 3, 14), (16, 1, 1, 15), (16, 1, 1, 16), (12, 3, 1, 17)])
def test_mem(self, max_addr, writer_rand, reader_rand, seed):
@pytest.mark.parametrize(
"max_addr, writer_rand, reader_rand, seed", [(9, 3, 3, 14), (16, 1, 1, 15), (16, 1, 1, 16), (12, 3, 1, 17)]
)
@pytest.mark.parametrize("read_ports", [1, 2])
@pytest.mark.parametrize("write_ports", [1, 2])
def test_mem(self, max_addr: int, writer_rand: int, reader_rand: int, seed: int, read_ports: int, write_ports: int):
test_count = 200

data_width = 6
m = SimpleTestCircuit(AsyncMemoryBank(data_layout=[("data", data_width)], elem_count=max_addr))
m = SimpleTestCircuit(
AsyncMemoryBank(
data_layout=[("data", data_width)], elem_count=max_addr, read_ports=read_ports, write_ports=write_ports
)
)

data: list[int] = list(0 for i in range(max_addr))

random.seed(seed)

def writer():
for cycle in range(test_count):
d = random.randrange(2**data_width)
a = random.randrange(max_addr)
yield from m.write.call(data=d, addr=a)
for _ in range(2):
yield Settle()
data[a] = d
yield from self.random_wait(writer_rand, min_cycle_cnt=1)
def writer(i):
def process():
for cycle in range(test_count):
d = random.randrange(2**data_width)
a = random.randrange(max_addr)
yield from m.writes[i].call(data=d, addr=a)
for _ in range(i + 2):
yield Settle()
data[a] = d
yield from self.random_wait(writer_rand, min_cycle_cnt=1)

def reader():
for cycle in range(test_count):
a = random.randrange(max_addr)
d = yield from m.read.call(addr=a)
for _ in range(1):
yield Settle()
expected_d = data[a]
assert d["data"] == expected_d
yield from self.random_wait(reader_rand, min_cycle_cnt=1)
return process

def reader(i):
def process():
for cycle in range(test_count):
a = random.randrange(max_addr)
d = yield from m.reads[i].call(addr=a)
for _ in range(1):
yield Settle()
expected_d = data[a]
assert d["data"] == expected_d
yield from self.random_wait(reader_rand, min_cycle_cnt=1)

return process

with self.run_simulation(m) as sim:
sim.add_process(reader)
sim.add_process(writer)
for i in range(read_ports):
sim.add_process(reader(i))
for i in range(write_ports):
sim.add_process(writer(i))


class ManyToOneConnectTransTestCircuit(Elaboratable):
Expand Down
Loading