Skip to content

[Invoker] Drain invocation response stream#4497

Open
muhamadazmy wants to merge 1 commit intorestatedev:mainfrom
muhamadazmy:pr4497
Open

[Invoker] Drain invocation response stream#4497
muhamadazmy wants to merge 1 commit intorestatedev:mainfrom
muhamadazmy:pr4497

Conversation

@muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Mar 18, 2026

[Invoker] Drain invocation response stream

Summary

  • Fixes Invoker should await response EOS #4456 by making sure:
    • Request stream is closed immediately after the we receive a terminal state
    • Drain the response stream. This also fixes a connection thrashing issue

Stack created with Sapling. Best reviewed with ReviewStack.

@claude
Copy link

claude bot commented Mar 18, 2026

⚠️ Code review skipped — your organization's overage spend limit has been reached.

Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit in Settings → Usage.

Once credits are available, reopen this pull request to trigger a review.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @muhamadazmy. The changes look good to me. I left a few questions around when to drain and when to log information about it.

http_stream_tx,
&mut decoder_stream,
)
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line break seems to be missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rustfmt changes it to be this way 🤷🏼‍♂️

Comment on lines +195 to +200
if decoder_stream.inner().has_remaining() {
warn_it!(
InvokerError::WriteAfterEndOfStream,
"The read buffer is non empty after the stream has been closed."
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that only happens if the SDK misbehaves? If this can also happen if the connection was lost due to some reason, then warn might be a little bit too serious as connection losses can always happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are also logging something if TerminalLoopState::Failed which is different to before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually after looking at this block of code again (the sanity check) it seems that it can run only "sometimes" after failure. Other times it won't depends on where the error happens.

Wondering if this sanity check has any value in the first place. If we want to keep it I think it should only run after a successful termination state "Closed" or "Suspended".

.await;
// Sanity check of the stream decoder
if decoder_stream.inner().has_remaining() {
warn_it!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also include the invocation id to know which invocation was affected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including information about the deployment would actually be more helpful when trying to investigate.

.is_err()
{
warn!(
restate.invocation.id = %self.invocation_task.invocation_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including deployment information would be helpful as well.


let inner_stream = &mut decoder_stream.inner_pin_mut().inner;

if tokio::time::timeout(Duration::from_secs(60), async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to log on debug that we are starting to drain the inner stream.


let inner_stream = &mut decoder_stream.inner_pin_mut().inner;

if tokio::time::timeout(Duration::from_secs(60), async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be draining in every case or only if there was an orderly termination of the h2 connection (having received an end or a suspended message)?

Copy link
Contributor Author

@muhamadazmy muhamadazmy Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO draining should happen all the time because it ensure that the underlying http2 connection remains healthy. Failure to do so caused connections to drop abruptly after thousand of invocations, even with 100% successful invocations. if there is no data left to drain this should return immediately with no delays.


let inner_stream = &mut decoder_stream.inner_pin_mut().inner;

if tokio::time::timeout(Duration::from_secs(60), async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering whether 60s is quite long. What if there is a problem with the SDK so that it does not close the response stream. Then we would be occupying the concurrency slot for 60s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! Wondering if 5s is good enough ?

# Summary

- Fixes restatedev#4456 by making sure:
  - Request stream is closed immediately after the we receive a terminal state
  - Drain the response stream. This also fixes a connection thrashing issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Invoker should await response EOS

2 participants