Skip to content

Commit 2c26e52

Browse files
committed
Redesign the current aspnetcore Concurrency limiter middleware to use the new abstractions & implementations dotnet#38306
1 parent 1e26857 commit 2c26e52

File tree

5 files changed

+143
-216
lines changed

5 files changed

+143
-216
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Collections.Concurrent;
5+
using System.Threading.RateLimiting;
6+
using Microsoft.Extensions.Options;
7+
8+
namespace Microsoft.AspNetCore.ConcurrencyLimiter;
9+
10+
internal class BasePolicy : IQueuePolicy, IDisposable
11+
{
12+
private readonly System.Threading.RateLimiting.ConcurrencyLimiter _limiter;
13+
private readonly ConcurrentQueue<RateLimitLease> _leases = new ConcurrentQueue<RateLimitLease>();
14+
15+
public int TotalRequests => _leases.Count;
16+
17+
public BasePolicy(IOptions<QueuePolicyOptions> options, QueueProcessingOrder order)
18+
{
19+
var queuePolicyOptions = options.Value;
20+
21+
var maxConcurrentRequests = queuePolicyOptions.MaxConcurrentRequests;
22+
if (maxConcurrentRequests <= 0)
23+
{
24+
throw new ArgumentException("MaxConcurrentRequests must be a positive integer.", nameof(options));
25+
}
26+
27+
var requestQueueLimit = queuePolicyOptions.RequestQueueLimit;
28+
if (requestQueueLimit < 0)
29+
{
30+
throw new ArgumentException("The RequestQueueLimit cannot be a negative number.", nameof(options));
31+
}
32+
33+
_limiter = new System.Threading.RateLimiting.ConcurrencyLimiter(
34+
new System.Threading.RateLimiting.ConcurrencyLimiterOptions(
35+
permitLimit: maxConcurrentRequests, order, queueLimit: requestQueueLimit));
36+
}
37+
38+
public ValueTask<bool> TryEnterAsync()
39+
{
40+
// a return value of 'false' indicates that the request is rejected
41+
// a return value of 'true' indicates that the request may proceed
42+
43+
var lease = _limiter.Acquire();
44+
if (lease.IsAcquired)
45+
{
46+
_leases.Enqueue(lease);
47+
return new ValueTask<bool>(true);
48+
}
49+
50+
var task = _limiter.WaitAsync();
51+
if (task.IsCompletedSuccessfully)
52+
{
53+
lease = task.Result;
54+
if (lease.IsAcquired)
55+
{
56+
_leases.Enqueue(lease);
57+
return new ValueTask<bool>(true);
58+
}
59+
60+
return new ValueTask<bool>(false);
61+
}
62+
63+
return Awaited(task);
64+
}
65+
66+
public void OnExit()
67+
{
68+
if (!_leases.TryDequeue(out var lease))
69+
{
70+
throw new InvalidOperationException("No outstanding leases.");
71+
}
72+
73+
lease.Dispose();
74+
}
75+
76+
public void Dispose()
77+
{
78+
_limiter.Dispose();
79+
}
80+
81+
private async ValueTask<bool> Awaited(ValueTask<RateLimitLease> task)
82+
{
83+
var lease = await task;
84+
85+
if (lease.IsAcquired)
86+
{
87+
_leases.Enqueue(lease);
88+
return true;
89+
}
90+
91+
return false;
92+
}
93+
}
Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,15 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Threading.RateLimiting;
45
using Microsoft.Extensions.Options;
56

67
namespace Microsoft.AspNetCore.ConcurrencyLimiter;
78

8-
internal class QueuePolicy : IQueuePolicy, IDisposable
9+
internal class QueuePolicy : BasePolicy
910
{
10-
private readonly int _maxTotalRequest;
11-
private readonly SemaphoreSlim _serverSemaphore;
12-
13-
private int _totalRequests;
14-
15-
public int TotalRequests => _totalRequests;
16-
1711
public QueuePolicy(IOptions<QueuePolicyOptions> options)
12+
: base(options, QueueProcessingOrder.OldestFirst)
1813
{
19-
var queuePolicyOptions = options.Value;
20-
21-
var maxConcurrentRequests = queuePolicyOptions.MaxConcurrentRequests;
22-
if (maxConcurrentRequests <= 0)
23-
{
24-
throw new ArgumentException("MaxConcurrentRequests must be a positive integer.", nameof(options));
25-
}
26-
27-
var requestQueueLimit = queuePolicyOptions.RequestQueueLimit;
28-
if (requestQueueLimit < 0)
29-
{
30-
throw new ArgumentException("The RequestQueueLimit cannot be a negative number.", nameof(options));
31-
}
32-
33-
_serverSemaphore = new SemaphoreSlim(maxConcurrentRequests);
34-
35-
_maxTotalRequest = maxConcurrentRequests + requestQueueLimit;
36-
}
37-
38-
public ValueTask<bool> TryEnterAsync()
39-
{
40-
// a return value of 'false' indicates that the request is rejected
41-
// a return value of 'true' indicates that the request may proceed
42-
// _serverSemaphore.Release is *not* called in this method, it is called externally when requests leave the server
43-
44-
int totalRequests = Interlocked.Increment(ref _totalRequests);
45-
46-
if (totalRequests > _maxTotalRequest)
47-
{
48-
Interlocked.Decrement(ref _totalRequests);
49-
return new ValueTask<bool>(false);
50-
}
51-
52-
Task task = _serverSemaphore.WaitAsync();
53-
if (task.IsCompletedSuccessfully)
54-
{
55-
return new ValueTask<bool>(true);
56-
}
57-
58-
return SemaphoreAwaited(task);
59-
}
60-
61-
public void OnExit()
62-
{
63-
_serverSemaphore.Release();
64-
65-
Interlocked.Decrement(ref _totalRequests);
66-
}
67-
68-
public void Dispose()
69-
{
70-
_serverSemaphore.Dispose();
71-
}
72-
73-
private static async ValueTask<bool> SemaphoreAwaited(Task task)
74-
{
75-
await task;
76-
77-
return true;
7814
}
7915
}
Lines changed: 3 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,15 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Threading.RateLimiting;
45
using Microsoft.Extensions.Options;
56

67
namespace Microsoft.AspNetCore.ConcurrencyLimiter;
78

8-
internal class StackPolicy : IQueuePolicy
9+
internal class StackPolicy : BasePolicy
910
{
10-
private readonly List<ResettableBooleanCompletionSource> _buffer;
11-
public ResettableBooleanCompletionSource? _cachedResettableTCS;
12-
13-
private readonly int _maxQueueCapacity;
14-
private readonly int _maxConcurrentRequests;
15-
private bool _hasReachedCapacity;
16-
private int _head;
17-
private int _queueLength;
18-
19-
private readonly object _bufferLock = new Object();
20-
21-
private int _freeServerSpots;
22-
2311
public StackPolicy(IOptions<QueuePolicyOptions> options)
12+
: base(options, QueueProcessingOrder.NewestFirst)
2413
{
25-
_buffer = new List<ResettableBooleanCompletionSource>();
26-
_maxQueueCapacity = options.Value.RequestQueueLimit;
27-
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
28-
_freeServerSpots = options.Value.MaxConcurrentRequests;
29-
}
30-
31-
public ValueTask<bool> TryEnterAsync()
32-
{
33-
lock (_bufferLock)
34-
{
35-
if (_freeServerSpots > 0)
36-
{
37-
_freeServerSpots--;
38-
return new ValueTask<bool>(true);
39-
}
40-
41-
// if queue is full, cancel oldest request
42-
if (_queueLength == _maxQueueCapacity)
43-
{
44-
_hasReachedCapacity = true;
45-
_buffer[_head].Complete(false);
46-
_queueLength--;
47-
}
48-
49-
var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
50-
_cachedResettableTCS = null;
51-
52-
if (_hasReachedCapacity || _queueLength < _buffer.Count)
53-
{
54-
_buffer[_head] = tcs;
55-
}
56-
else
57-
{
58-
_buffer.Add(tcs);
59-
}
60-
_queueLength++;
61-
62-
// increment _head for next time
63-
_head++;
64-
if (_head == _maxQueueCapacity)
65-
{
66-
_head = 0;
67-
}
68-
69-
return tcs.GetValueTask();
70-
}
71-
}
72-
73-
public void OnExit()
74-
{
75-
lock (_bufferLock)
76-
{
77-
if (_queueLength == 0)
78-
{
79-
_freeServerSpots++;
80-
81-
if (_freeServerSpots > _maxConcurrentRequests)
82-
{
83-
_freeServerSpots--;
84-
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
85-
}
86-
87-
return;
88-
}
89-
90-
// step backwards and launch a new task
91-
if (_head == 0)
92-
{
93-
_head = _maxQueueCapacity - 1;
94-
}
95-
else
96-
{
97-
_head--;
98-
}
99-
100-
_buffer[_head].Complete(true);
101-
_queueLength--;
102-
}
10314
}
10415
}

src/Middleware/ConcurrencyLimiter/test/MiddlewareTests.cs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -176,40 +176,4 @@ public async Task ExceptionThrownDuringOnRejected()
176176
Assert.Equal(0, concurrent);
177177
Assert.Equal(0, testQueue.QueuedRequests);
178178
}
179-
180-
[Fact]
181-
public async Task MiddlewareOnlyCallsGetResultOnce()
182-
{
183-
var flag = false;
184-
185-
var queue = new TestQueueForResettableBoolean();
186-
var middleware = TestUtils.CreateTestMiddleware(
187-
queue,
188-
next: async context =>
189-
{
190-
await Task.CompletedTask;
191-
flag = true;
192-
});
193-
194-
queue.Source.Complete(true);
195-
await middleware.Invoke(new DefaultHttpContext());
196-
197-
Assert.True(flag);
198-
}
199-
200-
private class TestQueueForResettableBoolean : IQueuePolicy
201-
{
202-
public ResettableBooleanCompletionSource Source;
203-
public TestQueueForResettableBoolean()
204-
{
205-
Source = new ResettableBooleanCompletionSource(TestUtils.CreateStackPolicy(1));
206-
}
207-
208-
public ValueTask<bool> TryEnterAsync()
209-
{
210-
return Source.GetValueTask();
211-
}
212-
213-
public void OnExit() { }
214-
}
215179
}

0 commit comments

Comments
 (0)