From beb83225e53aa4de370a415a091c55a85a100fd2 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 17:59:59 -0800 Subject: [PATCH 01/14] etag remove --- codex-rs/codex-api/src/endpoint/models.rs | 54 +++++++++---------- .../codex-api/tests/models_integration.rs | 7 ++- .../core/src/openai_models/models_manager.rs | 10 +--- codex-rs/core/tests/common/responses.rs | 9 +--- codex-rs/core/tests/suite/remote_models.rs | 4 -- codex-rs/protocol/src/openai_models.rs | 2 - 6 files changed, 30 insertions(+), 56 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/models.rs b/codex-rs/codex-api/src/endpoint/models.rs index b15f07fca2a..7f21c776352 100644 --- a/codex-rs/codex-api/src/endpoint/models.rs +++ b/codex-rs/codex-api/src/endpoint/models.rs @@ -5,6 +5,7 @@ use crate::provider::Provider; use crate::telemetry::run_with_request_telemetry; use codex_client::HttpTransport; use codex_client::RequestTelemetry; +use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelsResponse; use http::HeaderMap; use http::Method; @@ -41,7 +42,7 @@ impl ModelsClient { &self, client_version: &str, extra_headers: HeaderMap, - ) -> Result { + ) -> Result<(Vec, Option), ApiError> { let builder = || { let mut req = self.provider.build_request(Method::GET, self.path()); req.headers.extend(extra_headers.clone()); @@ -66,7 +67,7 @@ impl ModelsClient { .and_then(|value| value.to_str().ok()) .map(ToString::to_string); - let ModelsResponse { models, etag } = serde_json::from_slice::(&resp.body) + let ModelsResponse { models } = serde_json::from_slice::(&resp.body) .map_err(|e| { ApiError::Stream(format!( "failed to decode models response: {e}; body: {}", @@ -74,9 +75,7 @@ impl ModelsClient { )) })?; - let etag = header_etag.unwrap_or(etag); - - Ok(ModelsResponse { models, etag }) + Ok((models, header_etag)) } } @@ -102,16 +101,15 @@ mod tests { struct CapturingTransport { last_request: Arc>>, body: Arc, + response_etag: Arc>, } impl Default for CapturingTransport { fn default() -> Self { Self { last_request: Arc::new(Mutex::new(None)), - body: Arc::new(ModelsResponse { - models: Vec::new(), - etag: String::new(), - }), + body: Arc::new(ModelsResponse { models: Vec::new() }), + response_etag: Arc::new(None), } } } @@ -122,8 +120,8 @@ mod tests { *self.last_request.lock().unwrap() = Some(req); let body = serde_json::to_vec(&*self.body).unwrap(); let mut headers = HeaderMap::new(); - if !self.body.etag.is_empty() { - headers.insert(ETAG, self.body.etag.parse().unwrap()); + if let Some(etag) = self.response_etag.as_ref().as_deref() { + headers.insert(ETAG, etag.parse().unwrap()); } Ok(Response { status: StatusCode::OK, @@ -166,14 +164,12 @@ mod tests { #[tokio::test] async fn appends_client_version_query() { - let response = ModelsResponse { - models: Vec::new(), - etag: String::new(), - }; + let response = ModelsResponse { models: Vec::new() }; let transport = CapturingTransport { last_request: Arc::new(Mutex::new(None)), body: Arc::new(response), + response_etag: Arc::new(None), }; let client = ModelsClient::new( @@ -182,12 +178,12 @@ mod tests { DummyAuth, ); - let result = client + let (models, _etag) = client .list_models("0.99.0", HeaderMap::new()) .await .expect("request should succeed"); - assert_eq!(result.models.len(), 0); + assert_eq!(models.len(), 0); let url = transport .last_request @@ -232,12 +228,12 @@ mod tests { })) .unwrap(), ], - etag: String::new(), }; let transport = CapturingTransport { last_request: Arc::new(Mutex::new(None)), body: Arc::new(response), + response_etag: Arc::new(None), }; let client = ModelsClient::new( @@ -246,27 +242,25 @@ mod tests { DummyAuth, ); - let result = client + let (models, _etag) = client .list_models("0.99.0", HeaderMap::new()) .await .expect("request should succeed"); - assert_eq!(result.models.len(), 1); - assert_eq!(result.models[0].slug, "gpt-test"); - assert_eq!(result.models[0].supported_in_api, true); - assert_eq!(result.models[0].priority, 1); + assert_eq!(models.len(), 1); + assert_eq!(models[0].slug, "gpt-test"); + assert_eq!(models[0].supported_in_api, true); + assert_eq!(models[0].priority, 1); } #[tokio::test] async fn list_models_includes_etag() { - let response = ModelsResponse { - models: Vec::new(), - etag: "\"abc\"".to_string(), - }; + let response = ModelsResponse { models: Vec::new() }; let transport = CapturingTransport { last_request: Arc::new(Mutex::new(None)), body: Arc::new(response), + response_etag: Arc::new(Some("\"abc\"".to_string())), }; let client = ModelsClient::new( @@ -275,12 +269,12 @@ mod tests { DummyAuth, ); - let result = client + let (models, etag) = client .list_models("0.1.0", HeaderMap::new()) .await .expect("request should succeed"); - assert_eq!(result.models.len(), 0); - assert_eq!(result.etag, "\"abc\""); + assert_eq!(models.len(), 0); + assert_eq!(etag.as_deref(), Some("\"abc\"")); } } diff --git a/codex-rs/codex-api/tests/models_integration.rs b/codex-rs/codex-api/tests/models_integration.rs index 93baffd3560..0b3e95ee303 100644 --- a/codex-rs/codex-api/tests/models_integration.rs +++ b/codex-rs/codex-api/tests/models_integration.rs @@ -90,7 +90,6 @@ async fn models_client_hits_models_endpoint() { reasoning_summary_format: ReasoningSummaryFormat::None, experimental_supported_tools: Vec::new(), }], - etag: String::new(), }; Mock::given(method("GET")) @@ -106,13 +105,13 @@ async fn models_client_hits_models_endpoint() { let transport = ReqwestTransport::new(reqwest::Client::new()); let client = ModelsClient::new(transport, provider(&base_url), DummyAuth); - let result = client + let (models, _etag) = client .list_models("0.1.0", HeaderMap::new()) .await .expect("models request should succeed"); - assert_eq!(result.models.len(), 1); - assert_eq!(result.models[0].slug, "gpt-test"); + assert_eq!(models.len(), 1); + assert_eq!(models[0].slug, "gpt-test"); let received = server .received_requests() diff --git a/codex-rs/core/src/openai_models/models_manager.rs b/codex-rs/core/src/openai_models/models_manager.rs index 7f54c4f8525..d6ca13e76bb 100644 --- a/codex-rs/core/src/openai_models/models_manager.rs +++ b/codex-rs/core/src/openai_models/models_manager.rs @@ -94,12 +94,12 @@ impl ModelsManager { let client = ModelsClient::new(transport, api_provider, api_auth); let client_version = format_client_version_to_whole(); - let ModelsResponse { models, etag } = client + let (models, etag) = client .list_models(&client_version, HeaderMap::new()) .await .map_err(map_api_error)?; - let etag = (!etag.is_empty()).then_some(etag); + let etag = etag.filter(|value| !value.is_empty()); self.apply_remote_models(models.clone()).await; *self.etag.write().await = etag.clone(); @@ -389,7 +389,6 @@ mod tests { &server, ModelsResponse { models: remote_models.clone(), - etag: String::new(), }, ) .await; @@ -446,7 +445,6 @@ mod tests { &server, ModelsResponse { models: remote_models.clone(), - etag: String::new(), }, ) .await; @@ -501,7 +499,6 @@ mod tests { &server, ModelsResponse { models: initial_models.clone(), - etag: String::new(), }, ) .await; @@ -542,7 +539,6 @@ mod tests { &server, ModelsResponse { models: updated_models.clone(), - etag: String::new(), }, ) .await; @@ -576,7 +572,6 @@ mod tests { &server, ModelsResponse { models: initial_models, - etag: String::new(), }, ) .await; @@ -605,7 +600,6 @@ mod tests { &server, ModelsResponse { models: refreshed_models, - etag: String::new(), }, ) .await; diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index b98b29625eb..8fcabdda541 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -677,14 +677,7 @@ pub async fn start_mock_server() -> MockServer { .await; // Provide a default `/models` response so tests remain hermetic when the client queries it. - let _ = mount_models_once( - &server, - ModelsResponse { - models: Vec::new(), - etag: String::new(), - }, - ) - .await; + let _ = mount_models_once(&server, ModelsResponse { models: Vec::new() }).await; server } diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index 3c4d389ec05..4086732c2b9 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -93,7 +93,6 @@ async fn remote_models_remote_model_uses_unified_exec() -> Result<()> { &server, ModelsResponse { models: vec![remote_model], - etag: String::new(), }, ) .await; @@ -232,7 +231,6 @@ async fn remote_models_apply_remote_base_instructions() -> Result<()> { &server, ModelsResponse { models: vec![remote_model], - etag: String::new(), }, ) .await; @@ -310,7 +308,6 @@ async fn remote_models_preserve_builtin_presets() -> Result<()> { &server, ModelsResponse { models: vec![remote_model.clone()], - etag: String::new(), }, ) .await; @@ -368,7 +365,6 @@ async fn remote_models_hide_picker_only_models() -> Result<()> { &server, ModelsResponse { models: vec![remote_model], - etag: String::new(), }, ) .await; diff --git a/codex-rs/protocol/src/openai_models.rs b/codex-rs/protocol/src/openai_models.rs index 28b25bb604e..4cdc0e4ba6f 100644 --- a/codex-rs/protocol/src/openai_models.rs +++ b/codex-rs/protocol/src/openai_models.rs @@ -197,8 +197,6 @@ pub struct ModelInfo { #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, TS, JsonSchema, Default)] pub struct ModelsResponse { pub models: Vec, - #[serde(default)] - pub etag: String, } // convert ModelInfo to ModelPreset From 677532f97bf1ead8740e71fb74e4686585ba89e7 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 18:25:07 -0800 Subject: [PATCH 02/14] progress --- codex-rs/core/src/client.rs | 12 ++++++++++-- codex-rs/core/src/codex.rs | 8 ++++++++ codex-rs/core/src/openai_models/models_manager.rs | 4 ++++ codex-rs/core/tests/chat_completions_payload.rs | 1 + codex-rs/core/tests/chat_completions_sse.rs | 1 + codex-rs/core/tests/responses_headers.rs | 3 +++ codex-rs/core/tests/suite/client.rs | 1 + 7 files changed, 28 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index aaf3b0ea353..53212f72ffa 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -58,6 +58,7 @@ pub struct ModelClient { config: Arc, auth_manager: Option>, model_family: ModelFamily, + models_etag: Option, otel_manager: OtelManager, provider: ModelProviderInfo, conversation_id: ConversationId, @@ -72,6 +73,7 @@ impl ModelClient { config: Arc, auth_manager: Option>, model_family: ModelFamily, + models_etag: Option, otel_manager: OtelManager, provider: ModelProviderInfo, effort: Option, @@ -83,6 +85,7 @@ impl ModelClient { config, auth_manager, model_family, + models_etag, otel_manager, provider, conversation_id, @@ -262,7 +265,7 @@ impl ModelClient { store_override: None, conversation_id: Some(conversation_id.clone()), session_source: Some(session_source.clone()), - extra_headers: beta_feature_headers(&self.config), + extra_headers: beta_feature_headers(&self.config, self.models_etag.clone()), }; let stream_result = client @@ -398,7 +401,7 @@ fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec ApiHeaderMap { +fn beta_feature_headers(config: &Config, models_etag: Option) -> ApiHeaderMap { let enabled = FEATURES .iter() .filter_map(|spec| { @@ -416,6 +419,11 @@ fn beta_feature_headers(config: &Config) -> ApiHeaderMap { { headers.insert("x-codex-beta-features", header_value); } + if let Some(etag) = models_etag + && let Ok(header_value) = HeaderValue::from_str(&etag) + { + headers.insert("X-If-Models-Match", header_value); + } headers } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index f0d2056587c..649bac18c42 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -492,6 +492,7 @@ impl Session { session_configuration: &SessionConfiguration, per_turn_config: Config, model_family: ModelFamily, + models_etag: Option, conversation_id: ConversationId, sub_id: String, ) -> TurnContext { @@ -505,6 +506,7 @@ impl Session { per_turn_config.clone(), auth_manager, model_family.clone(), + models_etag, otel_manager, provider, session_configuration.model_reasoning_effort, @@ -919,6 +921,7 @@ impl Session { .models_manager .construct_model_family(session_configuration.model.as_str(), &per_turn_config) .await; + let models_etag = self.services.models_manager.get_models_etag().await; let mut turn_context: TurnContext = Self::make_turn_context( Some(Arc::clone(&self.services.auth_manager)), &self.services.otel_manager, @@ -926,6 +929,7 @@ impl Session { &session_configuration, per_turn_config, model_family, + models_etag, self.conversation_id, sub_id, ); @@ -2105,6 +2109,7 @@ async fn spawn_review_thread( .models_manager .construct_model_family(&model, &config) .await; + let models_etag = sess.services.models_manager.get_models_etag().await; // For reviews, disable web_search and view_image regardless of global settings. let mut review_features = sess.features.clone(); review_features @@ -2137,6 +2142,7 @@ async fn spawn_review_thread( per_turn_config.clone(), auth_manager, model_family.clone(), + models_etag, otel_manager, provider, per_turn_config.model_reasoning_effort, @@ -3163,6 +3169,7 @@ mod tests { &session_configuration, per_turn_config, model_family, + None, conversation_id, "turn_id".to_string(), ); @@ -3249,6 +3256,7 @@ mod tests { &session_configuration, per_turn_config, model_family, + None, conversation_id, "turn_id".to_string(), )); diff --git a/codex-rs/core/src/openai_models/models_manager.rs b/codex-rs/core/src/openai_models/models_manager.rs index d6ca13e76bb..fa8ea4b12d6 100644 --- a/codex-rs/core/src/openai_models/models_manager.rs +++ b/codex-rs/core/src/openai_models/models_manager.rs @@ -131,6 +131,10 @@ impl ModelsManager { .with_config_overrides(config) } + pub async fn get_models_etag(&self) -> Option { + self.etag.read().await.clone() + } + pub async fn get_model(&self, model: &Option, config: &Config) -> String { if let Some(model) = model.as_ref() { return model.to_string(); diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index 5867935470e..b0086ef5186 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -92,6 +92,7 @@ async fn run_request(input: Vec) -> Value { Arc::clone(&config), None, model_family, + None, otel_manager, provider, effort, diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index f58b039220e..74f7730504c 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -93,6 +93,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { Arc::clone(&config), None, model_family, + None, otel_manager, provider, effort, diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 5c32685cc92..f005ec5be9e 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -86,6 +86,7 @@ async fn responses_stream_includes_subagent_header_on_review() { Arc::clone(&config), None, model_family, + None, otel_manager, provider, effort, @@ -181,6 +182,7 @@ async fn responses_stream_includes_subagent_header_on_other() { Arc::clone(&config), None, model_family, + None, otel_manager, provider, effort, @@ -275,6 +277,7 @@ async fn responses_respects_model_family_overrides_from_config() { Arc::clone(&config), None, model_family, + None, otel_manager, provider, effort, diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index bda232433da..65b1a7f4c8e 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1146,6 +1146,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { Arc::clone(&config), None, model_family, + None, otel_manager, provider, effort, From f8ba48d995e346c8e7abbd109a74429f2efeaee9 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 18:32:41 -0800 Subject: [PATCH 03/14] progress --- codex-rs/core/src/api_bridge.rs | 5 +++++ codex-rs/core/src/error.rs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/codex-rs/core/src/api_bridge.rs b/codex-rs/core/src/api_bridge.rs index 79fd67d6501..c7da55da853 100644 --- a/codex-rs/core/src/api_bridge.rs +++ b/codex-rs/core/src/api_bridge.rs @@ -67,6 +67,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr { status, request_id: extract_request_id(headers.as_ref()), }) + } else if status == http::StatusCode::PRECONDITION_FAILED + && body_text + .contains("Models catalog has changed. Please refresh your models list.") + { + CodexErr::OutdatedModels } else { CodexErr::UnexpectedStatus(UnexpectedResponseError { status, diff --git a/codex-rs/core/src/error.rs b/codex-rs/core/src/error.rs index e8fa91d26e8..e85eaf0dbfe 100644 --- a/codex-rs/core/src/error.rs +++ b/codex-rs/core/src/error.rs @@ -90,6 +90,10 @@ pub enum CodexErr { #[error("spawn failed: child stdout/stderr not captured")] Spawn, + /// Returned when the models list is outdated and needs to be refreshed. + #[error("remote models list is outdated")] + OutdatedModels, + /// Returned by run_command_stream when the user pressed Ctrl‑C (SIGINT). Session uses this to /// surface a polite FunctionCallOutput back to the model instead of crashing the CLI. #[error("interrupted (Ctrl-C). Something went wrong? Hit `/feedback` to report the issue.")] From 09693d259b0de8debf1bfa0a07b85b515352b400 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 19:02:58 -0800 Subject: [PATCH 04/14] rwlock --- codex-rs/core/src/client.rs | 41 ++++++++++++++++++++++++++++++------ codex-rs/core/src/codex.rs | 29 ++++++++++++++++++++++++- codex-rs/core/src/compact.rs | 2 +- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 53212f72ffa..c929e8e8610 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::RwLock; use crate::api_bridge::auth_provider_from_auth; use crate::api_bridge::map_api_error; @@ -53,12 +54,12 @@ use crate::openai_models::model_family::ModelFamily; use crate::tools::spec::create_tools_json_for_chat_completions_api; use crate::tools::spec::create_tools_json_for_responses_api; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ModelClient { config: Arc, auth_manager: Option>, - model_family: ModelFamily, - models_etag: Option, + model_family: RwLock, + models_etag: RwLock>, otel_manager: OtelManager, provider: ModelProviderInfo, conversation_id: ConversationId, @@ -84,8 +85,8 @@ impl ModelClient { Self { config, auth_manager, - model_family, - models_etag, + model_family: RwLock::new(model_family), + models_etag: RwLock::new(models_etag), otel_manager, provider, conversation_id, @@ -111,6 +112,22 @@ impl ModelClient { &self.provider } + pub fn update_models_etag(&self, models_etag: Option) { + let mut guard = self + .models_etag + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *guard = models_etag; + } + + pub fn update_model_family(&self, model_family: ModelFamily) { + let mut guard = self + .model_family + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *guard = model_family; + } + /// Streams a single model turn using either the Responses or Chat /// Completions wire API, depending on the configured provider. /// @@ -265,7 +282,7 @@ impl ModelClient { store_override: None, conversation_id: Some(conversation_id.clone()), session_source: Some(session_source.clone()), - extra_headers: beta_feature_headers(&self.config, self.models_etag.clone()), + extra_headers: beta_feature_headers(&self.config, self.get_models_etag()), }; let stream_result = client @@ -306,7 +323,17 @@ impl ModelClient { /// Returns the currently configured model family. pub fn get_model_family(&self) -> ModelFamily { - self.model_family.clone() + self.model_family + .read() + .map(|model_family| model_family.clone()) + .unwrap_or_else(|err| err.into_inner().clone()) + } + + fn get_models_etag(&self) -> Option { + self.models_etag + .read() + .map(|models_etag| models_etag.clone()) + .unwrap_or_else(|err| err.into_inner().clone()) } /// Returns the current reasoning effort setting. diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 649bac18c42..402b4db2565 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2459,6 +2459,34 @@ async fn run_turn( Err(e @ CodexErr::InvalidRequest(_)) => return Err(e), Err(e @ CodexErr::RefreshTokenFailed(_)) => return Err(e), Err(e) => { + // Refresh models if we got an outdated models error + if matches!(e, CodexErr::OutdatedModels) { + let config = { + let state = sess.state.lock().await; + state + .session_configuration + .original_config_do_not_use + .clone() + }; + if let Err(err) = sess + .services + .models_manager + .refresh_available_models(&config) + .await + { + error!("failed to refresh models after outdated models error: {err}"); + } + let models_etag = sess.services.models_manager.get_models_etag().await; + let model = turn_context.client.get_model(); + let model_family = sess + .services + .models_manager + .construct_model_family(&model, &config) + .await; + turn_context.client.update_model_family(model_family); + turn_context.client.update_models_etag(models_etag); + } + // Use the configured provider-specific stream retry budget. let max_retries = turn_context.client.get_provider().stream_max_retries(); if retries < max_retries { @@ -2543,7 +2571,6 @@ async fn try_run_turn( sess.persist_rollout_items(&[rollout_item]).await; let mut stream = turn_context .client - .clone() .stream(prompt) .instrument(trace_span!("stream_request")) .or_cancel(&cancellation_token) diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 1a90b7b223f..3290d1b321b 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -290,7 +290,7 @@ async fn drain_to_completed( turn_context: &TurnContext, prompt: &Prompt, ) -> CodexResult<()> { - let mut stream = turn_context.client.clone().stream(prompt).await?; + let mut stream = turn_context.client.stream(prompt).await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { From e01610f762a640da6549595aeecf9dbd680b6839 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 19:37:24 -0800 Subject: [PATCH 05/14] unit test --- codex-rs/core/src/codex.rs | 2 +- .../core/src/openai_models/models_manager.rs | 28 +- codex-rs/core/tests/common/responses.rs | 18 ++ codex-rs/core/tests/suite/remote_models.rs | 276 +++++++++++++++++- 4 files changed, 312 insertions(+), 12 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 402b4db2565..f0370c0e733 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -249,7 +249,7 @@ impl Codex { let config = Arc::new(config); if config.features.enabled(Feature::RemoteModels) - && let Err(err) = models_manager.refresh_available_models(&config).await + && let Err(err) = models_manager.try_refresh_available_models(&config).await { error!("failed to refresh available models: {err:?}"); } diff --git a/codex-rs/core/src/openai_models/models_manager.rs b/codex-rs/core/src/openai_models/models_manager.rs index fa8ea4b12d6..999ac20164d 100644 --- a/codex-rs/core/src/openai_models/models_manager.rs +++ b/codex-rs/core/src/openai_models/models_manager.rs @@ -77,7 +77,7 @@ impl ModelsManager { } /// Fetch the latest remote models, using the on-disk cache when still fresh. - pub async fn refresh_available_models(&self, config: &Config) -> CoreResult<()> { + pub async fn try_refresh_available_models(&self, config: &Config) -> CoreResult<()> { if !config.features.enabled(Feature::RemoteModels) || self.auth_manager.get_auth_mode() == Some(AuthMode::ApiKey) { @@ -86,7 +86,15 @@ impl ModelsManager { if self.try_load_cache().await { return Ok(()); } + self.refresh_available_models(config).await + } + pub async fn refresh_available_models(&self, config: &Config) -> CoreResult<()> { + if !config.features.enabled(Feature::RemoteModels) + || self.auth_manager.get_auth_mode() == Some(AuthMode::ApiKey) + { + return Ok(()); + } let auth = self.auth_manager.auth(); let api_provider = self.provider.to_api_provider(Some(AuthMode::ChatGPT))?; let api_auth = auth_provider_from_auth(auth.clone(), &self.provider).await?; @@ -108,7 +116,7 @@ impl ModelsManager { } pub async fn list_models(&self, config: &Config) -> Vec { - if let Err(err) = self.refresh_available_models(config).await { + if let Err(err) = self.try_refresh_available_models(config).await { error!("failed to refresh available models: {err}"); } let remote_models = self.remote_models(config).await; @@ -139,7 +147,7 @@ impl ModelsManager { if let Some(model) = model.as_ref() { return model.to_string(); } - if let Err(err) = self.refresh_available_models(config).await { + if let Err(err) = self.try_refresh_available_models(config).await { error!("failed to refresh available models: {err}"); } // if codex-auto-balanced exists & signed in with chatgpt mode, return it, otherwise return the default model @@ -410,7 +418,7 @@ mod tests { let manager = ModelsManager::with_provider(auth_manager, provider); manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("refresh succeeds"); let cached_remote = manager.remote_models(&config).await; @@ -469,7 +477,7 @@ mod tests { let manager = ModelsManager::with_provider(auth_manager, provider); manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("first refresh succeeds"); assert_eq!( @@ -480,7 +488,7 @@ mod tests { // Second call should read from cache and avoid the network. manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("cached refresh succeeds"); assert_eq!( @@ -523,7 +531,7 @@ mod tests { let manager = ModelsManager::with_provider(auth_manager, provider); manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("initial refresh succeeds"); @@ -548,7 +556,7 @@ mod tests { .await; manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("second refresh succeeds"); assert_eq!( @@ -594,7 +602,7 @@ mod tests { manager.cache_ttl = Duration::ZERO; manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("initial refresh succeeds"); @@ -609,7 +617,7 @@ mod tests { .await; manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("second refresh succeeds"); diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 8fcabdda541..76133a9fa39 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -670,6 +670,24 @@ pub async fn mount_models_once(server: &MockServer, body: ModelsResponse) -> Mod models_mock } +pub async fn mount_models_once_with_etag( + server: &MockServer, + body: ModelsResponse, + etag: &str, +) -> ModelsMock { + let (mock, models_mock) = models_mock(); + mock.respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "application/json") + .insert_header("etag", etag) + .set_body_json(body.clone()), + ) + .up_to_n_times(1) + .mount(server) + .await; + models_mock +} + pub async fn start_mock_server() -> MockServer { let server = MockServer::builder() .body_print_limit(BodyPrintLimit::Limited(80_000)) diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index 4086732c2b9..6cbec027a17 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -33,8 +33,12 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_response_created; +use core_test_support::responses::ev_shell_command_call; use core_test_support::responses::mount_models_once; +use core_test_support::responses::mount_models_once_with_etag; +use core_test_support::responses::mount_response_once_match; use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::skip_if_no_network; @@ -42,6 +46,7 @@ use core_test_support::skip_if_sandbox; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; +use serde_json::Value; use serde_json::json; use tempfile::TempDir; use tokio::time::Duration; @@ -49,9 +54,92 @@ use tokio::time::Instant; use tokio::time::sleep; use wiremock::BodyPrintLimit; use wiremock::MockServer; +use wiremock::ResponseTemplate; const REMOTE_MODEL_SLUG: &str = "codex-test"; +#[derive(Clone, Default)] +struct ResponsesMatch { + etag: Option, + user_text: Option, + call_id: Option, +} + +impl ResponsesMatch { + fn with_etag(mut self, etag: &str) -> Self { + self.etag = Some(etag.to_string()); + self + } + + fn with_user_text(mut self, text: &str) -> Self { + self.user_text = Some(text.to_string()); + self + } + + fn with_function_call_output(mut self, call_id: &str) -> Self { + self.call_id = Some(call_id.to_string()); + self + } +} + +impl wiremock::Match for ResponsesMatch { + fn matches(&self, request: &wiremock::Request) -> bool { + if let Some(expected_etag) = &self.etag { + let header = request + .headers + .get("X-If-Models-Match") + .and_then(|value| value.to_str().ok()); + if header != Some(expected_etag.as_str()) { + return false; + } + } + + let Ok(body): Result = request.body_json() else { + return false; + }; + let Some(items) = body.get("input").and_then(Value::as_array) else { + return false; + }; + + if let Some(expected_text) = &self.user_text + && !input_has_user_text(items, expected_text) + { + return false; + } + + if let Some(expected_call_id) = &self.call_id + && !input_has_function_call_output(items, expected_call_id) + { + return false; + } + + true + } +} + +fn input_has_user_text(items: &[Value], expected: &str) -> bool { + items.iter().any(|item| { + item.get("type").and_then(Value::as_str) == Some("message") + && item.get("role").and_then(Value::as_str) == Some("user") + && item + .get("content") + .and_then(Value::as_array) + .is_some_and(|content| { + content.iter().any(|span| { + span.get("type").and_then(Value::as_str) == Some("input_text") + && span.get("text").and_then(Value::as_str) == Some(expected) + }) + }) + }) +} + +fn input_has_function_call_output(items: &[Value], call_id: &str) -> bool { + items.iter().any(|item| { + item.get("type").and_then(Value::as_str) == Some("function_call_output") + && item.get("call_id").and_then(Value::as_str) == Some(call_id) + }) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_models_remote_model_uses_unified_exec() -> Result<()> { skip_if_no_network!(Ok(())); @@ -297,6 +385,192 @@ async fn remote_models_apply_remote_base_instructions() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = MockServer::builder() + .body_print_limit(BodyPrintLimit::Limited(80_000)) + .start() + .await; + + let remote_model = test_remote_model("remote-etag", ModelVisibility::List, 1); + let initial_etag = "models-etag-initial"; + let refreshed_etag = "models-etag-refreshed"; + + // Phase 1: start Codex with remote models and capture the initial ETag. + let models_mock = mount_models_once_with_etag( + &server, + ModelsResponse { + models: vec![remote_model.clone()], + }, + initial_etag, + ) + .await; + + let harness = build_remote_models_harness(&server, |config| { + config.features.enable(Feature::RemoteModels); + config.model = Some("gpt-5.1".to_string()); + }) + .await?; + + let RemoteModelsHarness { + codex, + cwd, + config, + conversation_manager, + .. + } = harness; + + let models_manager = conversation_manager.get_models_manager(); + wait_for_model_available(&models_manager, "remote-etag", &config).await; + + assert_eq!( + models_manager.get_models_etag().await.as_deref(), + Some(initial_etag), + ); + assert_eq!( + models_mock.requests().len(), + 1, + "expected an initial /models request", + ); + assert_eq!(models_mock.requests()[0].url.path(), "/v1/models"); + + // Phase 2: the tool output turn hits a 412 and triggers a models refresh. + server.reset().await; + let refreshed_models_mock = mount_models_once_with_etag( + &server, + ModelsResponse { + models: vec![remote_model], + }, + refreshed_etag, + ) + .await; + + let call_id = "shell-command-call"; + let first_prompt = "run a shell command"; + let followup_prompt = "send another message"; + + let first_response = mount_sse_once_match( + &server, + ResponsesMatch::default() + .with_etag(initial_etag) + .with_user_text(first_prompt), + sse(vec![ + ev_response_created("resp-1"), + ev_shell_command_call(call_id, "echo refreshed"), + ev_completed("resp-1"), + ]), + ) + .await; + + let stale_response = mount_response_once_match( + &server, + ResponsesMatch::default() + .with_etag(initial_etag) + .with_function_call_output(call_id), + ResponseTemplate::new(412) + .set_body_string("Models catalog has changed. Please refresh your models list."), + ) + .await; + + let refreshed_response = mount_sse_once_match( + &server, + ResponsesMatch::default() + .with_etag(refreshed_etag) + .with_function_call_output(call_id), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ) + .await; + + // Phase 3: the next user turn should send the refreshed ETag. + let next_turn_response = mount_sse_once_match( + &server, + ResponsesMatch::default() + .with_etag(refreshed_etag) + .with_user_text(followup_prompt), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-2", "ok"), + ev_completed("resp-3"), + ]), + ) + .await; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: first_prompt.into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: "gpt-5.1".to_string(), + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + assert_eq!( + refreshed_models_mock.requests().len(), + 1, + "expected a refreshed /models request", + ); + assert_eq!( + models_manager.get_models_etag().await.as_deref(), + Some(refreshed_etag), + ); + + assert_eq!( + first_response.single_request().header("X-If-Models-Match"), + Some(initial_etag.to_string()), + ); + assert_eq!( + stale_response.single_request().header("X-If-Models-Match"), + Some(initial_etag.to_string()), + ); + assert_eq!( + refreshed_response + .single_request() + .header("X-If-Models-Match"), + Some(refreshed_etag.to_string()), + ); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: followup_prompt.into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: "gpt-5.1".to_string(), + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + assert_eq!( + next_turn_response + .single_request() + .header("X-If-Models-Match"), + Some(refreshed_etag.to_string()), + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_models_preserve_builtin_presets() -> Result<()> { skip_if_no_network!(Ok(())); @@ -327,7 +601,7 @@ async fn remote_models_preserve_builtin_presets() -> Result<()> { ); manager - .refresh_available_models(&config) + .try_refresh_available_models(&config) .await .expect("refresh succeeds"); From 985333feffc2ad54fb66898b5bd56ee7eb08646e Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 19:44:55 -0800 Subject: [PATCH 06/14] comments --- codex-rs/core/tests/suite/remote_models.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index 6cbec027a17..d54405fe6dc 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -385,6 +385,12 @@ async fn remote_models_apply_remote_base_instructions() -> Result<()> { Ok(()) } +/// Exercises the remote-models retry flow: +/// 1) initial `/models` fetch stores an ETag, +/// 2) `/responses` uses that ETag for a tool call, +/// 3) the tool-output turn receives a 412 (stale models), +/// 4) Codex refreshes `/models` to get a new ETag and retries, +/// 5) subsequent user turns keep sending the refreshed ETag. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { skip_if_no_network!(Ok(())); @@ -399,7 +405,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { let initial_etag = "models-etag-initial"; let refreshed_etag = "models-etag-refreshed"; - // Phase 1: start Codex with remote models and capture the initial ETag. + // Phase 1a: seed the initial `/models` response with an ETag. let models_mock = mount_models_once_with_etag( &server, ModelsResponse { @@ -409,6 +415,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { ) .await; + // Phase 1b: boot a Codex session configured for remote models. let harness = build_remote_models_harness(&server, |config| { config.features.enable(Feature::RemoteModels); config.model = Some("gpt-5.1".to_string()); @@ -426,6 +433,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { let models_manager = conversation_manager.get_models_manager(); wait_for_model_available(&models_manager, "remote-etag", &config).await; + // Phase 1c: confirm the ETag is stored and `/models` was called. assert_eq!( models_manager.get_models_etag().await.as_deref(), Some(initial_etag), @@ -437,8 +445,9 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { ); assert_eq!(models_mock.requests()[0].url.path(), "/v1/models"); - // Phase 2: the tool output turn hits a 412 and triggers a models refresh. + // Phase 2a: reset mocks so the next `/models` call must be explicit. server.reset().await; + // Phase 2b: mount a refreshed `/models` response with a new ETag. let refreshed_models_mock = mount_models_once_with_etag( &server, ModelsResponse { @@ -452,6 +461,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { let first_prompt = "run a shell command"; let followup_prompt = "send another message"; + // Phase 2c: first `/responses` turn uses the initial ETag and emits a tool call. let first_response = mount_sse_once_match( &server, ResponsesMatch::default() @@ -465,6 +475,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { ) .await; + // Phase 2d: the tool-output follow-up returns 412 (stale models). let stale_response = mount_response_once_match( &server, ResponsesMatch::default() @@ -475,6 +486,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { ) .await; + // Phase 2e: retry tool-output follow-up should use the refreshed ETag. let refreshed_response = mount_sse_once_match( &server, ResponsesMatch::default() @@ -488,7 +500,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { ) .await; - // Phase 3: the next user turn should send the refreshed ETag. + // Phase 3a: next user turn should also use the refreshed ETag. let next_turn_response = mount_sse_once_match( &server, ResponsesMatch::default() @@ -502,6 +514,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { ) .await; + // Phase 3b: run the first user turn and let retries complete. codex .submit(Op::UserTurn { items: vec![UserInput::Text { @@ -519,6 +532,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + // Phase 3c: assert the refresh happened and the ETag was updated. assert_eq!( refreshed_models_mock.requests().len(), 1, @@ -529,6 +543,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { Some(refreshed_etag), ); + // Phase 3d: assert the ETag header progression across the retry sequence. assert_eq!( first_response.single_request().header("X-If-Models-Match"), Some(initial_etag.to_string()), @@ -544,6 +559,7 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { Some(refreshed_etag.to_string()), ); + // Phase 3e: execute a new user turn and ensure the refreshed ETag persists. codex .submit(Op::UserTurn { items: vec![UserInput::Text { From ecff4d4f721a256c778ae947f2a439834f7cec19 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 19:55:38 -0800 Subject: [PATCH 07/14] comments --- codex-rs/core/src/codex.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index f0370c0e733..033229124ba 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2409,21 +2409,22 @@ async fn run_turn( ), )); - let model_supports_parallel = turn_context - .client - .get_model_family() - .supports_parallel_tool_calls; - - let prompt = Prompt { - input, - tools: router.specs(), - parallel_tool_calls: model_supports_parallel && sess.enabled(Feature::ParallelToolCalls), - base_instructions_override: turn_context.base_instructions.clone(), - output_schema: turn_context.final_output_json_schema.clone(), - }; - let mut retries = 0; loop { + let model_supports_parallel = turn_context + .client + .get_model_family() + .supports_parallel_tool_calls; + + let prompt = Prompt { + input: input.clone(), + tools: router.specs(), + parallel_tool_calls: model_supports_parallel + && sess.enabled(Feature::ParallelToolCalls), + base_instructions_override: turn_context.base_instructions.clone(), + output_schema: turn_context.final_output_json_schema.clone(), + }; + match try_run_turn( Arc::clone(&router), Arc::clone(&sess), From 359142f22f47e80646005f581041892dedee217e Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 20:03:11 -0800 Subject: [PATCH 08/14] comments --- codex-rs/core/src/codex.rs | 99 ++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 033229124ba..b0b789da4bb 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2411,19 +2411,12 @@ async fn run_turn( let mut retries = 0; loop { - let model_supports_parallel = turn_context - .client - .get_model_family() - .supports_parallel_tool_calls; - - let prompt = Prompt { - input: input.clone(), - tools: router.specs(), - parallel_tool_calls: model_supports_parallel - && sess.enabled(Feature::ParallelToolCalls), - base_instructions_override: turn_context.base_instructions.clone(), - output_schema: turn_context.final_output_json_schema.clone(), - }; + let prompt = build_prompt( + sess.as_ref(), + turn_context.as_ref(), + router.as_ref(), + &input, + ); match try_run_turn( Arc::clone(&router), @@ -2460,38 +2453,15 @@ async fn run_turn( Err(e @ CodexErr::InvalidRequest(_)) => return Err(e), Err(e @ CodexErr::RefreshTokenFailed(_)) => return Err(e), Err(e) => { - // Refresh models if we got an outdated models error - if matches!(e, CodexErr::OutdatedModels) { - let config = { - let state = sess.state.lock().await; - state - .session_configuration - .original_config_do_not_use - .clone() - }; - if let Err(err) = sess - .services - .models_manager - .refresh_available_models(&config) - .await - { - error!("failed to refresh models after outdated models error: {err}"); - } - let models_etag = sess.services.models_manager.get_models_etag().await; - let model = turn_context.client.get_model(); - let model_family = sess - .services - .models_manager - .construct_model_family(&model, &config) - .await; - turn_context.client.update_model_family(model_family); - turn_context.client.update_models_etag(models_etag); - } - // Use the configured provider-specific stream retry budget. let max_retries = turn_context.client.get_provider().stream_max_retries(); if retries < max_retries { retries += 1; + // Refresh models if we got an outdated models error + if matches!(e, CodexErr::OutdatedModels) { + refresh_models_after_outdated_error(sess.as_ref(), turn_context.as_ref()) + .await; + } let delay = match e { CodexErr::Stream(_, Some(delay)) => delay, _ => backoff(retries), @@ -2519,6 +2489,53 @@ async fn run_turn( } } +fn build_prompt( + sess: &Session, + turn_context: &TurnContext, + router: &ToolRouter, + input: &[ResponseItem], +) -> Prompt { + let model_supports_parallel = turn_context + .client + .get_model_family() + .supports_parallel_tool_calls; + + Prompt { + input: input.to_vec(), + tools: router.specs(), + parallel_tool_calls: model_supports_parallel && sess.enabled(Feature::ParallelToolCalls), + base_instructions_override: turn_context.base_instructions.clone(), + output_schema: turn_context.final_output_json_schema.clone(), + } +} + +async fn refresh_models_after_outdated_error(sess: &Session, turn_context: &TurnContext) { + let config = { + let state = sess.state.lock().await; + state + .session_configuration + .original_config_do_not_use + .clone() + }; + if let Err(err) = sess + .services + .models_manager + .refresh_available_models(&config) + .await + { + error!("failed to refresh models after outdated models error: {err}"); + } + let models_etag = sess.services.models_manager.get_models_etag().await; + let model = turn_context.client.get_model(); + let model_family = sess + .services + .models_manager + .construct_model_family(&model, &config) + .await; + turn_context.client.update_model_family(model_family); + turn_context.client.update_models_etag(models_etag); +} + #[derive(Debug)] struct TurnRunResult { needs_follow_up: bool, From 1a5289a4eff274945f0ab4f2760fa367b6a65270 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 20:32:51 -0800 Subject: [PATCH 09/14] test --- codex-rs/core/src/client.rs | 47 +++------- codex-rs/core/src/client_common.rs | 25 ++++++ codex-rs/core/src/codex.rs | 100 ++++++++------------- codex-rs/core/tests/suite/remote_models.rs | 11 +++ 4 files changed, 83 insertions(+), 100 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index c929e8e8610..bd22ab2e565 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::sync::RwLock; use crate::api_bridge::auth_provider_from_auth; use crate::api_bridge::map_api_error; @@ -58,8 +57,8 @@ use crate::tools::spec::create_tools_json_for_responses_api; pub struct ModelClient { config: Arc, auth_manager: Option>, - model_family: RwLock, - models_etag: RwLock>, + model_family: ModelFamily, + models_etag: Option, otel_manager: OtelManager, provider: ModelProviderInfo, conversation_id: ConversationId, @@ -85,8 +84,8 @@ impl ModelClient { Self { config, auth_manager, - model_family: RwLock::new(model_family), - models_etag: RwLock::new(models_etag), + model_family, + models_etag, otel_manager, provider, conversation_id, @@ -112,22 +111,6 @@ impl ModelClient { &self.provider } - pub fn update_models_etag(&self, models_etag: Option) { - let mut guard = self - .models_etag - .write() - .unwrap_or_else(std::sync::PoisonError::into_inner); - *guard = models_etag; - } - - pub fn update_model_family(&self, model_family: ModelFamily) { - let mut guard = self - .model_family - .write() - .unwrap_or_else(std::sync::PoisonError::into_inner); - *guard = model_family; - } - /// Streams a single model turn using either the Responses or Chat /// Completions wire API, depending on the configured provider. /// @@ -167,7 +150,7 @@ impl ModelClient { let auth_manager = self.auth_manager.clone(); let model_family = self.get_model_family(); - let instructions = prompt.get_full_instructions(&model_family).into_owned(); + let instructions = prompt.get_full_instructions(model_family).into_owned(); let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?; let api_prompt = build_api_prompt(prompt, instructions, tools_json); let conversation_id = self.conversation_id.to_string(); @@ -221,7 +204,7 @@ impl ModelClient { let auth_manager = self.auth_manager.clone(); let model_family = self.get_model_family(); - let instructions = prompt.get_full_instructions(&model_family).into_owned(); + let instructions = prompt.get_full_instructions(model_family).into_owned(); let tools_json: Vec = create_tools_json_for_responses_api(&prompt.tools)?; let reasoning = if model_family.supports_reasoning_summaries { @@ -282,7 +265,7 @@ impl ModelClient { store_override: None, conversation_id: Some(conversation_id.clone()), session_source: Some(session_source.clone()), - extra_headers: beta_feature_headers(&self.config, self.get_models_etag()), + extra_headers: beta_feature_headers(&self.config, self.get_models_etag().clone()), }; let stream_result = client @@ -322,18 +305,12 @@ impl ModelClient { } /// Returns the currently configured model family. - pub fn get_model_family(&self) -> ModelFamily { - self.model_family - .read() - .map(|model_family| model_family.clone()) - .unwrap_or_else(|err| err.into_inner().clone()) + pub fn get_model_family(&self) -> &ModelFamily { + &self.model_family } - fn get_models_etag(&self) -> Option { - self.models_etag - .read() - .map(|models_etag| models_etag.clone()) - .unwrap_or_else(|err| err.into_inner().clone()) + fn get_models_etag(&self) -> &Option { + &self.models_etag } /// Returns the current reasoning effort setting. @@ -370,7 +347,7 @@ impl ModelClient { .with_telemetry(Some(request_telemetry)); let instructions = prompt - .get_full_instructions(&self.get_model_family()) + .get_full_instructions(self.get_model_family()) .into_owned(); let payload = ApiCompactionInput { model: &self.get_model(), diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index 4a3bc8de235..ca8a142a3a8 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -1,6 +1,10 @@ use crate::client_common::tools::ToolSpec; +use crate::codex::Session; +use crate::codex::TurnContext; use crate::error::Result; +use crate::features::Feature; use crate::openai_models::model_family::ModelFamily; +use crate::tools::ToolRouter; pub use codex_api::common::ResponseEvent; use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS; use codex_protocol::models::ResponseItem; @@ -44,6 +48,27 @@ pub struct Prompt { } impl Prompt { + pub(crate) fn new( + sess: &Session, + turn_context: &TurnContext, + router: &ToolRouter, + input: &[ResponseItem], + ) -> Prompt { + let model_supports_parallel = turn_context + .client + .get_model_family() + .supports_parallel_tool_calls; + + Prompt { + input: input.to_vec(), + tools: router.specs(), + parallel_tool_calls: model_supports_parallel + && sess.enabled(Feature::ParallelToolCalls), + base_instructions_override: turn_context.base_instructions.clone(), + output_schema: turn_context.final_output_json_schema.clone(), + } + } + pub(crate) fn get_full_instructions<'a>(&'a self, model: &'a ModelFamily) -> Cow<'a, str> { let base = self .base_instructions_override diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b0b789da4bb..cbe80df26b3 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2226,7 +2226,7 @@ fn errors_to_info(errors: &[SkillError]) -> Vec { /// pub(crate) async fn run_task( sess: Arc, - turn_context: Arc, + mut turn_context: Arc, input: Vec, cancellation_token: CancellationToken, ) -> Option { @@ -2309,7 +2309,7 @@ pub(crate) async fn run_task( .collect::>(); match run_turn( Arc::clone(&sess), - Arc::clone(&turn_context), + &mut turn_context, Arc::clone(&turn_diff_tracker), turn_input, cancellation_token.child_token(), @@ -2386,7 +2386,7 @@ async fn run_auto_compact(sess: &Arc, turn_context: &Arc) )] async fn run_turn( sess: Arc, - turn_context: Arc, + turn_context: &mut Arc, turn_diff_tracker: SharedTurnDiffTracker, input: Vec, cancellation_token: CancellationToken, @@ -2399,19 +2399,20 @@ async fn run_turn( .list_all_tools() .or_cancel(&cancellation_token) .await?; - let router = Arc::new(ToolRouter::from_config( - &turn_context.tools_config, - Some( - mcp_tools - .into_iter() - .map(|(name, tool)| (name, tool.tool)) - .collect(), - ), - )); let mut retries = 0; loop { - let prompt = build_prompt( + let router = Arc::new(ToolRouter::from_config( + &turn_context.tools_config, + Some( + mcp_tools + .clone() + .into_iter() + .map(|(name, tool)| (name, tool.tool)) + .collect(), + ), + )); + let prompt = Prompt::new( sess.as_ref(), turn_context.as_ref(), router.as_ref(), @@ -2421,7 +2422,7 @@ async fn run_turn( match try_run_turn( Arc::clone(&router), Arc::clone(&sess), - Arc::clone(&turn_context), + Arc::clone(turn_context), Arc::clone(&turn_diff_tracker), &prompt, cancellation_token.child_token(), @@ -2437,13 +2438,13 @@ async fn run_turn( Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), Err(e @ CodexErr::Fatal(_)) => return Err(e), Err(e @ CodexErr::ContextWindowExceeded) => { - sess.set_total_tokens_full(&turn_context).await; + sess.set_total_tokens_full(turn_context).await; return Err(e); } Err(CodexErr::UsageLimitReached(e)) => { let rate_limits = e.rate_limits.clone(); if let Some(rate_limits) = rate_limits { - sess.update_rate_limits(&turn_context, rate_limits).await; + sess.update_rate_limits(turn_context, rate_limits).await; } return Err(CodexErr::UsageLimitReached(e)); } @@ -2459,7 +2460,23 @@ async fn run_turn( retries += 1; // Refresh models if we got an outdated models error if matches!(e, CodexErr::OutdatedModels) { - refresh_models_after_outdated_error(sess.as_ref(), turn_context.as_ref()) + let config = { + let state = sess.state.lock().await; + state + .session_configuration + .original_config_do_not_use + .clone() + }; + if let Err(err) = sess + .services + .models_manager + .refresh_available_models(&config) + .await + { + error!("failed to refresh models after outdated models error: {err}"); + } + *turn_context = sess + .new_default_turn_with_sub_id(turn_context.sub_id.clone()) .await; } let delay = match e { @@ -2474,7 +2491,7 @@ async fn run_turn( // user understands what is happening instead of staring // at a seemingly frozen screen. sess.notify_stream_error( - &turn_context, + turn_context, format!("Reconnecting... {retries}/{max_retries}"), e, ) @@ -2489,53 +2506,6 @@ async fn run_turn( } } -fn build_prompt( - sess: &Session, - turn_context: &TurnContext, - router: &ToolRouter, - input: &[ResponseItem], -) -> Prompt { - let model_supports_parallel = turn_context - .client - .get_model_family() - .supports_parallel_tool_calls; - - Prompt { - input: input.to_vec(), - tools: router.specs(), - parallel_tool_calls: model_supports_parallel && sess.enabled(Feature::ParallelToolCalls), - base_instructions_override: turn_context.base_instructions.clone(), - output_schema: turn_context.final_output_json_schema.clone(), - } -} - -async fn refresh_models_after_outdated_error(sess: &Session, turn_context: &TurnContext) { - let config = { - let state = sess.state.lock().await; - state - .session_configuration - .original_config_do_not_use - .clone() - }; - if let Err(err) = sess - .services - .models_manager - .refresh_available_models(&config) - .await - { - error!("failed to refresh models after outdated models error: {err}"); - } - let models_etag = sess.services.models_manager.get_models_etag().await; - let model = turn_context.client.get_model(); - let model_family = sess - .services - .models_manager - .construct_model_family(&model, &config) - .await; - turn_context.client.update_model_family(model_family); - turn_context.client.update_models_etag(models_etag); -} - #[derive(Debug)] struct TurnRunResult { needs_follow_up: bool, diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index d54405fe6dc..b03f3ce09ff 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -530,6 +530,17 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { }) .await?; + let stream_error = + wait_for_event(&codex, |event| matches!(event, EventMsg::StreamError(_))).await; + let EventMsg::StreamError(stream_error) = stream_error else { + unreachable!(); + }; + assert!( + stream_error.message.starts_with("Reconnecting..."), + "unexpected stream error message: {}", + stream_error.message + ); + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; // Phase 3c: assert the refresh happened and the ETag was updated. From 27cec53ddc67a994a68732f9f3e9b3d9cc531859 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 21:44:02 -0800 Subject: [PATCH 10/14] error --- codex-rs/core/src/codex.rs | 44 +++++++++++++--------- codex-rs/core/tests/suite/remote_models.rs | 11 ------ 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index cbe80df26b3..d4a87d46f5a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2368,6 +2368,30 @@ pub(crate) async fn run_task( last_agent_message } +async fn refresh_models_and_reset_turn_context( + sess: &Arc, + turn_context: &mut Arc, +) { + let config = { + let state = sess.state.lock().await; + state + .session_configuration + .original_config_do_not_use + .clone() + }; + if let Err(err) = sess + .services + .models_manager + .refresh_available_models(&config) + .await + { + error!("failed to refresh models after outdated models error: {err}"); + } + *turn_context = sess + .new_default_turn_with_sub_id(turn_context.sub_id.clone()) + .await; +} + async fn run_auto_compact(sess: &Arc, turn_context: &Arc) { if should_use_remote_compact_task(sess.as_ref(), &turn_context.client.get_provider()) { run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await; @@ -2460,24 +2484,8 @@ async fn run_turn( retries += 1; // Refresh models if we got an outdated models error if matches!(e, CodexErr::OutdatedModels) { - let config = { - let state = sess.state.lock().await; - state - .session_configuration - .original_config_do_not_use - .clone() - }; - if let Err(err) = sess - .services - .models_manager - .refresh_available_models(&config) - .await - { - error!("failed to refresh models after outdated models error: {err}"); - } - *turn_context = sess - .new_default_turn_with_sub_id(turn_context.sub_id.clone()) - .await; + refresh_models_and_reset_turn_context(&sess, turn_context).await; + continue; } let delay = match e { CodexErr::Stream(_, Some(delay)) => delay, diff --git a/codex-rs/core/tests/suite/remote_models.rs b/codex-rs/core/tests/suite/remote_models.rs index b03f3ce09ff..d54405fe6dc 100644 --- a/codex-rs/core/tests/suite/remote_models.rs +++ b/codex-rs/core/tests/suite/remote_models.rs @@ -530,17 +530,6 @@ async fn remote_models_refresh_etag_after_outdated_models() -> Result<()> { }) .await?; - let stream_error = - wait_for_event(&codex, |event| matches!(event, EventMsg::StreamError(_))).await; - let EventMsg::StreamError(stream_error) = stream_error else { - unreachable!(); - }; - assert!( - stream_error.message.starts_with("Reconnecting..."), - "unexpected stream error message: {}", - stream_error.message - ); - wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; // Phase 3c: assert the refresh happened and the ETag was updated. From 6912ba9fda24610d6dc80d78f181160cbb3e6890 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 22:06:04 -0800 Subject: [PATCH 11/14] final_output_json_schema --- codex-rs/core/src/codex.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d4a87d46f5a..d9e5974cbb5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2387,8 +2387,14 @@ async fn refresh_models_and_reset_turn_context( { error!("failed to refresh models after outdated models error: {err}"); } + let session_configuration = sess.state.lock().await.session_configuration.clone(); *turn_context = sess - .new_default_turn_with_sub_id(turn_context.sub_id.clone()) + .new_turn_from_configuration( + turn_context.sub_id.clone(), + session_configuration, + Some(turn_context.final_output_json_schema.clone()), + false, + ) .await; } From 348d37950949ede1fc2eba14debfd40bb545074b Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 22:20:35 -0800 Subject: [PATCH 12/14] rwlock --- codex-rs/core/src/client.rs | 54 ++++++++++++-------- codex-rs/core/src/client_common.rs | 3 +- codex-rs/core/src/codex.rs | 46 +++++++++-------- codex-rs/core/src/compact.rs | 4 +- codex-rs/core/src/compact_remote.rs | 2 +- codex-rs/core/src/context_manager/history.rs | 4 +- codex-rs/core/src/tasks/user_shell.rs | 2 +- 7 files changed, 65 insertions(+), 50 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index bd22ab2e565..3426da78c17 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -33,6 +33,7 @@ use http::StatusCode as HttpStatusCode; use reqwest::StatusCode; use serde_json::Value; use std::time::Duration; +use tokio::sync::RwLock; use tokio::sync::mpsc; use tracing::warn; @@ -57,8 +58,8 @@ use crate::tools::spec::create_tools_json_for_responses_api; pub struct ModelClient { config: Arc, auth_manager: Option>, - model_family: ModelFamily, - models_etag: Option, + model_family: RwLock, + models_etag: RwLock>, otel_manager: OtelManager, provider: ModelProviderInfo, conversation_id: ConversationId, @@ -84,8 +85,8 @@ impl ModelClient { Self { config, auth_manager, - model_family, - models_etag, + model_family: RwLock::new(model_family), + models_etag: RwLock::new(models_etag), otel_manager, provider, conversation_id, @@ -95,8 +96,8 @@ impl ModelClient { } } - pub fn get_model_context_window(&self) -> Option { - let model_family = self.get_model_family(); + pub async fn get_model_context_window(&self) -> Option { + let model_family = self.get_model_family().await; let effective_context_window_percent = model_family.effective_context_window_percent; model_family .context_window @@ -149,8 +150,8 @@ impl ModelClient { } let auth_manager = self.auth_manager.clone(); - let model_family = self.get_model_family(); - let instructions = prompt.get_full_instructions(model_family).into_owned(); + let model_family = self.get_model_family().await; + let instructions = prompt.get_full_instructions(&model_family).into_owned(); let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?; let api_prompt = build_api_prompt(prompt, instructions, tools_json); let conversation_id = self.conversation_id.to_string(); @@ -170,7 +171,7 @@ impl ModelClient { let stream_result = client .stream_prompt( - &self.get_model(), + &self.get_model().await, &api_prompt, Some(conversation_id.clone()), Some(session_source.clone()), @@ -203,8 +204,8 @@ impl ModelClient { } let auth_manager = self.auth_manager.clone(); - let model_family = self.get_model_family(); - let instructions = prompt.get_full_instructions(model_family).into_owned(); + let model_family = self.get_model_family().await; + let instructions = prompt.get_full_instructions(&model_family).into_owned(); let tools_json: Vec = create_tools_json_for_responses_api(&prompt.tools)?; let reasoning = if model_family.supports_reasoning_summaries { @@ -265,11 +266,14 @@ impl ModelClient { store_override: None, conversation_id: Some(conversation_id.clone()), session_source: Some(session_source.clone()), - extra_headers: beta_feature_headers(&self.config, self.get_models_etag().clone()), + extra_headers: beta_feature_headers( + &self.config, + self.get_models_etag().await.clone(), + ), }; let stream_result = client - .stream_prompt(&self.get_model(), &api_prompt, options) + .stream_prompt(&self.get_model().await, &api_prompt, options) .await; match stream_result { @@ -300,17 +304,25 @@ impl ModelClient { } /// Returns the currently configured model slug. - pub fn get_model(&self) -> String { - self.get_model_family().get_model_slug().to_string() + pub async fn get_model(&self) -> String { + self.get_model_family().await.get_model_slug().to_string() } /// Returns the currently configured model family. - pub fn get_model_family(&self) -> &ModelFamily { - &self.model_family + pub async fn get_model_family(&self) -> ModelFamily { + self.model_family.read().await.clone() } - fn get_models_etag(&self) -> &Option { - &self.models_etag + pub async fn get_models_etag(&self) -> Option { + self.models_etag.read().await.clone() + } + + pub async fn update_models_etag(&self, etag: Option) { + *self.models_etag.write().await = etag; + } + + pub async fn update_model_family(&self, model_family: ModelFamily) { + *self.model_family.write().await = model_family; } /// Returns the current reasoning effort setting. @@ -347,10 +359,10 @@ impl ModelClient { .with_telemetry(Some(request_telemetry)); let instructions = prompt - .get_full_instructions(self.get_model_family()) + .get_full_instructions(&self.get_model_family().await) .into_owned(); let payload = ApiCompactionInput { - model: &self.get_model(), + model: &self.get_model().await, input: &prompt.input, instructions: &instructions, }; diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index ca8a142a3a8..db1a7333d18 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -48,7 +48,7 @@ pub struct Prompt { } impl Prompt { - pub(crate) fn new( + pub(crate) async fn new( sess: &Session, turn_context: &TurnContext, router: &ToolRouter, @@ -57,6 +57,7 @@ impl Prompt { let model_supports_parallel = turn_context .client .get_model_family() + .await .supports_parallel_tool_calls; Prompt { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d9e5974cbb5..90e6a0b2c12 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -790,7 +790,7 @@ impl Session { } }) { - let curr = turn_context.client.get_model(); + let curr = turn_context.client.get_model().await; if prev != curr { warn!( "resuming session with different model: previous={prev}, current={curr}" @@ -1338,7 +1338,7 @@ impl Session { if let Some(token_usage) = token_usage { state.update_token_info_from_usage( token_usage, - turn_context.client.get_model_context_window(), + turn_context.client.get_model_context_window().await, ); } } @@ -1350,6 +1350,7 @@ impl Session { .clone_history() .await .estimate_token_count(turn_context) + .await else { return; }; @@ -1370,7 +1371,7 @@ impl Session { }; if info.model_context_window.is_none() { - info.model_context_window = turn_context.client.get_model_context_window(); + info.model_context_window = turn_context.client.get_model_context_window().await; } state.set_token_info(Some(info)); @@ -1400,7 +1401,7 @@ impl Session { } pub(crate) async fn set_total_tokens_full(&self, turn_context: &TurnContext) { - let context_window = turn_context.client.get_model_context_window(); + let context_window = turn_context.client.get_model_context_window().await; if let Some(context_window) = context_window { { let mut state = self.state.lock().await; @@ -2226,7 +2227,7 @@ fn errors_to_info(errors: &[SkillError]) -> Vec { /// pub(crate) async fn run_task( sess: Arc, - mut turn_context: Arc, + turn_context: Arc, input: Vec, cancellation_token: CancellationToken, ) -> Option { @@ -2237,6 +2238,7 @@ pub(crate) async fn run_task( let auto_compact_limit = turn_context .client .get_model_family() + .await .auto_compact_token_limit() .unwrap_or(i64::MAX); let total_usage_tokens = sess.get_total_token_usage().await; @@ -2244,7 +2246,7 @@ pub(crate) async fn run_task( run_auto_compact(&sess, &turn_context).await; } let event = EventMsg::TaskStarted(TaskStartedEvent { - model_context_window: turn_context.client.get_model_context_window(), + model_context_window: turn_context.client.get_model_context_window().await, }); sess.send_event(&turn_context, event).await; @@ -2309,7 +2311,7 @@ pub(crate) async fn run_task( .collect::>(); match run_turn( Arc::clone(&sess), - &mut turn_context, + &turn_context, Arc::clone(&turn_diff_tracker), turn_input, cancellation_token.child_token(), @@ -2370,7 +2372,7 @@ pub(crate) async fn run_task( async fn refresh_models_and_reset_turn_context( sess: &Arc, - turn_context: &mut Arc, + turn_context: &Arc, ) { let config = { let state = sess.state.lock().await; @@ -2387,15 +2389,15 @@ async fn refresh_models_and_reset_turn_context( { error!("failed to refresh models after outdated models error: {err}"); } - let session_configuration = sess.state.lock().await.session_configuration.clone(); - *turn_context = sess - .new_turn_from_configuration( - turn_context.sub_id.clone(), - session_configuration, - Some(turn_context.final_output_json_schema.clone()), - false, - ) + let model = turn_context.client.get_model().await; + let model_family = sess + .services + .models_manager + .construct_model_family(&model, &config) .await; + let models_etag = sess.services.models_manager.get_models_etag().await; + turn_context.client.update_model_family(model_family).await; + turn_context.client.update_models_etag(models_etag).await; } async fn run_auto_compact(sess: &Arc, turn_context: &Arc) { @@ -2410,13 +2412,13 @@ async fn run_auto_compact(sess: &Arc, turn_context: &Arc) skip_all, fields( turn_id = %turn_context.sub_id, - model = %turn_context.client.get_model(), + model = %turn_context.client.get_model().await, cwd = %turn_context.cwd.display() ) )] async fn run_turn( sess: Arc, - turn_context: &mut Arc, + turn_context: &Arc, turn_diff_tracker: SharedTurnDiffTracker, input: Vec, cancellation_token: CancellationToken, @@ -2454,7 +2456,7 @@ async fn run_turn( Arc::clone(&sess), Arc::clone(turn_context), Arc::clone(&turn_diff_tracker), - &prompt, + &prompt.await, cancellation_token.child_token(), ) .await @@ -2490,7 +2492,7 @@ async fn run_turn( retries += 1; // Refresh models if we got an outdated models error if matches!(e, CodexErr::OutdatedModels) { - refresh_models_and_reset_turn_context(&sess, turn_context).await; + refresh_models_and_reset_turn_context(&sess, &turn_context).await; continue; } let delay = match e { @@ -2550,7 +2552,7 @@ async fn drain_in_flight( skip_all, fields( turn_id = %turn_context.sub_id, - model = %turn_context.client.get_model() + model = %turn_context.client.get_model().await, ) )] async fn try_run_turn( @@ -2565,7 +2567,7 @@ async fn try_run_turn( cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), - model: turn_context.client.get_model(), + model: turn_context.client.get_model().await, effort: turn_context.client.get_reasoning_effort(), summary: turn_context.client.get_reasoning_summary(), }); diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 3290d1b321b..3ccbfb61284 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -55,7 +55,7 @@ pub(crate) async fn run_compact_task( input: Vec, ) { let start_event = EventMsg::TaskStarted(TaskStartedEvent { - model_context_window: turn_context.client.get_model_context_window(), + model_context_window: turn_context.client.get_model_context_window().await, }); sess.send_event(&turn_context, start_event).await; run_compact_task_inner(sess.clone(), turn_context, input).await; @@ -83,7 +83,7 @@ async fn run_compact_task_inner( cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), - model: turn_context.client.get_model(), + model: turn_context.client.get_model().await, effort: turn_context.client.get_reasoning_effort(), summary: turn_context.client.get_reasoning_summary(), }); diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index b855f28d39d..3419b2e51fd 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -20,7 +20,7 @@ pub(crate) async fn run_inline_remote_auto_compact_task( pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Arc) { let start_event = EventMsg::TaskStarted(TaskStartedEvent { - model_context_window: turn_context.client.get_model_context_window(), + model_context_window: turn_context.client.get_model_context_window().await, }); sess.send_event(&turn_context, start_event).await; diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index c18ad7df8ec..841be4bae58 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -79,8 +79,8 @@ impl ContextManager { // Estimate token usage using byte-based heuristics from the truncation helpers. // This is a coarse lower bound, not a tokenizer-accurate count. - pub(crate) fn estimate_token_count(&self, turn_context: &TurnContext) -> Option { - let model_family = turn_context.client.get_model_family(); + pub(crate) async fn estimate_token_count(&self, turn_context: &TurnContext) -> Option { + let model_family = turn_context.client.get_model_family().await; let base_tokens = i64::try_from(approx_token_count(model_family.base_instructions.as_str())) .unwrap_or(i64::MAX); diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index aec09514ca3..b053020bdb9 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -59,7 +59,7 @@ impl SessionTask for UserShellCommandTask { cancellation_token: CancellationToken, ) -> Option { let event = EventMsg::TaskStarted(TaskStartedEvent { - model_context_window: turn_context.client.get_model_context_window(), + model_context_window: turn_context.client.get_model_context_window().await, }); let session = session.clone_session(); session.send_event(turn_context.as_ref(), event).await; From 0a1323747b5b54e0aad0bd4b626faec6e1c99269 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 22:38:40 -0800 Subject: [PATCH 13/14] rwlock --- codex-rs/core/src/codex.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 90e6a0b2c12..4ca56685701 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2412,7 +2412,7 @@ async fn run_auto_compact(sess: &Arc, turn_context: &Arc) skip_all, fields( turn_id = %turn_context.sub_id, - model = %turn_context.client.get_model().await, + model = tracing::field::Empty, cwd = %turn_context.cwd.display() ) )] @@ -2423,6 +2423,8 @@ async fn run_turn( input: Vec, cancellation_token: CancellationToken, ) -> CodexResult { + let model = turn_context.client.get_model().await; + tracing::Span::current().record("model", field::display(&model)); let mcp_tools = sess .services .mcp_connection_manager @@ -2492,7 +2494,7 @@ async fn run_turn( retries += 1; // Refresh models if we got an outdated models error if matches!(e, CodexErr::OutdatedModels) { - refresh_models_and_reset_turn_context(&sess, &turn_context).await; + refresh_models_and_reset_turn_context(&sess, turn_context).await; continue; } let delay = match e { @@ -2552,7 +2554,7 @@ async fn drain_in_flight( skip_all, fields( turn_id = %turn_context.sub_id, - model = %turn_context.client.get_model().await, + model = tracing::field::Empty, ) )] async fn try_run_turn( @@ -2563,11 +2565,13 @@ async fn try_run_turn( prompt: &Prompt, cancellation_token: CancellationToken, ) -> CodexResult { + let model = turn_context.client.get_model().await; + tracing::Span::current().record("model", field::display(&model)); let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), - model: turn_context.client.get_model().await, + model, effort: turn_context.client.get_reasoning_effort(), summary: turn_context.client.get_reasoning_summary(), }); From 124c7fc2bec6bf27aadd5f3e457676c7ef2db71e Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 18 Dec 2025 22:56:49 -0800 Subject: [PATCH 14/14] compact --- codex-rs/core/src/codex.rs | 2 +- codex-rs/core/src/compact.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 4ca56685701..0c6b5739894 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2370,7 +2370,7 @@ pub(crate) async fn run_task( last_agent_message } -async fn refresh_models_and_reset_turn_context( +pub(crate) async fn refresh_models_and_reset_turn_context( sess: &Arc, turn_context: &Arc, ) { diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 3ccbfb61284..a864774d69c 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -6,6 +6,7 @@ use crate::client_common::ResponseEvent; use crate::codex::Session; use crate::codex::TurnContext; use crate::codex::get_last_assistant_message_from_turn; +use crate::codex::refresh_models_and_reset_turn_context; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::features::Feature; @@ -132,6 +133,10 @@ async fn run_compact_task_inner( Err(e) => { if retries < max_retries { retries += 1; + if matches!(e, CodexErr::OutdatedModels) { + refresh_models_and_reset_turn_context(&sess, &turn_context).await; + continue; + } let delay = backoff(retries); sess.notify_stream_error( turn_context.as_ref(),