Skip to content

Add new structured listen API#619

Merged
fabianfett merged 10 commits into
vapor:mainfrom
fpseverino:listen-unlisten
Feb 10, 2026
Merged

Add new structured listen API#619
fabianfett merged 10 commits into
vapor:mainfrom
fpseverino:listen-unlisten

Conversation

@fpseverino
Copy link
Copy Markdown
Member

@fpseverino fpseverino commented Feb 3, 2026

  • Adds a new structured API for listen, inspired by valkey-swift, passing the AsyncSequence to a closure and sending the UNLISTEN task to the channel handler once the closure finishes executing
    • Initially I tried sending the UNLISTEN query directly, but given the current implementation of the channel handler and PostgresNotificationSequence, that ended up sending two UNLISTEN each time (which is not a protocol error, but undesirable)
  • Add unit and integration tests for the new API
  • Improve the "Listen & Notify" article showcasing the new API
  • Deprecate the old listen method

Comment thread Sources/PostgresNIO/Connection/PostgresConnection.swift
@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 76.53%. Comparing base (d578b86) to head (a9b259e).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #619      +/-   ##
==========================================
+ Coverage   76.40%   76.53%   +0.12%     
==========================================
  Files         134      134              
  Lines       10161    10170       +9     
==========================================
+ Hits         7764     7784      +20     
+ Misses       2397     2386      -11     
Files with missing lines Coverage Δ
...es/PostgresNIO/Connection/PostgresConnection.swift 83.00% <100.00%> (+0.37%) ⬆️

... and 6 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated
Comment thread Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated
Comment thread Tests/IntegrationTests/AsyncTests.swift Outdated
Comment on lines +388 to +390
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use singleton eventloop instead.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct?

let eventLoopGroup = MultiThreadedEventLoopGroup.singleton
let eventLoop = eventLoopGroup.next()

Comment thread Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated
Comment thread Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated
Comment thread Tests/PostgresNIOTests/New/PostgresConnectionTests.swift Outdated
Copy link
Copy Markdown
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already looks really good. A few small issues, but this will be a major improvement over the status quo.

Copy link
Copy Markdown
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just had a look at the statemachine: I'm afraid this code might break us:

        mutating func cancelListening(id: Int) -> CancelAction {
            switch self.state {
            case .initialized:
                fatalError("Invalid state: \(self.state)")

can we add an integration test case:

@Test func leavingTheScopeSecondsAfterCancellationDoesNotCrash() async throws {
  withThrowingTaskGroup { taskGroup in
    taskGroup.addTask {
      try await client.listen("Foo") {
        do {
          for try await not in $0 {

          }
        } catch {
          try await Task {
            try? await Task.sleep(.seconds(1))
          }.result
        }
        // scope is left long after task is cancelled
      }
    }
    // wait until listen has started by using an AsyncStream.
    taskGroup.cancelAll()
  }
}

@fpseverino
Copy link
Copy Markdown
Member Author

@fabianfett I added the test, it passes without throwing any fatalError both with the current implementation and with this one. Let me know what you think!

@fabianfett
Copy link
Copy Markdown
Collaborator

fabianfett commented Feb 4, 2026

@fpseverino I hear you that it doesn't crash in your test, however I remain sceptical that something else is wrong here:

        mutating func stopListeningSucceeded() -> StopListeningSuccessAction {
            switch self.state {
            case .initialized, .listening, .starting:
                fatalError("Invalid state: \(self.state)")
                
            case .stopping(let listeners):
                if listeners.isEmpty {
                    self.state = .initialized
                    return .none
                } else {
                    self.state = .starting(listeners)
                    return .startListening
                }
                
            case .failed:
                return .none
            }
        }

Especially given this code it SHOULD crash. This resets it to initialized. Another cancel would then hit the initialized.

@fpseverino
Copy link
Copy Markdown
Member Author

Using the new listen normally, without any Task cancellation:

  • the defer block is executed and the cancelListening task is written to PostgresChannelHandler
    • the write method of PostgresChannelHandler checks the state machine
      • the channel is found in the listening state
      • the stopListening action is returned by the state machine
      • makeUnlistenQuery is called and the UNLISTEN command is sent
  • the onTermination handler of NotificationListener is called
    • like the in write method, the state machine is checked
      • the channel is found in the stopping state
      • the none action is returned by the state machine
      • does nothing, exits from the onTermination handler
  • the promise inside makeUnlistenQuery completes with success
    • stopListeningSucceeded is called
      • the channel is found in the stopping state
      • stopListeningSucceeded sets the channel state to initialized and returns the none action
      • does nothing, the test finishes execution

In the test you proposed, where the task is cancelled and the closure of listen finishes executing much later:

  • the Task is cancelled an we start sleeping for one second inside the closure
  • the onTermination handler of NotificationListener is called
    • the state machine is checked
      • the channel is found in the listening state
      • the stopListening action is returned by the state machine
      • makeUnlistenQuery is called and the UNLISTEN command is sent
  • the promise inside makeUnlistenQuery completes with success
    • stopListeningSucceeded is called
      • the channel is found in the stopping state
      • stopListeningSucceeded sets the channel state to initialized and returns the none action
      • does nothing
  • Task.sleep finishes and we exit the closure
  • the defer block is executed and the cancelListening task is written to PostgresChannelHandler
    • the write method of PostgresChannelHandler checks the state machine
      • the state machine finds no channel in the channels dictionary of ListenStateMachine
      • the none action is returned since no channels were found
      • does nothing, exits from the write method of PostgresChannelHandler

The initialized state for a channel is only set on initialization, if starting listening fails and by stopListeningSucceeded, the function you mentioned in your last comment, and stopListeningSucceeded is only called by the whenComplete handler of the promise inside makeUnlistenQuery

@fabianfett
Copy link
Copy Markdown
Collaborator

The magic is here:

    mutating func stopListeningSucceeded(channel: String) -> StopListeningSuccessAction {
        switch self.channels[channel]!.stopListeningSucceeded() {
        case .none:
            self.channels.removeValue(forKey: channel)
            return .none

        case .startListening:
            return .startListening
        }
    }

Copy link
Copy Markdown
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing this through. Really great API change.

@fabianfett fabianfett merged commit 88e1545 into vapor:main Feb 10, 2026
18 of 20 checks passed
@fabianfett fabianfett added the semver-minor Adds new public API. label Feb 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

semver-minor Adds new public API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants