diff --git a/coreblocks/core.py b/coreblocks/core.py index 46f0f6b91..e52ad9f2c 100644 --- a/coreblocks/core.py +++ b/coreblocks/core.py @@ -1,4 +1,5 @@ from amaranth import * +from amaranth.lib.wiring import flipped, connect from transactron.utils.dependencies import DependencyManager, DependencyContext from coreblocks.stages.func_blocks_unifier import FuncBlocksUnifier @@ -27,7 +28,7 @@ from coreblocks.stages.retirement import Retirement from coreblocks.cache.icache import ICache, ICacheBypass from coreblocks.peripherals.bus_adapter import WishboneMasterAdapter -from coreblocks.peripherals.wishbone import WishboneMaster, WishboneBus +from coreblocks.peripherals.wishbone import WishboneMaster, WishboneInterface from coreblocks.cache.refiller import SimpleCommonBusCacheRefiller from coreblocks.frontend.fetch import Fetch, UnalignedFetch from transactron.lib.transformers import MethodMap, MethodProduct @@ -38,7 +39,7 @@ class Core(Elaboratable): - def __init__(self, *, gen_params: GenParams, wb_instr_bus: WishboneBus, wb_data_bus: WishboneBus): + def __init__(self, *, gen_params: GenParams, wb_instr_bus: WishboneInterface, wb_data_bus: WishboneInterface): self.gen_params = gen_params dep_manager = DependencyContext.get() @@ -117,8 +118,8 @@ def __init__(self, *, gen_params: GenParams, wb_instr_bus: WishboneBus, wb_data_ def elaborate(self, platform): m = TModule() - m.d.comb += self.wb_master_instr.wb_master.connect(self.wb_instr_bus) - m.d.comb += self.wb_master_data.wb_master.connect(self.wb_data_bus) + connect(m, flipped(self.wb_instr_bus), self.wb_master_instr.wb_master) + connect(m, flipped(self.wb_data_bus), self.wb_master_data.wb_master) m.submodules.wb_master_instr = self.wb_master_instr m.submodules.wb_master_data = self.wb_master_data diff --git a/coreblocks/peripherals/axi_lite.py b/coreblocks/peripherals/axi_lite.py index 4059e30cd..268c396ab 100644 --- a/coreblocks/peripherals/axi_lite.py +++ b/coreblocks/peripherals/axi_lite.py @@ -1,10 +1,12 @@ +from typing import Protocol, TypeAlias, runtime_checkable from amaranth import * -from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT +from amaranth.lib.wiring import Component, Signature, In, Out from transactron import Method, def_method, TModule from transactron.core import Transaction from transactron.lib.connectors import Forwarder +from transactron.utils._typing import AbstractInterface, AbstractSignature -__all__ = ["AXILiteParameters", "AXILiteMaster"] +__all__ = ["AXILiteParameters", "AXILiteSignature", "AXILiteInterface", "AXILiteMaster"] class AXILiteParameters: @@ -24,65 +26,116 @@ def __init__(self, *, data_width: int = 64, addr_width: int = 64): self.granularity = 8 -class AXILiteLayout: - """AXI-Lite bus layout generator +class AXILiteSignature(Signature): + """AXI-Lite bus signature Parameters ---------- axil_params: AXILiteParameters - Patameters used to generate AXI-Lite layout - master: Boolean - Whether the layout should be generated for master side - (if false it's generatd for the slave side) - - Attributes - ---------- - axil_layout: Record - Record of a AXI-Lite bus. + Patameters used to generate AXI-Lite signature """ - def __init__(self, axil_params: AXILiteParameters, *, master: bool = True): - write_address = [ - ("valid", 1, DIR_FANOUT if master else DIR_FANIN), - ("rdy", 1, DIR_FANIN if master else DIR_FANOUT), - ("addr", axil_params.addr_width, DIR_FANOUT if master else DIR_FANIN), - ("prot", 3, DIR_FANOUT if master else DIR_FANIN), - ] + def __init__(self, axil_params: AXILiteParameters): + write_address = Signature( + { + "valid": Out(1), + "rdy": In(1), + "addr": Out(axil_params.addr_width), + "prot": Out(3), + } + ) - write_data = [ - ("valid", 1, DIR_FANOUT if master else DIR_FANIN), - ("rdy", 1, DIR_FANIN if master else DIR_FANOUT), - ("data", axil_params.data_width, DIR_FANOUT if master else DIR_FANIN), - ("strb", axil_params.data_width // 8, DIR_FANOUT if master else DIR_FANIN), - ] + write_data = Signature( + { + "valid": Out(1), + "rdy": In(1), + "data": Out(axil_params.data_width), + "strb": Out(axil_params.data_width // 8), + } + ) - write_response = [ - ("valid", 1, DIR_FANIN if master else DIR_FANOUT), - ("rdy", 1, DIR_FANOUT if master else DIR_FANIN), - ("resp", 2, DIR_FANIN if master else DIR_FANOUT), - ] + write_response = Signature( + { + "valid": In(1), + "rdy": Out(1), + "resp": In(2), + } + ) - read_address = [ - ("valid", 1, DIR_FANOUT if master else DIR_FANIN), - ("rdy", 1, DIR_FANIN if master else DIR_FANOUT), - ("addr", axil_params.addr_width, DIR_FANOUT if master else DIR_FANIN), - ("prot", 3, DIR_FANOUT if master else DIR_FANIN), - ] + read_address = Signature( + { + "valid": Out(1), + "rdy": In(1), + "addr": Out(axil_params.addr_width), + "prot": Out(3), + } + ) - read_data = [ - ("valid", 1, DIR_FANIN if master else DIR_FANOUT), - ("rdy", 1, DIR_FANOUT if master else DIR_FANIN), - ("data", axil_params.data_width, DIR_FANIN if master else DIR_FANOUT), - ("resp", 2, DIR_FANIN if master else DIR_FANOUT), - ] + read_data = Signature( + { + "valid": In(1), + "rdy": Out(1), + "data": In(axil_params.data_width), + "resp": In(2), + } + ) - self.axil_layout = [ - ("write_address", write_address), - ("write_data", write_data), - ("write_response", write_response), - ("read_address", read_address), - ("read_data", read_data), - ] + super().__init__( + { + "write_address": Out(write_address), + "write_data": Out(write_data), + "write_response": Out(write_response), + "read_address": Out(read_address), + "read_data": Out(read_data), + } + ) + + +class AXILiteWriteAddressInterface(AbstractInterface[AbstractSignature], Protocol): + valid: Signal + rdy: Signal + addr: Signal + prot: Signal + + +class AXILiteWriteDataInterface(AbstractInterface[AbstractSignature], Protocol): + valid: Signal + rdy: Signal + data: Signal + strb: Signal + + +class AXILiteWriteResponseInterface(AbstractInterface[AbstractSignature], Protocol): + valid: Signal + rdy: Signal + resp: Signal + + +class AXILiteReadAddressInterface(AbstractInterface[AbstractSignature], Protocol): + valid: Signal + rdy: Signal + addr: Signal + prot: Signal + + +@runtime_checkable +class AXILiteReadDataInterface(AbstractInterface[AbstractSignature], Protocol): + valid: Signal + rdy: Signal + data: Signal + resp: Signal + + +class AXILiteInterface(AbstractInterface[AbstractSignature], Protocol): + write_address: AXILiteWriteAddressInterface + write_data: AXILiteWriteDataInterface + write_response: AXILiteWriteResponseInterface + read_address: AXILiteReadAddressInterface + read_data: AXILiteReadDataInterface + + +AXILiteOutChannel: TypeAlias = AXILiteWriteAddressInterface | AXILiteWriteDataInterface | AXILiteReadAddressInterface +AXILiteInChannel: TypeAlias = AXILiteWriteResponseInterface | AXILiteReadDataInterface class AXILiteMasterMethodLayouts: @@ -137,7 +190,7 @@ def __init__(self, axil_params: AXILiteParameters): ] -class AXILiteMaster(Elaboratable): +class AXILiteMaster(Component): """AXI-Lite master interface. Parameters @@ -173,10 +226,11 @@ class AXILiteMaster(Elaboratable): Returns response state as 'wr_response_layout'. """ + axil_master: AXILiteInterface + def __init__(self, axil_params: AXILiteParameters): + super().__init__({"axil_master": Out(AXILiteSignature(axil_params))}) self.axil_params = axil_params - self.axil_layout = AXILiteLayout(self.axil_params).axil_layout - self.axil_master = Record(self.axil_layout) self.method_layouts = AXILiteMasterMethodLayouts(self.axil_params) @@ -195,7 +249,7 @@ def start_request_transaction(self, m, arg, *, channel, is_address_channel): m.d.sync += channel.strb.eq(arg.strb) m.d.sync += channel.valid.eq(1) - def state_machine_request(self, m: TModule, method: Method, *, channel: Record, request_signal: Signal): + def state_machine_request(self, m: TModule, method: Method, *, channel: AXILiteOutChannel, request_signal: Signal): with m.FSM("Idle"): with m.State("Idle"): m.d.sync += channel.valid.eq(0) @@ -212,11 +266,11 @@ def state_machine_request(self, m: TModule, method: Method, *, channel: Record, with m.Else(): m.d.comb += request_signal.eq(0) - def result_handler(self, m: TModule, forwarder: Forwarder, *, data: bool, channel: Record): + def result_handler(self, m: TModule, forwarder: Forwarder, *, channel: AXILiteInChannel): with m.If(channel.rdy & channel.valid): m.d.sync += channel.rdy.eq(forwarder.read.run) with Transaction().body(m): - if data: + if isinstance(channel, AXILiteReadDataInterface): forwarder.write(m, data=channel.data, resp=channel.resp) else: forwarder.write(m, resp=channel.resp) @@ -245,7 +299,7 @@ def _(arg): self.start_request_transaction(m, arg, channel=self.axil_master.read_address, is_address_channel=True) # read_data - self.result_handler(m, rd_forwarder, data=True, channel=self.axil_master.read_data) + self.result_handler(m, rd_forwarder, channel=self.axil_master.read_data) @def_method(m, self.rd_response) def _(): @@ -276,7 +330,7 @@ def _(arg): self.start_request_transaction(m, arg, channel=self.axil_master.write_data, is_address_channel=False) # write_response - self.result_handler(m, wr_forwarder, data=False, channel=self.axil_master.write_response) + self.result_handler(m, wr_forwarder, channel=self.axil_master.write_response) @def_method(m, self.wr_response) def _(): diff --git a/coreblocks/peripherals/wishbone.py b/coreblocks/peripherals/wishbone.py index 75f71522a..e71617682 100644 --- a/coreblocks/peripherals/wishbone.py +++ b/coreblocks/peripherals/wishbone.py @@ -1,13 +1,14 @@ from amaranth import * -from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT +from amaranth.lib.wiring import PureInterface, Signature, In, Out, Component from functools import reduce -from typing import List +from typing import Protocol, cast import operator from transactron import Method, def_method, TModule from transactron.core import Transaction from transactron.lib import AdapterTrans, BasicFifo from transactron.utils import OneHotSwitchDynamic, assign, RoundRobin +from transactron.utils._typing import AbstractInterface, AbstractSignature from transactron.lib.connectors import Forwarder from transactron.utils.transactron_helpers import make_layout @@ -31,52 +32,45 @@ def __init__(self, *, data_width: int = 64, addr_width: int = 64, granularity: i self.granularity = granularity -class WishboneLayout: - """Wishbone bus Layout generator. - - Parameters - ---------- - wb_params: WishboneParameters - Parameters used to generate Wishbone layout - master: Boolean - Whether the layout should be generated for the master side - (otherwise it's generated for the slave side) - - Attributes - ---------- - wb_layout: Record - Record of a Wishbone bus. - """ - - def __init__(self, wb_params: WishboneParameters, master=True): - self.wb_layout = [ - ("dat_r", wb_params.data_width, DIR_FANIN if master else DIR_FANOUT), - ("dat_w", wb_params.data_width, DIR_FANOUT if master else DIR_FANIN), - ("rst", 1, DIR_FANOUT if master else DIR_FANIN), - ("ack", 1, DIR_FANIN if master else DIR_FANOUT), - ("adr", wb_params.addr_width, DIR_FANOUT if master else DIR_FANIN), - ("cyc", 1, DIR_FANOUT if master else DIR_FANIN), - ("stall", 1, DIR_FANIN if master else DIR_FANOUT), - ("err", 1, DIR_FANIN if master else DIR_FANOUT), - ("lock", 1, DIR_FANOUT if master else DIR_FANIN), - ("rty", 1, DIR_FANIN if master else DIR_FANOUT), - ("sel", wb_params.data_width // wb_params.granularity, DIR_FANOUT if master else DIR_FANIN), - ("stb", 1, DIR_FANOUT if master else DIR_FANIN), - ("we", 1, DIR_FANOUT if master else DIR_FANIN), - ] +class WishboneSignature(Signature): + def __init__(self, wb_params: WishboneParameters): + super().__init__( + { + "dat_r": In(wb_params.data_width), + "dat_w": Out(wb_params.data_width), + "rst": Out(1), + "ack": In(1), + "adr": Out(wb_params.addr_width), + "cyc": Out(1), + "stall": In(1), + "err": In(1), + "lock": Out(1), + "rty": In(1), + "sel": Out(wb_params.data_width // wb_params.granularity), + "stb": Out(1), + "we": Out(1), + } + ) + def create(self, *, path: tuple[str | int, ...] = (), src_loc_at: int = 0): + """Create a WishboneInterface.""" # workaround for Sphinx problem with Amaranth docstring + return cast(WishboneInterface, PureInterface(self, path=path, src_loc_at=src_loc_at + 1)) -class WishboneBus(Record): - """Wishbone bus. - Parameters - ---------- - wb_params: WishboneParameters - Parameters for bus generation. - """ - - def __init__(self, wb_params: WishboneParameters, **kwargs): - super().__init__(WishboneLayout(wb_params).wb_layout, **kwargs) +class WishboneInterface(AbstractInterface[AbstractSignature], Protocol): + dat_r: Signal + dat_w: Signal + rst: Signal + ack: Signal + adr: Signal + cyc: Signal + stall: Signal + err: Signal + lock: Signal + rty: Signal + sel: Signal + stb: Signal + we: Signal class WishboneMasterMethodLayout: @@ -107,7 +101,7 @@ def __init__(self, wb_params: WishboneParameters): self.result_layout = make_layout(("data", wb_params.data_width), ("err", 1)) -class WishboneMaster(Elaboratable): +class WishboneMaster(Component): """Wishbone bus master interface. Parameters @@ -117,7 +111,7 @@ class WishboneMaster(Elaboratable): Attributes ---------- - wb_master: Record (like WishboneLayout) + wb_master: WishboneInterface Wishbone bus output. request: Method Transactional method to start a new Wishbone request. @@ -129,10 +123,11 @@ class WishboneMaster(Elaboratable): Returns state of request (error or success) and data (in case of read request) as `result_layout`. """ + wb_master: WishboneInterface + def __init__(self, wb_params: WishboneParameters): + super().__init__({"wb_master": Out(WishboneSignature(wb_params))}) self.wb_params = wb_params - self.wb_layout = WishboneLayout(wb_params).wb_layout - self.wb_master = Record(self.wb_layout) self.method_layouts = WishboneMasterMethodLayout(wb_params) @@ -208,7 +203,7 @@ def _(arg): return m -class PipelinedWishboneMaster(Elaboratable): +class PipelinedWishboneMaster(Component): """Pipelined Wishbone bus master interface. Parameters @@ -220,7 +215,7 @@ class PipelinedWishboneMaster(Elaboratable): Attributes ---------- - wb: Record (like WishboneLayout) + wb: WishboneInterface Wishbone bus output. request: Method Transactional method to start a new Wishbone request. @@ -234,7 +229,10 @@ class PipelinedWishboneMaster(Elaboratable): True, if there are no requests waiting for response """ + wb: WishboneInterface + def __init__(self, wb_params: WishboneParameters, *, max_req: int = 8): + super().__init__({"wb": Out(WishboneSignature(wb_params))}) self.wb_params = wb_params self.max_req = max_req @@ -244,9 +242,6 @@ def __init__(self, wb_params: WishboneParameters, *, max_req: int = 8): self.requests_finished = Signal() - self.wb_layout = WishboneLayout(wb_params).wb_layout - self.wb = Record(self.wb_layout) - def generate_method_layouts(self, wb_params: WishboneParameters): # generate method layouts locally self.request_in_layout = [ @@ -307,17 +302,17 @@ def _(arg) -> None: return m -class WishboneMuxer(Elaboratable): +class WishboneMuxer(Component): """Wishbone Muxer. Connects one master to multiple slaves. Parameters ---------- - master_wb: Record (like WishboneLayout) - Record of master inteface. - slaves: List[Record] - List of connected slaves' Wishbone Records (like WishboneLayout). + wb_params: WishboneParameters + Parameters for bus generation. + num_slaves: int + Number of slave devices to multiplex. ssel_tga: Signal Signal that selects the slave to connect. Signal width is the number of slaves and each bit coresponds to a slave. This signal is a Wishbone TGA (address tag), so it needs to be valid every time Wishbone STB @@ -326,15 +321,29 @@ class WishboneMuxer(Elaboratable): different `ssel_tga` value, all pending request have to be finished (and `stall` cleared) and there have to be one cycle delay from previouse request (to deassert the STB signal). Holding new requests should be implemented in block that controlls `ssel_tga` signal, before the Wishbone Master. + + Attributes + ---------- + master_wb: WishboneInterface + Master inteface. + slaves: list of WishboneInterface + List of connected slaves' Wishbone interfaces. """ - def __init__(self, master_wb: Record, slaves: List[Record], ssel_tga: Signal): - self.master_wb = master_wb - self.slaves = slaves + master_wb: WishboneInterface + slaves: list[WishboneInterface] + + def __init__(self, wb_params: WishboneParameters, num_slaves: int, ssel_tga: Signal): + super().__init__( + { + "master_wb": Out(WishboneSignature(wb_params)), + "slaves": In(WishboneSignature(wb_params)).array(num_slaves), + } + ) self.sselTGA = ssel_tga select_bits = ssel_tga.shape().width - assert select_bits == len(slaves) + assert select_bits == num_slaves self.txn_sel = Signal(select_bits) self.txn_sel_r = Signal(select_bits) @@ -354,10 +363,9 @@ def elaborate(self, platform): for i in range(len(self.slaves)): # connect all M->S signals except stb - m.d.comb += self.master_wb.connect( - self.slaves[i], - include=["dat_w", "rst", "cyc", "lock", "adr", "we", "sel"], - ) + # workaround for the lack of selective connecting in wiring + for n in ["dat_w", "cyc", "lock", "adr", "we", "sel", "stb"]: + m.d.comb += getattr(self.slaves[i], n).eq(getattr(self.master_wb, n)) # use stb as select m.d.comb += self.slaves[i].stb.eq(self.txn_sel[i] & self.master_wb.stb) @@ -367,12 +375,14 @@ def elaborate(self, platform): m.d.comb += self.master_wb.rty.eq(reduce(operator.or_, [self.slaves[i].rty for i in range(len(self.slaves))])) for i in OneHotSwitchDynamic(m, self.txn_sel): # mux S->M data - m.d.comb += self.master_wb.connect(self.slaves[i], include=["dat_r", "stall"]) + # workaround for the lack of selective connecting in wiring + for n in ["dat_r", "stall"]: + m.d.comb += getattr(self.master_wb, n).eq(getattr(self.slaves[i], n)) return m # connects multiple masters to one slave -class WishboneArbiter(Elaboratable): +class WishboneArbiter(Component): """Wishbone Arbiter. Connects multiple masters to one slave. @@ -380,20 +390,34 @@ class WishboneArbiter(Elaboratable): Parameters ---------- - slave_wb: Record (like WishboneLayout) - Record of slave inteface. - masters: List[Record] - List of master interface Records. + wb_params: WishboneParameters + Parameters for bus generation. + num_slaves: int + Number of master devices. + + Attributes + ---------- + slave_wb: WishboneInterface + Slave inteface. + masters: list of WishboneInterface + List of master interfaces. """ - def __init__(self, slave_wb: Record, masters: List[Record]): - self.slave_wb = slave_wb - self.masters = masters + slave_wb: WishboneInterface + masters: list[WishboneInterface] + + def __init__(self, wb_params: WishboneParameters, num_masters: int): + super().__init__( + { + "slave_wb": In(WishboneSignature(wb_params)), + "masters": Out(WishboneSignature(wb_params)).array(num_masters), + } + ) self.prev_cyc = Signal() # Amaranth round robin singals self.arb_enable = Signal() - self.req_signal = Signal(len(masters)) + self.req_signal = Signal(num_masters) def elaborate(self, platform): m = TModule() @@ -417,7 +441,9 @@ def elaborate(self, platform): m.d.comb += self.masters[i].err.eq((m.submodules.rr.grant == i) & self.slave_wb.err) m.d.comb += self.masters[i].rty.eq((m.submodules.rr.grant == i) & self.slave_wb.rty) # remaining S->M signals are shared, master will only accept response if bus termination signal is present - m.d.comb += self.masters[i].connect(self.slave_wb, include=["dat_r", "stall"]) + # workaround for the lack of selective connecting in wiring + for n in ["dat_r", "stall"]: + m.d.comb += getattr(self.masters[i], n).eq(getattr(self.slave_wb, n)) # combine reset singnal m.d.comb += self.slave_wb.rst.eq(reduce(operator.or_, [self.masters[i].rst for i in range(len(self.masters))])) @@ -426,10 +452,9 @@ def elaborate(self, platform): with m.Switch(m.submodules.rr.grant): for i in range(len(self.masters)): with m.Case(i): - m.d.comb += self.masters[i].connect( - self.slave_wb, - include=["dat_w", "cyc", "lock", "adr", "we", "sel", "stb"], - ) + # workaround for the lack of selective connecting in wiring + for n in ["dat_w", "cyc", "lock", "adr", "we", "sel", "stb"]: + m.d.comb += getattr(self.slave_wb, n).eq(getattr(self.masters[i], n)) # Disable slave when round robin is not valid at start of new request # This prevents chaning grant and muxes during Wishbone cycle @@ -439,7 +464,7 @@ def elaborate(self, platform): return m -class WishboneMemorySlave(Elaboratable): +class WishboneMemorySlave(Component): """Wishbone slave with memory Wishbone slave interface with addressable memory underneath. @@ -454,11 +479,14 @@ class WishboneMemorySlave(Elaboratable): Attributes ---------- - bus: Record (like WishboneLayout) - Wishbone bus record. + bus: WishboneInterface + Wishbone bus interface. """ + bus: WishboneInterface + def __init__(self, wb_params: WishboneParameters, **kwargs): + super().__init__({"bus": In(WishboneSignature(wb_params))}) if "width" not in kwargs: kwargs["width"] = wb_params.data_width if kwargs["width"] not in (8, 16, 32, 64): @@ -470,7 +498,6 @@ def __init__(self, wb_params: WishboneParameters, **kwargs): raise RuntimeError("Granularity has to be one of: 8, 16, 32, 64") self.mem = Memory(**kwargs) - self.bus = Record(WishboneLayout(wb_params, master=False).wb_layout) def elaborate(self, platform): m = TModule() diff --git a/scripts/gen_verilog.py b/scripts/gen_verilog.py index 5696a15cf..e9c5b8707 100755 --- a/scripts/gen_verilog.py +++ b/scripts/gen_verilog.py @@ -14,10 +14,10 @@ sys.path.insert(0, parent) from coreblocks.params.genparams import GenParams -from coreblocks.peripherals.wishbone import WishboneBus +from coreblocks.peripherals.wishbone import WishboneSignature from coreblocks.core import Core from transactron import TransactionModule -from transactron.utils import flatten_signals, DependencyManager, DependencyContext +from transactron.utils import DependencyManager, DependencyContext from transactron.utils.gen import generate_verilog from coreblocks.params.configurations import * @@ -33,8 +33,8 @@ class Top(Elaboratable): def __init__(self, gen_params): self.gp: GenParams = gen_params - self.wb_instr = WishboneBus(self.gp.wb_params, name="wb_instr") - self.wb_data = WishboneBus(self.gp.wb_params, name="wb_data") + self.wb_instr = WishboneSignature(self.gp.wb_params).create() + self.wb_data = WishboneSignature(self.gp.wb_params).create() def elaborate(self, platform: Platform): m = Module() @@ -49,9 +49,14 @@ def gen_verilog(core_config: CoreConfiguration, output_path: str): with DependencyContext(DependencyManager()): gp = GenParams(core_config) top = Top(gp) - ports = list(flatten_signals(top.wb_instr)) + list(flatten_signals(top.wb_data)) - - verilog_text, gen_info = generate_verilog(top, ports) + instr_ports: list[Signal] = [getattr(top.wb_instr, name) for name in top.wb_instr.signature.members] + data_ports: list[Signal] = [getattr(top.wb_data, name) for name in top.wb_data.signature.members] + for sig in instr_ports: + sig.name = "wb_instr__" + sig.name + for sig in data_ports: + sig.name = "wb_data__" + sig.name + + verilog_text, gen_info = generate_verilog(top, instr_ports + data_ports) gen_info.encode(f"{output_path}.json") with open(output_path, "w") as f: diff --git a/scripts/synthesize.py b/scripts/synthesize.py index 73e317507..6c5c2f7eb 100755 --- a/scripts/synthesize.py +++ b/scripts/synthesize.py @@ -7,6 +7,7 @@ from amaranth.build import Platform from amaranth import * +from amaranth.lib.wiring import Flow if __name__ == "__main__": parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -26,7 +27,7 @@ from coreblocks.fu.zbs import ZbsComponent from transactron import TransactionModule from transactron.lib import AdapterBase, AdapterTrans -from coreblocks.peripherals.wishbone import WishboneArbiter, WishboneBus +from coreblocks.peripherals.wishbone import WishboneArbiter, WishboneInterface from constants.ecp5_platforms import ( ResourceBuilder, adapter_resources, @@ -45,7 +46,7 @@ class WishboneConnector(Elaboratable): - def __init__(self, wb: WishboneBus, number: int): + def __init__(self, wb: WishboneInterface, number: int): self.wb = wb self.number = number @@ -55,7 +56,12 @@ def elaborate(self, platform: Platform): pins = platform.request("wishbone", self.number) assert isinstance(pins, Record) - m.d.comb += self.wb.connect(pins) + for name in self.wb.signature.members: + member = self.wb.signature.members[name] + if member.flow == Flow.In: + m.d.comb += getattr(pins, name).o.eq(getattr(self.wb, name)) + else: + m.d.comb += getattr(self.wb, name).eq(getattr(pins, name).i) return m @@ -93,14 +99,13 @@ def elaborate(self, platform: Platform): def unit_core(gen_params: GenParams): resources = wishbone_resources(gen_params.wb_params) - wb_instr = WishboneBus(gen_params.wb_params) - wb_data = WishboneBus(gen_params.wb_params) + wb_arbiter = WishboneArbiter(gen_params.wb_params, 2) + wb_instr = wb_arbiter.masters[0] + wb_data = wb_arbiter.masters[1] - core = Core(gen_params=gen_params, wb_instr_bus=wb_instr, wb_data_bus=wb_data) + wb_connector = WishboneConnector(wb_arbiter.slave_wb, 0) - wb = WishboneBus(gen_params.wb_params) - wb_arbiter = WishboneArbiter(wb, [wb_instr, wb_data]) - wb_connector = WishboneConnector(wb, 0) + core = Core(gen_params=gen_params, wb_instr_bus=wb_instr, wb_data_bus=wb_data) module = ModuleConnector(core=core, wb_arbiter=wb_arbiter, wb_connector=wb_connector) diff --git a/stubs/amaranth/lib/wiring.pyi b/stubs/amaranth/lib/wiring.pyi new file mode 100644 index 000000000..9565301f9 --- /dev/null +++ b/stubs/amaranth/lib/wiring.pyi @@ -0,0 +1,1143 @@ +""" +This type stub file was generated by pyright. +""" + +import enum +from collections.abc import Mapping, Iterator +from typing import NoReturn, Literal, TypeVar, Generic, Any, Self, Optional, overload +from ..hdl._ir import Elaboratable +from .._utils import final +from transactron.utils._typing import ShapeLike, ValueLike, AbstractInterface, AbstractSignature, ModuleLike + +__all__ = ["In", "Out", "Signature", "PureInterface", "connect", "flipped", "Component"] + +_T_Signature = TypeVar("_T_Signature", bound=AbstractSignature) +_T_SignatureMembers = TypeVar("_T_SignatureMembers", bound=SignatureMembers) +_T_Interface = TypeVar("_T_Interface", bound=AbstractInterface) +_T = TypeVar("_T") + +class Flow(enum.Enum): + """Direction of data flow. This enumeration has two values, :attr:`Out` and :attr:`In`, + the meaning of which depends on the context in which they are used. + """ + Out = "out" + In = "in" + def flip(self) -> Flow: + """Flip the direction of data flow. + + Returns + ------- + :class:`Flow` + :attr:`In` if called as :pc:`Out.flip()`; :attr:`Out` if called as :pc:`In.flip()`. + """ + ... + + def __call__(self, description: Signature | ShapeLike, *, reset=...) -> Member: + """Create a :class:`Member` with this data flow and the provided description and + reset value. + + Returns + ------- + :class:`Member` + :pc:`Member(self, description, reset=reset)` + """ + ... + + def __repr__(self) -> Literal['Out', 'In']: + ... + + def __str__(self) -> str: + ... + + + +Out = Flow.Out +In = Flow.In + +@final +class Member: + """Description of a signature member. + + This class is a discriminated union: its instances describe either a `port member` or + a `signature member`, and accessing properties for the wrong kind of member raises + an :exc:`AttributeError`. + + The class is created from a `description`: a :class:`Signature` instance (in which case + the :class:`Member` is created as a signature member), or a :ref:`shape-like ` + object (in which case it is created as a port member). After creation the :class:`Member` + instance cannot be modified. + + When a :class:`Signal` is created from a description of a port member, the signal's reset value + is taken from the member description. If this signal is never explicitly assigned a value, it + will equal ``reset``. + + Although instances can be created directly, most often they will be created through + :data:`In` and :data:`Out`, e.g. :pc:`In(unsigned(1))` or :pc:`Out(stream.Signature(RGBPixel))`. + """ + def __init__(self, flow: Flow, description: Signature | ShapeLike, *, reset=..., _dimensions=...) -> None: + ... + + def flip(self) -> Member: + """Flip the data flow of this member. + + Returns + ------- + :class:`Member` + A new :pc:`member` with :pc:`member.flow` equal to :pc:`self.flow.flip()`, and identical + to :pc:`self` other than that. + """ + ... + + def array(self, *dimensions) -> Member: + """Add array dimensions to this member. + + The dimensions passed to this method are `prepended` to the existing dimensions. + For example, :pc:`Out(1).array(2)` describes an array of 2 elements, whereas both + :pc:`Out(1).array(2, 3)` and :pc:`Out(1).array(3).array(2)` both describe a two dimensional + array of 2 by 3 elements. + + Dimensions are passed to :meth:`array` in the order in which they would be indexed. + That is, :pc:`.array(x, y)` creates a member that can be indexed up to :pc:`[x-1][y-1]`. + + The :meth:`array` method is composable: calling :pc:`member.array(x)` describes an array of + :pc:`x` members even if :pc:`member` was already an array. + + Returns + ------- + :class:`Member` + A new :pc:`member` with :pc:`member.dimensions` extended by :pc:`dimensions`, and + identical to :pc:`self` other than that. + """ + ... + + @property + def flow(self) -> Flow: + """Data flow of this member. + + Returns + ------- + :class:`Flow` + """ + ... + + @property + def is_port(self) -> bool: + """Whether this is a description of a port member. + + Returns + ------- + :class:`bool` + :pc:`True` if this is a description of a port member, + :pc:`False` if this is a description of a signature member. + """ + ... + + @property + def is_signature(self) -> bool: + """Whether this is a description of a signature member. + + Returns + ------- + :class:`bool` + :pc:`True` if this is a description of a signature member, + :pc:`False` if this is a description of a port member. + """ + ... + + @property + def shape(self) -> ShapeLike: + """Shape of a port member. + + Returns + ------- + :ref:`shape-like object ` + The shape that was provided when constructing this :class:`Member`. + + Raises + ------ + :exc:`AttributeError` + If :pc:`self` describes a signature member. + """ + ... + + @property + def reset(self): # -> None: + """Reset value of a port member. + + Returns + ------- + :ref:`const-castable object ` + The reset value that was provided when constructing this :class:`Member`. + + Raises + ------ + :exc:`AttributeError` + If :pc:`self` describes a signature member. + """ + ... + + @property + def signature(self) -> Signature: + """Signature of a signature member. + + Returns + ------- + :class:`Signature` + The signature that was provided when constructing this :class:`Member`. + + Raises + ------ + :exc:`AttributeError` + If :pc:`self` describes a port member. + """ + ... + + @property + def dimensions(self) -> tuple[int, ...]: + """Array dimensions. + + A member will usually have no dimensions; in this case it does not describe an array. + A single dimension describes one-dimensional array, and so on. + + Returns + ------- + :class:`tuple` of :class:`int` + Dimensions, if any, of this member, from most to least major. + """ + ... + + def __eq__(self, other) -> bool: + ... + + def __repr__(self) -> str: + ... + + + +@final +class SignatureError(Exception): + """ + This exception is raised when an invalid operation specific to signature manipulation is + performed with :class:`SignatureMembers`, such as adding a member to a frozen signature. + Other exceptions, such as :exc:`TypeError` or :exc:`NameError`, will still be raised where + appropriate. + """ + ... + + +@final +class SignatureMembers(Mapping[str, Member]): + """Mapping of signature member names to their descriptions. + + This container, a :class:`collections.abc.Mapping`, is used to implement the :pc:`members` + attribute of signature objects. + + The keys in this container must be valid Python attribute names that are public (do not begin + with an underscore. The values must be instances of :class:`Member`. The container is mutable + in a restricted manner: new keys may be added, but existing keys may not be modified or removed. + In addition, the container can be `frozen`, which disallows addition of new keys. Freezing + a container recursively freezes the members of any signatures inside. + + In addition to the use of the superscript operator, multiple members can be added at once with + the :pc:`+=` opreator. + + The :meth:`create` method converts this mapping into a mapping of names to signature members + (signals and interface objects) by creating them from their descriptions. The created mapping + can be used to populate an interface object. + """ + def __init__(self, members: Mapping[str, Member]=...) -> None: + ... + + def flip(self) -> FlippedSignatureMembers[Self]: + """Flip the data flow of the members in this mapping. + + Returns + ------- + :class:`FlippedSignatureMembers` + Proxy collection :pc:`FlippedSignatureMembers(self)` that flips the data flow of + the members that are accessed using it. + """ + ... + + def __eq__(self, other) -> bool: + """Compare the members in this and another mapping. + + Returns + ------- + :class:`bool` + :pc:`True` if the mappings contain the same key-value pairs, :pc:`False` otherwise. + """ + ... + + def __contains__(self, name: str) -> bool: + """Check whether a member with a given name exists. + + Returns + ------- + :class:`bool` + """ + ... + + def __getitem__(self, name: str) -> Member: + """Retrieves the description of a member with a given name. + + Returns + ------- + :class:`Member` + + Raises + ------ + :exc:`TypeError` + If :pc:`name` is not a string. + :exc:`NameError` + If :pc:`name` is not a valid, public Python attribute name. + :exc:`SignatureError` + If a member called :pc:`name` does not exist in the collection. + """ + ... + + def __setitem__(self, name: str, member: Member) -> NoReturn: + """Stub that forbids addition of members to the collection. + + Raises + ------ + :exc:`SignatureError` + Always. + """ + ... + + def __delitem__(self, name: str) -> NoReturn: + """Stub that forbids removal of members from the collection. + + Raises + ------ + :exc:`SignatureError` + Always. + """ + ... + + def __iter__(self) -> Iterator[str]: + """Iterate through the names of members in the collection. + + Returns + ------- + iterator of :class:`str` + Names of members, in the order of insertion. + """ + ... + + def __len__(self) -> int: + ... + + def flatten(self, *, path: tuple[str | int, ...]=...) -> Iterator[tuple[tuple[str | int, ...], Member]]: + """Recursively iterate through this collection. + + .. note:: + + The :ref:`paths ` returned by this method and by :meth:`Signature.flatten` + differ. This method yields a single result for each :class:`Member` in the collection, + disregarding their dimensions: + + .. doctest:: + + >>> sig = wiring.Signature({ + ... "items": In(1).array(2) + ... }) + >>> list(sig.members.flatten()) + [(('items',), In(1).array(2))] + + The :meth:`Signature.flatten` method yields multiple results for such a member; see + the documentation for that method for an example. + + Returns + ------- + iterator of (:class:`tuple` of :class:`str`, :class:`Member`) + Pairs of :ref:`paths ` and the corresponding members. A path yielded by + this method is a tuple of strings where each item is a key through which the item may + be reached. + """ + ... + + def create(self, *, path: tuple[str | int, ...] =..., src_loc_at: int =...) -> dict[str, Any]: + """Create members from their descriptions. + + For each port member, this function creates a :class:`Signal` with the shape and reset + value taken from the member description, and the name constructed from + the :ref:`paths ` to the member (by concatenating path items with a double + underscore, ``__``). + + For each signature member, this function calls :meth:`Signature.create` for that signature. + The resulting object can have any type if a :class:`Signature` subclass overrides + the :class:`create` method. + + If the member description includes dimensions, in each case, instead of a single member, + a :class:`list` of members is created for each dimension. (That is, for a single dimension + a list of members is returned, for two dimensions a list of lists is returned, and so on.) + + Returns + ------- + dict of :class:`str` to :ref:`value-like ` or interface object or a potentially nested list of these + Mapping of names to actual signature members. + """ + ... + + def __repr__(self) -> str: + ... + + + +@final +class FlippedSignatureMembers(Mapping[str, Member], Generic[_T_SignatureMembers]): + """Mapping of signature member names to their descriptions, with the directions flipped. + + Although an instance of :class:`FlippedSignatureMembers` could be created directly, it will + be usually created by a call to :meth:`SignatureMembers.flip`. + + This container is a wrapper around :class:`SignatureMembers` that contains the same members + as the inner mapping, but flips their data flow when they are accessed. For example: + + .. testcode:: + + members = wiring.SignatureMembers({"foo": Out(1)}) + + flipped_members = members.flip() + assert flipped_members["foo"].flow == In + + This class implements the same methods, with the same functionality (other than the flipping of + the data flow), as the :class:`SignatureMembers` class; see the documentation for that class + for details. + """ + def __init__(self, unflipped: _T_SignatureMembers) -> None: + ... + + def flip(self) -> _T_SignatureMembers: + """ + Flips this mapping back to the original one. + + Returns + ------- + :class:`SignatureMembers` + :pc:`unflipped` + """ + ... + + def __eq__(self, other) -> bool: + """Compare the members in this and another mapping. + + Returns + ------- + :class:`bool` + :pc:`True` if the mappings contain the same key-value pairs, :pc:`False` otherwise. + """ + ... + + def __contains__(self, name: str) -> bool: + ... + + def __getitem__(self, name: str) -> Member: + ... + + def __setitem__(self, name: str, member: Member) -> NoReturn: + ... + + def __delitem__(self, name: str) -> NoReturn: + ... + + def __iter__(self) -> Iterator[str]: + ... + + def __len__(self) -> int: + ... + + def flatten(self, *, path: tuple[str | int, ...] = ...) -> Iterator[tuple[tuple[str | int, ...], Member]]: + """Recursively iterate through this collection. + + .. note:: + + The :ref:`paths ` returned by this method and by :meth:`Signature.flatten` + differ. This method yields a single result for each :class:`Member` in the collection, + disregarding their dimensions: + + .. doctest:: + + >>> sig = wiring.Signature({ + ... "items": In(1).array(2) + ... }) + >>> list(sig.members.flatten()) + [(('items',), In(1).array(2))] + + The :meth:`Signature.flatten` method yields multiple results for such a member; see + the documentation for that method for an example. + + Returns + ------- + iterator of (:class:`tuple` of :class:`str`, :class:`Member`) + Pairs of :ref:`paths ` and the corresponding members. A path yielded by + this method is a tuple of strings where each item is a key through which the item may + be reached. + """ + ... + + def create(self, *, path: tuple[str | int, ...] =..., src_loc_at: int =...) -> dict[str, Any]: + """Create members from their descriptions. + + For each port member, this function creates a :class:`Signal` with the shape and reset + value taken from the member description, and the name constructed from + the :ref:`paths ` to the member (by concatenating path items with a double + underscore, ``__``). + + For each signature member, this function calls :meth:`Signature.create` for that signature. + The resulting object can have any type if a :class:`Signature` subclass overrides + the :class:`create` method. + + If the member description includes dimensions, in each case, instead of a single member, + a :class:`list` of members is created for each dimension. (That is, for a single dimension + a list of members is returned, for two dimensions a list of lists is returned, and so on.) + + Returns + ------- + dict of :class:`str` to :ref:`value-like ` or interface object or a potentially nested list of these + Mapping of names to actual signature members. + """ + ... + + def __repr__(self) -> str: + ... + + + +class SignatureMeta(type): + """Metaclass for :class:`Signature` that makes :class:`FlippedSignature` its + 'virtual subclass'. + + The object returned by :meth:`Signature.flip` is an instance of :class:`FlippedSignature`. + It implements all of the methods :class:`Signature` has, and for subclasses of + :class:`Signature`, it implements all of the methods defined on the subclass as well. + This makes it effectively a subtype of :class:`Signature` (or a derived class of it), but this + relationship is not captured by the Python type system: :class:`FlippedSignature` only has + :class:`object` as its base class. + + This metaclass extends :func:`issubclass` and :func:`isinstance` so that they take into + account the subtyping relationship between :class:`Signature` and :class:`FlippedSignature`, + described below. + """ + def __subclasscheck__(cls, subclass) -> bool: + """ + Override of :pc:`issubclass(cls, Signature)`. + + In addition to the standard behavior of :func:`issubclass`, this override makes + :class:`FlippedSignature` a subclass of :class:`Signature` or any of its subclasses. + """ + ... + + def __instancecheck__(cls, instance) -> bool: + """ + Override of :pc:`isinstance(obj, Signature)`. + + In addition to the standard behavior of :func:`isinstance`, this override makes + :pc:`isinstance(obj, cls)` act as :pc:`isinstance(obj.flip(), cls)` where + :pc:`obj` is an instance of :class:`FlippedSignature`. + """ + ... + + + +class Signature(metaclass=SignatureMeta): + """Description of an interface object. + + An interface object is a Python object that has a :pc:`signature` attribute containing + a :class:`Signature` object, as well as an attribute for every member of its signature. + Signatures and interface objects are tightly linked: an interface object can be created out + of a signature, and the signature is used when :func:`connect` ing two interface objects + together. See the :ref:`introduction to interfaces ` for a more detailed + explanation of why this is useful. + + :class:`Signature` can be used as a base class to define :ref:`customized ` + signatures and interface objects. + + .. important:: + + :class:`Signature` objects are immutable. Classes inheriting from :class:`Signature` must + ensure this remains the case when additional functionality is added. + """ + def __init__(self, members: Mapping[str, Member]) -> None: + ... + + def flip(self) -> FlippedSignature[Self]: + """Flip the data flow of the members in this signature. + + Returns + ------- + :class:`FlippedSignature` + Proxy object :pc:`FlippedSignature(self)` that flips the data flow of the attributes + corresponding to the members that are accessed using it. + + See the documentation for the :class:`FlippedSignature` class for a detailed discussion + of how this proxy object works. + """ + ... + + @property + def members(self) -> SignatureMembers: + """Members in this signature. + + Returns + ------- + :class:`SignatureMembers` + """ + ... + + def __eq__(self, other) -> bool: + """Compare this signature with another. + + The behavior of this operator depends on the types of the arguments. If both :pc:`self` + and :pc:`other` are instances of the base :class:`Signature` class, they are compared + structurally (the result is :pc:`self.members == other.members`); otherwise they are + compared by identity (the result is :pc:`self is other`). + + Subclasses of :class:`Signature` are expected to override this method to take into account + the specifics of the domain. If the subclass has additional properties that do not influence + the :attr:`members` dictionary but nevertheless make its instance incompatible with other + instances (for example, whether the feedback is combinational or registered), + the overridden method must take that into account. + + Returns + ------- + :class:`bool` + """ + ... + + def flatten(self, obj) -> Iterator[tuple[tuple[str | int, ...], Flow, ValueLike]]: + """Recursively iterate through this signature, retrieving member values from an interface + object. + + .. note:: + + The :ref:`paths ` returned by this method and by + :meth:`SignatureMembers.flatten` differ. This method yield several results for each + :class:`Member` in the collection that has a dimension: + + .. doctest:: + :options: +NORMALIZE_WHITESPACE + + >>> sig = wiring.Signature({ + ... "items": In(1).array(2) + ... }) + >>> obj = sig.create() + >>> list(sig.flatten(obj)) + [(('items', 0), In(1), (sig obj__items__0)), + (('items', 1), In(1), (sig obj__items__1))] + + The :meth:`SignatureMembers.flatten` method yields one result for such a member; see + the documentation for that method for an example. + + Returns + ------- + iterator of (:class:`tuple` of :class:`str` or :class:`int`, :class:`Flow`, :ref:`value-like `) + Tuples of :ref:`paths `, flow, and the corresponding member values. A path + yielded by this method is a tuple of strings or integers where each item is an attribute + name or index (correspondingly) using which the member value was retrieved. + """ + ... + + def is_compliant(self, obj, *, reasons: Optional[list[str]] =..., path: tuple[str, ...] =...) -> bool: + """Check whether an object matches the description in this signature. + + This module places few restrictions on what an interface object may be; it does not + prescribe a specific base class or a specific way of constructing the object, only + the values that its attributes should have. This method ensures consistency between + the signature and the interface object, checking every aspect of the provided interface + object for compliance with the signature. + + It verifies that: + + * :pc:`obj` has a :pc:`signature` attribute whose value a :class:`Signature` instance + such that ``self == obj.signature``; + * for each member, :pc:`obj` has an attribute with the same name, whose value: + + * for members with :meth:`dimensions ` specified, contains a list or + a tuple (or several levels of nested lists or tuples, for multiple dimensions) + satisfying the requirements below; + * for port members, is a :ref:`value-like ` object casting to + a :class:`Signal` or a :class:`Const` whose width and signedness is the same as that + of the member, and (in case of a :class:`Signal`) which is not reset-less and whose + reset value is that of the member; + * for signature members, matches the description in the signature as verified by + :meth:`Signature.is_compliant`. + + If the verification fails, this method reports the reason(s) by filling the :pc:`reasons` + container. These reasons are intended to be human-readable: more than one reason may be + reported but only in cases where this is helpful (e.g. the same error message will not + repeat 10 times for each of the 10 ports in a list). + + Arguments + --------- + reasons : :class:`list` or :pc:`None` + If provided, a container that receives diagnostic messages. + path : :class:`tuple` of :class:`str` + The :ref:`path ` to :pc:`obj`. Could be set to improve diagnostic + messages if :pc:`obj` is nested within another object, or for clarity. + + Returns + ------- + :class:`bool` + :pc:`True` if :pc:`obj` matches the description in this signature, :pc:`False` + otherwise. If :pc:`False` and :pc:`reasons` was not :pc:`None`, it will contain + a detailed explanation why. + """ + ... + + def create(self, *, path: tuple[str | int, ...]=..., src_loc_at: int =...) -> AbstractInterface[Self]: + """Create an interface object from this signature. + + The default :meth:`Signature.create` implementation consists of one line: + + .. code:: + + def create(self, *, path=None, src_loc_at=0): + return PureInterface(self, path=path, src_loc_at=1 + src_loc_at) + + This implementation creates an interface object from this signature that serves purely + as a container for the attributes corresponding to the signature members, and implements + no behavior. Such an implementation is sufficient for signatures created ad-hoc using + the :pc:`Signature({ ... })` constructor as well as simple signature subclasses. + + When defining a :class:`Signature` subclass that needs to customize the behavior of + the created interface objects, override this method with a similar implementation + that references the class of your custom interface object: + + .. testcode:: + + class CustomSignature(wiring.Signature): + def create(self, *, path=None, src_loc_at=0): + return CustomInterface(self, path=path, src_loc_at=1 + src_loc_at) + + class CustomInterface(wiring.PureInterface): + @property + def my_property(self): + ... + + The :pc:`path` and :pc:`src_loc_at` arguments are necessary to ensure the generated signals + have informative names and accurate source location information. + + The custom :meth:`create` method may take positional or keyword arguments in addition to + the two listed above. Such arguments must have a default value, because + the :meth:`SignatureMembers.create` method will call the :meth:`Signature.create` member + without these additional arguments when this signature is a member of another signature. + """ + ... + + def __repr__(self) -> str: + ... + + + +@final +class FlippedSignature(Generic[_T_Signature]): + """Description of an interface object, with the members' directions flipped. + + Although an instance of :class:`FlippedSignature` could be created directly, it will be usually + created by a call to :meth:`Signature.flip`. + + This proxy is a wrapper around :class:`Signature` that contains the same description as + the inner mapping, but flips the members' data flow when they are accessed. It is useful + because :class:`Signature` objects are mutable and may include custom behavior, and if one was + copied (rather than wrapped) by :meth:`Signature.flip`, the wrong object would be mutated, and + custom behavior would be unavailable. + + For example: + + .. testcode:: + + sig = wiring.Signature({"foo": Out(1)}) + + flipped_sig = sig.flip() + assert flipped_sig.members["foo"].flow == In + + sig.attr = 1 + assert flipped_sig.attr == 1 + flipped_sig.attr += 1 + assert sig.attr == flipped_sig.attr == 2 + + This class implements the same methods, with the same functionality (other than the flipping of + the members' data flow), as the :class:`Signature` class; see the documentation for that class + for details. + + It is not possible to inherit from :class:`FlippedSignature` and :meth:`Signature.flip` must not + be overridden. If a :class:`Signature` subclass defines a method and this method is called on + a flipped instance of the subclass, it receives the flipped instance as its :pc:`self` argument. + To distinguish being called on the flipped instance from being called on the unflipped one, use + :pc:`isinstance(self, FlippedSignature)`: + + .. testcode:: + + class SignatureKnowsWhenFlipped(wiring.Signature): + @property + def is_flipped(self): + return isinstance(self, wiring.FlippedSignature) + + sig = SignatureKnowsWhenFlipped({}) + assert sig.is_flipped == False + assert sig.flip().is_flipped == True + """ + def __init__(self, signature: _T_Signature) -> None: + ... + + def flip(self) -> _T_Signature: + """ + Flips this signature back to the original one. + + Returns + ------- + :class:`Signature` + :pc:`unflipped` + """ + ... + + @property + def members(self) -> FlippedSignatureMembers: + ... + + def __eq__(self, other) -> bool: + ... + + def flatten(self, obj) -> Iterator[tuple[tuple[str | int, ...], Flow, ValueLike]]: + ... + + def is_compliant(self, obj, *, reasons: Optional[list[str]] =..., path: tuple[str, ...] =...) -> bool: + ... + + def __getattr__(self, name) -> Any: + """Retrieves attribute or method :pc:`name` of the unflipped signature. + + Performs :pc:`getattr(unflipped, name)`, ensuring that, if :pc:`name` refers to a property + getter or a method, its :pc:`self` argument receives the *flipped* signature. A class + method's :pc:`cls` argument receives the class of the *unflipped* signature, as usual. + """ + ... + + def __setattr__(self, name, value) -> None: + """Assigns attribute :pc:`name` of the unflipped signature to ``value``. + + Performs :pc:`setattr(unflipped, name, value)`, ensuring that, if :pc:`name` refers to + a property setter, its :pc:`self` argument receives the flipped signature. + """ + ... + + def __delattr__(self, name) -> None: + """Removes attribute :pc:`name` of the unflipped signature. + + Performs :pc:`delattr(unflipped, name)`, ensuring that, if :pc:`name` refers to a property + deleter, its :pc:`self` argument receives the flipped signature. + """ + ... + + def create(self, *args, path: tuple[str | int, ...] =..., src_loc_at: int =..., **kwargs) -> FlippedInterface: + ... + + def __repr__(self) -> str: + ... + + + +class PureInterface(Generic[_T_Signature]): + """A helper for constructing ad-hoc interfaces. + + The :class:`PureInterface` helper primarily exists to be used by the default implementation of + :meth:`Signature.create`, but it can also be used in any other context where an interface + object needs to be created without the overhead of defining a class for it. + + .. important:: + + Any object can be an interface object; it only needs a :pc:`signature` property containing + a compliant signature. It is **not** necessary to use :class:`PureInterface` in order to + create an interface object, but it may be used either directly or as a base class whenever + it is convenient to do so. + """ + signature: _T_Signature + + def __init__(self, signature: _T_Signature, *, path: tuple[str | int, ...]=..., src_loc_at: int =...) -> None: + """Create attributes from a signature. + + The sole method defined by this helper is its constructor, which only defines + the :pc:`self.signature` attribute as well as the attributes created from the signature + members: + + .. code:: + + def __init__(self, signature, *, path): + self.__dict__.update({ + "signature": signature, + **signature.members.create(path=path) + }) + + .. note:: + + This implementation can be copied and reused in interface objects that *do* include + custom behavior, if the signature serves as the source of truth for attributes + corresponding to its members. Although it is less repetitive, this approach can confuse + IDEs and type checkers. + """ + ... + + def __repr__(self) -> str: + ... + + + +@final +class FlippedInterface(Generic[_T_Signature, _T_Interface]): + """An interface object, with its members' directions flipped. + + An instance of :class:`FlippedInterface` should only be created by calling :func:`flipped`, + which ensures that a :pc:`FlippedInterface(FlippedInterface(...))` object is never created. + + This proxy wraps any interface object and forwards attribute and method access to the wrapped + interface object while flipping its signature and the values of any attributes corresponding to + interface members. It is useful because interface objects may be mutable or include custom + behavior, and explicitly keeping track of whether the interface object is flipped would be very + burdensome. + + For example: + + .. testcode:: + + intf = wiring.PureInterface(wiring.Signature({"foo": Out(1)}), path=()) + + flipped_intf = wiring.flipped(intf) + assert flipped_intf.signature.members["foo"].flow == In + + intf.attr = 1 + assert flipped_intf.attr == 1 + flipped_intf.attr += 1 + assert intf.attr == flipped_intf.attr == 2 + + It is not possible to inherit from :class:`FlippedInterface`. If an interface object class + defines a method or a property and it is called on the flipped interface object, the method + receives the flipped interface object as its :pc:`self` argument. To distinguish being called + on the flipped interface object from being called on the unflipped one, use + :pc:`isinstance(self, FlippedInterface)`: + + .. testcode:: + + class InterfaceKnowsWhenFlipped: + signature = wiring.Signature({}) + + @property + def is_flipped(self): + return isinstance(self, wiring.FlippedInterface) + + intf = InterfaceKnowsWhenFlipped() + assert intf.is_flipped == False + assert wiring.flipped(intf).is_flipped == True + """ + def __init__(self, interface: _T_Interface) -> None: + ... + + # not true -- this is a property -- but required for clean typing + signature: _T_Signature +# @property +# def signature(self) -> _T_Signature: +# """Signature of the flipped interface. +# +# Returns +# ------- +# Signature +# :pc:`unflipped.signature.flip()` +# """ +# ... + + def __eq__(self, other) -> bool: + """Compare this flipped interface with another. + + Returns + ------- + bool + :pc:`True` if :pc:`other` is an instance :pc:`FlippedInterface(other_unflipped)` where + :pc:`unflipped == other_unflipped`, :pc:`False` otherwise. + """ + ... + + def __getattr__(self, name) -> Any: + """Retrieves attribute or method :pc:`name` of the unflipped interface. + + Performs :pc:`getattr(unflipped, name)`, with the following caveats: + + 1. If :pc:`name` refers to a signature member, the returned interface object is flipped. + 2. If :pc:`name` refers to a property getter or a method, its :pc:`self` argument receives + the *flipped* interface. A class method's :pc:`cls` argument receives the class of + the *unflipped* interface, as usual. + """ + ... + + def __setattr__(self, name, value) -> None: + """Assigns attribute :pc:`name` of the unflipped interface to ``value``. + + Performs :pc:`setattr(unflipped, name, value)`, with the following caveats: + + 1. If :pc:`name` refers to a signature member, the assigned interface object is flipped. + 2. If :pc:`name` refers to a property setter, its :pc:`self` argument receives the flipped + interface. + """ + ... + + def __delattr__(self, name) -> None: + """Removes attribute :pc:`name` of the unflipped interface. + + Performs :pc:`delattr(unflipped, name)`, ensuring that, if :pc:`name` refers to a property + deleter, its :pc:`self` argument receives the flipped interface. + """ + ... + + def __repr__(self) -> str: + ... + +@overload +def flipped(interface: FlippedInterface[_T_Signature, _T_Interface]) -> _T_Interface: + ... + +# Can't be typed nicer for now. +@overload +def flipped(interface: _T_Interface) -> FlippedInterface[Any, _T_Interface]: + ... + +def flipped(interface: _T_Interface) -> _T_Interface | FlippedInterface[Any, _T_Interface]: + """ + Flip the data flow of the members of the interface object :pc:`interface`. + + If an interface object is flipped twice, returns the original object: + :pc:`flipped(flipped(interface)) is interface`. Otherwise, wraps :pc:`interface` in + a :class:`FlippedInterface` proxy object that flips the directions of its members. + + See the documentation for the :class:`FlippedInterface` class for a detailed discussion of how + this proxy object works. + """ + ... + +@final +class ConnectionError(Exception): + """Exception raised when the :func:`connect` function is requested to perform an impossible, + meaningless, or forbidden connection.""" + ... + + +def connect(m: ModuleLike, *args: AbstractInterface, **kwargs: AbstractInterface) -> None: + """Connect interface objects to each other. + + This function creates connections between ports of several interface objects. (Any number of + interface objects may be provided; in most cases it is two.) + + The connections can be made only if all of the objects satisfy a number of requirements: + + * Every interface object must have the same set of port members, and they must have the same + :meth:`dimensions `. + * For each path, the port members of every interface object must have the same width and reset + value (for port members corresponding to signals) or constant value (for port members + corresponding to constants). Signedness may differ. + * For each path, at most one interface object must have the corresponding port member be + an output. + * For a given path, if any of the interface objects has an input port member corresponding + to a constant value, then the rest of the interface objects must have output port members + corresponding to the same constant value. + + For example, if :pc:`obj1` is being connected to :pc:`obj2` and :pc:`obj3`, and :pc:`obj1.a.b` + is an output, then :pc:`obj2.a.b` and :pc:`obj2.a.b` must exist and be inputs. If :pc:`obj2.c` + is an input and its value is :pc:`Const(1)`, then :pc:`obj1.c` and :pc:`obj3.c` must be outputs + whose value is also :pc:`Const(1)`. If no ports besides :pc:`obj1.a.b` and :pc:`obj1.c` exist, + then no ports except for those two must exist on :pc:`obj2` and :pc:`obj3` either. + + Once it is determined that the interface objects can be connected, this function performs + an equivalent of: + + .. code:: + + m.d.comb += [ + in1.eq(out1), + in2.eq(out1), + ... + ] + + Where :pc:`out1` is an output and :pc:`in1`, :pc:`in2`, ... are the inputs that have the same + path. (If no interface object has an output for a given path, **no connection at all** is made.) + + The positions (within :pc:`args`) or names (within :pc:`kwargs`) of the arguments do not affect + the connections that are made. There is no difference in behavior between :pc:`connect(m, a, b)` + and :pc:`connect(m, b, a)` or :pc:`connect(m, arbiter=a, decoder=b)`. The names of the keyword + arguments serve only a documentation purpose: they clarify the diagnostic messages when + a connection cannot be made. + """ + ... + +class Component(Elaboratable, Generic[_T_Signature]): + """Base class for elaboratable interface objects. + + A component is an :class:`Elaboratable` whose interaction with other parts of the design is + defined by its signature. Most if not all elaboratables in idiomatic Amaranth code should be + components, as the signature clarifies the direction of data flow at their boundary. See + the :ref:`introduction to interfaces ` section for a practical guide to defining + and using components. + + There are two ways to define a component. If all instances of a component have the same + signature, it can be defined using :term:`variable annotations `: + + .. testcode:: + + class FixedComponent(wiring.Component): + en: In(1) + data: Out(8) + + The variable annotations are collected by the constructor :meth:`Component.__init__`. Only + public (not starting with ``_``) annotations with :class:`In ` or :class:`Out ` + objects are considered; all other annotations are ignored under the assumption that they are + interpreted by some other tool. + + It is possible to use inheritance to extend a component: the component's signature is composed + from the variable annotations in the class that is being constructed as well as all of its + base classes. It is an error to have more than one variable annotation for the same attribute. + + If different instances of a component may need to have different signatures, variable + annotations cannot be used. In this case, the constructor should be overridden, and + the computed signature members should be provided to the superclass constructor: + + .. testcode:: + + class ParametricComponent(wiring.Component): + def __init__(self, data_width): + super().__init__({ + "en": In(1), + "data": Out(data_width) + }) + + It is also possible to pass a :class:`Signature` instance to the superclass constructor. + + Aside from initializing the :attr:`signature` attribute, the :meth:`Component.__init__` + constructor creates attributes corresponding to all of the members defined in the signature. + If an attribute with the same name as that of a member already exists, an error is raied. + + Raises + ------ + :exc:`TypeError` + If the :pc:`signature` object is neither a :class:`Signature` nor a :class:`dict`. + If neither variable annotations nor the :pc:`signature` argument are present, or if + both are present. + :exc:`NameError` + If a name conflict is detected between two variable annotations, or between a member + and an existing attribute. + """ + def __init__(self, signature: Optional[_T_Signature | dict[str, Member]] = None) -> None: + ... + + @property + def signature(self) -> _T_Signature: + """The signature of the component. + + .. important:: + + Do not override this property. Once a component is constructed, its :attr:`signature` + property must always return the same :class:`Signature` instance. The constructor + can be used to customize a component's signature. + """ + ... + + + diff --git a/test/peripherals/test_axi_lite.py b/test/peripherals/test_axi_lite.py index e1d52c107..27821c899 100644 --- a/test/peripherals/test_axi_lite.py +++ b/test/peripherals/test_axi_lite.py @@ -6,7 +6,7 @@ class AXILiteInterfaceWrapper: - def __init__(self, axi_lite_master: Record): + def __init__(self, axi_lite_master: AXILiteInterface): self.axi_lite = axi_lite_master def slave_ra_ready(self, rdy=1): diff --git a/test/peripherals/test_wishbone.py b/test/peripherals/test_wishbone.py index a8332f62b..4dd5485ed 100644 --- a/test/peripherals/test_wishbone.py +++ b/test/peripherals/test_wishbone.py @@ -1,6 +1,8 @@ import random from collections import deque +from amaranth.lib.wiring import connect + from coreblocks.peripherals.wishbone import * from transactron.lib import AdapterTrans @@ -9,8 +11,8 @@ class WishboneInterfaceWrapper: - def __init__(self, wishbone_record): - self.wb = wishbone_record + def __init__(self, wishbone_interface: WishboneInterface): + self.wb = wishbone_interface def master_set(self, addr, data, we): yield self.wb.dat_w.eq(data) @@ -142,10 +144,10 @@ def slave(): class TestWishboneMuxer(TestCaseWithSimulator): def test_manual(self): - wb_master = WishboneInterfaceWrapper(Record(WishboneLayout(WishboneParameters()).wb_layout)) num_slaves = 4 - slaves = [WishboneInterfaceWrapper(Record.like(wb_master.wb, name=f"sl{i}")) for i in range(num_slaves)] - mux = WishboneMuxer(wb_master.wb, [s.wb for s in slaves], Signal(num_slaves)) + mux = WishboneMuxer(WishboneParameters(), num_slaves, Signal(num_slaves)) + slaves = [WishboneInterfaceWrapper(slave) for slave in mux.slaves] + wb_master = WishboneInterfaceWrapper(mux.master_wb) def process(): # check full communiaction @@ -183,9 +185,9 @@ def process(): class TestWishboneAribiter(TestCaseWithSimulator): def test_manual(self): - slave = WishboneInterfaceWrapper(Record(WishboneLayout(WishboneParameters()).wb_layout)) - masters = [WishboneInterfaceWrapper(Record.like(slave.wb, name=f"mst{i}")) for i in range(2)] - arb = WishboneArbiter(slave.wb, [m.wb for m in masters]) + arb = WishboneArbiter(WishboneParameters(), 2) + slave = WishboneInterfaceWrapper(arb.slave_wb) + masters = [WishboneInterfaceWrapper(master) for master in arb.masters] def process(): yield from masters[0].master_set(2, 3, 1) @@ -319,7 +321,7 @@ def elaborate(self, platform): m.submodules.request = self.request = TestbenchIO(AdapterTrans(self.mem_master.request)) m.submodules.result = self.result = TestbenchIO(AdapterTrans(self.mem_master.result)) - m.d.comb += self.mem_master.wb_master.connect(self.mem_slave.bus) + connect(m, self.mem_master.wb_master, self.mem_slave.bus) return m diff --git a/test/regression/pysim.py b/test/regression/pysim.py index 424d83d8e..7eccf3c9d 100644 --- a/test/regression/pysim.py +++ b/test/regression/pysim.py @@ -18,7 +18,7 @@ from coreblocks.core import Core from coreblocks.params import GenParams from coreblocks.params.configurations import full_core_config -from coreblocks.peripherals.wishbone import WishboneBus +from coreblocks.peripherals.wishbone import WishboneSignature class PySimulation(SimulationBackend): @@ -131,8 +131,8 @@ def pretty_dump_metrics(self, metric_values: dict[str, dict[str, int]], filter_r async def run(self, mem_model: CoreMemoryModel, timeout_cycles: int = 5000) -> SimulationExecutionResult: with DependencyContext(DependencyManager()): - wb_instr_bus = WishboneBus(self.gp.wb_params) - wb_data_bus = WishboneBus(self.gp.wb_params) + wb_instr_bus = WishboneSignature(self.gp.wb_params).create() + wb_data_bus = WishboneSignature(self.gp.wb_params).create() core = Core(gen_params=self.gp, wb_instr_bus=wb_instr_bus, wb_data_bus=wb_data_bus) wb_instr_ctrl = WishboneInterfaceWrapper(wb_instr_bus) diff --git a/test/test_core.py b/test/test_core.py index 8bf5c8f1b..a2cfd1d88 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -1,4 +1,5 @@ from amaranth import Elaboratable, Module +from amaranth.lib.wiring import connect from transactron.lib import AdapterTrans from transactron.utils import align_to_power_of_two, signed_to_int @@ -8,7 +9,7 @@ from coreblocks.core import Core from coreblocks.params import GenParams from coreblocks.params.configurations import CoreConfiguration, basic_core_config, full_core_config -from coreblocks.peripherals.wishbone import WishboneBus, WishboneMemorySlave +from coreblocks.peripherals.wishbone import WishboneSignature, WishboneMemorySlave from typing import Optional import random @@ -33,8 +34,8 @@ def __init__(self, gen_params: GenParams, instr_mem: list[int] = [0], data_mem: def elaborate(self, platform): m = Module() - wb_instr_bus = WishboneBus(self.gen_params.wb_params) - wb_data_bus = WishboneBus(self.gen_params.wb_params) + wb_instr_bus = WishboneSignature(self.gen_params.wb_params).create() + wb_data_bus = WishboneSignature(self.gen_params.wb_params).create() # Align the size of the memory to the length of a cache line. instr_mem_depth = align_to_power_of_two(len(self.instr_mem), self.gen_params.icache_params.block_size_bits) @@ -54,8 +55,8 @@ def elaborate(self, platform): m.submodules.io_in = self.io_in m.submodules.interrupt = self.interrupt - m.d.comb += wb_instr_bus.connect(self.wb_mem_slave.bus) - m.d.comb += wb_data_bus.connect(self.wb_mem_slave_data.bus) + connect(m, wb_instr_bus, self.wb_mem_slave.bus) + connect(m, wb_data_bus, self.wb_mem_slave_data.bus) return m diff --git a/transactron/utils/_typing.py b/transactron/utils/_typing.py index 8f42c1910..a44dc6610 100644 --- a/transactron/utils/_typing.py +++ b/transactron/utils/_typing.py @@ -14,11 +14,12 @@ Any, TYPE_CHECKING, ) -from collections.abc import Iterable, Mapping, Sequence +from collections.abc import Iterable, Iterator, Mapping, Sequence from contextlib import AbstractContextManager from enum import Enum from amaranth import * from amaranth.lib.data import StructLayout, View +from amaranth.lib.wiring import Flow, Member from amaranth.hdl import ShapeCastable, ValueCastable from amaranth.hdl.rec import Direction, Layout @@ -86,6 +87,7 @@ GraphCC: TypeAlias = set[T] +# Protocols for Amaranth classes class _ModuleBuilderDomainsLike(Protocol): def __getattr__(self, name: str) -> "_ModuleBuilderDomain": ... @@ -143,6 +145,74 @@ def next(self, name: str) -> None: ... +class AbstractSignatureMembers(Protocol): + def flip(self) -> "AbstractSignatureMembers": + ... + + def __eq__(self, other) -> bool: + ... + + def __contains__(self, name: str) -> bool: + ... + + def __getitem__(self, name: str) -> Member: + ... + + def __setitem__(self, name: str, member: Member) -> NoReturn: + ... + + def __delitem__(self, name: str) -> NoReturn: + ... + + def __iter__(self) -> Iterator[str]: + ... + + def __len__(self) -> int: + ... + + def flatten(self, *, path: tuple[str | int, ...] = ...) -> Iterator[tuple[tuple[str | int, ...], Member]]: + ... + + def create(self, *, path: tuple[str | int, ...] = ..., src_loc_at: int = ...) -> dict[str, Any]: + ... + + def __repr__(self) -> str: + ... + + +class AbstractSignature(Protocol): + def flip(self) -> "AbstractSignature": + ... + + @property + def members(self) -> AbstractSignatureMembers: + ... + + def __eq__(self, other) -> bool: + ... + + def flatten(self, obj) -> Iterator[tuple[tuple[str | int, ...], Flow, ValueLike]]: + ... + + def is_compliant(self, obj, *, reasons: Optional[list[str]] = ..., path: tuple[str, ...] = ...) -> bool: + ... + + def create( + self, *, path: tuple[str | int, ...] = ..., src_loc_at: int = ... + ) -> "AbstractInterface[AbstractSignature]": + ... + + def __repr__(self) -> str: + ... + + +_T_AbstractSignature = TypeVar("_T_AbstractSignature", bound=AbstractSignature) + + +class AbstractInterface(Protocol, Generic[_T_AbstractSignature]): + signature: _T_AbstractSignature + + class HasElaborate(Protocol): def elaborate(self, platform) -> "HasElaborate": ...