diff --git a/coreblocks/frontend/fetch.py b/coreblocks/frontend/fetch.py index 1fdad5091..33a1a2129 100644 --- a/coreblocks/frontend/fetch.py +++ b/coreblocks/frontend/fetch.py @@ -38,9 +38,6 @@ def __init__(self, gen_params: GenParams, icache: CacheInterface, cont: Method) # ExceptionCauseRegister uses separate Transaction for it, so performace is not affected. self.stall_exception.add_conflict(self.resume, Priority.LEFT) - # PC of the last fetched instruction. For now only used in tests. - self.pc = Signal(self.gen_params.isa.xlen) - def elaborate(self, platform): m = TModule() @@ -91,7 +88,6 @@ def stall(exception=False): with m.If(unsafe_instr): stall() - m.d.sync += self.pc.eq(target.addr) m.d.comb += instr.eq(res.instr) self.cont(m, instr=instr, pc=target.addr, access_fault=fetch_error, rvc=0) @@ -138,9 +134,6 @@ def __init__(self, gen_params: GenParams, icache: CacheInterface, cont: Method) self.perf_rvc = HwCounter("frontend.ifu.rvc", "Number of decompressed RVC instructions") - # PC of the last fetched instruction. For now only used in tests. - self.pc = Signal(self.gen_params.isa.xlen) - def elaborate(self, platform) -> TModule: m = TModule() @@ -231,7 +224,6 @@ def elaborate(self, platform) -> TModule: m.d.sync += stalled_unsafe.eq(1) m.d.sync += flushing.eq(1) - m.d.sync += self.pc.eq(current_pc) with m.If(~cache_resp.error): m.d.sync += current_pc.eq(current_pc + Mux(is_rvc, C(2, 3), C(4, 3))) diff --git a/stubs/amaranth/_toolchain/yosys.pyi b/stubs/amaranth/_toolchain/yosys.pyi new file mode 100644 index 000000000..46cff1055 --- /dev/null +++ b/stubs/amaranth/_toolchain/yosys.pyi @@ -0,0 +1,144 @@ +""" +This type stub file was generated by pyright. +""" + +__all__ = ["YosysError", "YosysBinary", "find_yosys"] +from typing import Optional +from pathlib import Path + + +class YosysError(Exception): + ... + + +class YosysWarning(Warning): + ... + + +class YosysBinary: + @classmethod + def available(cls) -> bool: + """Check for Yosys availability. + + Returns + ------- + available : bool + ``True`` if Yosys is installed, ``False`` otherwise. Installed binary may still not + be runnable, or might be too old to be useful. + """ + ... + + @classmethod + def version(cls) -> Optional[tuple[int, int, int]]: + """Get Yosys version. + + Returns + ------- + ``None`` if version number could not be determined, or a 3-tuple ``(major, minor, distance)`` if it could. + + major : int + Major version. + minor : int + Minor version. + distance : int + Distance to last tag per ``git describe``. May not be exact for system Yosys. + """ + ... + + @classmethod + def data_dir(cls) -> pathlib.Path: + """Get Yosys data directory. + + Returns + ------- + data_dir : pathlib.Path + Yosys data directory (also known as "datdir"). + """ + ... + + @classmethod + def run(cls, args: list[str], stdin: str=...) -> str: + """Run Yosys process. + + Parameters + ---------- + args : list of str + Arguments, not including the program name. + stdin : str + Standard input. + + Returns + ------- + stdout : str + Standard output. + + Exceptions + ---------- + YosysError + Raised if Yosys returns a non-zero code. The exception message is the standard error + output. + """ + ... + + + +class _BuiltinYosys(YosysBinary): + YOSYS_PACKAGE = ... + @classmethod + def available(cls): # -> bool: + ... + + @classmethod + def version(cls): # -> tuple[int, int, int]: + ... + + @classmethod + def data_dir(cls): # -> Traversable: + ... + + @classmethod + def run(cls, args, stdin=..., *, ignore_warnings=..., src_loc_at=...): + ... + + + +class _SystemYosys(YosysBinary): + YOSYS_BINARY = ... + @classmethod + def available(cls): # -> bool: + ... + + @classmethod + def version(cls): # -> tuple[int, int, int] | None: + ... + + @classmethod + def data_dir(cls): # -> Path: + ... + + @classmethod + def run(cls, args, stdin=..., *, ignore_warnings=..., src_loc_at=...): + ... + + + +def find_yosys(requirement): + """Find an available Yosys executable of required version. + + Parameters + ---------- + requirement : function + Version check. Should return ``True`` if the version is acceptable, ``False`` otherwise. + + Returns + ------- + yosys_binary : subclass of YosysBinary + Proxy for running the requested version of Yosys. + + Exceptions + ---------- + YosysError + Raised if required Yosys version is not found. + """ + ... + diff --git a/stubs/amaranth/back/__init__.pyi b/stubs/amaranth/back/__init__.pyi new file mode 100644 index 000000000..006bc2749 --- /dev/null +++ b/stubs/amaranth/back/__init__.pyi @@ -0,0 +1,4 @@ +""" +This type stub file was generated by pyright. +""" + diff --git a/stubs/amaranth/back/verilog.pyi b/stubs/amaranth/back/verilog.pyi new file mode 100644 index 000000000..2850050a3 --- /dev/null +++ b/stubs/amaranth/back/verilog.pyi @@ -0,0 +1,14 @@ +""" +This type stub file was generated by pyright. +""" + +from .._toolchain.yosys import * +from ..hdl.ast import SignalDict + +__all__ = ["YosysError", "convert", "convert_fragment"] +def convert_fragment(*args, strip_internal_attrs=..., **kwargs) -> tuple[str, SignalDict]: + ... + +def convert(elaboratable, name=..., platform=..., *, ports=..., emit_src=..., strip_internal_attrs=..., **kwargs) -> str: + ... + diff --git a/test/regression/cocotb.py b/test/regression/cocotb.py index 1c818196e..faae07b9e 100644 --- a/test/regression/cocotb.py +++ b/test/regression/cocotb.py @@ -157,6 +157,17 @@ def get_cocotb_handle(self, path_components: list[str]) -> ModifiableObject: return obj + async def assert_handler(self, clock): + clock_edge_event = FallingEdge(clock) + + while True: + for assert_info in self.gen_info.asserts: + assert_val = self.get_cocotb_handle(assert_info.location) + n, i = assert_info.src_loc + assert assert_val.value, f"Assertion at {n}:{i}" + + await clock_edge_event # type: ignore + async def run(self, mem_model: CoreMemoryModel, timeout_cycles: int = 5000) -> SimulationExecutionResult: clk = Clock(self.dut.clk, 1, "ns") cocotb.start_soon(clk.start()) @@ -171,6 +182,8 @@ async def run(self, mem_model: CoreMemoryModel, timeout_cycles: int = 5000) -> S data_wb = WishboneSlave(self.dut, "wb_data", self.dut.clk, mem_model, is_instr_bus=False) cocotb.start_soon(data_wb.start()) + cocotb.start_soon(self.assert_handler(self.dut.clk)) + success = True try: await with_timeout(self.finish_event.wait(), timeout_cycles, "ns") diff --git a/test/test_core.py b/test/test_core.py index 1522fa3a6..8bf5c8f1b 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -10,27 +10,15 @@ from coreblocks.params.configurations import CoreConfiguration, basic_core_config, full_core_config from coreblocks.peripherals.wishbone import WishboneBus, WishboneMemorySlave -from typing import Optional, cast +from typing import Optional import random import subprocess import tempfile from parameterized import parameterized_class from riscvmodel.insn import ( InstructionADDI, - InstructionSLTI, - InstructionSLTIU, - InstructionXORI, - InstructionORI, - InstructionANDI, - InstructionSLLI, - InstructionSRLI, - InstructionSRAI, InstructionLUI, - InstructionJAL, ) -from riscvmodel.model import Model -from riscvmodel.isa import Instruction, InstructionRType, get_insns -from riscvmodel.variant import RV32I class CoreTestElaboratable(Elaboratable): @@ -58,14 +46,12 @@ def elaborate(self, platform): ) self.core = Core(gen_params=self.gen_params, wb_instr_bus=wb_instr_bus, wb_data_bus=wb_data_bus) self.io_in = TestbenchIO(AdapterTrans(self.core.fetch_continue.method)) - self.rf_write = TestbenchIO(AdapterTrans(self.core.RF.write)) self.interrupt = TestbenchIO(AdapterTrans(self.core.interrupt_controller.report_interrupt)) m.submodules.wb_mem_slave = self.wb_mem_slave m.submodules.wb_mem_slave_data = self.wb_mem_slave_data m.submodules.c = self.core m.submodules.io_in = self.io_in - m.submodules.rf_write = self.rf_write m.submodules.interrupt = self.interrupt m.d.comb += wb_instr_bus.connect(self.wb_mem_slave.bus) @@ -74,53 +60,19 @@ def elaborate(self, platform): return m -def gen_riscv_add_instr(dst, src1, src2): - return 0b0110011 | dst << 7 | src1 << 15 | src2 << 20 - - -def gen_riscv_lui_instr(dst, imm): - return 0b0110111 | dst << 7 | imm << 12 - - class TestCoreBase(TestCaseWithSimulator): gen_params: GenParams m: CoreTestElaboratable - def check_RAT_alloc(self, rat, expected_alloc_count=None): # noqa: N802 - allocated = [] - for i in range(self.m.gen_params.isa.reg_cnt): - allocated.append((yield rat.entries[i])) - filtered_zeros = list(filter(lambda x: x != 0, allocated)) - - # check if 0th register is set to 0 - self.assertEqual(allocated[0], 0) - # check if there are no duplicate physical registers allocated for two different architectural registers - self.assertEqual(len(filtered_zeros), len(set(filtered_zeros))) - # check if the expected number of allocated registers matches reality - if expected_alloc_count: - self.assertEqual(len(filtered_zeros), expected_alloc_count) - def get_phys_reg_rrat(self, reg_id): return (yield self.m.core.RRAT.entries[reg_id]) - def get_phys_reg_frat(self, reg_id): - return (yield self.m.core.FRAT.entries[reg_id]) - def get_arch_reg_val(self, reg_id): return (yield self.m.core.RF.entries[(yield from self.get_phys_reg_rrat(reg_id))].reg_val) - def get_phys_reg_val(self, reg_id): - return (yield self.m.core.RF.entries[reg_id].reg_val) - def push_instr(self, opcode): yield from self.m.io_in.call(instr=opcode) - def compare_core_states(self, sw_core): - for i in range(self.gen_params.isa.reg_cnt): - reg_val = sw_core.state.intreg.regs[i].value - unsigned_val = reg_val & 0xFFFFFFFF - self.assertEqual((yield from self.get_arch_reg_val(i)), unsigned_val) - def push_register_load_imm(self, reg_id, val): addi_imm = signed_to_int(val & 0xFFF, 12) lui_imm = (val & 0xFFFFF000) >> 12 @@ -132,123 +84,6 @@ def push_register_load_imm(self, reg_id, val): yield from self.push_instr(InstructionADDI(reg_id, reg_id, addi_imm).encode()) -class TestCoreSimple(TestCoreBase): - def simple_test(self): - # this test first provokes allocation of physical registers, - # then sets the values in those registers, and finally runs - # an actual computation. - - # The test sets values in the reg file by hand - - # provoking allocation of physical register - for i in range(self.m.gen_params.isa.reg_cnt - 1): - yield from self.push_instr(gen_riscv_add_instr(i + 1, 0, 0)) - - # waiting for the retirement rat to be set - for i in range(100): - yield - - # checking if all registers have been allocated - yield from self.check_RAT_alloc(self.m.core.FRAT, 31) - yield from self.check_RAT_alloc(self.m.core.RRAT, 31) - - # writing values to physical registers - yield from self.m.rf_write.call(reg_id=(yield from self.get_phys_reg_rrat(1)), reg_val=1) - yield from self.m.rf_write.call(reg_id=(yield from self.get_phys_reg_rrat(2)), reg_val=2) - yield from self.m.rf_write.call(reg_id=(yield from self.get_phys_reg_rrat(3)), reg_val=3) - - # waiting for potential conflicts on rf_write - for i in range(10): - yield - - self.assertEqual((yield from self.get_arch_reg_val(1)), 1) - self.assertEqual((yield from self.get_arch_reg_val(2)), 2) - self.assertEqual((yield from self.get_arch_reg_val(3)), 3) - - # issuing actual instructions for the test - yield from self.push_instr(gen_riscv_add_instr(4, 1, 2)) - yield from self.push_instr(gen_riscv_add_instr(4, 3, 4)) - yield from self.push_instr(gen_riscv_lui_instr(5, 1)) - - # waiting for the instructions to be processed - for i in range(50): - yield - - self.assertEqual((yield from self.get_arch_reg_val(1)), 1) - self.assertEqual((yield from self.get_arch_reg_val(2)), 2) - self.assertEqual((yield from self.get_arch_reg_val(3)), 3) - # 1 + 2 + 3 = 6 - self.assertEqual((yield from self.get_arch_reg_val(4)), 6) - self.assertEqual((yield from self.get_arch_reg_val(5)), 1 << 12) - - def test_simple(self): - self.gen_params = GenParams(basic_core_config) - m = CoreTestElaboratable(self.gen_params) - self.m = m - - with self.run_simulation(m) as sim: - sim.add_sync_process(self.simple_test) - - -class TestCoreRandomized(TestCoreBase): - def randomized_input(self): - infloop_addr = (len(self.instr_mem) - 1) * 4 - # wait for PC to go past all instruction - while (yield self.m.core.fetch.pc) != infloop_addr: - yield - - # finish calculations - yield from self.tick(50) - - yield from self.compare_core_states(self.software_core) - - def test_randomized(self): - self.gen_params = GenParams(basic_core_config) - self.instr_count = 300 - random.seed(42) - - # cast is there to avoid stubbing riscvmodel - instructions = cast(list[type[Instruction]], get_insns(cls=InstructionRType, variant=RV32I)) - instructions += [ - InstructionADDI, - InstructionSLTI, - InstructionSLTIU, - InstructionXORI, - InstructionORI, - InstructionANDI, - InstructionSLLI, - InstructionSRLI, - InstructionSRAI, - InstructionLUI, - ] - - # allocate some random values for registers - init_instr_list = list( - InstructionADDI(rd=i, rs1=0, imm=random.randint(-(2**11), 2**11 - 1)) - for i in range(self.gen_params.isa.reg_cnt) - ) - - # generate random instruction stream - instr_list = list(random.choice(instructions)() for _ in range(self.instr_count)) - for instr in instr_list: - instr.randomize(RV32I) - - self.software_core = Model(RV32I) - self.software_core.execute(init_instr_list) - self.software_core.execute(instr_list) - - # We add JAL instruction at the end to effectively create a infinite loop at the end of the program. - all_instr = init_instr_list + instr_list + [InstructionJAL(rd=0, imm=0)] - - self.instr_mem = list(map(lambda x: x.encode(), all_instr)) - - m = CoreTestElaboratable(self.gen_params, instr_mem=self.instr_mem) - self.m = m - - with self.run_simulation(m) as sim: - sim.add_sync_process(self.randomized_input) - - class TestCoreAsmSourceBase(TestCoreBase): base_dir: str = "test/asm/" diff --git a/test/transactron/testing/test_assertion.py b/test/transactron/testing/test_assertion.py index c5bc1284b..4becf3062 100644 --- a/test/transactron/testing/test_assertion.py +++ b/test/transactron/testing/test_assertion.py @@ -14,7 +14,7 @@ def elaborate(self, platform): m.d.comb += self.output.eq(self.input & ~self.input) - assertion(self.input == self.output) + assertion(m, self.input == self.output) return m diff --git a/transactron/testing/assertion.py b/transactron/testing/assertion.py index 8ae9bdf0d..19c5a4149 100644 --- a/transactron/testing/assertion.py +++ b/transactron/testing/assertion.py @@ -2,21 +2,18 @@ from typing import Any from amaranth.sim import Passive, Tick from transactron.utils import assert_bit, assert_bits -from transactron.utils.dependencies import DependencyContext __all__ = ["make_assert_handler"] def make_assert_handler(my_assert: Callable[[int, str], Any]): - dependency_manager = DependencyContext.get() - def assert_handler(): yield Passive() while True: yield Tick("sync_neg") - if not (yield assert_bit(dependency_manager)): - for v, (n, i) in assert_bits(dependency_manager): + if not (yield assert_bit()): + for v, (n, i) in assert_bits(): my_assert((yield v), f"Assertion at {n}:{i}") yield diff --git a/transactron/utils/assertion.py b/transactron/utils/assertion.py index 537fbd0b5..b79a74fef 100644 --- a/transactron/utils/assertion.py +++ b/transactron/utils/assertion.py @@ -4,17 +4,18 @@ import operator from dataclasses import dataclass from transactron.utils import SrcLoc -from transactron.utils.dependencies import DependencyContext, DependencyManager, ListKey +from transactron.utils._typing import ModuleLike, ValueLike +from transactron.utils.dependencies import DependencyContext, ListKey __all__ = ["AssertKey", "assertion", "assert_bit", "assert_bits"] @dataclass(frozen=True) -class AssertKey(ListKey[tuple[Value, SrcLoc]]): +class AssertKey(ListKey[tuple[Signal, SrcLoc]]): pass -def assertion(value: Value, *, src_loc_at: int = 0): +def assertion(m: ModuleLike, value: ValueLike, *, src_loc_at: int = 0): """Define an assertion. This function might help find some hardware bugs which might otherwise be @@ -24,6 +25,8 @@ def assertion(value: Value, *, src_loc_at: int = 0): Parameters ---------- + m: Module + Module in which the assertion is defined. value : Value If the value of this Amaranth expression is false, the assertion will fail. @@ -32,35 +35,26 @@ def assertion(value: Value, *, src_loc_at: int = 0): identify the failing assertion. """ src_loc = get_src_loc(src_loc_at) + sig = Signal() + m.d.comb += sig.eq(value) dependencies = DependencyContext.get() - dependencies.add_dependency(AssertKey(), (value, src_loc)) + dependencies.add_dependency(AssertKey(), (sig, src_loc)) -def assert_bits(dependencies: DependencyManager) -> list[tuple[Value, SrcLoc]]: +def assert_bits() -> list[tuple[Signal, SrcLoc]]: """Gets assertion bits. This function returns all the assertion signals created by `assertion`, together with their source locations. - - Parameters - ---------- - dependencies : DependencyManager - The assertion feature uses the `DependencyManager` to store - assertions. """ + dependencies = DependencyContext.get() return dependencies.get_dependency(AssertKey()) -def assert_bit(dependencies: DependencyManager) -> Value: +def assert_bit() -> Signal: """Gets assertion bit. The signal returned by this function is false if and only if there exists a false signal among assertion bits created by `assertion`. - - Parameters - ---------- - dependencies : DependencyManager - The assertion feature uses the `DependencyManager` to store - assertions. """ - return reduce(operator.and_, [a[0] for a in assert_bits(dependencies)], C(1)) + return reduce(operator.and_, [a[0] for a in assert_bits()], C(1)) diff --git a/transactron/utils/gen.py b/transactron/utils/gen.py index d5aa53d89..b2cbf6da9 100644 --- a/transactron/utils/gen.py +++ b/transactron/utils/gen.py @@ -6,6 +6,9 @@ from amaranth.hdl import Fragment from transactron.lib.metrics import HardwareMetricsManager +from transactron.utils._typing import SrcLoc +from transactron.utils.assertion import assert_bits + from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -14,6 +17,7 @@ __all__ = [ "MetricLocation", + "AssertLocation", "GenerationInfo", "generate_verilog", ] @@ -35,6 +39,25 @@ class MetricLocation: regs: dict[str, list[str]] = field(default_factory=dict) +@dataclass_json +@dataclass +class AssertLocation: + """Information about an assert signal in the generated Verilog code. + + Attributes + ---------- + location : list[str] + The location of the assert signal. The location is a list of Verilog + identifiers that denote a path consisting of module names (and the + signal name at the end) leading to the signal wire. + src_loc : SrcLoc + Source location of the assertion. + """ + + location: list[str] + src_loc: SrcLoc + + @dataclass_json @dataclass class GenerationInfo: @@ -45,9 +68,12 @@ class GenerationInfo: metrics_location : dict[str, MetricInfo] Mapping from a metric name to an object storing Verilog locations of its registers. + asserts : list[AssertLocation] + Locations and metadata for assertion signals. """ metrics_location: dict[str, MetricLocation] = field(default_factory=dict) + asserts: list[AssertLocation] = field(default_factory=list) def encode(self, file_name: str): """ @@ -116,12 +142,21 @@ def collect_metric_locations(name_map: "SignalDict") -> dict[str, MetricLocation return metrics_location +def collect_asserts(name_map: "SignalDict") -> list[AssertLocation]: + asserts: list[AssertLocation] = [] + + for v, src_loc in assert_bits(): + asserts.append(AssertLocation(get_signal_location(v, name_map), src_loc)) + + return asserts + + def generate_verilog( top_module: Elaboratable, ports: list[Signal], top_name: str = "top" ) -> tuple[str, GenerationInfo]: fragment = Fragment.get(top_module, platform=None).prepare(ports=ports) verilog_text, name_map = verilog.convert_fragment(fragment, name=top_name, emit_src=True, strip_internal_attrs=True) - gen_info = GenerationInfo(metrics_location=collect_metric_locations(name_map)) # type: ignore + gen_info = GenerationInfo(metrics_location=collect_metric_locations(name_map), asserts=collect_asserts(name_map)) return verilog_text, gen_info