Skip to content

Commit

Permalink
Keep all statistics with the object
Browse files Browse the repository at this point in the history
- added $!stats attribute to ParaSeq
- added $!ordinal attribute to ParaStats
- make sure all ParaStats objects are created with ordinal number
- removed the ParaSeq $!batch attribute, it is not needed
- renamed the ParaSeq $!initial attribute to $!batchq
  • Loading branch information
lizmat committed May 19, 2024
1 parent 6c003e0 commit 44f8f5d
Showing 1 changed file with 79 additions and 71 deletions.
150 changes: 79 additions & 71 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ my class ParaQueue is repr('ConcBlockingQueue') { }
# from a batch of values

my class ParaStats {
has uint $.ordinal;
has uint $.elems;
has uint $.nsecs;

method new(uint $elems, uint $nsecs) {
method new(uint $ordinal, uint $elems, uint $nsecs) {
my $self := nqp::create(self);
nqp::bindattr_i($self,ParaStats,'$!elems',$elems);
nqp::bindattr_i($self,ParaStats,'$!nsecs',$nsecs);
nqp::bindattr_i($self, ParaStats, '$!ordinal',$ordinal);
nqp::bindattr_i($self, ParaStats, '$!elems', $elems );
nqp::bindattr_i($self, ParaStats, '$!nsecs', $nsecs );
$self
}
}
Expand Down Expand Up @@ -129,87 +131,95 @@ class ParaSeq {
has $!buffer; # first buffer
has $!source; # iterator producing source values
has $!result; # iterator producing result values
has $!stats; # list with ParaStats objects
has $!SCHEDULER; # $*SCHEDULER to be used
has uint $.degree; # number of CPUs, must be > 1
has uint $.initial; # initial batch size, must be > 0
has uint $.batch; # current batch size, must be > 0
has uint $.batch; # initial batch size, must be > 0
has uint $!stop; # stop all processing if 1

#- private helper methods ------------------------------------------------------

# Do error checking and set up object if all ok
method !setup(
str $method, uint $initial, uint $degree, $buffer, $source
str $method, uint $batch, uint $degree, $buffer, $source
) is hidden-from-backtrace {
X::Invalid::Value.new(:$method, :name<batch>, :value($initial)).throw
if $initial <= 0;
X::Invalid::Value.new(:$method, :name<batch>, :value($batch)).throw
if $batch <= 0;
X::Invalid::Value.new(:$method, :name<degree>, :value($degree)).throw
if $degree <= 1;

# Set it up!
my $self := nqp::create(self);
nqp::bindattr_i($self, ParaSeq, '$!degree', $degree);
nqp::bindattr_i($self, ParaSeq, '$!batch',
nqp::bindattr_i($self, ParaSeq, '$!initial', $initial)
);
nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $*SCHEDULER );
nqp::bindattr($self, ParaSeq, '$!buffer', nqp::decont($buffer));
nqp::bindattr($self, ParaSeq, '$!source', $source );
nqp::bindattr_i($self, ParaSeq, '$!batch', $batch );
nqp::bindattr_i($self, ParaSeq, '$!degree', $degree );
nqp::bindattr( $self, ParaSeq, '$!SCHEDULER', $*SCHEDULER );
nqp::bindattr( $self, ParaSeq, '$!buffer', nqp::decont($buffer));
nqp::bindattr( $self, ParaSeq, '$!source', $source );
$self
}

# Start the async process with the first buffer and the given buffer
# queuing logic
# queuing logic and required granularity for producing values
method !start(&queue-buffer, uint $granularity = 1) {

# Logic for making sure batch size has correct granularity
my sub granulize(uint $batch) {
$batch %% $granularity
?? $batch
!! $batch div $granularity * $granularity || $granularity
}

# Local copy of first buffer
my $first := $!buffer;
$!buffer := Mu; # release buffer in object

# Set up back pressure queue
# Set up initial back pressure queue allowing for one batch to
# be created for each worker thread
my $pressure := nqp::create(ParaQueue);
nqp::push($pressure, Mu) for ^$!degree;

# Queue the first buffer we already filled, and set up the
# result iterator
$!result := my $result :=
ParaIterator.new(queue-buffer($first), $pressure);

# Make sure batch size has the right granularity
$!batch = $!batch div $granularity * $granularity || $granularity
unless $!batch %% $granularity;
$!result := ParaIterator.new(queue-buffer(0, $first), $pressure);
$!stats := nqp::list;

# Make sure the scheduling of further batches actually happen in a
# separate thread
my $source := $!source;
$!SCHEDULER.cue: {
my uint $ordinal; # ordinal number of batch
my uint $exhausted; # flag: 1 if exhausted

# some shortcuts
my $source := $!source;
my $result := $!result;
my $stats := $!stats;

# Make sure batch size has the right granularity
my uint $batch = granulize($!batch);

# Until we're halted or have a buffer that's not full
nqp::until(
$!stop
|| nqp::eqaddr(
(my $stats := nqp::shift($pressure)),
IterationEnd
),
$!stop # complete shutdown requested
|| $exhausted, # nothing left to batch
nqp::stmts(
nqp::if(
nqp::eqaddr(
$source.push-exactly(
(my $buffer := nqp::create(IterationBuffer)),
$!batch # intentionally use the attribute so that any
# changes to it will be reflected then
),
IterationEnd
nqp::if( # wait for ok to proceed
nqp::isconcrete(my $ok := nqp::shift($pressure)),
nqp::bindpos($stats, $ok.ordinal, $ok)
),
($exhausted = nqp::eqaddr(
$source.push-exactly(
(my $buffer := nqp::create(IterationBuffer)),
$batch
),
($!stop = 1),
$result.add-queue(queue-buffer($buffer))
IterationEnd
)),
nqp::if( # add if something to add
nqp::elems($buffer),
$result.add-queue(queue-buffer(++$ordinal, $buffer))
)
)
);

# Some leftovers to process?
$result.add-queue(queue-buffer($buffer)) if nqp::elems($buffer);

# No more result queues will come
$result.add-queue(IterationEnd);
}
Expand All @@ -221,17 +231,15 @@ class ParaSeq {
# Entry point in chain, from an Iterator
method !pass-the-chain(\source) {
my $self := nqp::create(self);
nqp::bindattr_i($self, ParaSeq, '$!degree', $!degree);
nqp::bindattr_i($self, ParaSeq, '$!batch',
nqp::bindattr_i($self, ParaSeq, '$!initial', $!initial)
);
nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $!SCHEDULER);
nqp::bindattr($self, ParaSeq, '$!source', source );
nqp::bindattr_i($self, ParaSeq, '$!degree', $!degree );
nqp::bindattr_i($self, ParaSeq, '$!batch', $!batch );
nqp::bindattr( $self, ParaSeq, '$!SCHEDULER', $!SCHEDULER);
nqp::bindattr( $self, ParaSeq, '$!source', source );
$self
}

# Mark the given queue as done
method !queue-done(uint $then, $buffer, $queue) {
method !queue-done(uint $ordinal, uint $then, $buffer, $queue) {
my uint $delta = nqp::time() - $then;

# Indicate this result queue is done
Expand All @@ -245,6 +253,7 @@ class ParaSeq {
nqp::push(
nqp::decont($queue),
ParaStats.new(
$ordinal,
nqp::elems(nqp::decont($buffer)),
nqp::time() - $then
)
Expand Down Expand Up @@ -292,12 +301,11 @@ class ParaSeq {

#- introspection ---------------------------------------------------------------

method default-batch() { $default-batch }
method default-degree() { $default-degree }
method default-batch() { $default-batch }

method degree( ParaSeq:D:) { $!degree }
method initial-batch(ParaSeq:D:) { $!initial }
method current-batch(ParaSeq:D:) { $!batch }
method degree(ParaSeq:D:) { $!degree }
method batch( ParaSeq:D:) { $!batch }

#- map -------------------------------------------------------------------------

Expand All @@ -306,7 +314,7 @@ class ParaSeq {

# Logic for queuing a buffer for map
my $SCHEDULER := $!SCHEDULER;
sub queue-buffer($buffer) {
sub queue-buffer(uint $ordinal, $buffer) {
my $queue := nqp::create(ParaQueue);
$SCHEDULER.cue: {
my uint $then = nqp::time;
Expand All @@ -316,7 +324,7 @@ class ParaSeq {
nqp::eqaddr((my $pulled := $iterator.pull-one),IterationEnd),
nqp::push($queue,$pulled)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}
Expand All @@ -334,7 +342,7 @@ class ParaSeq {
my uint $base; # base offset for :k, :kv, :p

# Logic for queuing a buffer for bare grep { }, producing values
sub v($buffer) {
sub v(uint $ordinal, $buffer) {
my $queue := nqp::create(ParaQueue);
$SCHEDULER.cue: {
my uint $then = nqp::time;
Expand All @@ -344,13 +352,13 @@ class ParaSeq {
nqp::eqaddr((my $pulled := $iterator.pull-one),IterationEnd),
nqp::push($queue,$pulled)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}

# Logic for queuing a buffer for grep { } :k
sub k($buffer) {
sub k(uint $ordinal, $buffer) {
my uint $offset = $base;
$base = $base + nqp::elems(nqp::decont($buffer));

Expand All @@ -363,13 +371,13 @@ class ParaSeq {
nqp::eqaddr((my $key := $iterator.pull-one),IterationEnd),
nqp::push($queue,$offset + $key)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}

# Logic for queuing a buffer for grep { } :kv
sub kv($buffer) {
sub kv(uint $ordinal, $buffer) {
my uint $offset = $base;
$base = $base + nqp::elems(nqp::decont($buffer));

Expand All @@ -385,13 +393,13 @@ class ParaSeq {
nqp::push($queue,$iterator.pull-one) # value
)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}

# Logic for queuing a buffer for grep { } :p
sub p($buffer) {
sub p(uint $ordinal, $buffer) {
my uint $offset = $base;
$base = $base + nqp::elems(nqp::decont($buffer));

Expand All @@ -407,7 +415,7 @@ class ParaSeq {
Pair.new($offset + $key, $iterator.pull-one)
)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}
Expand All @@ -424,7 +432,7 @@ class ParaSeq {
my uint $base; # base offset for :k, :kv, :p

# Logic for queuing a buffer for grep /.../
sub v($buffer) {
sub v(uint $ordinal, $buffer) {
my $queue := nqp::create(ParaQueue);
$SCHEDULER.cue: {
my uint $then = nqp::time;
Expand All @@ -434,13 +442,13 @@ class ParaSeq {
nqp::eqaddr((my $pulled := $iterator.pull-one),IterationEnd),
nqp::push($queue,$pulled)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}

# Logic for queuing a buffer for grep /.../ :k
sub k($buffer) {
sub k(uint $ordinal, $buffer) {
my uint $offset = $base;
$base = $base + nqp::elems(nqp::decont($buffer));

Expand All @@ -453,13 +461,13 @@ class ParaSeq {
nqp::eqaddr((my $key := $iterator.pull-one),IterationEnd),
nqp::push($queue,$offset + $key)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}

# Logic for queuing a buffer for grep /.../ :kv
sub kv($buffer) {
sub kv(uint $ordinal, $buffer) {
my uint $offset = $base;
$base = $base + nqp::elems(nqp::decont($buffer));

Expand All @@ -475,13 +483,13 @@ class ParaSeq {
nqp::push($queue,$iterator.pull-one) # value
)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}

# Logic for queuing a buffer for grep /.../ :p
sub p($buffer) {
sub p(uint $ordinal, $buffer) {
my uint $offset = $base;
$base = $base + nqp::elems(nqp::decont($buffer));

Expand All @@ -497,7 +505,7 @@ class ParaSeq {
Pair.new($offset + $key, $iterator.pull-one)
)
);
self!queue-done($then, $buffer, $queue);
self!queue-done($ordinal, $then, $buffer, $queue);
}
$queue
}
Expand Down

0 comments on commit 44f8f5d

Please sign in to comment.