Skip to content

Commit

Permalink
much improved cocotb time tracking and sync
Browse files Browse the repository at this point in the history
  • Loading branch information
psychogenic committed Nov 22, 2024
1 parent 84fd126 commit 7acd728
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 95 deletions.
82 changes: 41 additions & 41 deletions src/ttboard/cocotb/clock.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@

'''
Created on Nov 21, 2024
@author: Pat Deegan
@copyright: Copyright (C) 2024 Pat Deegan, https://psychogenic.com
'''
import gc
import ttboard.util.time as time
import ttboard.log as logging
log = logging.getLogger(__name__)
_ClockForSignal = dict()
gc.collect()
from ttboard.cocotb.time import TimeValue, SystemTime
gc.collect()

class TimeConverter:
@classmethod
def scale(cls, units:str):
vals = {
'fs': 1e-15,
'ps': 1e-12,
'ns': 1e-9,
'us': 1e-6,
'ms': 1e-3,
'sec': 1
}
if units not in vals:
raise ValueError(f"Unknown units {units}")
return vals[units]

@classmethod
def time_to_clockticks(cls, clock, t:int, units:str):
if clock is None:
clock = Clock.get()

time_secs = t*cls.scale(units)
return round(time_secs / clock.period_s)

_ClockForSignal = dict()
class Clock:
@classmethod
def get(cls, signal):
Expand All @@ -50,32 +30,52 @@ def all(cls):

def __init__(self, signal, period, units):
self.signal = signal
self.period = period
self.units = units
self.scale = TimeConverter.scale(units)
self.running = False
self.period_s = self.period * self.scale

self.half_period = TimeValue(period/2, units)
self.next_toggle = TimeValue(period/2, units)
self.current_signal_value = 0
self.sleep_us = 0
sleep_us = (1e6*self.period_s/2.0)
if sleep_us >= 500:
self.sleep_us = sleep_us
half_per_secs = float(self.half_period)
if half_per_secs > 250e-6:
self.sleep_us = round(half_per_secs*1e-6)

self._toggle_count = 0


@property
def event_interval(self):
return self.half_period

def start(self):
global _ClockForSignal
_ClockForSignal[self.signal] = self

def numticks_in(self, t:int, units:str):
return TimeConverter.time_to_clockticks(self, t, units)
def num_events_in(self, time_or_timevalue:int, units:str=None):
if isinstance(time_or_timevalue, TimeValue):
tv = time_or_timevalue
elif units is not None:
tv = TimeValue(time_or_timevalue, units)
else:
raise ValueError
return tv / self.half_period

def tick(self):
self.signal.value = 1
if self.sleep_us:
time.sleep_us(self.sleep_us)

self.signal.value = 0
def time_has_passed(self):
while self.next_toggle <= SystemTime.current():
self.toggle()
self.next_toggle += self.half_period

def toggle(self):
# print(f"toggle {self._toggle_count}")
new_val = 1 if not self.current_signal_value else 0
self.signal.value = new_val
self.current_signal_value = new_val
if self.sleep_us:
time.sleep_us(self.sleep_us)

def tick(self):
# clock will go through whole period, end where it started
self.toggle()
self.toggle()


8 changes: 6 additions & 2 deletions src/ttboard/cocotb/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ def __init__(self):
self.segments = self.new_slice_attribute(self.tt.uo_out, 6, 0)

dut = DUT()
clk = Clock(dut.clk, 500, 'us')
clk = Clock(dut.clk, 50, 'us')
clk.start()

async def yo(dut):
dut.display_single_select.value = 1
print("3 cycles")
await ClockCycles(dut.clk, 3)
dut.display_single_select.value = 0
print("5 cycles")
await ClockCycles(dut.clk, 5)
await Timer(2, 'ms')

print("100 us")
await Timer(100, 'us')


109 changes: 109 additions & 0 deletions src/ttboard/cocotb/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
'''
Created on Nov 22, 2024
@author: Pat Deegan
@copyright: Copyright (C) 2024 Pat Deegan, https://psychogenic.com
'''
import gc
gc.collect()
class TimeConverter:
@classmethod
def scale(cls, units:str):
vals = {
'fs': 1e-15,
'ps': 1e-12,
'ns': 1e-9,
'us': 1e-6,
'ms': 1e-3,
'sec': 1
}
if units not in vals:
raise ValueError(f"Unknown units {units}")
return vals[units]

@classmethod
def time_to_clockticks(cls, clock, t:int, units:str):
time_secs = t*cls.scale(units)
return round(time_secs / clock.period_s)

@classmethod
def rescale(cls,t:int, units:str, to_units:str):
return t*(cls.scale(units)/cls.scale(to_units))

class TimeValue:
def __init__(self, time:int, units:str):
self.time = time
self._units = units
self.scale = TimeConverter.scale(units)

@property
def units(self):
return self._units
@units.setter
def units(self, set_to:str):
self._units = set_to
self.scale = TimeConverter.scale(set_to)
def __float__(self):
return self.time*self.scale

def __gt__(self, other):
if isinstance(other, (TimeValue, float)):
return float(self) > float(other)
raise ValueError

def __le__(self, other):
return not (self > other)

def __eq__(self, other):
return float(self) == float(other)

def __iadd__(self, other):
if isinstance(other, TimeValue):
self.time += TimeConverter.rescale(other.time, other.units, self.units)
return self
raise ValueError

def __add__(self, other):
if isinstance(other, TimeValue):
new_time = self.time + TimeConverter.rescale(other.time, other.units, self.units)
return TimeValue(new_time, self.units)
raise ValueError

def __repr__(self):
return f'<TimeValue {round(self.time)} {self.units}>'

def __truediv__(self, other):
if isinstance(other, TimeValue):
other_conv = TimeConverter.rescale(other.time, other.units, self.units)
return self.time / other_conv
raise ValueError

def __mult__(self, other:int):
return TimeValue(self.time*other, self.units)


class SystemTime:
_global_time = TimeValue(0, 'ns')

@classmethod
def reset(cls):
cls._global_time = TimeValue(0, 'ns')

@classmethod
def current(cls):
return cls._global_time

@classmethod
def set_units(cls, units:str):
cls._global_time = TimeValue(cls._global_time.time, units)

@classmethod
def advance(cls, time_or_timevalue, units:str=None):
if isinstance(time_or_timevalue, TimeValue):
cls._global_time += time_or_timevalue
elif isinstance(time_or_timevalue, int) and units is not None:
cls._global_time += TimeValue(time_or_timevalue, units)
else:
raise ValueError


77 changes: 25 additions & 52 deletions src/ttboard/cocotb/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
@author: Pat Deegan
@copyright: Copyright (C) 2024 Pat Deegan, https://psychogenic.com
'''
from ttboard.cocotb.clock import Clock, TimeConverter

from ttboard.cocotb.clock import Clock
from ttboard.cocotb.time import TimeValue, SystemTime
class Awaitable:
def __init__(self, signal=None):
self.signal = signal
Expand All @@ -20,37 +20,30 @@ class ClockCycles(Awaitable):
def __init__(self, sig, num_cycles:int):
super().__init__(sig)
self.num_cycles = num_cycles
self.cycle_count = 0

def __iter__(self):
# print(f"Will tick {self.num_cycles} times ")
self.cycle_count = self.num_cycles
return self

def next(self):
clk = Clock.get(self.signal)

if clk is None:
print("CLK NO CLK")
else:
while self.cycle_count > 0:
self.cycle_count -= 1
clk.tick()
num_transitions = self.num_cycles * 2
target_time = SystemTime.current() + (clk.half_period * num_transitions)
all_clocks = sorted(Clock.all(), key=lambda x: float(x.half_period))
fastest_clock = all_clocks[0]
time_increment = fastest_clock.half_period
while SystemTime.current() < target_time:
SystemTime.advance(time_increment)
for clk in all_clocks:
clk.time_has_passed()
raise StopIteration

def __next__(self):
return self.next()

def deadbeef(self):
clk = Clock.get(self.signal)
if clk is None:
print("CLK NO CLK")
else:
if self.cycle_count > 0:
self.cycle_count -= 1
clk.tick()
yield self.cycle_count
raise StopIteration


def __await__(self):
clk = Clock.get(self.signal)
Expand All @@ -64,49 +57,29 @@ def __await__(self):
class Timer(Awaitable):
def __init__(self, time:int, units:str):
super().__init__()
self.time = time
self.units = units
self.time_s = time * TimeConverter.scale(units)
self.cycle_count = 0


def countdowns(self):
all_clocks = Clock.all()
countdowns = []
max_ticks = 0
for clk in all_clocks:
nt = clk.numticks_in(self.time, self.units)
if nt > max_ticks:
max_ticks = nt
#print(f'Want {nt} ticks for {clk}')
countdowns.append([nt, clk])

#print(f"Set cyclecount {max_ticks}")
self.cycle_count = max_ticks

return countdowns
self.time = TimeValue(time, units)


def run_timer(self):
all_clocks = sorted(Clock.all(), key=lambda x: float(x.half_period))
fastest_clock = all_clocks[0]
time_increment = fastest_clock.half_period
target_time = SystemTime.current() + self.time
while SystemTime.current() < target_time:
SystemTime.advance(time_increment)
for clk in all_clocks:
clk.time_has_passed()

def __iter__(self):
self._countdowns = self.countdowns()
return self

def __next__(self):
while self.cycle_count > 0:
self.cycle_count -= 1
self.tick_all()
self.run_timer()
raise StopIteration

def tick_all(self):
for cidx in range(len(self._countdowns)):
if self._countdowns[cidx][0] > 0:
self._countdowns[cidx][1].tick()
self._countdowns[cidx][0] -= 1

def __await__(self):
self._countdowns = self.countdowns()
for _i in range(self.cycle_count):
self.tick_all()
self.run_timer()
yield
return self

Expand Down

0 comments on commit 7acd728

Please sign in to comment.