From 4e83e3334720f539ce31808c1226e90121cfadeb Mon Sep 17 00:00:00 2001 From: Matt Edwards Date: Wed, 20 Nov 2024 15:20:35 -0500 Subject: [PATCH] WIP --- .github/workflows/format.yml | 2 +- .github/workflows/publish.yml | 2 +- .github/workflows/test.yml | 2 +- .../Hyperbee.Pipeline.Auth.csproj | 2 +- .../Binders/Abstractions/Binder.cs | 55 ++- .../Binders/Abstractions/BlockBinder.cs | 8 +- .../Abstractions/ConditionalBlockBinder.cs | 30 +- .../Binders/Abstractions/StatementBinder.cs | 141 ++----- .../Binders/CallBlockBinder.cs | 41 +-- .../Binders/CallIfBlockBinder.cs | 29 +- .../Binders/CallStatementBinder.cs | 48 +-- .../Binders/ForEachBlockBinder.cs | 72 ++-- src/Hyperbee.Pipeline/Binders/HookBinder.cs | 24 +- .../Binders/PipeBlockBinder.cs | 49 +-- .../Binders/PipeIfBlockBinder.cs | 27 +- .../Binders/PipeStatementBinder.cs | 35 +- .../Binders/ReduceBlockBinder.cs | 79 ++-- .../Binders/WaitAllBlockBinder.cs | 345 +++++++++++++++--- src/Hyperbee.Pipeline/Binders/WrapBinder.cs | 84 +++-- .../Builders/CallBlockBuilder.cs | 24 +- .../Builders/CallStatementBuilder.cs | 1 + .../Builders/ForEachBlockBuilder.cs | 1 - .../Builders/WaitAllBlockBuilder.cs | 24 +- .../Implementation/ContextImplExtensions.cs | 84 ++++- .../Hyperbee.Pipeline.csproj | 6 +- .../Hyperbee.Pipeline.Caching.csproj | 2 +- .../Hyperbee.Pipeline.Auth.Tests.csproj | 4 +- .../Hyperbee.Pipeline.Tests.csproj | 4 +- .../Hyperbee.Pipeline.Caching.Tests.csproj | 6 +- 29 files changed, 713 insertions(+), 518 deletions(-) diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 7be939e..b106b8e 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -15,7 +15,7 @@ on: - develop env: - DOTNET_VERSION: '8.0.x' + DOTNET_VERSION: '9.0.x' jobs: format: diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index f5e2c48..c72124e 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -8,7 +8,7 @@ on: env: BRANCH_NAME: ${{ github.event.release.target_commitish }} SOLUTION_NAME: ${{ vars.SOLUTION_NAME }} - DOTNET_VERSION: '8.0.x' + DOTNET_VERSION: '9.0.x' NUGET_SOURCE: 'https://api.nuget.org/v3/index.json' BUILD_CONFIGURATION: '' VERSION_SUFFIX: '' diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 768bcc3..18240ab 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ on: env: BRANCH_NAME: ${{ github.head_ref || github.ref_name }} SOLUTION_NAME: ${{ vars.SOLUTION_NAME }} - DOTNET_VERSION: '8.0.x' + DOTNET_VERSION: '9.0.x' jobs: test: diff --git a/src/Hyperbee.Pipeline.Auth/Hyperbee.Pipeline.Auth.csproj b/src/Hyperbee.Pipeline.Auth/Hyperbee.Pipeline.Auth.csproj index f69d011..94d7438 100644 --- a/src/Hyperbee.Pipeline.Auth/Hyperbee.Pipeline.Auth.csproj +++ b/src/Hyperbee.Pipeline.Auth/Hyperbee.Pipeline.Auth.csproj @@ -32,7 +32,7 @@ True \ - + all diff --git a/src/Hyperbee.Pipeline/Binders/Abstractions/Binder.cs b/src/Hyperbee.Pipeline/Binders/Abstractions/Binder.cs index 35e616a..bffd309 100644 --- a/src/Hyperbee.Pipeline/Binders/Abstractions/Binder.cs +++ b/src/Hyperbee.Pipeline/Binders/Abstractions/Binder.cs @@ -1,4 +1,5 @@ using System.Linq.Expressions; +using System.Reflection; using Hyperbee.Pipeline.Context; using static System.Linq.Expressions.Expression; using static Hyperbee.Expressions.ExpressionExtensions; @@ -10,25 +11,26 @@ internal abstract class Binder protected Expression> Pipeline { get; } protected Expression> Configure { get; } + private static ConstructorInfo _tupleConstructor = typeof( ValueTuple ).GetConstructor( [typeof( TOutput ), typeof( bool )] )!; + protected Binder( Expression> function, Expression> configure ) { Pipeline = function; Configure = configure; } - // protected virtual Task<(TOutput Result, bool Canceled)> ProcessPipelineAsync( IPipelineContext context, TInput argument ) - // { - // var result = await Pipeline( context, argument ).ConfigureAwait( false ); - // - // var contextControl = (IPipelineContextControl) context; - // var canceled = contextControl.HandleCancellationRequested( result ); - // - // return (canceled ? default : result, canceled); - // } - protected virtual Expression ProcessPipelineAsync( ParameterExpression context, ParameterExpression argument ) { - var tupleCtor = typeof( ValueTuple ).GetConstructor( [typeof( TOutput ), typeof( bool )] )!; + /* + { + var result = await Pipeline( context, argument ).ConfigureAwait( false ); + + var contextControl = (IPipelineContextControl) context; + var canceled = contextControl.HandleCancellationRequested( result ); + + return (canceled ? default : result, canceled); + } + */ var result = Variable( typeof( TOutput ), "result" ); var canceled = Variable( typeof( bool ), "canceled" ); @@ -42,30 +44,25 @@ protected virtual Expression ProcessPipelineAsync( ParameterExpression context, Condition( canceled, - New( tupleCtor, Default( typeof( TOutput ) ), canceled ), - New( tupleCtor, result, canceled ) + New( _tupleConstructor, Default( typeof( TOutput ) ), canceled ), + New( _tupleConstructor, result, canceled ) ) ); } - - - /* - public static bool HandleCancellationRequested( this IPipelineContextControl control, TOutput value ) - { - if ( !control.CancellationToken.IsCancellationRequested ) - return false; - - if ( !control.HasCancellationValue ) - control.CancellationValue = value; - - return true; - } - */ - - private Expression HandleCancellationRequested( Expression contextControl, Expression resultVariable ) { + /* + { + if ( !control.CancellationToken.IsCancellationRequested ) + return false; + + if ( !control.HasCancellationValue ) + control.CancellationValue = value; + + return true; + } + */ var hasCancellationValue = Property( contextControl, "HasCancellationValue" ); var cancellationTokenProperty = Property( contextControl, "CancellationToken" ); var cancellationValueProperty = Property( contextControl, "CancellationValue" ); diff --git a/src/Hyperbee.Pipeline/Binders/Abstractions/BlockBinder.cs b/src/Hyperbee.Pipeline/Binders/Abstractions/BlockBinder.cs index 1f3ba82..0a45d22 100644 --- a/src/Hyperbee.Pipeline/Binders/Abstractions/BlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/Abstractions/BlockBinder.cs @@ -15,15 +15,15 @@ protected BlockBinder( Expression> function, Expr // use cases where the next argument is not the same as the output type // like ReduceBlockBinder and ForEachBlockBinder - // protected virtual async Task ProcessBlockAsync( Expression> blockFunction, IPipelineContext context, TArgument nextArgument ) - // { - // return await blockFunction( context, nextArgument ).ConfigureAwait( false ); - // } protected virtual Expression ProcessBlockAsync( Expression> blockFunction, ParameterExpression context, Expression nextArgument ) { + /* + return blockFunction( context, nextArgument ); + */ + return Invoke( blockFunction, context, nextArgument ); } } diff --git a/src/Hyperbee.Pipeline/Binders/Abstractions/ConditionalBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/Abstractions/ConditionalBlockBinder.cs index 5d7b2a8..d1ec6b8 100644 --- a/src/Hyperbee.Pipeline/Binders/Abstractions/ConditionalBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/Abstractions/ConditionalBlockBinder.cs @@ -16,35 +16,31 @@ protected ConditionalBlockBinder( Expression> condition, Condition = condition; } - // protected override async Task ProcessBlockAsync( FunctionAsync blockFunction, IPipelineContext context, TArgument nextArgument ) - // { - // if ( Condition != null && !Condition( context, CastTypeArg( nextArgument ) ) ) - // { - // return CastTypeArg( nextArgument ); - // } - // - // return await base.ProcessBlockAsync( blockFunction, context, nextArgument ).ConfigureAwait( false ); - // } - - // [MethodImpl( MethodImplOptions.AggressiveInlining )] - // private static TResult CastTypeArg( TType input ) - // { - // return (TResult) (object) input; - // } - protected override Expression ProcessBlockAsync( Expression> blockFunction, ParameterExpression context, Expression nextArgument ) { + /* + { + if (Condition != null && !Condition( context, CastTypeArg(nextArgument ) ) ) + { + return CastTypeArg(nextArgument ); + } + + return await base.ProcessBlockAsync( blockFunction, context, nextArgument ).ConfigureAwait( false ); + } + */ + if ( Condition == null ) return base.ProcessBlockAsync( blockFunction, context, nextArgument ); + // TODO: improve casting return Condition( Not( Invoke( Condition, context, - Convert( Convert( nextArgument, typeof( object ) ), typeof( TOutput ) ) + Convert( Convert( nextArgument, typeof( object ) ), typeof( TOutput ) ) //(TResult) (object) input; ) ), Convert( Convert( nextArgument, typeof( object ) ), typeof( TNext ) ), Await( base.ProcessBlockAsync( blockFunction, context, nextArgument ) ) diff --git a/src/Hyperbee.Pipeline/Binders/Abstractions/StatementBinder.cs b/src/Hyperbee.Pipeline/Binders/Abstractions/StatementBinder.cs index 9576c6d..0252a2b 100644 --- a/src/Hyperbee.Pipeline/Binders/Abstractions/StatementBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/Abstractions/StatementBinder.cs @@ -1,5 +1,6 @@ -using System.Linq.Expressions; +using System.Linq.Expressions; using Hyperbee.Pipeline.Context; +using Hyperbee.Pipeline.Extensions.Implementation; using static System.Linq.Expressions.Expression; using static Hyperbee.Expressions.ExpressionExtensions; @@ -15,39 +16,33 @@ protected StatementBinder( Expression> function, Middleware = middleware; } - // protected MiddlewareAsync Middleware1 { get; } - // protected virtual async Task ProcessStatementAsync( FunctionAsync nextFunction, IPipelineContext context, TOutput nextArgument, string frameName ) - // { - // var contextControl = (IPipelineContextControl) context; - // - // using var _ = contextControl.CreateFrame( context, Configure, frameName ); - // - // if ( Middleware1 == null ) - // return await nextFunction( context, nextArgument ).ConfigureAwait( false ); - // - // return (TNext) await Middleware1( - // context, - // nextArgument, - // async ( context1, argument1 ) => await nextFunction( context1, (TOutput) argument1 ).ConfigureAwait( false ) - // ).ConfigureAwait( false ); - // } - protected virtual Expression ProcessStatementAsync( Expression> nextFunction, ParameterExpression context, Expression nextArgument, string frameName ) { - // if ( Middleware == null ) - // return await nextFunction( context, nextArgument ).ConfigureAwait( false ); - if ( Middleware == null ) + /* { - return Invoke( nextFunction, context, nextArgument ); + var contextControl = (IPipelineContextControl) context; + + using var _ = contextControl.CreateFrame( context, Configure, frameName ); - //using var _ = contextControl.CreateFrame( context, Configure, frameName ); - // return CreateFrameExpression( - // Convert( context, typeof(IPipelineContextControl) ), - // context, - // Configure, - // Await( Invoke( nextFunction, context, nextArgument ), configureAwait: false ), - // frameName ); + if ( Middleware1 == null ) + return await nextFunction( context, nextArgument ).ConfigureAwait( false ); + + return (TNext) await Middleware1( + context, + nextArgument, + async ( context1, argument1 ) => await nextFunction( context1, (TOutput) argument1 ).ConfigureAwait( false ) + ).ConfigureAwait( false ); + } + */ + + if ( Middleware == null ) + { + return BlockAsync( + Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName ); + ContextImplExtensions.CreateFrameExpression( context, Configure, frameName ), + Await( Invoke( nextFunction, context, nextArgument ) ) + ) ); } // async ( context1, argument1 ) => await nextFunction( context1, (TOutput) argument1 ).ConfigureAwait( false ) @@ -56,84 +51,26 @@ protected virtual Expression ProcessStatementAsync( Expression>( BlockAsync( - Convert( Await( - Invoke( nextFunction, context1, Convert( argument1, typeof( TOutput ) ) ), - configureAwait: false ), + Convert( + Await( Invoke( nextFunction, context1, Convert( argument1, typeof( TOutput ) ) ), configureAwait: false ), typeof( object ) ) ), parameters: [context1, argument1] ); - // return (TNext) await Middleware( - // context, - // nextArgument, - // middlewareNext - // ).ConfigureAwait( false ); - //return - //using var _ = contextControl.CreateFrame( context, Configure, frameName ); - // CreateFrameExpression( - // Convert( Constant( context ), typeof(IPipelineContextControl) ), - // context, - // Configure, - - var returnResult = Variable( typeof( TNext ), "returnResult" ); - - var b = BlockAsync( - [returnResult], - //Invoke( LoggerExpression.Log( "StatementBinder.ProcessStatementAsync" + Random.Shared.Next( 0, 1000 ) ), Convert( nextArgument, typeof( object ) ) ), - - Assign( returnResult, Convert( - Await( - Invoke( Middleware, - context, - Convert( nextArgument, typeof( object ) ), - middlewareNext - ), - configureAwait: false ), - typeof( TNext ) ) ) - - //Invoke( LoggerExpression.Log( "StatementBinder.ProcessStatementAsync-" + Random.Shared.Next( 0, 1000 ) ), Convert( returnResult, typeof( object ) ) ) - - , returnResult - ); //, - - - // frameName ); //); - - return b; - } - - public static Expression CreateFrameExpression( - Expression controlParam, - Expression contextParam, - Expression> config, - Expression body, - string defaultName = null - ) - { - var nameVariable = Variable( typeof( string ), "originalName" ); - var idVariable = Variable( typeof( int ), "originalId" ); - - var idProperty = Property( controlParam, "Id" ); - var nameProperty = Property( controlParam, "Name" ); - return BlockAsync( - [nameVariable, idVariable], - Assign( idVariable, idProperty ), - Assign( nameVariable, nameProperty ), - TryFinally( - Block( - Assign( idProperty, Call( controlParam, "GetNextId", Type.EmptyTypes ) ), - Assign( nameProperty, Constant( defaultName ) ), - config != null - ? Invoke( config, contextParam ) - : Empty(), - body - ), - Block( - Assign( idProperty, idVariable ), - Assign( nameProperty, nameVariable ) - ) ) - ); + Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName ); + ContextImplExtensions.CreateFrameExpression( context, Configure, frameName ), + Convert( + Await( + Invoke( Middleware, + context, + Convert( nextArgument, typeof( object ) ), + middlewareNext + ), + configureAwait: false ), + typeof( TNext ) ) + ) + ); } } diff --git a/src/Hyperbee.Pipeline/Binders/CallBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/CallBlockBinder.cs index 34081bb..192d0b5 100644 --- a/src/Hyperbee.Pipeline/Binders/CallBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/CallBlockBinder.cs @@ -1,4 +1,4 @@ -using System.Linq.Expressions; +using System.Linq.Expressions; using Hyperbee.Pipeline.Binders.Abstractions; using Hyperbee.Pipeline.Context; using static System.Linq.Expressions.Expression; @@ -13,22 +13,23 @@ public CallBlockBinder( Expression> function ) { } - // public FunctionAsync Bind( FunctionAsync next ) - // { - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); - // return nextArgument; - // }; - // } - public Expression> Bind( Expression> next ) { + /* + { + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); + return nextArgument; + }; + } + */ + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); @@ -39,27 +40,23 @@ public Expression> Bind( Expression>( BlockAsync( [awaitedResult, result], Assign( awaitedResult, Await( ProcessPipelineAsync( context, argument ), configureAwait: false ) ), IfThenElse( canceled, - Block( - Assign( result, Default( typeof( TOutput ) ) )//, - //Return( returnValue, result ) - ), + Assign( result, Default( typeof( TOutput ) ) ), Block( Await( ProcessBlockAsync( next, context, nextArgument ), configureAwait: false ), - Assign( result, nextArgument )//, - //Return( returnValue, result ) + Assign( result, nextArgument ) ) ), result - //Label( returnValue, result ) ), parameters: [context, argument] ); diff --git a/src/Hyperbee.Pipeline/Binders/CallIfBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/CallIfBlockBinder.cs index 0d6bc04..90a757c 100644 --- a/src/Hyperbee.Pipeline/Binders/CallIfBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/CallIfBlockBinder.cs @@ -13,22 +13,23 @@ public CallIfBlockBinder( Expression> condition, Express { } - // public FunctionAsync Bind( FunctionAsync next ) - // { - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); - // return nextArgument; - // }; - // } - public Expression> Bind( Expression> next ) { + /* + { + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); + return nextArgument; + }; + } + */ + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); diff --git a/src/Hyperbee.Pipeline/Binders/CallStatementBinder.cs b/src/Hyperbee.Pipeline/Binders/CallStatementBinder.cs index c5b7079..f44e72f 100644 --- a/src/Hyperbee.Pipeline/Binders/CallStatementBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/CallStatementBinder.cs @@ -14,29 +14,31 @@ public CallStatementBinder( Expression> function, { } - // public FunctionAsync Bind( ProcedureAsync next, MethodInfo method = null ) - // { - // var defaultName = (method ?? next.Method).Name; - // - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // return await ProcessStatementAsync( - // async ( ctx, arg ) => - // { - // await next( ctx, arg ).ConfigureAwait( false ); - // return arg; - // }, context, nextArgument, defaultName ).ConfigureAwait( false ); - // }; - // } - public Expression> Bind( Expression> next, MethodInfo method = null ) { - var defaultName = method?.Name ?? "defaultName"; + /* + { + var defaultName = (method ?? next.Method).Name; + + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + return await ProcessStatementAsync( + async ( ctx, arg ) => + { + await next( ctx, arg ).ConfigureAwait( false ); + return arg; + }, context, nextArgument, defaultName ).ConfigureAwait( false ); + }; + } + */ + + // TODO: Better way to get Name + var defaultName = method?.Name ?? next.Name ?? "name"; var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); @@ -57,7 +59,7 @@ public Expression> Bind( Expression>( + return Lambda>( BlockAsync( [awaitedResult], Assign( awaitedResult, Await( ProcessPipelineAsync( context, argument ), configureAwait: false ) ), @@ -72,8 +74,6 @@ public Expression> Bind( Expression> function ) { } - // public FunctionAsync Bind( FunctionAsync next ) - // { - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // var nextArguments = (IEnumerable) nextArgument; - // - // foreach ( var elementArgument in nextArguments ) - // { - // await ProcessBlockAsync( next, context, elementArgument ).ConfigureAwait( false ); - // } - // - // return nextArgument; - // }; - // } - public Expression> Bind( Expression> next ) { + /* + { + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + var nextArguments = (IEnumerable) nextArgument; + + foreach ( var elementArgument in nextArguments ) + { + await ProcessBlockAsync( next, context, elementArgument ).ConfigureAwait( false ); + } + + return nextArgument; + }; + } + */ + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); @@ -45,39 +44,20 @@ public Expression> Bind( Expression ), "nextArguments" ); - - var result = Variable( typeof( TOutput ), "blockResult" ); var element = Variable( typeof( TElement ), "element" ); - var enumerator = Variable( typeof( IEnumerator ), "enumerator" ); - - var getEnumeratorMethod = Call( nextArguments, typeof( IEnumerable ).GetMethod( "GetEnumerator" )! ); - var moveNextCall = Call( enumerator, typeof( IEnumerator ).GetMethod( "MoveNext" )! ); - var getCurrentMethod = Call( enumerator, typeof( IEnumerator ).GetProperty( "Current" )!.GetMethod! ); - - var breakLabel = Label( "breakLoop" ); return Lambda>( BlockAsync( - [awaitedResult, nextArguments, enumerator, element, result], + [awaitedResult, nextArguments, element], Assign( awaitedResult, Await( ProcessPipelineAsync( context, argument ), configureAwait: false ) ), Condition( canceled, Default( typeof( TOutput ) ), Block( - Assign( result, nextArgument ), Assign( nextArguments, Convert( nextArgument, typeof( IEnumerable ) ) ), - Assign( enumerator, getEnumeratorMethod ), - Loop( - IfThenElse( moveNextCall, - Block( - Assign( element, Convert( getCurrentMethod, typeof( TElement ) ) ), - Await( ProcessBlockAsync( next, context, element ), configureAwait: false ), - Empty() - ), - Break( breakLabel ) - ), - breakLabel + ForEach( nextArgument, element, + Await( ProcessBlockAsync( next, context, element ), configureAwait: false ) ), - result + nextArgument ) ) ), diff --git a/src/Hyperbee.Pipeline/Binders/HookBinder.cs b/src/Hyperbee.Pipeline/Binders/HookBinder.cs index 0af21e2..5cde0bc 100644 --- a/src/Hyperbee.Pipeline/Binders/HookBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/HookBinder.cs @@ -12,21 +12,23 @@ internal class HookBinder // explicit Type Args due to > middleware ) { - Middleware = middleware; // Note: no need to create empty middleware just don't execute it. + Middleware = middleware; } - // public MiddlewareAsync Bind( MiddlewareAsync middleware ) - // { - // return async ( context, argument, function ) => - // await middleware( - // context, - // argument, - // async ( context1, argument1 ) => await Middleware( context1, argument1, function ).ConfigureAwait( false ) - // ).ConfigureAwait( false ); - // } - public Expression> Bind( Expression> middleware ) { + /* + { + return async ( context, argument, function ) => + await middleware( + context, + argument, + async ( context1, argument1 ) => await Middleware( context1, argument1, function ).ConfigureAwait( false ) + ).ConfigureAwait( false ); + } + */ + + // If there is no middleware, return the original middleware if ( Middleware == null ) return middleware; diff --git a/src/Hyperbee.Pipeline/Binders/PipeBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/PipeBlockBinder.cs index 8af251a..cddf41c 100644 --- a/src/Hyperbee.Pipeline/Binders/PipeBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/PipeBlockBinder.cs @@ -13,21 +13,22 @@ public PipeBlockBinder( Expression> function ) { } - // public Expression> Bind( Expression> next ) - // { - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // return await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); - // }; - // } - public Expression> Bind( Expression> next ) { + /* + { + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + return await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); + }; + } + */ + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TNext ), "argument" ); @@ -40,9 +41,6 @@ public Expression> Bind( Expression> Bind( Expression> Log( string message ) - { - return arg1 => Log( message, arg1 ); - } - - public static void Log( string message, object arg1 ) - { - Console.WriteLine( $"{message} value: {arg1}" ); - } -} diff --git a/src/Hyperbee.Pipeline/Binders/PipeIfBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/PipeIfBlockBinder.cs index 711f442..97a6c8f 100644 --- a/src/Hyperbee.Pipeline/Binders/PipeIfBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/PipeIfBlockBinder.cs @@ -13,21 +13,22 @@ public PipeIfBlockBinder( Expression> condition, Express { } - // public FunctionAsync Bind( FunctionAsync next ) - // { - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // return await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); - // }; - // } - public Expression> Bind( Expression> next ) { + /* + { + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + return await ProcessBlockAsync( next, context, nextArgument ).ConfigureAwait( false ); + }; + } + */ + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); diff --git a/src/Hyperbee.Pipeline/Binders/PipeStatementBinder.cs b/src/Hyperbee.Pipeline/Binders/PipeStatementBinder.cs index 4e7d466..d296b92 100644 --- a/src/Hyperbee.Pipeline/Binders/PipeStatementBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/PipeStatementBinder.cs @@ -14,25 +14,26 @@ public PipeStatementBinder( Expression> function, { } - // public Expression> Bind( Expression> next, MethodInfo method = null ) - // { - // var defaultName = method?.Name ?? "name"; - // - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // return await ProcessStatementAsync( next, context, nextArgument, defaultName ).ConfigureAwait( false ); - // }; - // } - - public Expression> Bind( Expression> next, MethodInfo method = null ) { - var defaultName = method?.Name ?? "defaultName"; + /* + { + var defaultName = method?.Name ?? "name"; + + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + return await ProcessStatementAsync( next, context, nextArgument, defaultName ).ConfigureAwait( false ); + }; + } + */ + + // TODO: Better way to get Name + var defaultName = method?.Name ?? next.Name ?? "name"; var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); diff --git a/src/Hyperbee.Pipeline/Binders/ReduceBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/ReduceBlockBinder.cs index 61286db..417af0e 100644 --- a/src/Hyperbee.Pipeline/Binders/ReduceBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/ReduceBlockBinder.cs @@ -1,6 +1,5 @@ using System.Collections; using System.Linq.Expressions; -using System.Xml.Linq; using Hyperbee.Pipeline.Binders.Abstractions; using Hyperbee.Pipeline.Context; using static System.Linq.Expressions.Expression; @@ -18,74 +17,72 @@ public ReduceBlockBinder( Expression> reducer, Express Reducer = reducer; } - // public FunctionAsync Bind( FunctionAsync next ) - // { - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // var nextArguments = (IEnumerable) nextArgument; - // var accumulator = default( TNext ); - // - // // Process each element and apply the reducer - // foreach ( var elementArgument in nextArguments ) - // { - // var result = await ProcessBlockAsync( next, context, elementArgument ).ConfigureAwait( false ); - // accumulator = Reducer( accumulator, result ); - // } - // - // return accumulator; - // }; - // } - public Expression> Bind( Expression> next ) { + /* + { + return async( context, argument ) => + { + var( nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if (canceled ) + return default; + + var nextArguments = (IEnumerable) nextArgument; + var accumulator = default( TNext ); + + // Process each element and apply the reducer + foreach (var elementArgument in nextArguments ) + { + var result = await ProcessBlockAsync( next, context, elementArgument ).ConfigureAwait( false ); + accumulator = Reducer( accumulator, result ); + } + + return accumulator; + }; + } + */ + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); var awaitedResult = Variable( typeof( (TOutput, bool) ), "awaitedResult" ); var blockResult = Variable( typeof( TNext ), "blockResult" ); var accumulator = Variable( typeof( TNext ), "accumulator" ); + var finalResult = Variable( typeof( TNext ), "finalResult" ); var nextArgument = Field( awaitedResult, "Item1" ); var canceled = Field( awaitedResult, "Item2" ); var nextArguments = Variable( typeof( IEnumerable ), "nextArguments" ); var element = Variable( typeof( TElement ), "element" ); - var enumerator = Variable( typeof( IEnumerator ), "enumerator" ); + var enumerator = Variable( typeof( IEnumerator ), "enumerator" ); var getEnumeratorMethod = Call( nextArguments, typeof( IEnumerable ).GetMethod( "GetEnumerator" )! ); var moveNextCall = Call( enumerator, typeof( IEnumerator ).GetMethod( "MoveNext" )! ); var getCurrentMethod = Call( enumerator, typeof( IEnumerator ).GetProperty( "Current" )!.GetMethod! ); - var breakLabel = Label( "breakLoop" ); + // TODO: IfThenElse should be switched Condition, and we should be able to remove finalResult (bug in expressions) return Lambda>( BlockAsync( - [awaitedResult, nextArguments, enumerator, element, accumulator, blockResult], + [awaitedResult, nextArguments, enumerator, element, accumulator, finalResult, blockResult], Assign( awaitedResult, Await( ProcessPipelineAsync( context, argument ), configureAwait: false ) ), - Condition( canceled, + IfThenElse( canceled, Default( typeof( TNext ) ), Block( Assign( nextArguments, Convert( nextArgument, typeof( IEnumerable ) ) ), Assign( enumerator, getEnumeratorMethod ), - Loop( - IfThenElse( moveNextCall, - Block( - Assign( element, Convert( getCurrentMethod, typeof( TElement ) ) ), - Assign( blockResult, - Await( ProcessBlockAsync( next, context, element ), configureAwait: false ) ), - Assign( accumulator, Invoke( Reducer, accumulator, blockResult ) ) - ), - Break( breakLabel ) - ), - breakLabel + Assign( accumulator, Default( typeof( TNext ) ) ), + ForEach( nextArguments, element, + Block( + Assign( blockResult, Await( ProcessBlockAsync( next, context, element ), configureAwait: false ) ), + Assign( accumulator, Invoke( Reducer, accumulator, blockResult ) ) + ) ), - accumulator + Assign( finalResult, accumulator ) ) - ) + ), + finalResult ), parameters: [context, argument] ); diff --git a/src/Hyperbee.Pipeline/Binders/WaitAllBlockBinder.cs b/src/Hyperbee.Pipeline/Binders/WaitAllBlockBinder.cs index f3d3a3c..c12b15d 100644 --- a/src/Hyperbee.Pipeline/Binders/WaitAllBlockBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/WaitAllBlockBinder.cs @@ -1,6 +1,10 @@ -using System.Linq.Expressions; +using System.Collections; +using System.Collections.Concurrent; +using System.Linq.Expressions; using Hyperbee.Pipeline.Binders.Abstractions; using Hyperbee.Pipeline.Context; +using Hyperbee.Pipeline.Extensions.Implementation; + using static System.Linq.Expressions.Expression; using static Hyperbee.Expressions.ExpressionExtensions; @@ -22,57 +26,300 @@ public WaitAllBlockBinder( Expression> condition, Expres Middleware = middleware; } - public FunctionAsync Bind( FunctionAsync[] nexts, WaitAllReducer reducer ) + public Expression> Bind( + Expression nexts, + Expression> reducer ) { ArgumentNullException.ThrowIfNull( reducer ); - return null; - // return async ( context, argument ) => - // { - // var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); - // - // if ( canceled ) - // return default; - // - // // WaitAllBlockBinder is unique in that it is both a block configure and a step. - // // The reducer is the step action, and because it is a step, we need to ensure - // // that middleware is called. Middleware requires us to pass in the execution - // // function that it wraps. This requires an additional level of wrapping. - // - // return await WaitAllAsync( context, nextArgument, nexts, reducer ).ConfigureAwait( false ); - // }; + /* + { + return async ( context, argument ) => + { + var (nextArgument, canceled) = await ProcessPipelineAsync( context, argument ).ConfigureAwait( false ); + + if ( canceled ) + return default; + + // WaitAllBlockBinder is unique in that it is both a block configure and a step. + // The reducer is the step action, and because it is a step, we need to ensure + // that middleware is called. Middleware requires us to pass in the execution + // function that it wraps. This requires an additional level of wrapping. + + return await WaitAllAsync( context, nextArgument, nexts, reducer ).ConfigureAwait( false ); + }; + } + */ + + var context = Parameter( typeof( IPipelineContext ), "context" ); + var argument = Parameter( typeof( TInput ), "argument" ); + + var awaitedResult = Variable( typeof( (TOutput, bool) ), "awaitedResult" ); + var nextArgument = Field( awaitedResult, "Item1" ); + var canceled = Field( awaitedResult, "Item2" ); + + return Lambda>( + BlockAsync( + [awaitedResult], + Assign( awaitedResult, Await( ProcessPipelineAsync( context, argument ) ) ), + Condition( canceled, + Default( typeof( TNext ) ), + Await( WaitAllAsync( context, nextArgument, nexts, reducer ), configureAwait: false ) + ) + ), + parameters: [context, argument] + ); + } + + private Expression WaitAllAsync( + ParameterExpression context, + Expression nextArgument, + Expression nexts, // FunctionAsync[] + Expression> reducer ) + { + /* + { + var contextControl = (IPipelineContextControl) context; + using var _ = contextControl.CreateFrame( context, Configure, nameof( WaitAllAsync ) ); + + var results = new WaitAllResult[nexts.Length]; + var items = nexts.Select( ( x, i ) => new { next = x, index = i } ); + + await items.ForEachAsync( async item => + { + var innerContext = context.Clone( false ); // context fork + + var result = await ProcessStatementAsync( item.next, innerContext, nextArgument ).ConfigureAwait( false ); + + results[item.index] = new WaitAllResult { Context = innerContext, Result = result }; + } ).ConfigureAwait( false ); + + return reducer( context, nextArgument, results ); + } + */ + + var results = Variable( typeof( WaitAllResult[] ), "results" ); + var result = Variable( typeof( object ), "result" ); + var innerContext = Variable( typeof( IPipelineContext ), "innerContext" ); + + var indexedItem = Parameter( typeof( (FunctionAsync, int) ), "indexedItem" ); + var item = Field( indexedItem, "Item1" ); + var index = Field( indexedItem, "Item2" ); + + var methodInfo = typeof( IPipelineContext ).GetMethod( nameof( IPipelineContext.Clone ) )!; + + var forEachBody = Lambda, int), Task>>( + BlockAsync( + [innerContext, result], + + Assign( innerContext, Call( context, methodInfo, [Constant( false )] ) ), + + Assign( result, Await( ProcessStatementAsync( item, innerContext, nextArgument, "NAME" ), configureAwait: false ) ), + + Invoke( LoggerExpression.Log( "forEachBody.result" ), Convert( result, typeof( object ) ) ), + Invoke( LoggerExpression.Log( "forEachBody.index" ), Convert( index, typeof( object ) ) ), + + Assign( ArrayAccess( results, index ), New( typeof( WaitAllResult ).GetConstructors()[0], innerContext, result ) ), + + Invoke( LoggerExpression.Log( "forEachBody.results" ), Convert( results, typeof( object ) ) ) + ), + parameters: [indexedItem] + ); + + var innerForEach = ForEachAsync( + SelectIndexItem>( nexts ), + forEachBody, + Constant( Environment.ProcessorCount ) ); + + var b = BlockAsync( + [results], + Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName ); + ContextImplExtensions.CreateFrameExpression( context, Configure, nameof( WaitAllAsync ) ), + Block( + Assign( results, NewArrayBounds( typeof( WaitAllResult ), ArrayLength( nexts ) ) ), + Await( innerForEach, configureAwait: false ), + Invoke( reducer, context, nextArgument, results ) + ) + ) + ); + + return b; + + } + + private Expression SelectIndexItem( Expression nexts ) + { + var elementParam = Parameter( typeof( T ), "next" ); + var indexParam = Parameter( typeof( int ), "index" ); + + var tupleType = typeof( ValueTuple ); + var selectMethod = typeof( Enumerable ) + .GetMethods() + .First( m => + m.Name == nameof( Enumerable.Select ) && + m.GetParameters().Length == 2 && // Select with two parameters + m.GetParameters()[1].ParameterType + .GetGenericTypeDefinition() == typeof( Func<,,> ) + ) + .MakeGenericMethod( typeof( T ), tupleType ); + + // nexts.Select((next, index) => (next, index)) + return Call( + selectMethod, + nexts, + Lambda( + New( + tupleType.GetConstructor( [typeof( T ), typeof( int )] ), + elementParam, + indexParam + ), + elementParam, + indexParam + ) + ); } - // - // private async Task WaitAllAsync( IPipelineContext context, TOutput nextArgument, FunctionAsync[] nexts, WaitAllReducer reducer ) - // { - // var contextControl = (IPipelineContextControl) context; - // using var _ = contextControl.CreateFrame( context, Configure, nameof( WaitAllAsync ) ); - // - // var results = new WaitAllResult[nexts.Length]; - // var items = nexts.Select( ( x, i ) => new { next = x, index = i } ); - // - // await items.ForEachAsync( async item => - // { - // var innerContext = context.Clone( false ); // context fork - // - // var result = await ProcessStatementAsync( item.next, innerContext, nextArgument ).ConfigureAwait( false ); - // - // results[item.index] = new WaitAllResult { Context = innerContext, Result = result }; - // } ).ConfigureAwait( false ); - // - // return reducer( context, nextArgument, results ); - // } - // - // private async Task ProcessStatementAsync( FunctionAsync next, IPipelineContext context, TOutput nextArgument ) - // { - // if ( Middleware == null ) - // return await next( context, nextArgument ).ConfigureAwait( false ); - // - // return await Middleware( - // context, - // nextArgument, - // async ( context1, argument1 ) => await next( context1, (TOutput) argument1 ).ConfigureAwait( false ) - // ).ConfigureAwait( false ); - // } + protected virtual Expression ProcessStatementAsync( + Expression nextFunction, + ParameterExpression context, + Expression nextArgument, + string frameName ) + { + /* + { + if ( Middleware == null ) + return await nextFunction( context, nextArgument ).ConfigureAwait( false ); + + return (TNext) await Middleware( + context, + nextArgument, + async ( ctx, arg ) => await nextFunction( ctx, (TOutput) arg ).ConfigureAwait( false ) + ).ConfigureAwait( false ); + } + */ + + if ( Middleware == null ) + return Invoke( nextFunction, context, nextArgument ); + + // async ( ctx, arg ) => await nextFunction( ctx, (TOutput) arg ).ConfigureAwait( false ) + var ctx = Parameter( typeof( IPipelineContext ), "ctx" ); + var arg = Parameter( typeof( object ), "arg" ); + var middlewareNext = Lambda>( + BlockAsync( + Convert( + Await( Invoke( nextFunction, ctx, Convert( arg, typeof( TOutput ) ) ), configureAwait: false ), + typeof( object ) ) + ), + parameters: [ctx, arg] + ); + + return BlockAsync( + Convert( + Await( + Invoke( Middleware, + context, + Convert( nextArgument, typeof( object ) ), + middlewareNext + ), + configureAwait: false ), + typeof( TNext ) ) + ); + } + + public static Expression ForEachAsync( + Expression sourceExpression, + Expression> function, + Expression partitionCount ) + { + /* + return Task.WhenAll( Partitioner + .Create( source ) + .GetPartitions( maxDegreeOfParallelism ) + .Select( partition => Task.Run( async () => + { + using var enumerator = partition; + while ( partition.MoveNext() ) + { + await function( partition.Current ).ConfigureAwait( false ); + } + } ) ) ); + */ + + var createMethod = typeof( Partitioner ) + .GetMethods() + .First( m => m.Name == nameof( Partitioner.Create ) && + m.IsGenericMethod && + m.GetParameters().Length == 1 && + m.GetParameters()[0].ParameterType.GetGenericTypeDefinition() == typeof( IEnumerable<> ) ); + + var createPartitionerCall = Call( + createMethod.MakeGenericMethod( typeof( (TSource, int) ) ), + sourceExpression ); + + var orderablePartitionerType = typeof( OrderablePartitioner<> ).MakeGenericType( typeof( (TSource, int) ) ); + var getPartitionsMethod = orderablePartitionerType.GetMethod( nameof( OrderablePartitioner<(TSource, int)>.GetPartitions ) ); + var getPartitionsCall = Call( + createPartitionerCall, + getPartitionsMethod, + partitionCount ); + + var partition = Variable( typeof( IEnumerator<(TSource, int)> ), "partition" ); + + // Task.Run( async () => { + // while ( partition.MoveNext() ) await function( partition.Current ) + // } + var moveNext = Call( partition, typeof( IEnumerator ).GetMethod( nameof( IEnumerator.MoveNext ) ) ); + var current = Property( partition, nameof( IEnumerator.Current ) ); + var taskRun = Call( + typeof( Task ).GetMethod( nameof( Task.Run ), [typeof( Func )] )!, + Lambda>( + BlockAsync( + // Using( partition, + While( + moveNext, + Await( Invoke( function, current ), configureAwait: false ) + ) + // ) + ) + ) + ); + + /* + * List tasks = new List(); + * foreach( var partition in partitions ) + * { + * tasks.Add( Task.Run( async () => { ... } ) ) ); + * } + */ + var tasks = Variable( typeof( List ), "tasks" ); + var initalizeTasks = Assign( tasks, New( typeof( List ).GetConstructors()[0] ) ); + var addTask = Call( tasks, typeof( List ).GetMethod( nameof( List.Add ) )!, taskRun ); + var foreachPartition = ForEach( getPartitionsCall, partition, addTask ); + + // await Task.WhenAll( tasks ); + var taskWhenAll = Await( Call( + typeof( Task ).GetMethod( nameof( Task.WhenAll ), [typeof( IEnumerable )] ), + tasks ) + ); + + return BlockAsync( + [tasks], + initalizeTasks, + foreachPartition, + taskWhenAll + ); + } +} + +public static class LoggerExpression +{ + public static Expression> Log( string message ) + { + return arg1 => Log( message, arg1 ); + } + + public static void Log( string message, object arg1 ) + { + Console.WriteLine( $"{message} value: {arg1}" ); + } } diff --git a/src/Hyperbee.Pipeline/Binders/WrapBinder.cs b/src/Hyperbee.Pipeline/Binders/WrapBinder.cs index e8c63a2..89636c8 100644 --- a/src/Hyperbee.Pipeline/Binders/WrapBinder.cs +++ b/src/Hyperbee.Pipeline/Binders/WrapBinder.cs @@ -1,9 +1,9 @@ using System.Linq.Expressions; using Hyperbee.Pipeline.Context; +using Hyperbee.Pipeline.Extensions.Implementation; using static System.Linq.Expressions.Expression; using static Hyperbee.Expressions.ExpressionExtensions; - namespace Hyperbee.Pipeline.Binders; internal class WrapBinder @@ -17,68 +17,66 @@ public WrapBinder( Expression> middleware, Expr Configure = configure; } - // public FunctionAsync Bind( FunctionAsync next ) - // { - // var defaultName = next.Method.Name; - // - // return async ( context, argument ) => - // { - // var contextControl = (IPipelineContextControl) context; - // - // using var _ = contextControl.CreateFrame( context, Configure, defaultName ); - // - // return await Middleware( - // context, - // argument, - // async ( context1, argument1 ) => await next( context1, argument1 ).ConfigureAwait( false ) - // ).ConfigureAwait( false ); - // }; - // } - public Expression> Bind( Expression> next ) { + /* + var defaultName = next.Method.Name; + + return async ( context, argument ) => + { + var contextControl = (IPipelineContextControl) context; + + using var _ = contextControl.CreateFrame( context, Configure, defaultName ); + + return await Middleware( + context, + argument, + async ( ctx, arg ) => await next( context1, argument1 ).ConfigureAwait( false ) + ).ConfigureAwait( false ); + }; + */ + + // TODO: Better way to get Name + var frameName = next.Name ?? "name"; + var context = Parameter( typeof( IPipelineContext ), "context" ); var argument = Parameter( typeof( TInput ), "argument" ); - // if ( Middleware == null ) - // return await nextFunction( context, nextArgument ).ConfigureAwait( false ); + // If there is no middleware, there is no need to wrap the next function if ( Middleware == null ) { return Lambda>( - Invoke( next, context, argument ), - parameters: [context, argument] ); + Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName ); + ContextImplExtensions.CreateFrameExpression( context, Configure, frameName ), + Invoke( next, context, argument ) + ) ); } - // async ( context1, argument1 ) => await nextFunction( context1, (TOutput) argument1 ).ConfigureAwait( false ) - var context1 = Parameter( typeof( IPipelineContext ), "context1" ); - var argument1 = Parameter( typeof( TOutput ), "argument1" ); - + var ctx = Parameter( typeof( IPipelineContext ), "ctx" ); + var arg = Parameter( typeof( TOutput ), "arg" ); var middlewareNext = Lambda>( BlockAsync( Convert( Await( - Invoke( next, context1, argument1 ), + Invoke( next, ctx, arg ), configureAwait: false ), typeof( TOutput ) ) ), - parameters: [context1, argument1] + parameters: [ctx, arg] ); - // return (TNext) await Middleware( - // context, - // nextArgument, - // middlewareNext - // ).ConfigureAwait( false ); return Lambda>( BlockAsync( - Convert( - Await( - Invoke( Middleware, - context, - argument, - middlewareNext - ), - configureAwait: false ), - typeof( TOutput ) ) ), + Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName ); + ContextImplExtensions.CreateFrameExpression( context, Configure, frameName ), + Await( + Invoke( Middleware, + context, + argument, + middlewareNext + ), + configureAwait: false ) + ) + ), parameters: [context, argument] ); } diff --git a/src/Hyperbee.Pipeline/Builders/CallBlockBuilder.cs b/src/Hyperbee.Pipeline/Builders/CallBlockBuilder.cs index a3779a5..683e1ff 100644 --- a/src/Hyperbee.Pipeline/Builders/CallBlockBuilder.cs +++ b/src/Hyperbee.Pipeline/Builders/CallBlockBuilder.cs @@ -1,5 +1,4 @@ -using System.Linq.Expressions; -using Hyperbee.Pipeline.Binders; +using Hyperbee.Pipeline.Binders; using Hyperbee.Pipeline.Extensions.Implementation; namespace Hyperbee.Pipeline; @@ -26,27 +25,6 @@ Func, IPipelineBuilder> builder internal static class CallBlockBuilder { - // public static IPipelineBuilder Call( - // IPipelineBuilder parent, - // bool inheritMiddleware, - // Func, IPipelineBuilder> builder - // ) - // { - // ArgumentNullException.ThrowIfNull( builder ); - // - // var (parentFunction, parentMiddleware) = parent.GetPipelineFunction(); - // - // var block = PipelineFactory.Start( inheritMiddleware ? parentMiddleware : null ); - // var (parentFunction, parentMiddleware) = parent.GetPipelineFunction(); - // var function = CastExpression( parent.CastExpression( builder ); // cast because we don't know the final Pipe output value - // builder( block ).CastFunction(); // cast because we don't know the final Pipe output value - // - // return new PipelineBuilder - // { - // Function = new CallBlockBinder( parentFunction ).Bind( block ), - // Middleware = parentMiddleware - // }; - // } public static IPipelineBuilder Call( IPipelineBuilder parent, bool inheritMiddleware, diff --git a/src/Hyperbee.Pipeline/Builders/CallStatementBuilder.cs b/src/Hyperbee.Pipeline/Builders/CallStatementBuilder.cs index d2cf84c..a381fdb 100644 --- a/src/Hyperbee.Pipeline/Builders/CallStatementBuilder.cs +++ b/src/Hyperbee.Pipeline/Builders/CallStatementBuilder.cs @@ -89,6 +89,7 @@ public static IPipelineBuilder CallAsync( Middleware = parentMiddleware }; } + internal static Task AsyncNext( Procedure next, IPipelineContext context, TOutput argument ) { next( context, argument ); diff --git a/src/Hyperbee.Pipeline/Builders/ForEachBlockBuilder.cs b/src/Hyperbee.Pipeline/Builders/ForEachBlockBuilder.cs index 83e604b..71619ba 100644 --- a/src/Hyperbee.Pipeline/Builders/ForEachBlockBuilder.cs +++ b/src/Hyperbee.Pipeline/Builders/ForEachBlockBuilder.cs @@ -37,7 +37,6 @@ Func, IPipelineBuilder> builder var (parentFunction, parentMiddleware) = parent.GetPipelineFunction(); var block = PipelineFactory.Start( inheritMiddleware ? parentMiddleware : null ); - //var function = builder( block ).CastFunction(); // cast because we don't know the final Pipe output value var function = builder( block ).CastExpression(); // cast because we don't know the final Pipe output value return new PipelineBuilder diff --git a/src/Hyperbee.Pipeline/Builders/WaitAllBlockBuilder.cs b/src/Hyperbee.Pipeline/Builders/WaitAllBlockBuilder.cs index 2e8ce3b..50a2fa8 100644 --- a/src/Hyperbee.Pipeline/Builders/WaitAllBlockBuilder.cs +++ b/src/Hyperbee.Pipeline/Builders/WaitAllBlockBuilder.cs @@ -93,18 +93,20 @@ public static IPipelineBuilder WaitAll( var (parentFunction, parentMiddleware) = parent.GetPipelineFunction(); - var functions = builderInstances + var functions = Expression.Constant( builderInstances .Select( builder => new { builder, block = PipelineFactory.Start( inheritMiddleware ? parentMiddleware : null ) } ) .Select( x => x.builder( x.block ).CastFunction() ) - .ToArray(); + .ToArray() ); Expression> configExpression = config == null ? null : ctx => config( ctx ); + Expression> reducerExpression = ( ctx, arg, results ) => reducer( ctx, arg, results ); + return new PipelineBuilder { - //Function = new WaitAllBlockBinder( parentFunction, parentMiddleware, configExpression ).Bind( functions, reducer ), + Function = new WaitAllBlockBinder( parentFunction, parentMiddleware, configExpression ).Bind( functions, reducerExpression ), Middleware = parentMiddleware }; } @@ -122,12 +124,12 @@ public class Builders public Func, IPipelineBuilder>[] Create( params Func, IPipelineBuilder>[] builders ) => builders; } -public sealed record WaitAllResult -{ - internal WaitAllResult() - { - } +public sealed record WaitAllResult( IPipelineContext Context, object Result ); +//{ +// public WaitAllResult() +// { +// } - public object Result { get; init; } - public IPipelineContext Context { get; init; } -} +// public object Result { get; init; } +// public IPipelineContext Context { get; init; } +//} diff --git a/src/Hyperbee.Pipeline/Extensions/Implementation/ContextImplExtensions.cs b/src/Hyperbee.Pipeline/Extensions/Implementation/ContextImplExtensions.cs index 2a1ce57..dd162e8 100644 --- a/src/Hyperbee.Pipeline/Extensions/Implementation/ContextImplExtensions.cs +++ b/src/Hyperbee.Pipeline/Extensions/Implementation/ContextImplExtensions.cs @@ -1,4 +1,7 @@ -using Hyperbee.Pipeline.Context; +using System.Linq.Expressions; +using System.Reflection; +using Hyperbee.Pipeline.Context; +using static System.Linq.Expressions.Expression; namespace Hyperbee.Pipeline.Extensions.Implementation; @@ -15,6 +18,82 @@ public static bool HandleCancellationRequested( this IPipelineContextCo return true; } + public static Expression CreateFrameExpression( + Expression context, + Expression> config, + string defaultName = null + ) + { + /* + { + var name = context.Name; + var id = context.Id; + + try + { + control.Id = control.GetNextId(); + control.Name = defaultName; + + configure?.Invoke( context ); // invoke user configure + + return new Disposable( () => + { + control.Id = id; + control.Name = name; + } ); + } + catch + { + control.Id = id; + control.Name = name; + throw; + } + } + */ + + var control = Convert( context, typeof( IPipelineContextControl ) ); + + var idVariable = Variable( typeof( int ), "originalId" ); + var nameVariable = Variable( typeof( string ), "originalName" ); + + var idProperty = Property( control, "Id" ); + var nameProperty = Property( control, "Name" ); + + var exception = Variable( typeof( Exception ), "exception" ); + + return Block( + [idVariable, nameVariable], + Assign( idVariable, idProperty ), + Assign( nameVariable, nameProperty ), + TryCatch( + Block( + Assign( idProperty, Call( control, "GetNextId", Type.EmptyTypes ) ), + Assign( nameProperty, Constant( defaultName ) ), + config != null + ? Invoke( config, context ) + : Empty(), + New( Disposable.ConstructorInfo, + Lambda( + Block( + Assign( idProperty, idVariable ), + Assign( nameProperty, Constant( "lambdaName" ) ) + ) + ) ) + ), + Catch( + exception, + Block( + [exception], + Assign( idProperty, idVariable ), + Assign( nameProperty, nameVariable ), + Throw( exception, typeof( Disposable ) ) + ) + ) + ) + ); + } + + /* public static IDisposable CreateFrame( this IPipelineContextControl control, IPipelineContext context, Action configure, string defaultName = null ) { var name = context.Name; @@ -40,9 +119,12 @@ public static IDisposable CreateFrame( this IPipelineContextControl control, IPi throw; } } + */ private sealed class Disposable( Action dispose ) : IDisposable { + public static readonly ConstructorInfo ConstructorInfo = typeof( Disposable ).GetConstructors()[0]; + private int _disposed; private Action Disposer { get; } = dispose; diff --git a/src/Hyperbee.Pipeline/Hyperbee.Pipeline.csproj b/src/Hyperbee.Pipeline/Hyperbee.Pipeline.csproj index 038922d..2cf6bd8 100644 --- a/src/Hyperbee.Pipeline/Hyperbee.Pipeline.csproj +++ b/src/Hyperbee.Pipeline/Hyperbee.Pipeline.csproj @@ -30,8 +30,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + \ No newline at end of file diff --git a/src/Hyperbee.Pipline.Caching/Hyperbee.Pipeline.Caching.csproj b/src/Hyperbee.Pipline.Caching/Hyperbee.Pipeline.Caching.csproj index ef35005..00848f2 100644 --- a/src/Hyperbee.Pipline.Caching/Hyperbee.Pipeline.Caching.csproj +++ b/src/Hyperbee.Pipline.Caching/Hyperbee.Pipeline.Caching.csproj @@ -39,7 +39,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/test/Hyperbee.Pipeline.Auth.Tests/Hyperbee.Pipeline.Auth.Tests.csproj b/test/Hyperbee.Pipeline.Auth.Tests/Hyperbee.Pipeline.Auth.Tests.csproj index d6b5970..900d218 100644 --- a/test/Hyperbee.Pipeline.Auth.Tests/Hyperbee.Pipeline.Auth.Tests.csproj +++ b/test/Hyperbee.Pipeline.Auth.Tests/Hyperbee.Pipeline.Auth.Tests.csproj @@ -14,8 +14,8 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - - + + diff --git a/test/Hyperbee.Pipeline.Tests/Hyperbee.Pipeline.Tests.csproj b/test/Hyperbee.Pipeline.Tests/Hyperbee.Pipeline.Tests.csproj index 854a675..41f8b8f 100644 --- a/test/Hyperbee.Pipeline.Tests/Hyperbee.Pipeline.Tests.csproj +++ b/test/Hyperbee.Pipeline.Tests/Hyperbee.Pipeline.Tests.csproj @@ -4,9 +4,9 @@ false - + - + diff --git a/test/Hyperbee.PipelineCaching.Tests/Hyperbee.Pipeline.Caching.Tests.csproj b/test/Hyperbee.PipelineCaching.Tests/Hyperbee.Pipeline.Caching.Tests.csproj index eef89b6..bc5d0ef 100644 --- a/test/Hyperbee.PipelineCaching.Tests/Hyperbee.Pipeline.Caching.Tests.csproj +++ b/test/Hyperbee.PipelineCaching.Tests/Hyperbee.Pipeline.Caching.Tests.csproj @@ -13,10 +13,10 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + - +