Skip to content

Commit

Permalink
Added AXI-Lite master
Browse files Browse the repository at this point in the history
  • Loading branch information
Durchbruchswagen committed Nov 26, 2023
1 parent 964065c commit afd1f29
Showing 1 changed file with 283 additions and 0 deletions.
283 changes: 283 additions & 0 deletions coreblocks/peripherals/axi_lite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
from amaranth import *
from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT
from transactron import Method, def_method, TModule
from transactron.core import Transaction
from transactron.lib.connectors import Forwarder
from enum import Enum, unique


class AXILiteParameters:
"""Parameters of the AXI-Lite bus.
Parameters
----------
data_width: int
Width of "data" signals for "write data" and "read data" channels. Must be either 32 or 64 bits. Defaults to 64
addr_width: int
Width of "addr" signals for "write address" and "read address" channels. Defaults to 64 bits.
"""

def __init__(self, *, data_width=64, addr_width=64):
self.data_width = data_width
self.addr_width = addr_width


class AXILiteLayout:
"""AXI-Lite bus layout generator
Parameters
----------
axil_params: AXILiteParameters
Patameters used to generate AXI-Lite layout
master: Boolean
Whether the layout should be generated for master side
(if false it's generatd for the slave side)
Attributes
----------
axil_layout: Record
Record of a AXI-Lite bus.
"""

def __init__(self, axil_params, *, master=True):
write_address = [
("val", 1, DIR_FANOUT if master else DIR_FANIN),
("rdy", 1, DIR_FANIN if master else DIR_FANOUT),
("addr", axil_params.addr_width, DIR_FANOUT if master else DIR_FANIN),
("prot", 3, DIR_FANOUT if master else DIR_FANIN),
]

write_data = [
("val", 1, DIR_FANOUT if master else DIR_FANIN),
("rdy", 1, DIR_FANIN if master else DIR_FANOUT),
("data", axil_params.data_width, DIR_FANOUT if master else DIR_FANIN),
("strb", axil_params.data_width // 8, DIR_FANOUT if master else DIR_FANIN),
]

write_response = [
("val", 1, DIR_FANIN if master else DIR_FANOUT),
("rdy", 1, DIR_FANOUT if master else DIR_FANIN),
("resp", 2, DIR_FANIN if master else DIR_FANOUT),
]

read_address = [
("val", 1, DIR_FANOUT if master else DIR_FANIN),
("rdy", 1, DIR_FANIN if master else DIR_FANOUT),
("addr", axil_params.addr_width, DIR_FANOUT if master else DIR_FANIN),
("prot", 3, DIR_FANOUT if master else DIR_FANIN),
]

read_data = [
("val", 1, DIR_FANIN if master else DIR_FANOUT),
("rdy", 1, DIR_FANOUT if master else DIR_FANIN),
("data", axil_params.data_width, DIR_FANIN if master else DIR_FANOUT),
("resp", 2, DIR_FANIN if master else DIR_FANOUT),
]

self.axil_layout = [
("write_address", write_address),
("write_data", write_data),
("write_response", write_response),
("read_address", read_address),
("read_data", read_data),
]


@unique
class States(Enum):
IDLE = 0
ACTIVE = 1


class AXILiteMaster(Elaboratable):
"""AXI-Lite master interface.
Parameters
----------
axil_params: AXILiteParameters
Parameters for bus generation/
Attributes
----------
axil_master: Record(like AXILiteLayout)
AXI-Lite bust output
ra_request: Method
Transactional method for initiating request on read address channel.
Ready when no request or only one is being executed.
Takes 'ra_request_layout' as argument.
rd_response: Method
Transactional method for reading response from read data channel.
Ready when there is request response availabe.
Returns data and response state as 'rd_response_layout'.
wa_request: Method
Transactional method for initiating request on write address channel.
Ready when no request or only one is being executed.
Takes 'wa_request_layout' as argument.
wd_request: Method
Transactional method for initiating request on write data channel.
Ready when no request or only one is being executed.
Takes 'wd_request_layout' as argument.
wr_response: Method
Transactional method for reading response from write response channel.
Ready when there is request response availabe.
Returns response state as 'wr_response_layout'.
"""

def __init__(self, axil_params: AXILiteParameters):
self.axil_params = axil_params
self.axil_layout = AXILiteLayout(self.axil_params).axil_layout
self.axil_master = Record(self.axil_layout)

self.FSM_ra = Signal(States, reset=States.IDLE)
self.FSM_wa = Signal(States, reset=States.IDLE)
self.FSM_wd = Signal(States, reset=States.IDLE)

self.generate_layouts(axil_params)

self.ra_request = Method(i=self.ra_request_layout)
self.rd_response = Method(o=self.rd_response_layout)
self.wa_request = Method(i=self.wa_request_layout)
self.wd_request = Method(i=self.wd_request_layout)
self.wr_response = Method(o=self.wr_response_layout)

def generate_layouts(self, axil_params: AXILiteParameters):
self.ra_request_layout = [
("addr", axil_params.addr_width, DIR_FANIN),
("prot", 3, DIR_FANIN),
]

self.wa_request_layout = [
("addr", axil_params.addr_width, DIR_FANIN),
("prot", 3, DIR_FANIN),
]

self.wd_request_layout = [
("data", axil_params.data_width, DIR_FANIN),
("strb", axil_params.data_width // 8, DIR_FANIN),
]

self.rd_response_layout = [
("data", axil_params.data_width),
("resp", 2),
]

self.ra_forwarder_layout = [
("addr", axil_params.addr_width),
("prot", 3),
]

self.wa_forwarder_layout = [
("addr", axil_params.addr_width),
("prot", 3),
]

self.wd_forwarder_layout = [
("data", axil_params.data_width),
("strb", axil_params.data_width // 8),
]

self.wr_response_layout = [
("resp", 2),
]

def state_machine_request(self, state, prot, address, channel, forwarder, method, m):
with m.If(state == States.IDLE):
m.d.sync += channel.val.eq(0)
with m.If(method.run):
with Transaction().body(m):
rtval = forwarder.read(m)
if address:
m.d.sync += channel.addr.eq(rtval.addr)
else:
m.d.sync += channel.data.eq(rtval.data)
if prot:
m.d.sync += channel.prot.eq(rtval.prot)
else:
m.d.sync += channel.strb.eq(rtval.strb)
m.d.sync += channel.val.eq(1)
m.d.sync += state.eq(States.ACTIVE)

with m.Elif(state == States.ACTIVE):
with m.If(channel.rdy):
with m.If(forwarder.read.ready | forwarder.write.run):
with Transaction().body(m):
rtval = forwarder.read(m)
if address:
m.d.sync += channel.addr.eq(rtval.addr)
else:
m.d.sync += channel.data.eq(rtval.data)
if prot:
m.d.sync += channel.prot.eq(rtval.prot)
else:
m.d.sync += channel.strb.eq(rtval.strb)
with m.Else():
m.d.sync += channel.val.eq(0)
m.d.sync += state.eq(States.IDLE)

def result_handler(self, data, channel, forwarder, m):
with m.If(channel.rdy & channel.val):
m.d.sync += channel.rdy.eq(forwarder.read.run)
with Transaction().body(m):
if data:
forwarder.write(m, data=channel.data, resp=channel.resp)
else:
forwarder.write(m, resp=channel.resp)
with m.Else():
m.d.sync += channel.rdy.eq(forwarder.write.ready)

def elaborate(self, platform):
m = TModule()

m.submodules.ra_forwarder = ra_forwarder = Forwarder(self.ra_forwarder_layout)
m.submodules.rd_forwarder = rd_forwarder = Forwarder(self.rd_response_layout)
m.submodules.wa_forwarder = wa_forwarder = Forwarder(self.wa_forwarder_layout)
m.submodules.wd_forwarder = wd_forwarder = Forwarder(self.wd_forwarder_layout)
m.submodules.wr_forwarder = wr_forwarder = Forwarder(self.wr_response_layout)

# read_address
self.state_machine_request(
self.FSM_ra, True, True, self.axil_master.read_address, ra_forwarder, self.ra_request, m
)

@def_method(m, self.ra_request, ready=ra_forwarder.write.ready)
def _(arg):
ra_forwarder.write(m, addr=arg.addr, prot=arg.prot)

# read_data
self.result_handler(True, self.axil_master.read_data, rd_forwarder, m)

@def_method(m, self.rd_response)
def _():
return rd_forwarder.read(m)

# write_adress
self.state_machine_request(
self.FSM_wa, True, True, self.axil_master.write_address, wa_forwarder, self.wa_request, m
)

@def_method(m, self.wa_request, ready=wa_forwarder.write.ready)
def _(arg):
wa_forwarder.write(m, addr=arg.addr, prot=arg.prot)

# write_data
self.state_machine_request(
self.FSM_wd, False, False, self.axil_master.write_data, wd_forwarder, self.wd_request, m
)

@def_method(m, self.wd_request, ready=wd_forwarder.write.ready)
def _(arg):
wd_forwarder.write(m, data=arg.data, strb=arg.strb)

# write_response
self.result_handler(False, self.axil_master.write_response, wr_forwarder, m)

@def_method(m, self.wr_response)
def _():
return wr_forwarder.read(m)

return m

0 comments on commit afd1f29

Please sign in to comment.