From fee98fbd77bc234eed2eebe9f4b1b5b78e22bd94 Mon Sep 17 00:00:00 2001 From: T6 Date: Mon, 25 Mar 2024 15:03:04 -0400 Subject: [PATCH] fix ooms (#92) Co-authored-by: FranchuFranchu --- Cargo.lock | 2 +- Cargo.toml | 2 +- cspell.json | 1 + src/host/readback.rs | 3 +- src/run/instruction.rs | 3 ++ src/run/interact.rs | 1 + src/run/linker.rs | 62 ++++++++++++++++++++++++++++++++++--- src/run/net.rs | 3 +- src/run/parallel.rs | 36 +++++++++++---------- src/transform/pre_reduce.rs | 2 +- tests/tests.rs | 7 ++++- 11 files changed, 94 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c52bd1d5..7fe20073 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,7 +280,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hvm-core" -version = "0.2.20" +version = "0.2.21" dependencies = [ "arrayvec", "clap", diff --git a/Cargo.toml b/Cargo.toml index 43e51b6e..dff6ed6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hvm-core" -version = "0.2.20" +version = "0.2.21" edition = "2021" description = "HVM-Core is a massively parallel Interaction Combinator evaluator." license = "MIT" diff --git a/cspell.json b/cspell.json index 45f0acb7..a968ffbf 100644 --- a/cspell.json +++ b/cspell.json @@ -10,6 +10,7 @@ "combinators", "condvar", "ctrs", + "deque", "dereferencable", "dref", "dups", diff --git a/src/host/readback.rs b/src/host/readback.rs index 72126391..d3665330 100644 --- a/src/host/readback.rs +++ b/src/host/readback.rs @@ -18,8 +18,7 @@ impl Host { let mut net = Net::default(); net.root = state.read_wire(rt_net.root.clone()); - - for (a, b) in &rt_net.redexes { + for (a, b) in rt_net.redexes.iter() { net.redexes.push((state.read_port(a.clone(), None), state.read_port(b.clone(), None))) } diff --git a/src/run/instruction.rs b/src/run/instruction.rs index 7aaf3787..b0090f38 100644 --- a/src/run/instruction.rs +++ b/src/run/instruction.rs @@ -121,6 +121,7 @@ impl<'a, M: Mode> Net<'a, M> { // TODO: fast copy? } else if false && !M::LAZY && port.tag() == Num || port.tag() == Ref && lab >= port.lab() { self.rwts.comm += 1; + self.free_trg(trg); (Trg::port(port.clone()), Trg::port(port)) } else { let n = self.create_node(Ctr, lab); @@ -140,6 +141,7 @@ impl<'a, M: Mode> Net<'a, M> { n.p1.wire().set_target(Port::new_num(port.num())); (Trg::port(n.p0), Trg::port(n.p2)) } else if !M::LAZY && port == Port::ERA { + self.free_trg(trg); (Trg::port(Port::ERA), Trg::port(Port::ERA)) } else { let n = self.create_node(Op, op as Lab); @@ -157,6 +159,7 @@ impl<'a, M: Mode> Net<'a, M> { self.free_trg(trg); Trg::port(Port::new_num(op.op(port.num(), rhs))) } else if !M::LAZY && port == Port::ERA { + self.free_trg(trg); Trg::port(Port::ERA) } else { let n = self.create_node(Op, op as Lab); diff --git a/src/run/interact.rs b/src/run/interact.rs index 80c5e3bb..6b0b37eb 100644 --- a/src/run/interact.rs +++ b/src/run/interact.rs @@ -259,6 +259,7 @@ impl<'a, M: Mode> Net<'a, M> { let a1 = a.p1.load_target(); if a1.tag() == Num { self.rwts.oper += 1; + self.half_free(a.p1.addr()); let out = op.op(b.num(), a1.num()); self.link_wire_port(a.p2, Port::new_num(out)); } else { diff --git a/src/run/linker.rs b/src/run/linker.rs index ea91e7d6..82b1091a 100644 --- a/src/run/linker.rs +++ b/src/run/linker.rs @@ -18,8 +18,8 @@ pub(super) struct Header { /// non-atomically (because they must be locked). pub struct Linker<'h, M: Mode> { pub(super) allocator: Allocator<'h>, - pub redexes: Vec<(Port, Port)>, pub rwts: Rewrites, + pub redexes: RedexQueue, headers: IntMap, _mode: PhantomData, } @@ -30,7 +30,7 @@ impl<'h, M: Mode> Linker<'h, M> { pub fn new(heap: &'h Heap) -> Self { Linker { allocator: Allocator::new(heap), - redexes: Vec::new(), + redexes: RedexQueue::default(), rwts: Default::default(), headers: Default::default(), _mode: PhantomData, @@ -84,12 +84,20 @@ impl<'h, M: Mode> Linker<'h, M> { /// Pushes an active pair to the redex queue; `a` and `b` must both be /// principal ports. #[inline(always)] - fn redux(&mut self, a: Port, b: Port) { + pub fn redux(&mut self, a: Port, b: Port) { trace!(self, a, b); + debug_assert!(!(a.is(Tag::Var) || a.is(Tag::Red) || b.is(Tag::Var) || b.is(Tag::Red))); if a.is_skippable() && b.is_skippable() { self.rwts.eras += 1; } else if !M::LAZY { - self.redexes.push((a, b)); + // Prioritize redexes that do not allocate memory, + // to prevent OOM errors that can be avoided + // by reducing redexes in a different order (see #91) + if redex_would_shrink(&a, &b) { + self.redexes.fast.push((a, b)); + } else { + self.redexes.slow.push((a, b)); + } } else { self.set_header(a.clone(), b.clone()); self.set_header(b.clone(), a.clone()); @@ -341,3 +349,49 @@ impl<'h, M: Mode> Linker<'h, M> { self.headers[&port.addr()].targ.clone() } } + +#[derive(Debug, Default)] +pub struct RedexQueue { + pub(super) fast: Vec<(Port, Port)>, + pub(super) slow: Vec<(Port, Port)>, +} + +impl RedexQueue { + /// Returns the highest-priority redex in the queue, if any + #[inline(always)] + pub fn pop(&mut self) -> Option<(Port, Port)> { + self.fast.pop().or_else(|| self.slow.pop()) + } + #[inline(always)] + pub fn len(&self) -> usize { + self.fast.len() + self.slow.len() + } + #[inline(always)] + pub fn is_empty(&self) -> bool { + self.fast.is_empty() && self.slow.is_empty() + } + #[inline(always)] + pub fn drain(&mut self) -> impl Iterator + '_ { + self.fast.drain(..).chain(self.slow.drain(..)) + } + #[inline(always)] + pub fn iter(&self) -> impl Iterator { + self.fast.iter().chain(self.slow.iter()) + } + #[inline(always)] + pub fn iter_mut(&mut self) -> impl Iterator { + self.fast.iter_mut().chain(self.slow.iter_mut()) + } + #[inline(always)] + pub fn clear(&mut self) { + self.fast.clear(); + self.slow.clear(); + } +} + +// Returns whether a redex does not allocate memory +fn redex_would_shrink(a: &Port, b: &Port) -> bool { + (*a == Port::ERA || *b == Port::ERA) + || (a.tag() == Tag::Ctr && b.tag() == Tag::Ctr && a.lab() == b.lab()) + || (!(a.tag() == Tag::Ref || b.tag() == Tag::Ref) && (a.tag() == Tag::Num || b.tag() == Tag::Num)) +} diff --git a/src/run/net.rs b/src/run/net.rs index 89b1cbe2..d4da5068 100644 --- a/src/run/net.rs +++ b/src/run/net.rs @@ -4,7 +4,7 @@ use super::*; /// An interaction combinator net. pub struct Net<'a, M: Mode> { - linker: Linker<'a, M>, + pub(super) linker: Linker<'a, M>, pub tid: usize, // thread id pub tids: usize, // thread count pub trgs: Box<[MaybeUninit]>, @@ -44,6 +44,7 @@ impl<'a, M: Mode> Net<'a, M> { pub fn reduce(&mut self, limit: usize) -> usize { assert!(!M::LAZY); let mut count = 0; + while let Some((a, b)) = self.redexes.pop() { self.interact(a, b); count += 1; diff --git a/src/run/parallel.rs b/src/run/parallel.rs index 1fc97fde..65e4acf8 100644 --- a/src/run/parallel.rs +++ b/src/run/parallel.rs @@ -3,19 +3,23 @@ use super::*; impl<'h, M: Mode> Net<'h, M> { /// Forks the net into `tids` child nets, for parallel operation. pub fn fork(&mut self, tids: usize) -> impl Iterator + '_ { - let mut redexes = std::mem::take(&mut self.redexes).into_iter(); + let redexes_len = self.linker.redexes.len(); + let mut redexes = self.linker.redexes.drain(); + let heap = &self.linker.allocator.heap; + let next = &self.linker.allocator.next; + let root = &self.root; (0 .. tids).map(move |tid| { - let heap_size = (self.heap.0.len() / tids) & !63; // round down to needed alignment + let heap_size = (heap.0.len() / tids) & !63; // round down to needed alignment let heap_start = heap_size * tid; - let area = unsafe { std::mem::transmute(&self.heap.0[heap_start .. heap_start + heap_size]) }; - let mut net = Net::new_with_root(area, self.root.clone()); - net.next = self.next.saturating_sub(heap_start); + let area = unsafe { std::mem::transmute(&heap.0[heap_start .. heap_start + heap_size]) }; + let mut net = Net::new_with_root(area, root.clone()); + net.next = next.saturating_sub(heap_start); net.head = if tid == 0 { net.head } else { Addr::NULL }; net.tid = tid; net.tids = tids; net.tracer.set_tid(tid); - let count = redexes.len() / (tids - tid); - net.redexes.extend((&mut redexes).take(count)); + let count = redexes_len / (tids - tid); + (&mut redexes).take(count).for_each(|i| net.redux(i.0, i.1)); net }) } @@ -37,7 +41,7 @@ impl<'h, M: Mode> Net<'h, M> { net: Net<'a, M>, // thread's own net object delta: &'a AtomicRewrites, // global delta rewrites share: &'a Vec<(AtomicU64, AtomicU64)>, // global share buffer - rlens: &'a Vec, // global redex lengths + rlens: &'a Vec, // global redex lengths (only counting shareable ones) total: &'a AtomicUsize, // total redex length barry: Arc, // synchronization barrier } @@ -70,8 +74,6 @@ impl<'h, M: Mode> Net<'h, M> { } }); - // Clear redexes and sum stats - self.redexes.clear(); delta.add_to(&mut self.rwts); // Main reduction loop @@ -106,7 +108,7 @@ impl<'h, M: Mode> Net<'h, M> { ctx.barry.wait(); ctx.total.store(0, Relaxed); ctx.barry.wait(); - ctx.rlens[ctx.tid].store(ctx.net.redexes.len(), Relaxed); + ctx.rlens[ctx.tid].store(ctx.net.redexes.slow.len(), Relaxed); ctx.total.fetch_add(ctx.net.redexes.len(), Relaxed); ctx.barry.wait(); ctx.total.load(Relaxed) @@ -120,7 +122,7 @@ impl<'h, M: Mode> Net<'h, M> { let shift = (1 << (plog2 - 1)) >> (ctx.tick % plog2); let a_tid = ctx.tid; let b_tid = if side == 1 { a_tid - shift } else { a_tid + shift }; - let a_len = ctx.net.redexes.len(); + let a_len = ctx.net.redexes.slow.len(); let b_len = ctx.rlens[b_tid].load(Relaxed); let send = if a_len > b_len { (a_len - b_len) / 2 } else { 0 }; let recv = if b_len > a_len { (b_len - a_len) / 2 } else { 0 }; @@ -128,8 +130,8 @@ impl<'h, M: Mode> Net<'h, M> { let recv = std::cmp::min(recv, SHARE_LIMIT); for i in 0 .. send { let init = a_len - send * 2; - let rdx0 = ctx.net.redexes.get_unchecked(init + i * 2 + 0).clone(); - let rdx1 = ctx.net.redexes.get_unchecked(init + i * 2 + 1).clone(); + let rdx0 = ctx.net.redexes.slow[init + i * 2 + 0].clone(); + let rdx1 = ctx.net.redexes.slow[init + i * 2 + 1].clone(); //let init = 0; //let ref0 = ctx.net.redexes.get_unchecked_mut(init + i * 2 + 0); //let rdx0 = *ref0; @@ -138,15 +140,15 @@ impl<'h, M: Mode> Net<'h, M> { //let rdx1 = *ref1; //*ref1 = (Ptr(0), Ptr(0)); let targ = ctx.share.get_unchecked(b_tid * SHARE_LIMIT + i); - *ctx.net.redexes.get_unchecked_mut(init + i) = rdx0; + ctx.net.redexes.slow[init + i] = rdx0; targ.0.store(rdx1.0.0, Relaxed); targ.1.store(rdx1.1.0, Relaxed); } - ctx.net.redexes.truncate(a_len - send); + ctx.net.redexes.slow.truncate(a_len - send); ctx.barry.wait(); for i in 0 .. recv { let got = ctx.share.get_unchecked(a_tid * SHARE_LIMIT + i); - ctx.net.redexes.push((Port(got.0.load(Relaxed)), Port(got.1.load(Relaxed)))); + ctx.net.redexes.slow.push((Port(got.0.load(Relaxed)), Port(got.1.load(Relaxed)))); } } } diff --git a/src/transform/pre_reduce.rs b/src/transform/pre_reduce.rs index cc968e34..b8cf6916 100644 --- a/src/transform/pre_reduce.rs +++ b/src/transform/pre_reduce.rs @@ -143,7 +143,7 @@ impl<'a> State<'a> { self.rewrites += rt.rwts; // Move interactions with inert defs back into the net redexes array - rt.redexes.extend(self.captured_redexes.lock().unwrap().drain(..)); + self.captured_redexes.lock().unwrap().drain(..).for_each(|r| rt.redux(r.0, r.1)); let net = self.host.readback(&mut rt); diff --git a/tests/tests.rs b/tests/tests.rs index 5e79be24..eee8a7de 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -91,9 +91,14 @@ fn test_run(name: &str, host: Arc>) { fn test_pre_reduce_run(path: &str, mut book: Book) { print!("{path}..."); print!(" pre-reduce"); + io::stdout().flush().unwrap(); + + let start = Instant::now(); let pre_stats = book.pre_reduce(&|x| x == "main", None, u64::MAX); - let host = hvmc::stdlib::create_host(&book); + print!(" {:.3?}...", start.elapsed()); + io::stdout().flush().unwrap(); + let host = hvmc::stdlib::create_host(&book); let Some((mut rwts, net)) = execute_host(host) else { assert_snapshot!(show_rewrites(&pre_stats.rewrites)); return;