[Invoker] Drain invocation response stream#4497
[Invoker] Drain invocation response stream#4497muhamadazmy wants to merge 1 commit intorestatedev:mainfrom
Conversation
|
Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit in Settings → Usage. Once credits are available, reopen this pull request to trigger a review. |
a8e597d to
f495b0d
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Thanks for creating this PR @muhamadazmy. The changes look good to me. I left a few questions around when to drain and when to log information about it.
| http_stream_tx, | ||
| &mut decoder_stream, | ||
| ) | ||
| .await; |
There was a problem hiding this comment.
nit: line break seems to be missing.
There was a problem hiding this comment.
rustfmt changes it to be this way 🤷🏼♂️
| if decoder_stream.inner().has_remaining() { | ||
| warn_it!( | ||
| InvokerError::WriteAfterEndOfStream, | ||
| "The read buffer is non empty after the stream has been closed." | ||
| ); | ||
| } |
There was a problem hiding this comment.
This is something that only happens if the SDK misbehaves? If this can also happen if the connection was lost due to some reason, then warn might be a little bit too serious as connection losses can always happen.
There was a problem hiding this comment.
It seems that we are also logging something if TerminalLoopState::Failed which is different to before.
There was a problem hiding this comment.
Actually after looking at this block of code again (the sanity check) it seems that it can run only "sometimes" after failure. Other times it won't depends on where the error happens.
Wondering if this sanity check has any value in the first place. If we want to keep it I think it should only run after a successful termination state "Closed" or "Suspended".
| .await; | ||
| // Sanity check of the stream decoder | ||
| if decoder_stream.inner().has_remaining() { | ||
| warn_it!( |
There was a problem hiding this comment.
Should we also include the invocation id to know which invocation was affected?
There was a problem hiding this comment.
Including information about the deployment would actually be more helpful when trying to investigate.
| .is_err() | ||
| { | ||
| warn!( | ||
| restate.invocation.id = %self.invocation_task.invocation_id, |
There was a problem hiding this comment.
Including deployment information would be helpful as well.
|
|
||
| let inner_stream = &mut decoder_stream.inner_pin_mut().inner; | ||
|
|
||
| if tokio::time::timeout(Duration::from_secs(60), async { |
There was a problem hiding this comment.
It would be good to log on debug that we are starting to drain the inner stream.
|
|
||
| let inner_stream = &mut decoder_stream.inner_pin_mut().inner; | ||
|
|
||
| if tokio::time::timeout(Duration::from_secs(60), async { |
There was a problem hiding this comment.
Should we be draining in every case or only if there was an orderly termination of the h2 connection (having received an end or a suspended message)?
There was a problem hiding this comment.
IMHO draining should happen all the time because it ensure that the underlying http2 connection remains healthy. Failure to do so caused connections to drop abruptly after thousand of invocations, even with 100% successful invocations. if there is no data left to drain this should return immediately with no delays.
|
|
||
| let inner_stream = &mut decoder_stream.inner_pin_mut().inner; | ||
|
|
||
| if tokio::time::timeout(Duration::from_secs(60), async { |
There was a problem hiding this comment.
Wondering whether 60s is quite long. What if there is a problem with the SDK so that it does not close the response stream. Then we would be occupying the concurrency slot for 60s.
There was a problem hiding this comment.
You are right! Wondering if 5s is good enough ?
# Summary - Fixes restatedev#4456 by making sure: - Request stream is closed immediately after the we receive a terminal state - Drain the response stream. This also fixes a connection thrashing issue
[Invoker] Drain invocation response stream
Summary
Stack created with Sapling. Best reviewed with ReviewStack.