Skip to content

Commit

Permalink
Remove the Stats entry from each buffer
Browse files Browse the repository at this point in the history
As we're moving away from using the stats for load control, to being
a completely optional introspection / debugging feature
  • Loading branch information
lizmat committed May 22, 2024
1 parent 329f211 commit 614c388
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ my class ParaIterator does Iterator {
has atomicint $!stop; # stop all processing if 1

method new(\parent, \pressure) {
# Initial buffer being empty, force fetching first batch immediately
my constant $empty = nqp::create(IterationBuffer);

my $self := nqp::create(self);
nqp::bindattr($self,ParaIterator,'$!parent',parent);
nqp::push((my $current := nqp::create(IterationBuffer)),Mu);
nqp::bindattr($self,ParaIterator,'$!current',$current);
nqp::bindattr($self,ParaIterator,'$!current',$empty);
nqp::bindattr($self,ParaIterator,'$!queues',nqp::create(ParaQueue));
nqp::bindattr($self,ParaIterator,'$!pressure',pressure);
$self
Expand All @@ -90,22 +92,26 @@ my class ParaIterator does Iterator {

method !next-batch(\next) {
nqp::push($!pressure,Mu); # initiate more work

$!delivered = nqp::add_i(
$!delivered,
nqp::sub_i(nqp::elems($!current := nqp::shift(next)),1)
nqp::elems(my $buffer := nqp::shift(next))
);
$!current
$buffer
}

method pull-one() {
my $pulled := nqp::shift($!current);

nqp::until(
nqp::if(
nqp::elems($!current),
(my $pulled := nqp::shift($!current)),
nqp::if(
nqp::eqaddr((my $next := nqp::shift($!queues)),IterationEnd),
(return IterationEnd),
($pulled := nqp::shift(self!next-batch($next)))
nqp::stmts(
($!current := self!next-batch($next)),
($pulled := self.pull-one)
)
)
);

Expand All @@ -121,7 +127,7 @@ my class ParaIterator does Iterator {
nqp::until(
nqp::atomicload_i($!stop),
nqp::stmts(
(my uint $elems = nqp::elems($current) - 1),
(my uint $elems = nqp::elems($current)),
nqp::if(
$elems > $toskip, # can't ignore whole buffer, only part
nqp::stmts(
Expand All @@ -134,7 +140,7 @@ my class ParaIterator does Iterator {
(return 0) # really done
),
($toskip = nqp::sub_i($toskip,$elems)),
($current := self!next-batch($next))
($current := $!current := self!next-batch($next))
)
);

Expand All @@ -149,13 +155,12 @@ my class ParaIterator does Iterator {
nqp::until(
nqp::atomicload_i($!stop),
nqp::stmts(
(my $stats := nqp::pop($current)),
target.append($current),
nqp::if(
nqp::eqaddr((my $next := nqp::shift($queues)),IterationEnd),
(return IterationEnd),
($current := self!next-batch($next))
),
)
)
);

Expand Down Expand Up @@ -518,7 +523,7 @@ class ParaSeq {
nqp::atomicadd_i($!processed,$processed);
nqp::atomicadd_i($!produced, $produced );
nqp::atomicadd_i($!nsecs, $nsecs );
ParaStats.new($ordinal, $processed, $produced, $nsecs)
# ParaStats.new($ordinal, $processed, $produced, $nsecs)
}

# Mark the given queue as done
Expand All @@ -529,14 +534,14 @@ class ParaSeq {
# If we're still running
unless nqp::atomicload_i($!stop) {
# Push the statistics as the last element in the output
nqp::push(
nqp::decont($output),
# nqp::push(
# nqp::decont($output),
self!add-stats(
$ordinal,
nqp::elems(nqp::decont($input)),
nqp::elems(nqp::decont($output)),
nqp::time() - $then
)
# )
);

# Make the produced values available to the result iterator
Expand Down

0 comments on commit 614c388

Please sign in to comment.