Skip to content

Commit

Permalink
Add ParaIterator $!delivered attribute
Browse files Browse the repository at this point in the history
This attribute indicates how many values have been delivered so far,
including all values from the current batch.

This should be used later to get a better approach to parallelization,
instead of going full out on all available CPUs, potentiallyi significantly
overshooting the target of number of values that actually needed to be
produced
  • Loading branch information
lizmat committed May 22, 2024
1 parent df1a128 commit 329f211
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ my class BufferIterator does Iterator {
# and removed in a thread-safe manner

my class ParaIterator does Iterator {
has $!parent; # ParaSeq parent object
has $!current; # current producer
has $!queues; # other queues to produce from
has $!pressure; # backpressure provider
has atomicint $!stop; # stop all processing if 1
has $!parent; # ParaSeq parent object
has $!current; # current producer
has $!queues; # other queues to produce from
has $!pressure; # backpressure provider
has uint $!delivered; # number of values that can be delivered
has atomicint $!stop; # stop all processing if 1

method new(\parent, \pressure) {
my $self := nqp::create(self);
Expand Down Expand Up @@ -87,20 +88,24 @@ my class ParaIterator does Iterator {
nqp::push($!queues,nqp::create(ParaQueue))
}

method !next-batch(\next) {
nqp::push($!pressure,Mu); # initiate more work
$!delivered = nqp::add_i(
$!delivered,
nqp::sub_i(nqp::elems($!current := nqp::shift(next)),1)
);
$!current
}

method pull-one() {
my $pulled := nqp::shift($!current);

nqp::until(
nqp::elems($!current), # queue exhausted
nqp::elems($!current),
nqp::if(
nqp::eqaddr((my $next := nqp::shift($!queues)),IterationEnd),
(return IterationEnd), # no queue left, done
nqp::stmts( # one more queue
nqp::push($!pressure,$pulled), # allow more work
$pulled := nqp::shift( # first value of
$!current := nqp::shift($next) # the next queue
)
)
(return IterationEnd),
($pulled := nqp::shift(self!next-batch($next)))
)
);

Expand Down Expand Up @@ -128,9 +133,8 @@ my class ParaIterator does Iterator {
nqp::eqaddr((my $next := nqp::shift($queues)),IterationEnd),
(return 0) # really done
),
nqp::push($pressure,nqp::pop($current)), # allow more work
($toskip = $toskip - $elems), # seen these
$current := nqp::shift($next) # the next queue
($toskip = nqp::sub_i($toskip,$elems)),
($current := self!next-batch($next))
)
);

Expand All @@ -149,10 +153,9 @@ my class ParaIterator does Iterator {
target.append($current),
nqp::if(
nqp::eqaddr((my $next := nqp::shift($queues)),IterationEnd),
(return IterationEnd) # really done
(return IterationEnd),
($current := self!next-batch($next))
),
nqp::push($pressure,$stats), # allow more work
$current := nqp::shift($next) # next queue
)
);

Expand Down

0 comments on commit 329f211

Please sign in to comment.