Skip to content

Commit

Permalink
Support dip1008
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Feb 7, 2023
1 parent 7dd10ad commit c0067ea
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 34 deletions.
4 changes: 2 additions & 2 deletions dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ configuration "unittest" {
dependency "unit-threaded" version="*"
targetType "executable"
mainSourceFile "tests/ut/ut_runner.d"
dflags "-dip1000"
dflags "-dip1000" "-dip1008"
sourcePaths "source" "tests/ut"
importPaths "source" "tests/ut"
}
configuration "unittest-release" {
dependency "unit-threaded" version="*"
targetType "executable"
mainSourceFile "tests/ut/ut_runner.d"
dflags "-dip1000" "-g"
dflags "-dip1000" "-g" "-dip1008"
sourcePaths "source" "tests/ut"
importPaths "source" "tests/ut"
# buildOptions "unittests" "optimize" "inline"
Expand Down
20 changes: 20 additions & 0 deletions source/concurrency/error.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Throwable clone(Throwable t) nothrow @safe {
return new ThrowableClone!RangeError(t.info, r.file, r.line, r.next);
if (auto e = cast(Error)t)
return new ThrowableClone!Error(t.info, t.msg, t.file, t.line, t.next);
if (auto e = cast(Exception)t)
return new ThrowableClone!Exception(t.info, t.msg, t.file, t.line, t.next);
return new ThrowableClone!Throwable(t.info, t.msg, t.file, t.line, t.next);
}

Expand Down Expand Up @@ -53,3 +55,21 @@ class ClonedTraceInfo : Throwable.TraceInfo {
return buf;
}
}

Exception unscopeException(scope Exception e) @trusted nothrow {
if (e.refcount == 0) {
//TODO: what if exception is allocated on stack or TLS?
return cast(Exception)e;
}

return cast(Exception)e.clone();
}

Throwable unscopeException(scope Throwable e) @trusted nothrow {
if (e.refcount == 0) {
//TODO: what if exception is allocated on stack or TLS?
return cast(Throwable)e;
}

return e.clone();
}
3 changes: 2 additions & 1 deletion source/concurrency/nursery.d
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class Nursery : StopSource {
try {
localReceiver.setValue();
} catch (Exception e) {
localReceiver.setError(e);
import concurrency.error;
localReceiver.setError(e.unscopeException);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/onerror.d
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private struct OnErrorReceiver(Value, SideEffect, Receiver) {
try
sideEffect(e);
catch (Exception e2) {
receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ());
import concurrency.error;
receiver.setError(() @trusted { return Throwable.chainTogether(e2.unscopeException, e); } ());
return;
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/onresult.d
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ private struct OnResultReceiver(Value, SideEffect, Receiver) {
try {
sideEffect(Result!(Value)(ex));
} catch (Exception e2) {
import concurrency.error;
return receiver.setError(() @trusted {
return Throwable.chainTogether(e2, e);
return Throwable.chainTogether(e2.unscopeException, e);
}());
} catch (Throwable t) {
return receiver.setError(t);
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/repeat.d
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private struct RepeatOp(Receiver, Sender) {
try {
reset();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
private void reset() @trusted scope {
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/operations/retry.d
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private struct RetryReceiver(Receiver, Sender, Logic) {
// From what I gathered that is what libunifex does
sender.connectHeap(this).start();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions source/concurrency/operations/toshared.d
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
try {
localState.op = sender.connect(SharedSenderReceiver!(Sender, Scheduler, resetLogic)(&state, scheduler));
} catch (Exception e) {
state.process!(resetLogic)(InternalValue(e));
import concurrency.error;
state.process!(resetLogic)(InternalValue(e.unscopeException));
}
localState.op.start();
} else {
Expand Down Expand Up @@ -158,10 +159,12 @@ private struct SharedSenderOp(Sender, Scheduler, ResetLogic resetLogic, Receiver
/// this onValue can sometimes be called immediately,
/// leaving no room to set cb.dispose...
cb.dispose();
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}, (Throwable e){
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}, (Done d){
receiver.setDone();
});
Expand Down
6 changes: 4 additions & 2 deletions source/concurrency/receiver.d
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ void setValueOrError(Receiver)(auto ref Receiver receiver) @safe {
try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand All @@ -85,7 +86,8 @@ void setValueOrError(Receiver, T)(auto ref Receiver receiver, auto ref T value)
try {
receiver.setValue(value);
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
6 changes: 4 additions & 2 deletions source/concurrency/scheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct SchedulerAdapter(Worker) {
try {
worker.schedule(cast(VoidDelegate)()=>receiver.setValueOrError());
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down Expand Up @@ -135,7 +136,8 @@ struct ScheduleAfterOp(Worker, Receiver) {
try {
timer = worker.addTimer(cast(void delegate(TimerTrigger) @safe shared)&trigger, dur);
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
return;
}

Expand Down
8 changes: 5 additions & 3 deletions source/concurrency/sender.d
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ struct JustFromSender(Fun) {
try {
set();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down Expand Up @@ -463,14 +464,15 @@ struct PromiseSenderOp(T, Receiver) {
// calling the receiver's termination functions.
if (cb)
cb.dispose();
value.match!((Sender.ValueRep v){
value.match!((Sender.ValueRep v) @safe {
import concurrency.error;
try {
static if (is(Sender.Value == void))
receiver.setValue();
else
receiver.setValue(v);
} catch (Exception e) {
receiver.setError(e);
receiver.setError(e.unscopeException);
}
}, (Throwable e){
receiver.setError(e);
Expand Down
15 changes: 10 additions & 5 deletions source/concurrency/stream/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,16 @@ auto intervalStream(Duration duration, bool emitAtStart = false) {
}
op.load();
} catch (Exception e) {
op.receiver.setError(e);
import concurrency.error;
op.receiver.setError(e.unscopeException);
}
}
void setDone() @safe nothrow {
op.receiver.setDone();
}
void setError(Throwable t) @safe nothrow {
op.receiver.setError(t);
import concurrency.error;
op.receiver.setError(t.unscopeException);
}
auto getStopToken() @safe {
return op.receiver.getStopToken();
Expand Down Expand Up @@ -165,15 +167,17 @@ auto intervalStream(Duration duration, bool emitAtStart = false) {
}
load();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
private void load() @trusted nothrow {
try {
op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
op.start();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
}
}
Expand Down Expand Up @@ -294,9 +298,10 @@ shared struct SharedStream(T) {
try {
dg(element);
} catch (Exception e) {
import concurrency.error;
source.remove(&this.onItem);
cb.dispose();
receiver.setError(e);
receiver.setError(e.unscopeException);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/stream/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ template loopStream(E) {
try {
t.loop(dg, receiver.getStopToken);
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
}
if (receiver.getStopToken().isStopRequested)
receiver.setDone();
Expand Down
9 changes: 6 additions & 3 deletions source/concurrency/stream/throttling.d
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi
dg();
return true;
} catch (Exception e) {
import concurrency.error;
with (flags.lock(ThrottleFlags.doneOrError_produced)) {
if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
throwable = e;
throwable = e.unscopeException;
}
release();
process(newState);
Expand Down Expand Up @@ -136,9 +137,10 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi
dg(t);
return true;
} catch (Exception e) {
import concurrency.error;
with (flags.lock(ThrottleFlags.doneOrError_produced)) {
if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
throwable = e;
throwable = e.unscopeException;
}
release();
process(newState);
Expand Down Expand Up @@ -198,7 +200,8 @@ template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogi
else
dg();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
return;
}
}
Expand Down
3 changes: 2 additions & 1 deletion source/concurrency/syncwait.d
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ package struct SyncWaitReceiver2(Value) {
}

void setError(Throwable e) nothrow @safe {
state.throwable = e;
import concurrency.error;
state.throwable = e.unscopeException;
state.worker.stop();
}
static if (is(Value == void))
Expand Down
9 changes: 6 additions & 3 deletions source/concurrency/thread.d
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ struct ThreadSender {
try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
} catch (Throwable t) {
receiver.setError(t.clone());
}
Expand Down Expand Up @@ -435,7 +436,8 @@ private struct TaskPoolSender {
try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
import concurrency.error;
receiver.setError(e.unscopeException);
} catch (Throwable t) {
receiver.setError(t.clone);
}
Expand All @@ -454,7 +456,8 @@ private struct TaskPoolSender {
try {
pool.put(myTask);
} catch (Exception e) {
myTask.args[0].setError(e);
import concurrency.error;
myTask.args[0].setError(e.unscopeException);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/ut/concurrency/operations.d
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct OutOfBandValueSender(T) {

@("race.exception.double")
@safe unittest {
// TODO: this is flaky. Find another way.
auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
Expand Down
8 changes: 4 additions & 4 deletions tests/ut/concurrency/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,11 @@ unittest {

@("flatmap.concat.error")
@safe unittest {
import concurrency.sender : just, ErrorSender;
import concurrency.sender : just, ThrowingSender;
import concurrency.operations : via;

[1,2,3].arrayStream
.flatMapConcat((int i) => ErrorSender())
.flatMapConcat((int i) => ThrowingSender())
.collect(()shared{})
.syncWait
.assumeOk
Expand Down Expand Up @@ -659,11 +659,11 @@ unittest {

@("flatmap.latest.error")
@safe unittest {
import concurrency.sender : just, ErrorSender;
import concurrency.sender : just, ThrowingSender;
import concurrency.operations : via;

[1,2,3].arrayStream
.flatMapLatest((int i) => ErrorSender())
.flatMapLatest((int i) => ThrowingSender())
.collect(()shared{})
.syncWait
.assumeOk
Expand Down

0 comments on commit c0067ea

Please sign in to comment.