From afd1f29854ed423f24f6714752ae3d5a23c3b224 Mon Sep 17 00:00:00 2001 From: Mateusz Marszalek Date: Sun, 26 Nov 2023 21:19:59 +0100 Subject: [PATCH] Added AXI-Lite master --- coreblocks/peripherals/axi_lite.py | 283 +++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 coreblocks/peripherals/axi_lite.py diff --git a/coreblocks/peripherals/axi_lite.py b/coreblocks/peripherals/axi_lite.py new file mode 100644 index 000000000..b6b7c2a94 --- /dev/null +++ b/coreblocks/peripherals/axi_lite.py @@ -0,0 +1,283 @@ +from amaranth import * +from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT +from transactron import Method, def_method, TModule +from transactron.core import Transaction +from transactron.lib.connectors import Forwarder +from enum import Enum, unique + + +class AXILiteParameters: + """Parameters of the AXI-Lite bus. + + Parameters + ---------- + data_width: int + Width of "data" signals for "write data" and "read data" channels. Must be either 32 or 64 bits. Defaults to 64 + addr_width: int + Width of "addr" signals for "write address" and "read address" channels. Defaults to 64 bits. + """ + + def __init__(self, *, data_width=64, addr_width=64): + self.data_width = data_width + self.addr_width = addr_width + + +class AXILiteLayout: + """AXI-Lite bus layout generator + + Parameters + ---------- + axil_params: AXILiteParameters + Patameters used to generate AXI-Lite layout + master: Boolean + Whether the layout should be generated for master side + (if false it's generatd for the slave side) + + Attributes + ---------- + axil_layout: Record + Record of a AXI-Lite bus. + """ + + def __init__(self, axil_params, *, master=True): + write_address = [ + ("val", 1, DIR_FANOUT if master else DIR_FANIN), + ("rdy", 1, DIR_FANIN if master else DIR_FANOUT), + ("addr", axil_params.addr_width, DIR_FANOUT if master else DIR_FANIN), + ("prot", 3, DIR_FANOUT if master else DIR_FANIN), + ] + + write_data = [ + ("val", 1, DIR_FANOUT if master else DIR_FANIN), + ("rdy", 1, DIR_FANIN if master else DIR_FANOUT), + ("data", axil_params.data_width, DIR_FANOUT if master else DIR_FANIN), + ("strb", axil_params.data_width // 8, DIR_FANOUT if master else DIR_FANIN), + ] + + write_response = [ + ("val", 1, DIR_FANIN if master else DIR_FANOUT), + ("rdy", 1, DIR_FANOUT if master else DIR_FANIN), + ("resp", 2, DIR_FANIN if master else DIR_FANOUT), + ] + + read_address = [ + ("val", 1, DIR_FANOUT if master else DIR_FANIN), + ("rdy", 1, DIR_FANIN if master else DIR_FANOUT), + ("addr", axil_params.addr_width, DIR_FANOUT if master else DIR_FANIN), + ("prot", 3, DIR_FANOUT if master else DIR_FANIN), + ] + + read_data = [ + ("val", 1, DIR_FANIN if master else DIR_FANOUT), + ("rdy", 1, DIR_FANOUT if master else DIR_FANIN), + ("data", axil_params.data_width, DIR_FANIN if master else DIR_FANOUT), + ("resp", 2, DIR_FANIN if master else DIR_FANOUT), + ] + + self.axil_layout = [ + ("write_address", write_address), + ("write_data", write_data), + ("write_response", write_response), + ("read_address", read_address), + ("read_data", read_data), + ] + + +@unique +class States(Enum): + IDLE = 0 + ACTIVE = 1 + + +class AXILiteMaster(Elaboratable): + """AXI-Lite master interface. + + Parameters + ---------- + axil_params: AXILiteParameters + Parameters for bus generation/ + + Attributes + ---------- + axil_master: Record(like AXILiteLayout) + AXI-Lite bust output + + ra_request: Method + Transactional method for initiating request on read address channel. + Ready when no request or only one is being executed. + Takes 'ra_request_layout' as argument. + + rd_response: Method + Transactional method for reading response from read data channel. + Ready when there is request response availabe. + Returns data and response state as 'rd_response_layout'. + + wa_request: Method + Transactional method for initiating request on write address channel. + Ready when no request or only one is being executed. + Takes 'wa_request_layout' as argument. + + wd_request: Method + Transactional method for initiating request on write data channel. + Ready when no request or only one is being executed. + Takes 'wd_request_layout' as argument. + + wr_response: Method + Transactional method for reading response from write response channel. + Ready when there is request response availabe. + Returns response state as 'wr_response_layout'. + """ + + def __init__(self, axil_params: AXILiteParameters): + self.axil_params = axil_params + self.axil_layout = AXILiteLayout(self.axil_params).axil_layout + self.axil_master = Record(self.axil_layout) + + self.FSM_ra = Signal(States, reset=States.IDLE) + self.FSM_wa = Signal(States, reset=States.IDLE) + self.FSM_wd = Signal(States, reset=States.IDLE) + + self.generate_layouts(axil_params) + + self.ra_request = Method(i=self.ra_request_layout) + self.rd_response = Method(o=self.rd_response_layout) + self.wa_request = Method(i=self.wa_request_layout) + self.wd_request = Method(i=self.wd_request_layout) + self.wr_response = Method(o=self.wr_response_layout) + + def generate_layouts(self, axil_params: AXILiteParameters): + self.ra_request_layout = [ + ("addr", axil_params.addr_width, DIR_FANIN), + ("prot", 3, DIR_FANIN), + ] + + self.wa_request_layout = [ + ("addr", axil_params.addr_width, DIR_FANIN), + ("prot", 3, DIR_FANIN), + ] + + self.wd_request_layout = [ + ("data", axil_params.data_width, DIR_FANIN), + ("strb", axil_params.data_width // 8, DIR_FANIN), + ] + + self.rd_response_layout = [ + ("data", axil_params.data_width), + ("resp", 2), + ] + + self.ra_forwarder_layout = [ + ("addr", axil_params.addr_width), + ("prot", 3), + ] + + self.wa_forwarder_layout = [ + ("addr", axil_params.addr_width), + ("prot", 3), + ] + + self.wd_forwarder_layout = [ + ("data", axil_params.data_width), + ("strb", axil_params.data_width // 8), + ] + + self.wr_response_layout = [ + ("resp", 2), + ] + + def state_machine_request(self, state, prot, address, channel, forwarder, method, m): + with m.If(state == States.IDLE): + m.d.sync += channel.val.eq(0) + with m.If(method.run): + with Transaction().body(m): + rtval = forwarder.read(m) + if address: + m.d.sync += channel.addr.eq(rtval.addr) + else: + m.d.sync += channel.data.eq(rtval.data) + if prot: + m.d.sync += channel.prot.eq(rtval.prot) + else: + m.d.sync += channel.strb.eq(rtval.strb) + m.d.sync += channel.val.eq(1) + m.d.sync += state.eq(States.ACTIVE) + + with m.Elif(state == States.ACTIVE): + with m.If(channel.rdy): + with m.If(forwarder.read.ready | forwarder.write.run): + with Transaction().body(m): + rtval = forwarder.read(m) + if address: + m.d.sync += channel.addr.eq(rtval.addr) + else: + m.d.sync += channel.data.eq(rtval.data) + if prot: + m.d.sync += channel.prot.eq(rtval.prot) + else: + m.d.sync += channel.strb.eq(rtval.strb) + with m.Else(): + m.d.sync += channel.val.eq(0) + m.d.sync += state.eq(States.IDLE) + + def result_handler(self, data, channel, forwarder, m): + with m.If(channel.rdy & channel.val): + m.d.sync += channel.rdy.eq(forwarder.read.run) + with Transaction().body(m): + if data: + forwarder.write(m, data=channel.data, resp=channel.resp) + else: + forwarder.write(m, resp=channel.resp) + with m.Else(): + m.d.sync += channel.rdy.eq(forwarder.write.ready) + + def elaborate(self, platform): + m = TModule() + + m.submodules.ra_forwarder = ra_forwarder = Forwarder(self.ra_forwarder_layout) + m.submodules.rd_forwarder = rd_forwarder = Forwarder(self.rd_response_layout) + m.submodules.wa_forwarder = wa_forwarder = Forwarder(self.wa_forwarder_layout) + m.submodules.wd_forwarder = wd_forwarder = Forwarder(self.wd_forwarder_layout) + m.submodules.wr_forwarder = wr_forwarder = Forwarder(self.wr_response_layout) + + # read_address + self.state_machine_request( + self.FSM_ra, True, True, self.axil_master.read_address, ra_forwarder, self.ra_request, m + ) + + @def_method(m, self.ra_request, ready=ra_forwarder.write.ready) + def _(arg): + ra_forwarder.write(m, addr=arg.addr, prot=arg.prot) + + # read_data + self.result_handler(True, self.axil_master.read_data, rd_forwarder, m) + + @def_method(m, self.rd_response) + def _(): + return rd_forwarder.read(m) + + # write_adress + self.state_machine_request( + self.FSM_wa, True, True, self.axil_master.write_address, wa_forwarder, self.wa_request, m + ) + + @def_method(m, self.wa_request, ready=wa_forwarder.write.ready) + def _(arg): + wa_forwarder.write(m, addr=arg.addr, prot=arg.prot) + + # write_data + self.state_machine_request( + self.FSM_wd, False, False, self.axil_master.write_data, wd_forwarder, self.wd_request, m + ) + + @def_method(m, self.wd_request, ready=wd_forwarder.write.ready) + def _(arg): + wd_forwarder.write(m, data=arg.data, strb=arg.strb) + + # write_response + self.result_handler(False, self.axil_master.write_response, wr_forwarder, m) + + @def_method(m, self.wr_response) + def _(): + return wr_forwarder.read(m) + + return m