Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propose to replace AsyncLock with SemaphoreSlim #2168

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ namespace MQTTnet.AspNetCore;
public sealed class MqttConnectionContext : IMqttChannelAdapter
{
readonly ConnectionContext _connection;
readonly AsyncLock _writerLock = new();
readonly SemaphoreSlim _writerLock = new(1, 1);

PipeReader _input;
PipeWriter _output;
@@ -197,30 +197,29 @@ public void ResetStatistics()

public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken)
{
using (await _writerLock.EnterAsync(cancellationToken).ConfigureAwait(false))
await _writerLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var buffer = PacketFormatterAdapter.Encode(packet);

if (buffer.Payload.Length == 0)
{
// zero copy
// https://github.com/dotnet/runtime/blob/e31ddfdc4f574b26231233dc10c9a9c402f40590/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs#L279
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);
}
else
{
WritePacketBuffer(_output, buffer);
await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}
var buffer = PacketFormatterAdapter.Encode(packet);

BytesSent += buffer.Length;
if (buffer.Payload.Length == 0)
{
// zero copy
// https://github.com/dotnet/runtime/blob/e31ddfdc4f574b26231233dc10c9a9c402f40590/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs#L279
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);
}
finally
else
{
PacketFormatterAdapter.Cleanup();
WritePacketBuffer(_output, buffer);
await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}

BytesSent += buffer.Length;
}
finally
{
_writerLock.Release();
PacketFormatterAdapter.Cleanup();
}
}

66 changes: 0 additions & 66 deletions Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs

This file was deleted.

10 changes: 8 additions & 2 deletions Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ namespace MQTTnet.Server.Internal;
public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification, IDisposable
{
readonly Dictionary<string, MqttConnectedClient> _clients = new(4096);
readonly AsyncLock _createConnectionSyncRoot = new();
readonly SemaphoreSlim _createConnectionSyncRoot = new(1, 1);
readonly MqttServerEventContainer _eventContainer;
readonly MqttNetSourceLogger _logger;
readonly MqttServerOptions _options;
@@ -544,7 +544,9 @@ async Task<MqttConnectedClient> CreateClientConnection(
{
MqttConnectedClient connectedClient;

using (await _createConnectionSyncRoot.EnterAsync().ConfigureAwait(false))
await _createConnectionSyncRoot.WaitAsync().ConfigureAwait(false);

try
{
MqttSession oldSession;
MqttConnectedClient oldConnectedClient;
@@ -629,6 +631,10 @@ async Task<MqttConnectedClient> CreateClientConnection(

oldSession?.Dispose();
}
finally
{
_createConnectionSyncRoot.Release();
}

return connectedClient;
}
18 changes: 14 additions & 4 deletions Source/MQTTnet.Server/Internal/MqttRetainedMessagesManager.cs
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.ObjectModel;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Internal;

@@ -11,7 +10,7 @@ namespace MQTTnet.Server.Internal
public sealed class MqttRetainedMessagesManager
{
readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>(4096);
readonly AsyncLock _storageAccessLock = new AsyncLock();
readonly SemaphoreSlim _storageAccessLock = new(1, 1);

readonly MqttServerEventContainer _eventContainer;
readonly MqttNetSourceLogger _logger;
@@ -98,11 +97,17 @@ public async Task UpdateMessage(string clientId, MqttApplicationMessage applicat

if (saveIsRequired)
{
using (await _storageAccessLock.EnterAsync().ConfigureAwait(false))
await _storageAccessLock.WaitAsync().ConfigureAwait(false);

try
{
var eventArgs = new RetainedMessageChangedEventArgs(clientId, applicationMessage, messagesForSave);
await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
finally
{
_storageAccessLock.Release();
}
}
}
catch (Exception exception)
@@ -140,10 +145,15 @@ public async Task ClearMessages()
_messages.Clear();
}

using (await _storageAccessLock.EnterAsync().ConfigureAwait(false))
await _storageAccessLock.WaitAsync().ConfigureAwait(false);
try
{
await _eventContainer.RetainedMessagesClearedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false);
}
finally
{
_storageAccessLock.Release();
}
}
}
}
28 changes: 0 additions & 28 deletions Source/MQTTnet.TestApp/AsyncLockTest.cs

This file was deleted.

4 changes: 0 additions & 4 deletions Source/MQTTnet.TestApp/Program.cs
Original file line number Diff line number Diff line change
@@ -83,10 +83,6 @@ public static void Main()
{
Task.Run(new MessageThroughputTest().Run);
}
else if (pressedKey.KeyChar == 'f')
{
Task.Run(new AsyncLockTest().Run);
}

Thread.Sleep(Timeout.Infinite);
}
291 changes: 0 additions & 291 deletions Source/MQTTnet.Tests/Internal/AsyncLock_Tests.cs

This file was deleted.

80 changes: 41 additions & 39 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ public sealed class MqttChannelAdapter : Disposable, IMqttChannelAdapter
readonly byte[] _fixedHeaderBuffer = new byte[2];
readonly MqttNetSourceLogger _logger;
readonly byte[] _singleByteBuffer = new byte[1];
readonly AsyncLock _syncRoot = new();
readonly SemaphoreSlim _syncRoot = new(1, 1);

Statistics _statistics; // mutable struct, don't make readonly!

@@ -210,58 +210,60 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat
// This lock makes sure that multiple threads can send packets at the same time.
// This is required when a disconnect is sent from another thread while the
// worker thread is still sending publish packets etc.
using (await _syncRoot.EnterAsync(cancellationToken).ConfigureAwait(false))

await _syncRoot.WaitAsync(cancellationToken).ConfigureAwait(false);

try
{
// Check for cancellation here again because "WaitAsync" might take some time.
cancellationToken.ThrowIfCancellationRequested();

try
{
var packetBuffer = PacketFormatterAdapter.Encode(packet);

var localPacketInspector = PacketInspector;
if (localPacketInspector != null)
{
await localPacketInspector.BeginSendPacket(packetBuffer).ConfigureAwait(false);
}
var packetBuffer = PacketFormatterAdapter.Encode(packet);

_logger.Verbose("TX ({0} bytes) >>> {1}", packetBuffer.Length, packet);
var localPacketInspector = PacketInspector;
if (localPacketInspector != null)
{
await localPacketInspector.BeginSendPacket(packetBuffer).ConfigureAwait(false);
}

if (packetBuffer.Payload.Length == 0 || !AllowPacketFragmentation)
{
await _channel.WriteAsync(new ReadOnlySequence<byte>(packetBuffer.Join()), true, cancellationToken).ConfigureAwait(false);
}
else
{
await _channel.WriteAsync(new ReadOnlySequence<byte>(packetBuffer.Packet), false, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
}
_logger.Verbose("TX ({0} bytes) >>> {1}", packetBuffer.Length, packet);

Interlocked.Add(ref _statistics._bytesReceived, packetBuffer.Length);
if (packetBuffer.Payload.Length == 0 || !AllowPacketFragmentation)
{
await _channel.WriteAsync(new ReadOnlySequence<byte>(packetBuffer.Join()), true, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
else
{
if (!WrapAndThrowException(exception))
{
throw;
}
await _channel.WriteAsync(new ReadOnlySequence<byte>(packetBuffer.Packet), false, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
}
finally

Interlocked.Add(ref _statistics._bytesReceived, packetBuffer.Length);
}
catch (Exception exception)
{
if (!WrapAndThrowException(exception))
{
PacketFormatterAdapter.Cleanup();
throw;
}
}
finally
{
_syncRoot.Release();

PacketFormatterAdapter.Cleanup();
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
_channel.Dispose();
_syncRoot.Dispose();
}

base.Dispose(disposing);
}

async Task<int> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken)
@@ -474,14 +476,14 @@ static bool WrapAndThrowException(Exception exception)
}

struct Statistics
{
public long _bytesReceived;
public long _bytesSent;
{
public long _bytesReceived;
public long _bytesSent;

public void Reset()
{
Volatile.Write(ref _bytesReceived, 0);
Volatile.Write(ref _bytesSent, 0);
}
public void Reset()
{
Volatile.Write(ref _bytesReceived, 0);
Volatile.Write(ref _bytesSent, 0);
}
}
}
7 changes: 6 additions & 1 deletion Source/MQTTnet/Implementations/CrossPlatformSocket.cs
Original file line number Diff line number Diff line change
@@ -193,7 +193,12 @@ public Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags
return _socket.ReceiveAsync(buffer, socketFlags);
}

public Task SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
public Task<int> SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
return _socket.SendAsync(buffer, socketFlags);
}

public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags)
{
return _socket.SendAsync(buffer, socketFlags);
}
2 changes: 1 addition & 1 deletion Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ public sealed class MqttWebSocketChannel : IMqttChannel
{
readonly MqttClientWebSocketOptions _options;

AsyncLock _sendLock = new AsyncLock();
SemaphoreSlim _sendLock = new(1, 1);
WebSocket _webSocket;

public MqttWebSocketChannel(MqttClientWebSocketOptions options)
160 changes: 0 additions & 160 deletions Source/MQTTnet/Internal/AsyncLock.cs

This file was deleted.