Skip to content

Commit

Permalink
Disposing pipeline should not dispose external inner pipeline (#1529)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk authored Aug 31, 2023
1 parent 5baee94 commit 0cbc4c6
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 27 deletions.
5 changes: 0 additions & 5 deletions src/Polly.Core/ResiliencePipelineBuilderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,6 @@ internal PipelineComponent BuildPipelineComponent()

var source = new ResilienceTelemetrySource(Name, InstanceName, null);

if (components.Distinct().Count() != components.Count)
{
throw new InvalidOperationException("The resilience pipeline must contain unique resilience strategies.");
}

return PipelineComponentFactory.CreateComposite(components, new ResilienceStrategyTelemetry(source, TelemetryListener), TimeProvider);
}

Expand Down
28 changes: 28 additions & 0 deletions src/Polly.Core/Utils/Pipeline/ExternalComponent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;

namespace Polly.Utils.Pipeline;

[DebuggerDisplay("{Component}")]
internal class ExternalComponent : PipelineComponent
{
public ExternalComponent(PipelineComponent component) => Component = component;

internal PipelineComponent Component { get; }

internal override ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state) => Component.ExecuteCore(callback, context, state);

public override void Dispose()
{
// don't dispose component that is external
}

public override ValueTask DisposeAsync()
{
// don't dispose component that is external
return default;
}
}
4 changes: 2 additions & 2 deletions src/Polly.Core/Utils/Pipeline/PipelineComponentFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace Polly.Utils.Pipeline;

internal static class PipelineComponentFactory
{
public static PipelineComponent FromPipeline(ResiliencePipeline pipeline) => pipeline.Component;
public static PipelineComponent FromPipeline(ResiliencePipeline pipeline) => new ExternalComponent(pipeline.Component);

public static PipelineComponent FromPipeline<T>(ResiliencePipeline<T> pipeline) => pipeline.Component;
public static PipelineComponent FromPipeline<T>(ResiliencePipeline<T> pipeline) => new ExternalComponent(pipeline.Component);

public static PipelineComponent FromStrategy(ResilienceStrategy strategy) => new BridgeComponent(strategy);

Expand Down
4 changes: 4 additions & 0 deletions src/Polly.Testing/ResiliencePipelineExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ private static void ExpandComponents(this PipelineComponent component, List<Pipe
{
ExpandComponents(callbacks.Component, components);
}
else if (component is ExternalComponent nonDisposable)
{
ExpandComponents(nonDisposable.Component, components);
}
else
{
components.Add(component);
Expand Down
12 changes: 8 additions & 4 deletions test/Polly.Core.Tests/GenericResiliencePipelineBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ public void Build_Ok()
public void AddGenericStrategy_Ok()
{
// arrange
var testStrategy = Substitute.For<ResilienceStrategy<string>>().AsPipeline();
_builder.AddPipeline(testStrategy);
var strategy = Substitute.For<ResilienceStrategy<string>>();
_builder.AddStrategy(strategy);

// act
var strategy = _builder.Build();
var pipeline = _builder.Build();

// assert
strategy.Should().NotBeNull();
((CompositeComponent)strategy.Component).Components[0].Should().Be(testStrategy.Component);
((CompositeComponent)pipeline.Component).Components[0]
.Should()
.BeOfType<BridgeComponent<string>>().Subject.Strategy
.Should()
.Be(strategy);
}
}
81 changes: 65 additions & 16 deletions test/Polly.Core.Tests/ResiliencePipelineBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void AddPipeline_Single_Ok()
After = (_, _) => executions.Add(3),
};

builder.AddPipeline(first.AsPipeline());
builder.AddStrategy(first);

// act
var pipeline = builder.Build();
Expand Down Expand Up @@ -102,21 +102,6 @@ public void AddPipeline_Multiple_Ok()
executions.Should().HaveCount(7);
}

[Fact]
public void AddPipeline_Duplicate_Throws()
{
// arrange
var executions = new List<int>();
var builder = new ResiliencePipelineBuilder()
.AddPipeline(ResiliencePipeline.Empty)
.AddPipeline(ResiliencePipeline.Empty);

builder.Invoking(b => b.Build())
.Should()
.Throw<InvalidOperationException>()
.WithMessage("The resilience pipeline must contain unique resilience strategies.");
}

[Fact]
public void Validator_Ok()
{
Expand Down Expand Up @@ -247,6 +232,70 @@ public void AddPipeline_NullFactory_Throws()
.Be("factory");
}

[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task AddPipeline_EnsureNotDisposed(bool isAsync)
{
var externalComponent = Substitute.For<PipelineComponent>();
var externalBuilder = new ResiliencePipelineBuilder();
externalBuilder.AddPipelineComponent(_ => externalComponent, new TestResilienceStrategyOptions());
var externalPipeline = externalBuilder.Build();

var internalComponent = Substitute.For<PipelineComponent>();
var builder = new ResiliencePipelineBuilder();
builder
.AddPipeline(externalPipeline)
.AddPipelineComponent(_ => internalComponent, new TestResilienceStrategyOptions());
var pipeline = builder.Build();

if (isAsync)
{
await pipeline.DisposeHelper.DisposeAsync();
await externalComponent.Received(0).DisposeAsync();
await internalComponent.Received(1).DisposeAsync();
}
else
{
pipeline.DisposeHelper.Dispose();
externalComponent.Received(0).Dispose();
internalComponent.Received(1).Dispose();
}
}

[InlineData(true)]
[InlineData(false)]
[Theory]
public async Task AddPipeline_Generic_EnsureNotDisposed(bool isAsync)
{
var externalComponent = Substitute.For<PipelineComponent>();
var externalBuilder = new ResiliencePipelineBuilder<string>();
externalBuilder.AddPipelineComponent(_ => externalComponent, new TestResilienceStrategyOptions());
var externalPipeline = externalBuilder.Build();

var internalComponent = Substitute.For<PipelineComponent>();
var builder = new ResiliencePipelineBuilder<string>();
builder
.AddPipeline(externalPipeline)
.AddPipelineComponent(_ => internalComponent, new TestResilienceStrategyOptions());
var pipeline = builder.Build();

pipeline.Execute(_ => string.Empty);

if (isAsync)
{
await pipeline.DisposeHelper.DisposeAsync();
await externalComponent.Received(0).DisposeAsync();
await internalComponent.Received(1).DisposeAsync();
}
else
{
pipeline.DisposeHelper.Dispose();
externalComponent.Received(0).Dispose();
internalComponent.Received(1).Dispose();
}
}

[Fact]
public void AddPipeline_CombinePipelines_Ok()
{
Expand Down
25 changes: 25 additions & 0 deletions test/Polly.Extensions.Tests/DisposablePipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ public void DisposePipeline_EnsureLinkedResourcesDisposedToo()
IsDisposed(limiters[0]).Should().BeTrue();
}

[Fact]
public void DisposePipeline_EnsureExternalPipelineNotDisposed()
{
var registry1 = new ResiliencePipelineRegistry<string>();
var pipeline1 = registry1.GetOrAddPipeline("my-pipeline", builder => builder.AddConcurrencyLimiter(1));
var pipeline2 = registry1.GetOrAddPipeline<string>("my-pipeline", builder => builder.AddConcurrencyLimiter(1));

var registry2 = new ResiliencePipelineRegistry<string>();
var pipeline3 = registry2.GetOrAddPipeline<string>("my-pipeline", builder => builder
.AddPipeline(pipeline1)
.AddPipeline(pipeline2));

pipeline3.Execute(() => string.Empty);
registry2.Dispose();
pipeline3.Invoking(p => p.Execute(() => string.Empty)).Should().Throw<ObjectDisposedException>();

pipeline1.Execute(() => { });
pipeline2.Execute(() => string.Empty);

registry1.Dispose();

pipeline1.Invoking(p => p.Execute(() => { })).Should().Throw<ObjectDisposedException>();
pipeline2.Invoking(p => p.Execute(() => string.Empty)).Should().Throw<ObjectDisposedException>();
}

private static bool IsDisposed(RateLimiter limiter)
{
try
Expand Down
12 changes: 12 additions & 0 deletions test/Polly.Testing.Tests/ResiliencePipelineExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ public void GetPipelineDescriptor_Reloadable_Ok()
descriptor.Strategies[1].StrategyInstance.GetType().Should().Be(typeof(CustomStrategy));
}

[Fact]
public void GetPipelineDescriptor_InnerPipeline_Ok()
{
var descriptor = new ResiliencePipelineBuilder()
.AddPipeline(new ResiliencePipelineBuilder().AddConcurrencyLimiter(1).Build())
.Build()
.GetPipelineDescriptor();

descriptor.Strategies.Should().HaveCount(1);
descriptor.Strategies[0].Options.Should().BeOfType<RateLimiterStrategyOptions>();
}

private sealed class CustomStrategy : ResilienceStrategy
{
protected override ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback, ResilienceContext context, TState state)
Expand Down

0 comments on commit 0cbc4c6

Please sign in to comment.