-
Notifications
You must be signed in to change notification settings - Fork 129
/
Demo11_MultipleConcurrencyLimiters.cs
197 lines (174 loc) · 8.32 KB
/
Demo11_MultipleConcurrencyLimiters.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
using System.Collections.Concurrent;
using PollyDemos.Helpers;
using PollyDemos.OutputHelpers;
namespace PollyDemos;
/// <summary>
/// <para>
/// Same scenario as previous demo:
/// <list type="bullet">
/// <item>Imagine a microservice or web front end (the upstream caller) trying to call two endpoints on a downstream system.</item>
/// <item>The 'good' endpoint responds quickly. The 'faulting' endpoint faults, and responds slowly.</item>
/// <item>Imagine the caller has limited capacity (all single instances of services/webapps eventually hit some capacity limit).</item>
/// </list>
/// </para>
/// <para>
/// Compared to demo 10, this demo isolates the calls
/// to the 'good' and 'faulting' endpoints in separate concurrency limiters. <br/>
/// A random combination of calls to the 'good' and 'faulting' endpoint are made.
/// </para>
/// <para>
/// Observations:
/// <list type="bullet">
/// <item>Because the separate 'good' and 'faulting' streams are isolated in separate concurrency limiters,</item>
/// <item>the 'faulting' calls still back up (high pending and failing number),</item>
/// <item>but 'good' calls (in a separate limiter) are <strong>unaffected</strong> (all succeed; none pending or failing).</item>
/// </list>
/// </para>
/// <para>
/// Concurrency limiters can be used to implement the bulkhead resiliency pattern. <br/>
/// Bulkheads' motto: making sure one fault doesn't sink the whole ship!
/// </para>
/// <para>
/// How to read the demo logs:
/// <list type="bullet">
/// <item>"Response: Fast ... to request #N": Response received from nonthrottledgood route.</item>
/// <item>"Response: Slow... to request #N": Response received from nonthrottledfaulting route.</item>
/// <item>"Request N failed with: ... rate limiter": Concurrency limit is reached, request is rejected.</item>
/// </list>
/// </para>
/// </summary>
public class Demo11_MultipleConcurrencyLimiters : ConcurrencyLimiterDemoBase
{
// Let's imagine this caller has some theoretically limited capacity.
const int callerParallelCapacity = 8; // artificially low - but easier to follow to illustrate the principle
private readonly ResiliencePipeline limiterForGoodCalls = new ResiliencePipelineBuilder()
.AddConcurrencyLimiter(
permitLimit: callerParallelCapacity / 2,
queueLimit: 10)
.Build();
private readonly ResiliencePipeline limiterForFaultingCalls = new ResiliencePipelineBuilder()
.AddConcurrencyLimiter(
permitLimit: callerParallelCapacity / 2,
queueLimit: 10)
.Build();
public override string Description =>
"Demonstrates a good call stream and faulting call stream separated into separate concurrency limiters. The faulting call stream is isolated from affecting the good call stream.";
public override async Task ExecuteAsync(CancellationToken externalCancellationToken, IProgress<DemoProgress> progress)
{
ArgumentNullException.ThrowIfNull(progress);
PrintHeader(progress);
TotalRequests = 0;
await ValueTask.FromResult(true);
var tasks = new List<Task>();
var internalCancellationTokenSource = new CancellationTokenSource();
var combinedToken = CancellationTokenSource
.CreateLinkedTokenSource(externalCancellationToken, internalCancellationTokenSource.Token)
.Token;
var messages = new ConcurrentQueue<(string Message, Color Color)>();
var client = new HttpClient();
var internalCancel = false;
while (!(internalCancel || externalCancellationToken.IsCancellationRequested))
{
TotalRequests++;
var thisRequest = TotalRequests;
if (Random.Shared.Next(0, 2) == 0)
{
GoodRequestsMade++;
tasks.Add(CallGoodEndpoint(client, messages, thisRequest, combinedToken));
}
else
{
FaultingRequestsMade++;
tasks.Add(CallFaultingEndpoint(client, messages, thisRequest, combinedToken));
}
while (messages.TryDequeue(out var tuple))
{
progress.Report(ProgressWithMessage(tuple.Message, tuple.Color));
}
await Task.Delay(TimeSpan.FromSeconds(0.2), externalCancellationToken).ConfigureAwait(false);
internalCancel = ShouldTerminateByKeyPress();
}
}
private Task CallFaultingEndpoint(HttpClient client, ConcurrentQueue<(string Message, Color Color)> messages, int thisRequest, CancellationToken cancellationToken)
{
ValueTask issueRequest = limiterForFaultingCalls.ExecuteAsync(async token =>
{
try
{
var responseBody = await IssueFaultingRequestAndProcessResponseAsync(client, token).ConfigureAwait(false);
if (!cancellationToken.IsCancellationRequested)
{
messages.Enqueue(($"Response: {responseBody}", Color.Green));
}
FaultingRequestsSucceeded++;
}
catch (Exception e)
{
if (!token.IsCancellationRequested)
{
messages.Enqueue(($"Request {thisRequest} eventually failed with: {e.Message}", Color.Red));
}
FaultingRequestsFailed++;
}
}, cancellationToken);
Task handleFailure = issueRequest
.AsTask()
.ContinueWith((failedTask, state) =>
{
if (failedTask.IsFaulted)
{
var message = $"Request {state} failed with: {failedTask.Exception!.Flatten().InnerExceptions.First().Message}";
messages.Enqueue((message, Color.Red));
}
FaultingRequestsFailed++;
}, thisRequest, TaskContinuationOptions.NotOnRanToCompletion);
return handleFailure;
}
private Task CallGoodEndpoint(HttpClient client, ConcurrentQueue<(string Message, Color Color)> messages, int thisRequest, CancellationToken cancellationToken)
{
ValueTask issueRequest = limiterForGoodCalls.ExecuteAsync(async token =>
{
try
{
var responseBody = await IssueGoodRequestAndProcessResponseAsync(client, token).ConfigureAwait(false);
if (!cancellationToken.IsCancellationRequested)
{
messages.Enqueue(($"Response: {responseBody}", Color.Green));
}
GoodRequestsSucceeded++;
}
catch (Exception e)
{
if (!token.IsCancellationRequested)
{
messages.Enqueue(($"Request {thisRequest} eventually failed with: {e.Message}", Color.Red));
}
GoodRequestsFailed++;
}
}, cancellationToken);
Task handleFailure = issueRequest
.AsTask()
.ContinueWith((failedTask, state) =>
{
if (failedTask.IsFaulted)
{
var message = $"Request {state} failed with: {failedTask.Exception!.Flatten().InnerExceptions.First().Message}";
messages.Enqueue((message, Color.Red));
}
GoodRequestsFailed++;
}, thisRequest, TaskContinuationOptions.NotOnRanToCompletion);
return handleFailure;
}
public override Statistic[] LatestStatistics => new Statistic[]
{
new("Total requests made", TotalRequests, Color.Default),
new("Good endpoint: requested", GoodRequestsMade, Color.Default),
new("Good endpoint: succeeded", GoodRequestsSucceeded, Color.Green),
new("Good endpoint: pending", GoodRequestsMade - GoodRequestsSucceeded - GoodRequestsFailed, Color.Yellow),
new("Good endpoint: failed", GoodRequestsFailed, Color.Red),
new("Faulting endpoint: requested", FaultingRequestsMade, Color.Default),
new("Faulting endpoint: succeeded", FaultingRequestsSucceeded, Color.Green),
new("Faulting endpoint: pending", FaultingRequestsMade - FaultingRequestsSucceeded - FaultingRequestsFailed, Color.Yellow),
new("Faulting endpoint: failed", FaultingRequestsFailed, Color.Red),
};
}