Skip to content

Commit

Permalink
fix ooms (#92)
Browse files Browse the repository at this point in the history
Co-authored-by: FranchuFranchu <[email protected]>
  • Loading branch information
tjjfvi and FranchuFranchu authored Mar 25, 2024
1 parent aa58a6d commit fee98fb
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hvm-core"
version = "0.2.20"
version = "0.2.21"
edition = "2021"
description = "HVM-Core is a massively parallel Interaction Combinator evaluator."
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"combinators",
"condvar",
"ctrs",
"deque",
"dereferencable",
"dref",
"dups",
Expand Down
3 changes: 1 addition & 2 deletions src/host/readback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ impl Host {
let mut net = Net::default();

net.root = state.read_wire(rt_net.root.clone());

for (a, b) in &rt_net.redexes {
for (a, b) in rt_net.redexes.iter() {
net.redexes.push((state.read_port(a.clone(), None), state.read_port(b.clone(), None)))
}

Expand Down
3 changes: 3 additions & 0 deletions src/run/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl<'a, M: Mode> Net<'a, M> {
// TODO: fast copy?
} else if false && !M::LAZY && port.tag() == Num || port.tag() == Ref && lab >= port.lab() {
self.rwts.comm += 1;
self.free_trg(trg);
(Trg::port(port.clone()), Trg::port(port))
} else {
let n = self.create_node(Ctr, lab);
Expand All @@ -140,6 +141,7 @@ impl<'a, M: Mode> Net<'a, M> {
n.p1.wire().set_target(Port::new_num(port.num()));
(Trg::port(n.p0), Trg::port(n.p2))
} else if !M::LAZY && port == Port::ERA {
self.free_trg(trg);
(Trg::port(Port::ERA), Trg::port(Port::ERA))
} else {
let n = self.create_node(Op, op as Lab);
Expand All @@ -157,6 +159,7 @@ impl<'a, M: Mode> Net<'a, M> {
self.free_trg(trg);
Trg::port(Port::new_num(op.op(port.num(), rhs)))
} else if !M::LAZY && port == Port::ERA {
self.free_trg(trg);
Trg::port(Port::ERA)
} else {
let n = self.create_node(Op, op as Lab);
Expand Down
1 change: 1 addition & 0 deletions src/run/interact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl<'a, M: Mode> Net<'a, M> {
let a1 = a.p1.load_target();
if a1.tag() == Num {
self.rwts.oper += 1;
self.half_free(a.p1.addr());
let out = op.op(b.num(), a1.num());
self.link_wire_port(a.p2, Port::new_num(out));
} else {
Expand Down
62 changes: 58 additions & 4 deletions src/run/linker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub(super) struct Header {
/// non-atomically (because they must be locked).
pub struct Linker<'h, M: Mode> {
pub(super) allocator: Allocator<'h>,
pub redexes: Vec<(Port, Port)>,
pub rwts: Rewrites,
pub redexes: RedexQueue,
headers: IntMap<Addr, Header>,
_mode: PhantomData<M>,
}
Expand All @@ -30,7 +30,7 @@ impl<'h, M: Mode> Linker<'h, M> {
pub fn new(heap: &'h Heap) -> Self {
Linker {
allocator: Allocator::new(heap),
redexes: Vec::new(),
redexes: RedexQueue::default(),
rwts: Default::default(),
headers: Default::default(),
_mode: PhantomData,
Expand Down Expand Up @@ -84,12 +84,20 @@ impl<'h, M: Mode> Linker<'h, M> {
/// Pushes an active pair to the redex queue; `a` and `b` must both be
/// principal ports.
#[inline(always)]
fn redux(&mut self, a: Port, b: Port) {
pub fn redux(&mut self, a: Port, b: Port) {
trace!(self, a, b);
debug_assert!(!(a.is(Tag::Var) || a.is(Tag::Red) || b.is(Tag::Var) || b.is(Tag::Red)));
if a.is_skippable() && b.is_skippable() {
self.rwts.eras += 1;
} else if !M::LAZY {
self.redexes.push((a, b));
// Prioritize redexes that do not allocate memory,
// to prevent OOM errors that can be avoided
// by reducing redexes in a different order (see #91)
if redex_would_shrink(&a, &b) {
self.redexes.fast.push((a, b));
} else {
self.redexes.slow.push((a, b));
}
} else {
self.set_header(a.clone(), b.clone());
self.set_header(b.clone(), a.clone());
Expand Down Expand Up @@ -341,3 +349,49 @@ impl<'h, M: Mode> Linker<'h, M> {
self.headers[&port.addr()].targ.clone()
}
}

#[derive(Debug, Default)]
pub struct RedexQueue {
pub(super) fast: Vec<(Port, Port)>,
pub(super) slow: Vec<(Port, Port)>,
}

impl RedexQueue {
/// Returns the highest-priority redex in the queue, if any
#[inline(always)]
pub fn pop(&mut self) -> Option<(Port, Port)> {
self.fast.pop().or_else(|| self.slow.pop())
}
#[inline(always)]
pub fn len(&self) -> usize {
self.fast.len() + self.slow.len()
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.fast.is_empty() && self.slow.is_empty()
}
#[inline(always)]
pub fn drain(&mut self) -> impl Iterator<Item = (Port, Port)> + '_ {
self.fast.drain(..).chain(self.slow.drain(..))
}
#[inline(always)]
pub fn iter(&self) -> impl Iterator<Item = &(Port, Port)> {
self.fast.iter().chain(self.slow.iter())
}
#[inline(always)]
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (Port, Port)> {
self.fast.iter_mut().chain(self.slow.iter_mut())
}
#[inline(always)]
pub fn clear(&mut self) {
self.fast.clear();
self.slow.clear();
}
}

// Returns whether a redex does not allocate memory
fn redex_would_shrink(a: &Port, b: &Port) -> bool {
(*a == Port::ERA || *b == Port::ERA)
|| (a.tag() == Tag::Ctr && b.tag() == Tag::Ctr && a.lab() == b.lab())
|| (!(a.tag() == Tag::Ref || b.tag() == Tag::Ref) && (a.tag() == Tag::Num || b.tag() == Tag::Num))
}
3 changes: 2 additions & 1 deletion src/run/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::*;

/// An interaction combinator net.
pub struct Net<'a, M: Mode> {
linker: Linker<'a, M>,
pub(super) linker: Linker<'a, M>,
pub tid: usize, // thread id
pub tids: usize, // thread count
pub trgs: Box<[MaybeUninit<Trg>]>,
Expand Down Expand Up @@ -44,6 +44,7 @@ impl<'a, M: Mode> Net<'a, M> {
pub fn reduce(&mut self, limit: usize) -> usize {
assert!(!M::LAZY);
let mut count = 0;

while let Some((a, b)) = self.redexes.pop() {
self.interact(a, b);
count += 1;
Expand Down
36 changes: 19 additions & 17 deletions src/run/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ use super::*;
impl<'h, M: Mode> Net<'h, M> {
/// Forks the net into `tids` child nets, for parallel operation.
pub fn fork(&mut self, tids: usize) -> impl Iterator<Item = Self> + '_ {
let mut redexes = std::mem::take(&mut self.redexes).into_iter();
let redexes_len = self.linker.redexes.len();
let mut redexes = self.linker.redexes.drain();
let heap = &self.linker.allocator.heap;
let next = &self.linker.allocator.next;
let root = &self.root;
(0 .. tids).map(move |tid| {
let heap_size = (self.heap.0.len() / tids) & !63; // round down to needed alignment
let heap_size = (heap.0.len() / tids) & !63; // round down to needed alignment
let heap_start = heap_size * tid;
let area = unsafe { std::mem::transmute(&self.heap.0[heap_start .. heap_start + heap_size]) };
let mut net = Net::new_with_root(area, self.root.clone());
net.next = self.next.saturating_sub(heap_start);
let area = unsafe { std::mem::transmute(&heap.0[heap_start .. heap_start + heap_size]) };
let mut net = Net::new_with_root(area, root.clone());
net.next = next.saturating_sub(heap_start);
net.head = if tid == 0 { net.head } else { Addr::NULL };
net.tid = tid;
net.tids = tids;
net.tracer.set_tid(tid);
let count = redexes.len() / (tids - tid);
net.redexes.extend((&mut redexes).take(count));
let count = redexes_len / (tids - tid);
(&mut redexes).take(count).for_each(|i| net.redux(i.0, i.1));
net
})
}
Expand All @@ -37,7 +41,7 @@ impl<'h, M: Mode> Net<'h, M> {
net: Net<'a, M>, // thread's own net object
delta: &'a AtomicRewrites, // global delta rewrites
share: &'a Vec<(AtomicU64, AtomicU64)>, // global share buffer
rlens: &'a Vec<AtomicUsize>, // global redex lengths
rlens: &'a Vec<AtomicUsize>, // global redex lengths (only counting shareable ones)
total: &'a AtomicUsize, // total redex length
barry: Arc<Barrier>, // synchronization barrier
}
Expand Down Expand Up @@ -70,8 +74,6 @@ impl<'h, M: Mode> Net<'h, M> {
}
});

// Clear redexes and sum stats
self.redexes.clear();
delta.add_to(&mut self.rwts);

// Main reduction loop
Expand Down Expand Up @@ -106,7 +108,7 @@ impl<'h, M: Mode> Net<'h, M> {
ctx.barry.wait();
ctx.total.store(0, Relaxed);
ctx.barry.wait();
ctx.rlens[ctx.tid].store(ctx.net.redexes.len(), Relaxed);
ctx.rlens[ctx.tid].store(ctx.net.redexes.slow.len(), Relaxed);
ctx.total.fetch_add(ctx.net.redexes.len(), Relaxed);
ctx.barry.wait();
ctx.total.load(Relaxed)
Expand All @@ -120,16 +122,16 @@ impl<'h, M: Mode> Net<'h, M> {
let shift = (1 << (plog2 - 1)) >> (ctx.tick % plog2);
let a_tid = ctx.tid;
let b_tid = if side == 1 { a_tid - shift } else { a_tid + shift };
let a_len = ctx.net.redexes.len();
let a_len = ctx.net.redexes.slow.len();
let b_len = ctx.rlens[b_tid].load(Relaxed);
let send = if a_len > b_len { (a_len - b_len) / 2 } else { 0 };
let recv = if b_len > a_len { (b_len - a_len) / 2 } else { 0 };
let send = std::cmp::min(send, SHARE_LIMIT);
let recv = std::cmp::min(recv, SHARE_LIMIT);
for i in 0 .. send {
let init = a_len - send * 2;
let rdx0 = ctx.net.redexes.get_unchecked(init + i * 2 + 0).clone();
let rdx1 = ctx.net.redexes.get_unchecked(init + i * 2 + 1).clone();
let rdx0 = ctx.net.redexes.slow[init + i * 2 + 0].clone();
let rdx1 = ctx.net.redexes.slow[init + i * 2 + 1].clone();
//let init = 0;
//let ref0 = ctx.net.redexes.get_unchecked_mut(init + i * 2 + 0);
//let rdx0 = *ref0;
Expand All @@ -138,15 +140,15 @@ impl<'h, M: Mode> Net<'h, M> {
//let rdx1 = *ref1;
//*ref1 = (Ptr(0), Ptr(0));
let targ = ctx.share.get_unchecked(b_tid * SHARE_LIMIT + i);
*ctx.net.redexes.get_unchecked_mut(init + i) = rdx0;
ctx.net.redexes.slow[init + i] = rdx0;
targ.0.store(rdx1.0.0, Relaxed);
targ.1.store(rdx1.1.0, Relaxed);
}
ctx.net.redexes.truncate(a_len - send);
ctx.net.redexes.slow.truncate(a_len - send);
ctx.barry.wait();
for i in 0 .. recv {
let got = ctx.share.get_unchecked(a_tid * SHARE_LIMIT + i);
ctx.net.redexes.push((Port(got.0.load(Relaxed)), Port(got.1.load(Relaxed))));
ctx.net.redexes.slow.push((Port(got.0.load(Relaxed)), Port(got.1.load(Relaxed))));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/transform/pre_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl<'a> State<'a> {
self.rewrites += rt.rwts;

// Move interactions with inert defs back into the net redexes array
rt.redexes.extend(self.captured_redexes.lock().unwrap().drain(..));
self.captured_redexes.lock().unwrap().drain(..).for_each(|r| rt.redux(r.0, r.1));

let net = self.host.readback(&mut rt);

Expand Down
7 changes: 6 additions & 1 deletion tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,14 @@ fn test_run(name: &str, host: Arc<Mutex<Host>>) {
fn test_pre_reduce_run(path: &str, mut book: Book) {
print!("{path}...");
print!(" pre-reduce");
io::stdout().flush().unwrap();

let start = Instant::now();
let pre_stats = book.pre_reduce(&|x| x == "main", None, u64::MAX);
let host = hvmc::stdlib::create_host(&book);
print!(" {:.3?}...", start.elapsed());
io::stdout().flush().unwrap();

let host = hvmc::stdlib::create_host(&book);
let Some((mut rwts, net)) = execute_host(host) else {
assert_snapshot!(show_rewrites(&pre_stats.rewrites));
return;
Expand Down

0 comments on commit fee98fb

Please sign in to comment.