-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathHedgingExecutionContext.cs
More file actions
211 lines (174 loc) · 7.17 KB
/
HedgingExecutionContext.cs
File metadata and controls
211 lines (174 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
using Polly.Hedging.Controller;
namespace Polly.Hedging.Utils;
/// <summary>
/// The context associated with an execution of hedging resilience strategy.
/// It holds the resources for all executed hedged tasks (primary + secondary) and is responsible for resource disposal.
/// </summary>
internal sealed class HedgingExecutionContext<T> : IAsyncDisposable
{
public readonly record struct ExecutionInfo<TResult>(TaskExecution<T>? Execution, bool Loaded, Outcome<TResult>? Outcome);
private readonly List<TaskExecution<T>> _tasks = [];
private readonly List<TaskExecution<T>> _executingTasks = [];
private readonly ObjectPool<TaskExecution<T>> _executionPool;
private readonly TimeProvider _timeProvider;
private readonly int _maxAttempts;
private readonly Action<HedgingExecutionContext<T>> _onReset;
public HedgingExecutionContext(
ObjectPool<TaskExecution<T>> executionPool,
TimeProvider timeProvider,
int maxAttempts,
Action<HedgingExecutionContext<T>> onReset)
{
_executionPool = executionPool;
_timeProvider = timeProvider;
_maxAttempts = maxAttempts;
_onReset = onReset;
}
internal void Initialize(ResilienceContext context) => PrimaryContext = context;
public int LoadedTasks => _tasks.Count;
public ResilienceContext? PrimaryContext { get; private set; }
public bool IsInitialized => PrimaryContext != null;
public IReadOnlyList<TaskExecution<T>> Tasks => _tasks;
private bool ContinueOnCapturedContext => PrimaryContext!.ContinueOnCapturedContext;
public async ValueTask<ExecutionInfo<T>> LoadExecutionAsync<TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<T>>> primaryCallback,
TState state)
{
if (LoadedTasks >= _maxAttempts)
{
return CreateExecutionInfoWhenNoExecution();
}
// determine what type of task we are creating
var type = LoadedTasks switch
{
0 => HedgedTaskType.Primary,
_ => HedgedTaskType.Secondary
};
var execution = _executionPool.Get();
if (await execution.InitializeAsync(type, PrimaryContext!, primaryCallback, state, LoadedTasks).ConfigureAwait(ContinueOnCapturedContext))
{
// we were able to start a new execution, register it
_tasks.Add(execution);
_executingTasks.Add(execution);
return new ExecutionInfo<T>(execution, true, null);
}
else
{
_executionPool.Return(execution);
return CreateExecutionInfoWhenNoExecution();
}
}
public async ValueTask DisposeAsync()
{
UpdateOriginalContext();
// first, cancel any pending tasks
foreach (var pair in _executingTasks)
{
pair.Cancel();
}
foreach (var task in _tasks)
{
await task.ExecutionTaskSafe!.ConfigureAwait(false);
await task.ResetAsync().ConfigureAwait(false);
_executionPool.Return(task);
}
Reset();
}
public async ValueTask<TaskExecution<T>?> TryWaitForCompletedExecutionAsync(TimeSpan hedgingDelay)
{
// before doing anything expensive, let's check whether any existing task is already completed
if (TryRemoveExecutedTask() is TaskExecution<T> execution)
{
return execution;
}
if (LoadedTasks == _maxAttempts)
{
await WaitForTaskCompetitionAsync().ConfigureAwait(ContinueOnCapturedContext);
var task = TryRemoveExecutedTask();
Debug.Assert(task != null, "There must be a completed task after awaiting for an executing task");
return task;
}
if (hedgingDelay == TimeSpan.Zero || LoadedTasks == 0)
{
// just load the next task
return null;
}
// Stryker disable once equality : no means to test this, stryker changes '<' to '<=' where 0 is already covered in the branch above
if (hedgingDelay < TimeSpan.Zero)
{
await WaitForTaskCompetitionAsync().ConfigureAwait(ContinueOnCapturedContext);
var task = TryRemoveExecutedTask();
Debug.Assert(task != null, "There must be a completed task after awaiting for an executing task");
return task;
}
#if NET8_0_OR_GREATER
var whenAnyHedgedTask = WaitForTaskCompetitionAsync();
await whenAnyHedgedTask.WaitAsync(hedgingDelay, _timeProvider, PrimaryContext!.CancellationToken)
.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing | (ContinueOnCapturedContext ? ConfigureAwaitOptions.ContinueOnCapturedContext : 0));
if (!whenAnyHedgedTask.IsCompleted)
{
return null;
}
#else
using var delayTaskCancellation = CancellationTokenSource.CreateLinkedTokenSource(PrimaryContext!.CancellationToken);
var delayTask = _timeProvider.Delay(hedgingDelay, delayTaskCancellation.Token);
var whenAnyHedgedTask = WaitForTaskCompetitionAsync();
var completedTask = await Task.WhenAny(whenAnyHedgedTask, delayTask).ConfigureAwait(ContinueOnCapturedContext);
if (completedTask == delayTask)
{
return null;
}
delayTaskCancellation.Cancel();
#endif
var completed = TryRemoveExecutedTask();
Debug.Assert(completed != null, "There must be a completed task after awaiting for an executing task");
return completed;
}
private ExecutionInfo<T> CreateExecutionInfoWhenNoExecution()
{
// if there are no more executing tasks we need to check finished ones
if (_executingTasks.Count == 0)
{
var finishedExecution = _tasks.First(static t => t.ExecutionTaskSafe!.IsCompleted);
finishedExecution.AcceptOutcome();
return new ExecutionInfo<T>(null, false, finishedExecution.Outcome);
}
return new ExecutionInfo<T>(null, false, null);
}
private Task WaitForTaskCompetitionAsync()
{
#pragma warning disable S109 // Magic numbers should not be used
return _executingTasks.Count switch
{
1 => _executingTasks[0].ExecutionTaskSafe!,
2 => Task.WhenAny(_executingTasks[0].ExecutionTaskSafe!, _executingTasks[1].ExecutionTaskSafe!),
_ => Task.WhenAny(_executingTasks.Select(v => v.ExecutionTaskSafe!))
};
#pragma warning restore S109 // Magic numbers should not be used
}
private TaskExecution<T>? TryRemoveExecutedTask()
{
var i = _executingTasks.FindIndex(static v => v.ExecutionTaskSafe!.IsCompleted);
if (i != -1)
{
var execution = _executingTasks[i];
_executingTasks.RemoveAt(i);
return execution;
}
return null;
}
private void UpdateOriginalContext()
{
if (_tasks.Find(static t => t.IsAccepted) is TaskExecution<T> acceptedExecution)
{
PrimaryContext!.Properties.AddOrReplaceProperties(acceptedExecution.Context.Properties);
}
}
private void Reset()
{
_tasks.Clear();
_executingTasks.Clear();
PrimaryContext = null;
_onReset(this);
}
}