-
-
Notifications
You must be signed in to change notification settings - Fork 302
Description
Describe the bug
I was running some compliance tests and noticed that the tests were hanging indefinitely on can_stop_receiving_when_too_busy_and_restart_listeners in particular. After further investigation I found that MarkAsTooBusyAndStopReceivingAsync() being called directly with the SemaphoreSlim(0, 1) setup in ListeningAgent wasn't following the Signal pattern you would typically vs the mutex pattern (1,1). The fix is quite easy to change this to SemaphoreSlim(1, 1) but I want to be sure this was the original intention because in order to actually use the signal pattern there's a slight change that needs to happen that I confirmed works.
To Reproduce
Steps to reproduce the behavior:
- Run compliance tests for RabbitMQ or any other transport
- Confirm that after a certain point the tests just hang and the timer just runs indefnitely.
- Make sure its not just "my machine" lol.
Expected behavior
Compliance tests pass and not hanging longer than 5 minutes.
Screenshots
If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
- OS: MacOS Sequoia 15.6
- Browser N/A
- Version : latest main branch
Smartphone (please complete the following information):
N/A
Additional context
I really want to just make sure this is an actual issue. If the signal pattern was intended for the listening agent. A signal pattern uses SemaphoreSlim(0, 1) when you want one method to wait until a different method signals it to proceed. The key difference from a mutex:
- Mutex: Same method calls
WaitAsync()andRelease()- it's a lock you acquire and release yourself - Signal: One method calls
WaitAsync()to block, a different method callsRelease()to unblock it
If the intent was to prevent MarkAsTooBusyAndStopReceivingAsync from running until the listener is fully started, we would only need one additional line in StartAsync():
// ListeningAgent.cs
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);
public async ValueTask StartAsync()
{
if (Status == ListeningStatus.Accepting)
{
return;
}
_receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync());
// ... build listeners ...
Status = ListeningStatus.Accepting;
_runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, Status));
_logger.LogInformation("Started message listening at {Uri}", Uri);
// SIGNAL: Now back pressure handling is allowed to proceed
_semaphore.Release(); // <-- This line must be added for signal pattern to work
}
public async ValueTask MarkAsTooBusyAndStopReceivingAsync()
{
if (Status != ListeningStatus.Accepting || Listener == null)
{
return;
}
await _semaphore.WaitAsync(); // Blocks until StartAsync() has called Release()
if (Status != ListeningStatus.Accepting || Listener == null)
{
_semaphore.Release(); // Release if we're not actually doing anything
return;
}
try
{
// ... stop the listener ...
Status = ListeningStatus.TooBusy;
}
finally
{
// Do NOT release here - StartAsync() will release when listener restarts
}
}I confirmed this change would work as well so I guess just depends if its a simple PR update to SemaphoreSlim(1,1) or the additional method.