Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,16 @@ impl Session {
state.get_total_token_usage()
}

async fn get_total_token_usage_with_estimate(&self, turn_context: &TurnContext) -> i64 {
let reported = self.get_total_token_usage().await;
let estimated = self
.clone_history()
.await
.estimate_token_count(turn_context)
.unwrap_or(0);
reported.max(estimated)
}

async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_default_turn().await;
match conversation_history {
Expand Down Expand Up @@ -2233,7 +2243,9 @@ pub(crate) async fn run_task(
.get_model_family()
.auto_compact_token_limit()
.unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
let total_usage_tokens = sess
.get_total_token_usage_with_estimate(&turn_context)
.await;
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
}
Expand Down Expand Up @@ -2315,7 +2327,9 @@ pub(crate) async fn run_task(
needs_follow_up,
last_agent_message: turn_last_agent_message,
} = turn_output;
let total_usage_tokens = sess.get_total_token_usage().await;
let total_usage_tokens = sess
.get_total_token_usage_with_estimate(&turn_context)
.await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit;

// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
Expand Down
136 changes: 121 additions & 15 deletions codex-rs/core/tests/suite/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,106 @@ async fn auto_compact_runs_after_token_limit_hit() {
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_uses_estimate_when_usage_is_zero() {
skip_if_no_network!();

let server = start_mock_server().await;
let large_user_msg = "x".repeat(40_000);

let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", AUTO_SUMMARY_TEXT),
ev_completed("r2"),
]);
let sse3 = sse(vec![
ev_assistant_message("m3", FINAL_REPLY),
ev_completed("r3"),
]);

let follow_up_msg = POST_AUTO_USER_MSG;
let large_user_msg_fragment = large_user_msg.clone();
let first_matcher = move |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(&large_user_msg_fragment)
&& !body_contains_text(body, follow_up_msg)
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, first_matcher, sse1).await;

let auto_compact_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, auto_compact_matcher, sse2).await;

let follow_up_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, follow_up_msg) && !body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, follow_up_matcher, sse3).await;

let model_provider = non_openai_model_provider(&server);
let home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&home).await;
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(15_000);
let conversation_manager = ConversationManager::with_models_provider(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
);
let codex = conversation_manager
.new_conversation(config)
.await
.unwrap()
.conversation;

codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: large_user_msg.clone(),
}],
})
.await
.unwrap();

wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;

codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: follow_up_msg.to_string(),
}],
})
.await
.unwrap();

wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;

let requests = get_responses_requests(&server).await;
assert_eq!(
requests.len(),
3,
"expected user turn, auto compact, and follow-up request"
);
let auto_compact_index = requests
.iter()
.enumerate()
.find_map(|(idx, req)| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT).then_some(idx)
})
.expect("auto compact request missing");
assert_eq!(
auto_compact_index, 1,
"auto compact should run before the follow-up turn when usage is zero",
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_persists_rollout_entries() {
skip_if_no_network!();
Expand Down Expand Up @@ -1701,29 +1801,29 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_

let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 500),
ev_completed_with_tokens("r1", 5000),
]);
let first_summary_payload = auto_summary(FIRST_AUTO_SUMMARY);
let sse2 = sse(vec![
ev_assistant_message("m2", &first_summary_payload),
ev_completed_with_tokens("r2", 50),
ev_completed_with_tokens("r2", 500),
]);
let sse3 = sse(vec![
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r3", 150),
ev_completed_with_tokens("r3", 1500),
]);
let sse4 = sse(vec![
ev_assistant_message("m4", SECOND_LARGE_REPLY),
ev_completed_with_tokens("r4", 450),
ev_completed_with_tokens("r4", 4500),
]);
let second_summary_payload = auto_summary(SECOND_AUTO_SUMMARY);
let sse5 = sse(vec![
ev_assistant_message("m5", &second_summary_payload),
ev_completed_with_tokens("r5", 60),
ev_completed_with_tokens("r5", 600),
]);
let sse6 = sse(vec![
ev_assistant_message("m6", FINAL_REPLY),
ev_completed_with_tokens("r6", 120),
ev_completed_with_tokens("r6", 1200),
]);
let follow_up_user = "FOLLOW_UP_AUTO_COMPACT";
let final_user = "FINAL_AUTO_COMPACT";
Expand All @@ -1736,7 +1836,9 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
let mut config = load_default_config_for_test(&home).await;
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(200);
// Keep base instructions empty so token estimates align with mocked usage.
config.base_instructions = Some(String::new());
config.model_auto_compact_token_limit = Some(2000);
let conversation_manager = ConversationManager::with_models_provider(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
Expand Down Expand Up @@ -1814,14 +1916,14 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {

let server = start_mock_server().await;

let context_window = 100;
let context_window = 1000;
let limit = context_window * 90 / 100;
let over_limit_tokens = context_window * 95 / 100 + 1;
let follow_up_user = "FOLLOW_UP_AFTER_LIMIT";

let first_turn = sse(vec![
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r1", 50),
ev_completed_with_tokens("r1", 500),
]);
let function_call_follow_up = sse(vec![
ev_assistant_message("m2", FINAL_REPLY),
Expand All @@ -1830,9 +1932,9 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
let auto_summary_payload = auto_summary(AUTO_SUMMARY_TEXT);
let auto_compact_turn = sse(vec![
ev_assistant_message("m3", &auto_summary_payload),
ev_completed_with_tokens("r3", 10),
ev_completed_with_tokens("r3", 100),
]);
let post_auto_compact_turn = sse(vec![ev_completed_with_tokens("r4", 10)]);
let post_auto_compact_turn = sse(vec![ev_completed_with_tokens("r4", 100)]);

// Mount responses in order and keep mocks only for the ones we assert on.
let first_turn_mock = mount_sse_once(&server, first_turn).await;
Expand All @@ -1847,6 +1949,8 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
let mut config = load_default_config_for_test(&home).await;
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
// Keep base instructions empty so token estimates align with mocked usage.
config.base_instructions = Some(String::new());
config.model_context_window = Some(context_window);
config.model_auto_compact_token_limit = Some(limit);

Expand Down Expand Up @@ -1926,16 +2030,16 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
let second_user = "TRIGGER_COMPACT_AT_LIMIT";
let third_user = "AFTER_REMOTE_COMPACT";

let pre_last_reasoning_content = "a".repeat(2_400);
let post_last_reasoning_content = "b".repeat(4_000);
let pre_last_reasoning_content = "a".repeat(2_4000);
let post_last_reasoning_content = "b".repeat(4_0000);

let first_turn = sse(vec![
ev_reasoning_item("pre-reasoning", &["pre"], &[&pre_last_reasoning_content]),
ev_completed_with_tokens("r1", 10),
ev_completed_with_tokens("r1", 100),
]);
let second_turn = sse(vec![
ev_reasoning_item("post-reasoning", &["post"], &[&post_last_reasoning_content]),
ev_completed_with_tokens("r2", 80),
ev_completed_with_tokens("r2", 800),
]);
let third_turn = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
Expand Down Expand Up @@ -1974,6 +2078,8 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
set_test_compact_prompt(config);
// Keep base instructions empty so token estimates align with mocked usage.
config.base_instructions = Some(String::new());
config.model_auto_compact_token_limit = Some(300);
config.features.enable(Feature::RemoteCompaction);
})
Expand Down
Loading