Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ enum TerminalLoopState<T> {
Failed(InvokerError),
}

impl<T> TerminalLoopState<T> {
fn is_closed(&self) -> bool {
matches!(self, Self::Closed)
}

fn is_suspend(&self) -> bool {
matches!(self, Self::Suspended(_) | Self::SuspendedV2(_))
}
}

impl<T, E: Into<InvokerError>> From<Result<T, E>> for TerminalLoopState<T> {
fn from(value: Result<T, E>) -> Self {
match value {
Expand Down
104 changes: 83 additions & 21 deletions crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ where
.try_into()
.expect("must be able to build a valid invocation path");

let journal_size = journal_metadata.length;

debug!(
restate.invocation.id = %self.invocation_task.invocation_id,
deployment.address = %deployment.address_display(),
Expand All @@ -159,15 +157,15 @@ where

// Create an arc of the parent SpanContext.
// We send this with every journal entry to correctly link new spans generated from journal entries.
let service_invocation_span_context = journal_metadata.span_context;

let deployment_id = deployment.id;
// Prepare the request
let (mut http_stream_tx, request) = Self::prepare_request(
let (http_stream_tx, request) = Self::prepare_request(
path,
deployment,
self.service_protocol_version,
&self.invocation_task.invocation_id,
&service_invocation_span_context,
&journal_metadata.span_context,
);

// Initialize the response stream state
Expand All @@ -183,6 +181,78 @@ where
.throttle(self.invocation_task.action_token_bucket.take())
);

let result = self
.run_inner(
txn,
protocol_type,
journal_metadata,
keyed_service_id,
cached_journal_items,
http_stream_tx,
&mut decoder_stream,
)
.await;
Comment thread
tillrohrmann marked this conversation as resolved.

// After successful termination decoder should
// not have any remaining buffer
if result.is_closed() || result.is_suspend() {
// Sanity check of the stream decoder
if decoder_stream.inner().has_remaining() {
warn_it!(
InvokerError::WriteAfterEndOfStream,
restate.invocation.id = %self.invocation_task.invocation_id,
deployment.id = %deployment_id,
deployment.service_protocol_version = %self.service_protocol_version.as_repr(),
"The read buffer is non empty after the stream has been closed."
);
}
}

let inner_stream = &mut decoder_stream.inner_pin_mut().inner;

if tokio::time::timeout(Duration::from_secs(5), async {
loop {
match inner_stream.next().await {
None => break,
Some(Ok(_)) => {}
Some(Err(err)) => {
debug!(%err, "Error while draining invocation response stream");
break;
}
}
}
})
.await
.is_err()
{
warn!(
restate.invocation.id = %self.invocation_task.invocation_id,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including deployment information would be helpful as well.

deployment.id = %deployment_id,
deployment.service_protocol_version = %self.service_protocol_version.as_repr(),
"Response stream draining timeout!"
);
}

result
}

#[allow(clippy::too_many_arguments)]
async fn run_inner<Txn, S>(
&mut self,
txn: Txn,
protocol_type: ProtocolType,
journal_metadata: JournalMetadata,
keyed_service_id: Option<ServiceId>,
cached_journal_items: Option<Vec<JournalEntry>>,
mut http_stream_tx: mpsc::Sender<Result<Frame<Bytes>, Infallible>>,
decoder_stream: &mut S,
) -> TerminalLoopState<()>
where
Txn: InvocationReaderTransaction,
S: Stream<Item = Result<DecoderStreamItem, InvokerError>> + Unpin,
{
let journal_size = journal_metadata.length;
let service_invocation_span_context = journal_metadata.span_context;
// === Replay phase (transaction alive) ===
{
// Read state if needed (state is collected for the START message)
Expand Down Expand Up @@ -215,7 +285,7 @@ where
crate::shortcircuit!(
self.replay_loop(
&mut http_stream_tx,
&mut decoder_stream,
decoder_stream,
journal_stream,
journal_metadata.length
)
Expand All @@ -234,7 +304,7 @@ where
crate::shortcircuit!(
self.replay_loop(
&mut http_stream_tx,
&mut decoder_stream,
decoder_stream,
journal_stream,
journal_metadata.length
)
Expand All @@ -255,32 +325,24 @@ where
self.bidi_stream_loop(
&service_invocation_span_context,
http_stream_tx,
&mut decoder_stream
decoder_stream
)
.await
);
} else {
trace!("Protocol is in bidi stream mode, will now drop the sender side of the request");
trace!(
"Protocol is not in bidi stream mode, will now drop the sender side of the request"
);
// Drop the http_stream_tx.
// This is required in HTTP/1.1 to let the deployment send the headers back
drop(http_stream_tx)
}

// We don't have the invoker_rx, so we simply consume the response
trace!("Sender side of the request has been dropped, now processing the response");
let result = self
.response_stream_loop(&service_invocation_span_context, &mut decoder_stream)
.await;

// Sanity check of the stream decoder
if decoder_stream.inner().has_remaining() {
warn_it!(
InvokerError::WriteAfterEndOfStream,
"The read buffer is non empty after the stream has been closed."
);
}

result
self.response_stream_loop(&service_invocation_span_context, decoder_stream)
.await
}

fn prepare_request(
Expand Down
Loading