Skip to content

Commit

Permalink
Fix some issues with storing from a ParaSeq
Browse files Browse the repository at this point in the history
  • Loading branch information
lizmat committed Aug 24, 2024
1 parent e701a2c commit c6ba033
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 36 deletions.
61 changes: 25 additions & 36 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -619,49 +619,39 @@ my class ParaIterator does Iterator {

# Optimized version appending to an IterationBuffer
multi method push-all(ParaIterator:D: IterationBuffer:D \target) {
my $current := $!current;
my $current := $!current;

nqp::while(
1,
nqp::stmts(
nqp::splice(target,$current,nqp::elems(target),0),
nqp::if(
nqp::eqaddr((my $next := self!next),IE),
(return IE),
($current := self!next-batch($next))
)
)
);
loop {
nqp::splice(target,$current,nqp::elems(target),0);
nqp::eqaddr((my $next := self!next),IE)
?? (return IE)
!! ($current := self!next-batch($next));
}
}

# Slower generic version that needs to coerce each buffer to a List
# to ensure the correct semantics with .append
multi method push-all(ParaIterator:D: \target) {
my $current := $!current;
multi method push-all(ParaIterator:D: Any:D \target) {
my $current := $!current;

nqp::while(
1,
nqp::stmts(
target.append($current.List),
nqp::if(
nqp::eqaddr((my $next := self!next),IE),
(return IE),
($current := self!next-batch($next))
)
)
);
loop {
nqp::while(
nqp::elems($current),
target.push(nqp::shift($current))
);
nqp::eqaddr((my $next := self!next),IE)
?? (return IE)
!! ($current := self!next-batch($next));
}
}

# Optimized version used for its side-effects
method sink-all(ParaIterator:D:) {
nqp::while(
1,
nqp::if(
nqp::eqaddr((my $next := self!next),IE),
(return IE),
self!next-batch($next)
)
);
loop {
nqp::eqaddr((my $next := self!next),IE)
?? (return IE)
!! self!next-batch($next);
}
}

method close-queues(ParaIterator:D:) is implementation-detail {
Expand Down Expand Up @@ -791,11 +781,11 @@ class ParaSeq is Seq {

# Perform the actual cueing
$!catch
?? $!SCHEDULER.cue: &code.clone, :catch({
?? $!SCHEDULER.cue: nqp::p6capturelex(&code.clone), :catch({
nqp::push($exceptions,$_);
try .resume;
})
!! $!SCHEDULER.cue: &code.clone
!! $!SCHEDULER.cue: nqp::p6capturelex(&code.clone)
}

# Set up object and return it
Expand Down Expand Up @@ -883,7 +873,6 @@ class ParaSeq is Seq {
my uint $ordinal; # ordinal number of batch
my uint $exhausted; # flag: 1 if exhausted


# Queue the first buffer we already filled, and set up the
# result iterator
if nqp::isconcrete($first) {
Expand Down
22 changes: 22 additions & 0 deletions t/51-store.rakutest
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use Test;
use ParaSeq;

plan 4;

my constant multiplier = 10000;
my $code = $*PROGRAM.slurp x multiplier;
my @codes = $code xx 100;
my atomicint $seen;
my atomicint $started;

my @a = @codes.&hyperize.map({
++⚛$started;
.lines.&hyperize.map({ ++⚛$seen, $_ if .contains("clean")}).elems
});

is-deeply $started, +@codes, 'did we start all parts';
is-deeply @a.elems, +@codes, 'did we get all parts';
is-deeply $seen, multiplier * @codes, 'did we see all occurrences';
is-deeply @a.sum, multiplier * @codes, 'did we get all occurrences';

# vim: expandtab shiftwidth=4

0 comments on commit c6ba033

Please sign in to comment.