Skip to content

Commit

Permalink
First version with batch adjusting algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
lizmat committed May 20, 2024
1 parent 44f8f5d commit b0a1b29
Showing 1 changed file with 104 additions and 30 deletions.
134 changes: 104 additions & 30 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,6 @@ my uint $default-degree = Kernel.cpu-cores-but-one;
# can nqp::shift
my class ParaQueue is repr('ConcBlockingQueue') { }

#- ParaStats -------------------------------------------------------------------
# A class for keeping stats about the production of a single result queue
# from a batch of values

my class ParaStats {
has uint $.ordinal;
has uint $.elems;
has uint $.nsecs;

method new(uint $ordinal, uint $elems, uint $nsecs) {
my $self := nqp::create(self);
nqp::bindattr_i($self, ParaStats, '$!ordinal',$ordinal);
nqp::bindattr_i($self, ParaStats, '$!elems', $elems );
nqp::bindattr_i($self, ParaStats, '$!nsecs', $nsecs );
$self
}
}

#- BufferIterator --------------------------------------------------------------
# An iterator that takes a buffer and an iterator, and first produces all
# of the values from the buffer, and then from the iterator
Expand Down Expand Up @@ -125,6 +107,30 @@ my class ParaIterator does Iterator {
}
}

#- ParaStats -------------------------------------------------------------------
# A class for keeping stats about the production of a single result queue
# from a batch of values

class ParaStats {
has uint $.ordinal;
has uint $.batch;
has uint $.nsecs;

method new(uint $ordinal, uint $batch, uint $nsecs) {
my $self := nqp::create(self);
nqp::bindattr_i($self, ParaStats, '$!ordinal',$ordinal);
nqp::bindattr_i($self, ParaStats, '$!batch', $batch );
nqp::bindattr_i($self, ParaStats, '$!nsecs', $nsecs );
$self
}

method average-nsecs(ParaStats:D:) { $!nsecs div $!batch }

multi method gist(ParaStats:D:) {
"#$!ordinal: $!batch (" ~ $!nsecs div $!batch ~ " nsecs)"
}
}

#- ParaSeq ---------------------------------------------------------------------
# The class containing all of the logic for parallel sequences
class ParaSeq {
Expand Down Expand Up @@ -166,22 +172,28 @@ class ParaSeq {
my sub granulize(uint $batch) {
$batch %% $granularity
?? $batch
!! $batch div $granularity * $granularity || $granularity
!! ($batch div $granularity * $granularity) || $granularity
}

# Local copy of first buffer
# Local copy of first buffer, make sure it is granulized correctly
my $first := $!buffer;
$!buffer := Mu; # release buffer in object
nqp::until(
nqp::elems($first) %% $granularity
|| nqp::eqaddr((my $pulled := $!source.pull-one),IterationEnd),
nqp::push($first,$pulled)
);

# Set up initial back pressure queue allowing for one batch to
# be created for each worker thread
my $pressure := nqp::create(ParaQueue);
nqp::push($pressure, Mu) for ^$!degree;
my uint $degree = $!degree;
my $pressure := nqp::create(ParaQueue);
nqp::push($pressure, Mu) for ^$degree;

# Queue the first buffer we already filled, and set up the
# result iterator
$!result := ParaIterator.new(queue-buffer(0, $first), $pressure);
$!stats := nqp::list;
$!stats := nqp::create(IterationBuffer);

# Make sure the scheduling of further batches actually happen in a
# separate thread
Expand All @@ -197,23 +209,78 @@ class ParaSeq {
# Make sure batch size has the right granularity
my uint $batch = granulize($!batch);

# Set up initial batch sizes to introduce sufficient noise
# to make sure we won't be influenced too much by the random
# noise that is already involved in doing multi-threading
my int $checkpoints = 3 * $degree;
my uint @initial = (^$checkpoints).map: {
granulize(nqp::coerce_ni($batch * (0.5e0 + 10 * 1.rand)))
}

# Initial noisifying in setting the batch size
my multi sub tweak-batch(Mu $ok) {
$batch = nqp::shift_i(@initial) if nqp::elems(@initial);
}

# Shortcut to the last N results
my uint @avg-nsecs;
my uint @batch;

# Logic for tweaking the batch size by looking at the last N
# stats that have been collected. If enough stats collected
# then zero in on the batch size with the lowest average nsecs
my multi sub tweak-batch(ParaStats:D $ok) {
my uint $lowest = $ok.average-nsecs;

# Still some initially fuzzed batch sizes
if nqp::elems(@initial) {
$batch = nqp::shift_i(@initial);
}

# No more fuzzed, need to calculate
else {
my uint $new = $ok.batch;
my uint $m = nqp::elems(@avg-nsecs);
my int $i = -1;
nqp::while(
++$i < $m,
nqp::if(
nqp::isle_i(nqp::atpos_i(@avg-nsecs,$i),$lowest),
nqp::stmts(
($lowest = nqp::atpos_i(@avg-nsecs,$i)),
($new = nqp::atpos_i(@batch,$i))
)
)
);

# Remove oldest
nqp::shift_i(@avg-nsecs);
nqp::shift_i(@batch);

# Set new batch size with a higher tendency
$batch = granulize(nqp::coerce_ni($new * 1.2e0));
}

# Now update the lists of stats
nqp::push_i(@avg-nsecs, $lowest);
nqp::push_i(@batch, $ok.batch);
nqp::push($stats, $ok);
}

# Until we're halted or have a buffer that's not full
nqp::until(
$!stop # complete shutdown requested
|| $exhausted, # nothing left to batch
nqp::stmts(
nqp::if( # wait for ok to proceed
nqp::isconcrete(my $ok := nqp::shift($pressure)),
nqp::bindpos($stats, $ok.ordinal, $ok)
),
tweak-batch(nqp::shift($pressure)), # wait for ok to proceed
($exhausted = nqp::eqaddr(
$source.push-exactly(
(my $buffer := nqp::create(IterationBuffer)),
$batch
),
IterationEnd
)),
nqp::if( # add if something to add
nqp::if( # add if something to add
nqp::elems($buffer),
$result.add-queue(queue-buffer(++$ordinal, $buffer))
)
Expand Down Expand Up @@ -304,8 +371,15 @@ class ParaSeq {
method default-degree() { $default-degree }
method default-batch() { $default-batch }

method degree(ParaSeq:D:) { $!degree }
method batch( ParaSeq:D:) { $!batch }
method degree(ParaSeq:D:) { $!degree }
method batch( ParaSeq:D:) { $!batch }
method stats( ParaSeq:D:) { $!stats.List }

method stopped(ParaSeq:D:) { nqp::hllbool($!stop) }

method average-nsecs(ParaSeq:D:) {
$!stats.List.map(*.average-nsecs).sum div $!stats.elems
}

#- map -------------------------------------------------------------------------

Expand Down

0 comments on commit b0a1b29

Please sign in to comment.