diff --git a/lib/ParaSeq.rakumod b/lib/ParaSeq.rakumod index c4075de..88e2cc4 100644 --- a/lib/ParaSeq.rakumod +++ b/lib/ParaSeq.rakumod @@ -63,10 +63,12 @@ my class ParaIterator does Iterator { has atomicint $!stop; # stop all processing if 1 method new(\parent, \pressure) { + # Initial buffer being empty, force fetching first batch immediately + my constant $empty = nqp::create(IterationBuffer); + my $self := nqp::create(self); nqp::bindattr($self,ParaIterator,'$!parent',parent); - nqp::push((my $current := nqp::create(IterationBuffer)),Mu); - nqp::bindattr($self,ParaIterator,'$!current',$current); + nqp::bindattr($self,ParaIterator,'$!current',$empty); nqp::bindattr($self,ParaIterator,'$!queues',nqp::create(ParaQueue)); nqp::bindattr($self,ParaIterator,'$!pressure',pressure); $self @@ -90,22 +92,26 @@ my class ParaIterator does Iterator { method !next-batch(\next) { nqp::push($!pressure,Mu); # initiate more work + $!delivered = nqp::add_i( $!delivered, - nqp::sub_i(nqp::elems($!current := nqp::shift(next)),1) + nqp::elems(my $buffer := nqp::shift(next)) ); - $!current + $buffer } method pull-one() { - my $pulled := nqp::shift($!current); - nqp::until( + nqp::if( nqp::elems($!current), + (my $pulled := nqp::shift($!current)), nqp::if( nqp::eqaddr((my $next := nqp::shift($!queues)),IterationEnd), (return IterationEnd), - ($pulled := nqp::shift(self!next-batch($next))) + nqp::stmts( + ($!current := self!next-batch($next)), + ($pulled := self.pull-one) + ) ) ); @@ -121,7 +127,7 @@ my class ParaIterator does Iterator { nqp::until( nqp::atomicload_i($!stop), nqp::stmts( - (my uint $elems = nqp::elems($current) - 1), + (my uint $elems = nqp::elems($current)), nqp::if( $elems > $toskip, # can't ignore whole buffer, only part nqp::stmts( @@ -134,7 +140,7 @@ my class ParaIterator does Iterator { (return 0) # really done ), ($toskip = nqp::sub_i($toskip,$elems)), - ($current := self!next-batch($next)) + ($current := $!current := self!next-batch($next)) ) ); @@ -149,13 +155,12 @@ my class ParaIterator does Iterator { nqp::until( nqp::atomicload_i($!stop), nqp::stmts( - (my $stats := nqp::pop($current)), target.append($current), nqp::if( nqp::eqaddr((my $next := nqp::shift($queues)),IterationEnd), (return IterationEnd), ($current := self!next-batch($next)) - ), + ) ) ); @@ -518,7 +523,7 @@ class ParaSeq { nqp::atomicadd_i($!processed,$processed); nqp::atomicadd_i($!produced, $produced ); nqp::atomicadd_i($!nsecs, $nsecs ); - ParaStats.new($ordinal, $processed, $produced, $nsecs) +# ParaStats.new($ordinal, $processed, $produced, $nsecs) } # Mark the given queue as done @@ -529,14 +534,14 @@ class ParaSeq { # If we're still running unless nqp::atomicload_i($!stop) { # Push the statistics as the last element in the output - nqp::push( - nqp::decont($output), +# nqp::push( +# nqp::decont($output), self!add-stats( $ordinal, nqp::elems(nqp::decont($input)), nqp::elems(nqp::decont($output)), nqp::time() - $then - ) +# ) ); # Make the produced values available to the result iterator