Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 5 additions & 73 deletions codex-rs/core/src/goals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ use codex_otel::GOAL_USAGE_LIMITED_METRIC;
use codex_protocol::config_types::ModeKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ThreadGoal;
use codex_protocol::protocol::ThreadGoalStatus;
use codex_protocol::protocol::ThreadGoalUpdatedEvent;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::validate_thread_goal_objective;
use codex_rollout::state_db::reconcile_rollout;
use codex_thread_store::LocalThreadStore;
Expand Down Expand Up @@ -156,7 +154,6 @@ pub(crate) enum GoalRuntimeEvent<'a> {
MaybeContinueIfIdle,
TaskAborted {
turn_context: Option<&'a TurnContext>,
reason: TurnAbortReason,
},
UsageLimitReached {
turn_context: &'a TurnContext,
Expand Down Expand Up @@ -337,8 +334,8 @@ impl Session {
/// starts capture the active goal and token baseline, tool completions
/// account usage and may inject budget steering, completion accounting
/// suppresses that steering, external mutations account best-effort before
/// changing state, interrupts pause active goals, thread resumes restore
/// runtime state for already-active goals, explicit maybe-continue events
/// changing state, thread resumes restore runtime state for already-active
/// goals, explicit maybe-continue events
/// start idle goal continuation turns, and continuation turns with no counted
/// autonomous activity suppress the next automatic continuation until
/// user/tool/external activity resets it.
Expand Down Expand Up @@ -390,12 +387,8 @@ impl Session {
self.maybe_continue_goal_if_idle_runtime().await;
Ok(())
}),
GoalRuntimeEvent::TaskAborted {
turn_context,
reason,
} => Box::pin(async move {
self.handle_thread_goal_task_abort(turn_context, reason)
.await;
GoalRuntimeEvent::TaskAborted { turn_context } => Box::pin(async move {
self.handle_thread_goal_task_abort(turn_context).await;
Ok(())
}),
GoalRuntimeEvent::UsageLimitReached { turn_context } => Box::pin(async move {
Expand Down Expand Up @@ -947,11 +940,7 @@ impl Session {
}
}

async fn handle_thread_goal_task_abort(
&self,
turn_context: Option<&TurnContext>,
reason: TurnAbortReason,
) {
async fn handle_thread_goal_task_abort(&self, turn_context: Option<&TurnContext>) {
Comment thread
etraut-openai marked this conversation as resolved.
if let Some(turn_context) = turn_context {
self.take_thread_goal_continuation_turn(&turn_context.sub_id)
.await;
Expand All @@ -974,12 +963,6 @@ impl Session {
accounting.turn = None;
}
}

if reason == TurnAbortReason::Interrupted
&& let Err(err) = self.pause_active_thread_goal_for_interrupt().await
{
tracing::warn!("failed to pause active thread goal after interrupt: {err}");
}
}

async fn account_thread_goal_progress(
Expand Down Expand Up @@ -1187,57 +1170,6 @@ impl Session {
}
}

async fn pause_active_thread_goal_for_interrupt(&self) -> anyhow::Result<()> {
if should_ignore_goal_for_mode(self.collaboration_mode().await.mode) {
return Ok(());
}

if !self.enabled(Feature::Goals) {
return Ok(());
}

let _continuation_guard = self
.goal_runtime
.continuation_lock
.acquire()
.await
.context("goal continuation semaphore closed")?;
let Some(state_db) = self.state_db_for_thread_goals().await? else {
return Ok(());
};
self.account_thread_goal_wall_clock_usage(
&state_db,
codex_state::ThreadGoalAccountingMode::ActiveStatusOnly,
TerminalMetricEmission::Emit,
)
.await?;
let Some(goal) = state_db
.thread_goals()
.pause_active_thread_goal(self.conversation_id)
.await?
else {
return Ok(());
};
let goal = protocol_goal_from_state(goal);
*self.goal_runtime.budget_limit_reported_goal_id.lock().await = None;
self.goal_runtime
.accounting
.lock()
.await
.wall_clock
.clear_active_goal();
self.send_event_raw(Event {
id: uuid::Uuid::new_v4().to_string(),
msg: EventMsg::ThreadGoalUpdated(ThreadGoalUpdatedEvent {
thread_id: self.conversation_id,
turn_id: None,
goal,
}),
})
.await;
Ok(())
}

async fn usage_limit_active_thread_goal_for_turn(
&self,
turn_context: &TurnContext,
Expand Down
1 change: 0 additions & 1 deletion codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3355,7 +3355,6 @@ impl Session {
pub async fn interrupt_task(self: &Arc<Self>) {
info!("interrupt received: abort current task, if any");
let had_active_turn = self.active_turn.lock().await.is_some();
// Even without an active task, interrupt handling pauses any active goal.
self.abort_all_tasks(TurnAbortReason::Interrupted).await;
if !had_active_turn {
self.cancel_mcp_startup().await;
Expand Down
32 changes: 30 additions & 2 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8243,7 +8243,7 @@ async fn abort_empty_active_turn_preserves_pending_input() {
}

#[tokio::test]
async fn interrupt_accounts_active_goal_before_pausing() -> anyhow::Result<()> {
async fn interrupt_accounts_active_goal_without_pausing() -> anyhow::Result<()> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a regression for the no-active-turn/unload path to?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Done.

let (sess, tc, _rx, _codex_home) = make_goal_session_and_context_with_rx().await;
sess.set_thread_goal(
tc.as_ref(),
Expand Down Expand Up @@ -8273,7 +8273,7 @@ async fn interrupt_accounts_active_goal_before_pausing() -> anyhow::Result<()> {
.await?
.expect("goal should remain persisted after interrupt");
assert_eq!(
codex_protocol::protocol::ThreadGoalStatus::Paused,
codex_protocol::protocol::ThreadGoalStatus::Active,
goal.status
);
assert_eq!(70, goal.tokens_used);
Expand All @@ -8283,6 +8283,34 @@ async fn interrupt_accounts_active_goal_before_pausing() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test]
async fn shutdown_without_active_turn_keeps_active_goal_active() -> anyhow::Result<()> {
let (sess, tc, _rx, _codex_home) = make_goal_session_and_context_with_rx().await;
sess.set_thread_goal(
tc.as_ref(),
SetGoalRequest {
objective: Some("Keep improving the benchmark".to_string()),
status: None,
token_budget: None,
},
)
.await?;

assert!(sess.active_turn.lock().await.is_none());
assert!(handlers::shutdown(&sess, "shutdown".to_string()).await);

let goal = sess
.get_thread_goal()
.await?
.expect("goal should remain persisted after shutdown");
assert_eq!(
codex_protocol::protocol::ThreadGoalStatus::Active,
goal.status
);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn active_goal_continuation_runs_again_after_no_tool_turn() -> anyhow::Result<()> {
let server = start_mock_server().await;
Expand Down
2 changes: 0 additions & 2 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ impl Session {
&& let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
turn_context: turn_context.as_deref(),
reason: reason.clone(),
})
.await
{
Expand Down Expand Up @@ -561,7 +560,6 @@ impl Session {
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
turn_context: turn_context.as_deref(),
reason: reason.clone(),
})
.await
{
Expand Down
Loading