Skip to content

Commit

Permalink
More SlimResult, Standardize patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
to11mtm committed Jan 7, 2024
1 parent 43bd56b commit 26a5854
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 147 deletions.
8 changes: 7 additions & 1 deletion src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2528,7 +2528,13 @@ public readonly struct SlimResult<T>

public static readonly SlimResult<T> NotYetReady =
new SlimResult<T>(NotYetThereSentinel.Instance, default);


public static SlimResult<T> FromTask(Task<T> task)
{
return task.IsCanceled || task.IsFaulted
? new SlimResult<T>(task.Exception, default)
: new SlimResult<T>(default, task.Result);
}
public SlimResult(Exception errorOrSentinel, T result)
{
if (result == null)
Expand Down

This file was deleted.

38 changes: 27 additions & 11 deletions src/core/Akka.Streams/Implementation/Sources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Annotations;
using Akka.Pattern;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Fusing;
using Akka.Streams.Implementation.Stages;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
Expand Down Expand Up @@ -779,8 +780,8 @@ private sealed class Logic : OutGraphStageLogic
private readonly Lazy<Decider> _decider;
private Option<TSource> _state = Option<TSource>.None;

private readonly PooledValueTaskContinuationHelper<Option<TOut>>
_pooledContinuation;
private ValueTask<Option<TOut>> _currentReadVt;
private readonly Action _valueTaskAwaiterOnCompleteAction;
public Logic(UnfoldResourceSourceValueTaskAsync<TOut, TCreateState, TSource> stage, Attributes inheritedAttributes)
: base(stage.Shape)
{
Expand All @@ -790,12 +791,11 @@ public Logic(UnfoldResourceSourceValueTaskAsync<TOut, TCreateState, TSource> sta
var strategy = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
return strategy != null ? strategy.Decider : Deciders.StoppingDecider;
});
_pooledContinuation =
new PooledValueTaskContinuationHelper<Option<TOut>>(
ReadCallback);
_valueTaskAwaiterOnCompleteAction = SelfReadCallback;
SetHandler(_stage.Out, this);
}


private Action<Try<TSource>> CreatedCallback => GetAsyncCallback<Try<TSource>>(resource =>
{
if (resource.IsSuccess)
Expand Down Expand Up @@ -830,12 +830,26 @@ private void ErrorHandler(Exception ex)
throw new ArgumentOutOfRangeException();
}
}

private Action<Try<Option<TOut>>> ReadCallback => GetAsyncCallback<Try<Option<TOut>>>(read =>


private void SelfReadCallback()
{
if (read.IsSuccess)
var swap = _currentReadVt;
_currentReadVt = default;
if (swap.IsCompletedSuccessfully)
{
var data = read.Success.Value;
ReadCallback(new SlimResult<Option<TOut>>(default,swap.Result));
}
else
{
ReadCallback(SlimResult<Option<TOut>>.FromTask(swap.AsTask()));
}
}
private Action<SlimResult<Option<TOut>>> ReadCallback => GetAsyncCallback<SlimResult<Option<TOut>>>(read =>
{
if (read.IsSuccess())
{
var data = read.Result;
if (data.HasValue)
{
var some = data.Value;
Expand All @@ -855,7 +869,7 @@ private void ErrorHandler(Exception ex)
}
}
}
else ErrorHandler(read.Failure.Value);
else ErrorHandler(read.Error);
});

private void CloseResource()
Expand Down Expand Up @@ -894,7 +908,9 @@ public override void OnPull()
}
else
{
_pooledContinuation.AttachAwaiter(vt);
_currentReadVt = vt;
_currentReadVt.GetAwaiter().OnCompleted(_valueTaskAwaiterOnCompleteAction);
//_pooledContinuation.AttachAwaiter(vt);
}


Expand Down
39 changes: 22 additions & 17 deletions src/core/Akka.Streams/Implementation/Unfold.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Akka.Annotations;
using Akka.Streams.Implementation.Fusing;
using Akka.Streams.Stage;
using Akka.Streams.Util;
using Akka.Util;
Expand Down Expand Up @@ -103,7 +104,7 @@ private sealed class Logic : OutGraphStageLogic
{
private readonly UnfoldValueTaskAsync<TState, TElement> _stage;
private TState _state;
private Action<Result<Option<(TState, TElement)>>> _asyncHandler;
private Action<SlimResult<Option<(TState, TElement)>>> _asyncHandler;
private ValueTask<Option<(TState, TElement)>> _currentTask;
public Logic(UnfoldValueTaskAsync<TState, TElement> stage) : base(stage.Shape)
{
Expand All @@ -116,38 +117,42 @@ public Logic(UnfoldValueTaskAsync<TState, TElement> stage) : base(stage.Shape)
public override void OnPull()
{
var vt = _stage.UnfoldFunc(_state);
var peeker = Unsafe.As<ValueTask<Option<(TState,TElement)>>,ValueTaskCheatingPeeker<Option<(TState,TElement)>>>(ref vt);
if (peeker._obj == null)
{
_asyncHandler(Result.Success<Option<(TState, TElement)>>(peeker._result));
}
else
{
_currentTask = vt;
vt.GetAwaiter().OnCompleted(CompletionAction);
}
if (vt.IsCompletedSuccessfully)
{
_asyncHandler(
new SlimResult<Option<(TState, TElement)>>(default,
vt.Result));
}
else
{
_currentTask = vt;
vt.GetAwaiter().OnCompleted(CompletionAction);
}
}
private void CompletionAction()
{
if (_currentTask.IsCompletedSuccessfully)
{
_asyncHandler.Invoke(Result.Success(_currentTask.Result));
_asyncHandler.Invoke(
new SlimResult<Option<(TState, TElement)>>(default,
_currentTask.Result));
}
else
{
_asyncHandler.Invoke(
Result.FromTask(_currentTask.AsTask()));
SlimResult<Option<(TState, TElement)>>.FromTask(
_currentTask.AsTask()));
}
}
public override void PreStart()
{
var ac = GetAsyncCallback<Result<Option<(TState, TElement)>>>(result =>
var ac = GetAsyncCallback<SlimResult<Option<(TState, TElement)>>>(result =>
{
if (!result.IsSuccess)
Fail(_stage.Out, result.Exception);
if (!result.IsSuccess())
Fail(_stage.Out, result.Error);
else
{
var option = result.Value;
var option = result.Result;
if (!option.HasValue)
Complete(_stage.Out);
else
Expand Down

0 comments on commit 26a5854

Please sign in to comment.