Skip to content

SemaphoreSlim(0, 1) in ListeningAgent causes back pressure to block indefinitely #1998

@thedonmon

Description

@thedonmon

Describe the bug
I was running some compliance tests and noticed that the tests were hanging indefinitely on can_stop_receiving_when_too_busy_and_restart_listeners in particular. After further investigation I found that MarkAsTooBusyAndStopReceivingAsync() being called directly with the SemaphoreSlim(0, 1) setup in ListeningAgent wasn't following the Signal pattern you would typically vs the mutex pattern (1,1). The fix is quite easy to change this to SemaphoreSlim(1, 1) but I want to be sure this was the original intention because in order to actually use the signal pattern there's a slight change that needs to happen that I confirmed works.

To Reproduce
Steps to reproduce the behavior:

  1. Run compliance tests for RabbitMQ or any other transport
  2. Confirm that after a certain point the tests just hang and the timer just runs indefnitely.
  3. Make sure its not just "my machine" lol.

Expected behavior
Compliance tests pass and not hanging longer than 5 minutes.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: MacOS Sequoia 15.6
  • Browser N/A
  • Version : latest main branch

Smartphone (please complete the following information):
N/A

Additional context
I really want to just make sure this is an actual issue. If the signal pattern was intended for the listening agent. A signal pattern uses SemaphoreSlim(0, 1) when you want one method to wait until a different method signals it to proceed. The key difference from a mutex:

  • Mutex: Same method calls WaitAsync() and Release() - it's a lock you acquire and release yourself
  • Signal: One method calls WaitAsync() to block, a different method calls Release() to unblock it

If the intent was to prevent MarkAsTooBusyAndStopReceivingAsync from running until the listener is fully started, we would only need one additional line in StartAsync():

// ListeningAgent.cs
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);

public async ValueTask StartAsync()
{
    if (Status == ListeningStatus.Accepting)
    {
        return;
    }
    
    _receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync());

    // ... build listeners ...

    Status = ListeningStatus.Accepting;
    _runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, Status));
    _logger.LogInformation("Started message listening at {Uri}", Uri);

    // SIGNAL: Now back pressure handling is allowed to proceed
    _semaphore.Release();  // <-- This line must be added for signal pattern to work
}

public async ValueTask MarkAsTooBusyAndStopReceivingAsync()
{
    if (Status != ListeningStatus.Accepting || Listener == null)
    {
        return;
    }

    await _semaphore.WaitAsync();  // Blocks until StartAsync() has called Release()
    
    if (Status != ListeningStatus.Accepting || Listener == null)
    {
        _semaphore.Release();  // Release if we're not actually doing anything
        return;
    }

    try
    {
        // ... stop the listener ...
        Status = ListeningStatus.TooBusy;
    }
    finally
    {
        // Do NOT release here - StartAsync() will release when listener restarts
    }
}

I confirmed this change would work as well so I guess just depends if its a simple PR update to SemaphoreSlim(1,1) or the additional method.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions