Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::task::JoinHandle;
use toml::Value as TomlValue;
use uuid::Uuid;
mod agent_message_consolidation;
mod agent_navigation;
mod app_server_event_targets;
mod app_server_events;
Expand Down
92 changes: 92 additions & 0 deletions codex-rs/tui/src/app/agent_message_consolidation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Transcript consolidation for finalized streaming agent messages.
//!
//! During streaming, the chat widget emits transient `AgentMessageCell`s so it
//! can animate stable lines into scrollback while keeping the active mutable
//! tail in the bottom pane. Once the answer finishes, the app replaces that
//! trailing run with a single source-backed `AgentMarkdownCell`. This makes the
//! transcript the canonical owner of the raw markdown source used for future
//! resize re-renders.

use std::path::PathBuf;
use std::sync::Arc;

use color_eyre::eyre::Result;

use super::App;
use super::resize_reflow::trailing_run_start;
use crate::app_event::ConsolidationScrollbackReflow;
use crate::history_cell;
use crate::history_cell::HistoryCell;
use crate::pager_overlay::Overlay;
use crate::tui;

impl App {
pub(super) fn handle_consolidate_agent_message(
&mut self,
tui: &mut tui::Tui,
source: String,
cwd: PathBuf,
scrollback_reflow: ConsolidationScrollbackReflow,
deferred_history_cell: Option<Box<dyn HistoryCell>>,
) -> Result<()> {
// Some finalize paths must preserve a last provisional stream cell long
// enough for queue ordering, then fold it into the canonical
// source-backed cell during consolidation.
if let Some(cell) = deferred_history_cell {
let cell: Arc<dyn HistoryCell> = cell.into();
if let Some(Overlay::Transcript(t)) = &mut self.overlay {
t.insert_cell(cell.clone());
}
self.transcript_cells.push(cell);
}

// Walk backward to find the contiguous run of streaming AgentMessageCells that
// belong to the just-finalized stream.
let end = self.transcript_cells.len();
tracing::debug!(
"ConsolidateAgentMessage: transcript_cells.len()={end}, source_len={}",
source.len()
);
let start = trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells);
if start < end {
tracing::debug!(
"ConsolidateAgentMessage: replacing cells [{start}..{end}] with AgentMarkdownCell"
);
let consolidated: Arc<dyn HistoryCell> =
Arc::new(history_cell::AgentMarkdownCell::new(source, &cwd));
self.transcript_cells
.splice(start..end, std::iter::once(consolidated.clone()));

if let Some(Overlay::Transcript(t)) = &mut self.overlay {
t.consolidate_cells(start..end, consolidated.clone());
tui.frame_requester().schedule_frame();
}

self.finish_agent_message_consolidation(tui, scrollback_reflow)?;
} else {
tracing::debug!(
"ConsolidateAgentMessage: no cells to consolidate(start={start}, end={end})",
);
self.maybe_finish_stream_reflow(tui)?;
}

Ok(())
}

fn finish_agent_message_consolidation(
&mut self,
tui: &mut tui::Tui,
scrollback_reflow: ConsolidationScrollbackReflow,
) -> Result<()> {
match scrollback_reflow {
ConsolidationScrollbackReflow::IfResizeReflowRan => {
self.maybe_finish_stream_reflow(tui)?;
}
ConsolidationScrollbackReflow::Required => {
self.finish_required_stream_reflow(tui)?;
}
}

Ok(())
}
}
36 changes: 13 additions & 23 deletions codex-rs/tui/src/app/event_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,29 +217,19 @@ impl App {
AppEvent::EndInitialHistoryReplayBuffer => {
self.finish_initial_history_replay_buffer(tui);
}
AppEvent::ConsolidateAgentMessage { source, cwd } => {
if !self.terminal_resize_reflow_enabled() {
self.transcript_reflow.clear();
return Ok(AppRunControl::Continue);
}
let end = self.transcript_cells.len();
let start =
trailing_run_start::<history_cell::AgentMessageCell>(&self.transcript_cells);
if start < end {
let consolidated: Arc<dyn HistoryCell> =
Arc::new(history_cell::AgentMarkdownCell::new(source, &cwd));
self.transcript_cells
.splice(start..end, std::iter::once(consolidated.clone()));

if let Some(Overlay::Transcript(t)) = &mut self.overlay {
t.consolidate_cells(start..end, consolidated.clone());
tui.frame_requester().schedule_frame();
}

self.maybe_finish_stream_reflow(tui)?;
} else {
self.maybe_finish_stream_reflow(tui)?;
}
AppEvent::ConsolidateAgentMessage {
source,
cwd,
scrollback_reflow,
deferred_history_cell,
} => {
self.handle_consolidate_agent_message(
tui,
source,
cwd,
scrollback_reflow,
deferred_history_cell,
)?;
}
AppEvent::ConsolidateProposedPlan(source) => {
if !self.terminal_resize_reflow_enabled() {
Expand Down
12 changes: 12 additions & 0 deletions codex-rs/tui/src/app_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ impl RealtimeAudioDeviceKind {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ConsolidationScrollbackReflow {
IfResizeReflowRan,
Required,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(not(target_os = "windows"), allow(dead_code))]
pub(crate) enum WindowsSandboxEnableMode {
Expand Down Expand Up @@ -518,9 +524,15 @@ pub(crate) enum AppEvent {
/// finalization. The `App` handler walks backward through `transcript_cells`
/// to find the `AgentMessageCell` run and splices in the consolidated cell.
/// The `cwd` keeps local file-link display stable across the final re-render.
/// `scrollback_reflow` lets table-tail finalization force the already-emitted
/// terminal scrollback to be rebuilt from the consolidated source-backed cell.
/// `deferred_history_cell` lets callers add the final stream tail to the
/// transcript without first writing its provisional render to scrollback.
ConsolidateAgentMessage {
source: String,
cwd: PathBuf,
scrollback_reflow: ConsolidationScrollbackReflow,
deferred_history_cell: Option<Box<dyn HistoryCell>>,
},

/// Replace the contiguous run of streaming `ProposedPlanStreamCell`s at the
Expand Down
115 changes: 101 additions & 14 deletions codex-rs/tui/src/chatwidget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1547,16 +1547,30 @@ impl ChatWidget {
fn flush_answer_stream_with_separator(&mut self) {
let had_stream_controller = self.stream_controller.is_some();
if let Some(mut controller) = self.stream_controller.take() {
let scrollback_reflow = if controller.has_live_tail() {
crate::app_event::ConsolidationScrollbackReflow::Required
} else {
crate::app_event::ConsolidationScrollbackReflow::IfResizeReflowRan
};
self.clear_active_stream_tail();
let (cell, source) = controller.finalize();
if let Some(cell) = cell {
self.add_boxed_history(cell);
}
let deferred_history_cell =
if scrollback_reflow == crate::app_event::ConsolidationScrollbackReflow::Required {
cell
} else {
if let Some(cell) = cell {
self.add_boxed_history(cell);
}
None
};
// Consolidate the run of streaming AgentMessageCells into a single AgentMarkdownCell
// that can re-render from source on resize.
if let Some(source) = source {
self.app_event_tx.send(AppEvent::ConsolidateAgentMessage {
source,
cwd: self.config.cwd.to_path_buf(),
scrollback_reflow,
deferred_history_cell,
});
}
}
Expand Down Expand Up @@ -2115,11 +2129,10 @@ impl ChatWidget {
self.transcript.plan_delta_buffer.clear();
}
self.transcript.plan_delta_buffer.push_str(&delta);
// Before streaming plan content, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();

if self.plan_stream_controller.is_none() {
// Before starting a plan stream, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();
self.plan_stream_controller = Some(PlanStreamController::new(
self.current_stream_width(/*reserved_cols*/ 4),
&self.config.cwd,
Expand All @@ -2132,6 +2145,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::StartCommitAnimation);
self.run_catch_up_commit_tick();
}
self.sync_active_stream_tail();
self.request_redraw();
}

Expand All @@ -2154,7 +2168,14 @@ impl ChatWidget {
self.transcript.saw_plan_item_this_turn = true;
let (finalized_streamed_cell, consolidated_plan_source) =
if let Some(mut controller) = self.plan_stream_controller.take() {
controller.finalize()
let had_live_tail = controller.has_live_tail();
self.clear_active_stream_tail();
let (cell, source) = controller.finalize();
if had_live_tail {
(None, source)
} else {
(cell, source)
}
} else {
(None, None)
};
Expand Down Expand Up @@ -2286,8 +2307,10 @@ impl ChatWidget {
// If a stream is currently active, finalize it.
self.flush_answer_stream_with_separator();
if let Some(mut controller) = self.plan_stream_controller.take() {
let had_live_tail = controller.has_live_tail();
self.clear_active_stream_tail();
let (cell, source) = controller.finalize();
if let Some(cell) = cell {
if !had_live_tail && let Some(cell) = cell {
self.add_boxed_history(cell);
}
if let Some(source) = source {
Expand Down Expand Up @@ -2772,6 +2795,9 @@ impl ChatWidget {
/// This does not clear MCP startup tracking, because MCP startup can overlap with turn cleanup
/// and should continue to drive the bottom-pane running indicator while it is in progress.
fn finalize_turn(&mut self) {
// Drop preview-only stream tail content on any termination path before
// failed-cell finalization, so transient tail cells are never persisted.
self.clear_active_stream_tail();
// Ensure any spinner is replaced by a red ✗ and flushed into history.
self.finalize_active_cell_as_failed();
// Turn-scoped hook rows are transient live state; once the turn is over,
Expand Down Expand Up @@ -4093,6 +4119,7 @@ impl ChatWidget {
self.bottom_pane.hide_status_indicator();
self.add_boxed_history(cell);
}
self.sync_active_stream_tail();

if outcome.has_controller && outcome.all_idle {
self.maybe_restore_status_indicator_after_stream_idle();
Expand Down Expand Up @@ -4137,11 +4164,10 @@ impl ChatWidget {

#[inline]
fn handle_streaming_delta(&mut self, delta: String) {
// Before streaming agent content, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();

if self.stream_controller.is_none() {
// Before starting an agent stream, flush any active exec cell group.
self.flush_unified_exec_wait_streak();
self.flush_active_cell();
// If the previous turn inserted non-stream history (exec output, patch status, MCP
// calls), render a separator before starting the next streamed assistant message.
if self.transcript.needs_final_message_separator && self.transcript.had_work_activity {
Expand All @@ -4165,6 +4191,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::StartCommitAnimation);
self.run_catch_up_commit_tick();
}
self.sync_active_stream_tail();
self.request_redraw();
}

Expand Down Expand Up @@ -5322,12 +5349,70 @@ impl ChatWidget {

if !keep_placeholder_header_active && !cell.display_lines(u16::MAX).is_empty() {
// Only break exec grouping if the cell renders visible lines.
self.flush_active_cell();
if !self.has_active_stream_tail() {
self.flush_active_cell();
}
self.transcript.needs_final_message_separator = true;
}
self.app_event_tx.send(AppEvent::InsertHistoryCell(cell));
}

fn active_cell_is_stream_tail(&self) -> bool {
self.transcript.active_cell.as_ref().is_some_and(|cell| {
cell.as_any().is::<history_cell::StreamingAgentTailCell>()
|| cell.as_any().is::<history_cell::StreamingPlanTailCell>()
})
}

fn has_active_stream_tail(&self) -> bool {
(self.stream_controller.is_some() || self.plan_stream_controller.is_some())
&& self.active_cell_is_stream_tail()
}

fn sync_active_stream_tail(&mut self) {
if let Some(controller) = self.stream_controller.as_ref() {
let tail_lines = controller.current_tail_lines();
if tail_lines.is_empty() {
self.clear_active_stream_tail();
return;
}

self.bottom_pane.hide_status_indicator();
self.transcript.active_cell =
Some(Box::new(history_cell::StreamingAgentTailCell::new(
tail_lines,
controller.tail_starts_stream(),
)));
self.bump_active_cell_revision();
return;
}

if let Some(controller) = self.plan_stream_controller.as_ref() {
let tail_lines = controller.current_tail_display_lines();
if tail_lines.is_empty() {
self.clear_active_stream_tail();
return;
}

self.bottom_pane.hide_status_indicator();
self.transcript.active_cell = Some(Box::new(history_cell::StreamingPlanTailCell::new(
tail_lines,
!controller.tail_starts_stream(),
)));
self.bump_active_cell_revision();
return;
}

self.clear_active_stream_tail();
}

fn clear_active_stream_tail(&mut self) {
if self.active_cell_is_stream_tail() {
self.transcript.active_cell = None;
self.bump_active_cell_revision();
}
}

fn queue_user_message(&mut self, user_message: UserMessage) {
self.queue_user_message_with_options(user_message, QueuedInputAction::Plain);
}
Expand Down Expand Up @@ -9723,6 +9808,7 @@ impl ChatWidget {
if let Some(controller) = self.plan_stream_controller.as_mut() {
controller.set_width(plan_stream_width);
}
self.sync_active_stream_tail();
if !had_rendered_width {
self.request_redraw();
}
Expand Down Expand Up @@ -9918,6 +10004,7 @@ impl ChatWidget {
if let Some(controller) = self.plan_stream_controller.as_mut() {
controller.clear_queue();
}
self.clear_active_stream_tail();
self.request_redraw();
}
}
Expand Down
Loading
Loading