Skip to content

[scheduler] Distributed Scheduler Orchestrator#2192

Draft
DiegoTavares wants to merge 9 commits intoAcademySoftwareFoundation:masterfrom
DiegoTavares:orchestrator
Draft

[scheduler] Distributed Scheduler Orchestrator#2192
DiegoTavares wants to merge 9 commits intoAcademySoftwareFoundation:masterfrom
DiegoTavares:orchestrator

Conversation

@DiegoTavares
Copy link
Copy Markdown
Collaborator

@DiegoTavares DiegoTavares commented Mar 4, 2026

TBD

Summary by CodeRabbit

Release Notes

  • New Features

    • Added orchestrated scheduling mode with --orchestrated flag for distributed cluster management across multiple scheduler instances.
    • Introduced automatic leader election and cluster distribution for improved load balancing in distributed environments.
  • Chores

    • Added orchestrator configuration options for heartbeat intervals, election cycles, failure thresholds, and instance capacity.
    • Added Prometheus metrics to monitor orchestrator cluster assignments, leader status, and instance health.

The orchestrator needs to expire assignments to allow rebalancing of the clusters when new instances
join the system.

Change cluster_id logic to account for changing cluster tags without triggering a cluster to be
reassigned. The new logic computes a cluster_id with facility_id:show_id:type:tag for alloc clusters
and facility_id:show_id:type:chunk_index for chuncked tags (manual, hostname and hardware).

Entire-Checkpoint: d04a08f3d901
Signed-off-by: Diego Tavares <dtavares@imageworks.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 7, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6d140c6d-8ae3-4224-9808-90ec10600d32

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: Diego Tavares <dtavares@imageworks.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (6)
.gitignore (1)

49-49: Consider anchoring the ignore rule to repository root.

Line 49 works, but /.claude/settings.json is more precise if you only intend to ignore the root-level file (and not any nested .claude/settings.json path).

Diff
-.claude/settings.json
+/.claude/settings.json
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.gitignore at line 49, Replace the unanchored ignore entry
".claude/settings.json" with a rooted pattern "/.claude/settings.json" in the
.gitignore so only the repository-root settings file is ignored (preventing
accidental ignoring of identically named files in subdirectories); update the
existing ".claude/settings.json" line to "/.claude/settings.json".
rust/crates/scheduler/src/orchestrator/distributor.rs (1)

209-242: Minor timestamp inconsistency in compute_rates.

Line 213 captures now at the start, but line 235 uses Instant::now() again when storing the new snapshot. This means timestamp in the snapshot could be slightly later than the now used for delta calculation. While negligible in practice (sub-millisecond), using the same now would be more precise.

♻️ Use consistent timestamp
             // Update snapshots for next cycle
             self.previous_snapshots.insert(
                 id,
                 RateSnapshot {
                     jobs_queried: instance.float_jobs_queried,
-                    timestamp: Instant::now(),
+                    timestamp: now,
                 },
             );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/orchestrator/distributor.rs` around lines 209 -
242, The compute_rates function captures now at the start but writes a new
Instant::now() into RateSnapshot, causing a tiny inconsistency; update the
snapshot insertion to use the same now variable (the one used for delta_secs)
instead of calling Instant::now() again so previous_snapshots,
RateSnapshot.timestamp, and the delta calculation all use a single consistent
timestamp; modify the insert for self.previous_snapshots in compute_rates to
store RateSnapshot { jobs_queried: instance.float_jobs_queried, timestamp: now
}.
rust/crates/scheduler/src/orchestrator/mod.rs (2)

76-84: Comment mentions SIGTERM but only SIGINT (ctrl_c) is handled.

The comment on line 76 mentions "SIGTERM/SIGINT handler" but only ctrl_c() (SIGINT) is actually registered. If SIGTERM handling is needed for container orchestration (e.g., Kubernetes), consider adding explicit SIGTERM support.

♻️ Optional: Add SIGTERM handling for container environments
     // 6. Set up SIGTERM/SIGINT handler for graceful shutdown
     let shutdown_tx_clone = shutdown_tx.clone();
     let shutdown_handle = tokio::spawn(async move {
-        tokio::signal::ctrl_c()
-            .await
-            .expect("Failed to listen for ctrl-c");
-        info!("Received shutdown signal");
+        #[cfg(unix)]
+        {
+            use tokio::signal::unix::{signal, SignalKind};
+            let mut sigterm = signal(SignalKind::terminate())
+                .expect("Failed to register SIGTERM handler");
+            tokio::select! {
+                _ = tokio::signal::ctrl_c() => {
+                    info!("Received SIGINT");
+                }
+                _ = sigterm.recv() => {
+                    info!("Received SIGTERM");
+                }
+            }
+        }
+        #[cfg(not(unix))]
+        {
+            tokio::signal::ctrl_c()
+                .await
+                .expect("Failed to listen for ctrl-c");
+            info!("Received shutdown signal");
+        }
         let _ = shutdown_tx_clone.send(true);
     });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/orchestrator/mod.rs` around lines 76 - 84, The
comment says SIGTERM/SIGINT but the code only registers tokio::signal::ctrl_c();
update the shutdown task (the async block spawned into shutdown_handle) to also
listen for SIGTERM on Unix (e.g., using tokio::signal::unix::signal or
equivalent) and use tokio::select! to await either ctrl_c() or the SIGTERM
stream, then log and send on shutdown_tx_clone as currently done; keep the
existing send(true) behavior so Shutdown handling (shutdown_tx,
shutdown_tx_clone, shutdown_handle) works for both signals.

93-100: Timeout awaits background tasks but doesn't log if timeout is exceeded.

When the timeout fires, there's no indication whether the tasks completed gracefully or were abandoned. Consider logging when the timeout is hit to aid debugging slow shutdowns.

♻️ Optional: Log timeout expiration
     // Wait for background tasks with a timeout
     let timeout = CONFIG.orchestrator.shutdown_timeout;
-    let _ = tokio::time::timeout(timeout, async {
+    let shutdown_result = tokio::time::timeout(timeout, async {
         let _ = heartbeat_handle.await;
         let _ = leader_handle.await;
         let _ = sync_handle.await;
     })
     .await;
+    if shutdown_result.is_err() {
+        warn!("Shutdown timeout exceeded, some background tasks may not have completed");
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/orchestrator/mod.rs` around lines 93 - 100, The
shutdown timeout call using tokio::time::timeout around awaiting
heartbeat_handle, leader_handle, and sync_handle currently discards the result;
change it to inspect the outcome of timeout(...).await and log when the timeout
elapses (i.e., when the result is Err). Specifically, after calling
tokio::time::timeout(timeout, async { let _ = heartbeat_handle.await; let _ =
leader_handle.await; let _ = sync_handle.await; }).await, match the Result and
emit a warning/error via the project logger/tracing (include
CONFIG.orchestrator.shutdown_timeout value in the message) so it’s clear the
shutdown timed out and which background tasks (heartbeat_handle, leader_handle,
sync_handle) may have been abandoned.
rust/crates/scheduler/src/metrics/mod.rs (1)

157-157: Log message doesn't mention the new /health endpoint.

The info log only mentions /metrics but the server now also serves /health.

♻️ Update log message
-    info!("Metrics server listening on http://{}/metrics", addr);
+    info!("Metrics server listening on http://{} (endpoints: /metrics, /health)", addr);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/metrics/mod.rs` at line 157, The info log in
metrics::mod (the info! call that currently prints "Metrics server listening on
http://{}/metrics") should be updated to include the new /health endpoint;
modify the info! invocation so it mentions both endpoints (e.g., "/metrics and
/health") and still interpolates the addr variable so the log clearly shows both
routes and the listening address.
rust/crates/scheduler/src/orchestrator/sync.rs (1)

58-103: Consider adding backoff on repeated database errors.

When dao.get_assignments_for_instance fails (lines 89-95), the loop immediately retries on the next tick without any backoff. If the database is temporarily unavailable, this could generate excessive error logs and unnecessary load.

♻️ Optional: Add simple error backoff
         tokio::spawn(async move {
             let mut ticker = tokio::time::interval(poll_interval);
+            let mut consecutive_errors = 0u32;

             loop {
                 tokio::select! {
                     _ = ticker.tick() => {
+                        // Skip ticks during backoff
+                        if consecutive_errors > 0 {
+                            let backoff = std::cmp::min(consecutive_errors, 6); // max ~64s
+                            if consecutive_errors % (1 << backoff) != 0 {
+                                continue;
+                            }
+                        }
+
                         match dao.get_assignments_for_instance(instance_id).await {
                             Ok(assignments) => {
+                                consecutive_errors = 0;
                                 // ... existing code ...
                             }
                             Err(e) => {
+                                consecutive_errors = consecutive_errors.saturating_add(1);
                                 error!(
                                     instance_id = %instance_id,
                                     "Failed to poll cluster assignments: {}",
                                     e
                                 );
                             }
                         }
                     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/orchestrator/sync.rs` around lines 58 - 103, The
polling loop retries immediately after dao.get_assignments_for_instance fails,
which can spam errors and DB load; add a simple backoff by tracking consecutive
failures (e.g., consecutive_db_errors) and when Err(e) increment it and compute
a backoff duration (capped exponential, e.g., min(base * 2^n, max)), then await
a tokio::time::sleep(backoff) before continuing while still selecting on
shutdown.changed; on success reset the counter to zero. Update the block around
ticker.tick handling (where dao.get_assignments_for_instance is called) and use
tokio::select with the sleep future and shutdown.changed so shutdown still
aborts promptly; keep cluster_feed.update_clusters and
crate::metrics::set_orchestrator_assigned_clusters logic unchanged on success.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rust/crates/scheduler/src/cluster.rs`:
- Around line 247-250: The write that replaces the vector in self.clusters must
also reset self.current_index while still holding the write lock to avoid
exposing a stale index; move the current_index.store(0, Ordering::Relaxed) into
the same write-lock block that assigns *clusters = new_clusters; additionally,
add a defensive clamp in stream() where it loads current_index (e.g., load with
current_index.load(...)) to bound the index to clusters.len().saturating_sub(1)
before indexing so a concurrent read cannot panic if a race occurs.
- Line 116: The sleep_map currently keys by the full Cluster struct which causes
equivalent clusters with the same stable id to miss matches; change sleep_map:
Mutex<HashMap<Cluster, SystemTime>> to use the cluster's stable identifier type
(e.g., Mutex<HashMap<ClusterId, SystemTime>> or Mutex<HashMap<String,
SystemTime>> depending on Cluster::id type) and update all accesses (insertion,
lookup, removal/retention code paths that reference sleep_map, including where
you currently index with a Cluster) to use cluster.id (or clone/borrow the id)
as the key; ensure the ClusterId type implements Eq+Hash (or convert id to a
hashable primitive) so existing logic that checks/retains sleep entries will
correctly match tag-only refreshes.
- Around line 91-100: from_tags currently accepts empty or mixed-type Tag lists
and builds an ambiguous ID by using the first tag's type and joining all names;
change from_tags to validate inputs up-front: if tags.is_empty() return an Err
(or panic if API policy dictates), and ensure all tags have the same ttype
(e.g., check tags.iter().all(|t| t.ttype == tags[0].ttype)) returning an Err for
mixed types; only after validation compute tag_type from the first tag, collect
a sorted, deduplicated set of tag names (you already use BTreeSet<Tag>) and
build the id from those verified values; update the signature of from_tags (and
callers) to return Result<Self, SomeError> or document the panic behavior so the
public constructor no longer produces ambiguous IDs.

In `@rust/crates/scheduler/src/metrics/mod.rs`:
- Around line 135-146: health_handler currently returns 503 whenever
ORCHESTRATOR_ASSIGNED_CLUSTERS.get() == 0 which breaks non-orchestrated
deployments; modify health_handler so it only treats zero assigned clusters as
unhealthy when running in orchestrated mode: introduce or reuse a boolean flag
(e.g. ORCHESTRATOR_ENABLED set in main.rs) and change health_handler to return
SERVICE_UNAVAILABLE only if ORCHESTRATOR_ENABLED.get() is true AND
ORCHESTRATOR_ASSIGNED_CLUSTERS.get() == 0, otherwise return OK; ensure the new
flag is initialized in the same startup path that sets
ORCHESTRATOR_ASSIGNED_CLUSTERS so non-orchestrated instances report healthy.

In `@rust/crates/scheduler/src/orchestrator/dao.rs`:
- Around line 152-156: The UPDATE_HEARTBEAT query currently ignores the number
of rows affected, so when delete_dead_instances() races and returns 0 rows the
caller never learns it must re-register; change the code in the method that runs
sqlx::query(UPDATE_HEARTBEAT) (the heartbeat/update_heartbeat function in
dao.rs) to capture the execute() result, check result.rows_affected(), and treat
0 as a failure: return an error (or a specific variant like
InstanceNotFound/NeedsReregister) so the caller can re-register the instance
rather than silently returning Ok(()). Ensure you reference UPDATE_HEARTBEAT and
preserve existing error types/propagation contract when returning the new error.
- Around line 263-269: The try_acquire_leader_lock implementation currently runs
TRY_ADVISORY_LOCK against &*self.connection_pool which returns only a bool and
immediately returns the pooled connection (releasing the session-scoped advisory
lock); update the design so the lock is held on a dedicated session: either add
a dedicated persistent connection field on the orchestrator DAO (e.g., store a
PgConnection in the struct), open that connection when attempting leadership and
run TRY_ADVISORY_LOCK on that connection so the session retains the lock for the
leader lifetime, or change try_acquire_leader_lock to return a guard/handle
(including the owned PgConnection or transaction) that the caller keeps until
demotion and then explicitly runs the unlock; ensure you stop using
&*self.connection_pool for acquiring the session-scoped lock and use the
dedicated connection/guard so the advisory lock is not released when the pooled
connection is returned.

In `@rust/crates/scheduler/src/orchestrator/instance.rs`:
- Around line 51-66: The code casts CONFIG.orchestrator.capacity (u32) into an
i32 when constructing InstanceManager in new; this can overflow if capacity >
i32::MAX. Update the InstanceManager::new logic to validate or saturate the
value before assigning to capacity: check CONFIG.orchestrator.capacity against
i32::MAX and either return an Err with a clear message (validation failure) or
clamp/saturate to i32::MAX (e.g., using min with i32::MAX as u32) and document
the behavior; ensure the change refers to the capacity local variable and the
CONFIG.orchestrator.capacity source so callers and logs reflect the enforced
limit.

In `@rust/crates/scheduler/src/orchestrator/leader.rs`:
- Around line 125-146: The leader lock is being acquired via
try_acquire_leader_lock but the implementation uses the pooled connection
(self.connection_pool) so the session-level pg_try_advisory_lock is released
when the query returns; change try_acquire_leader_lock in the DAO to obtain and
hold a dedicated long-lived connection for the lifetime of leadership (do not
use .fetch_one(&*self.connection_pool) for the lock), or alternatively switch
the leader election to a pool-compatible approach (e.g., transaction-scoped
advisory locks with a periodic heartbeat or store a dedicated Connection in the
DAO that is acquired when acquiring ORCHESTRATOR_LOCK_ID and only released when
releasing the lock). Ensure the leader acquisition path in leader.rs (where
try_acquire_leader_lock, is_leader, and distributor are used) uses the new API
so the connection remains open while is_leader is true.

In `@rust/crates/scheduler/tests/smoke_tests.rs`:
- Around line 937-946: The test currently calls the non-existent
Cluster::multiple_tag(...) to build manual_cluster; replace that call with
Cluster::from_tags(...) using the same arguments (test_data.facility_id,
test_data.show_id, vec![Tag { ... }]) so manual_cluster is created correctly,
then leave ClusterFeed::from_clusters(vec![manual_cluster], &[]) unchanged to
produce cluster_feed.

---

Nitpick comments:
In @.gitignore:
- Line 49: Replace the unanchored ignore entry ".claude/settings.json" with a
rooted pattern "/.claude/settings.json" in the .gitignore so only the
repository-root settings file is ignored (preventing accidental ignoring of
identically named files in subdirectories); update the existing
".claude/settings.json" line to "/.claude/settings.json".

In `@rust/crates/scheduler/src/metrics/mod.rs`:
- Line 157: The info log in metrics::mod (the info! call that currently prints
"Metrics server listening on http://{}/metrics") should be updated to include
the new /health endpoint; modify the info! invocation so it mentions both
endpoints (e.g., "/metrics and /health") and still interpolates the addr
variable so the log clearly shows both routes and the listening address.

In `@rust/crates/scheduler/src/orchestrator/distributor.rs`:
- Around line 209-242: The compute_rates function captures now at the start but
writes a new Instant::now() into RateSnapshot, causing a tiny inconsistency;
update the snapshot insertion to use the same now variable (the one used for
delta_secs) instead of calling Instant::now() again so previous_snapshots,
RateSnapshot.timestamp, and the delta calculation all use a single consistent
timestamp; modify the insert for self.previous_snapshots in compute_rates to
store RateSnapshot { jobs_queried: instance.float_jobs_queried, timestamp: now
}.

In `@rust/crates/scheduler/src/orchestrator/mod.rs`:
- Around line 76-84: The comment says SIGTERM/SIGINT but the code only registers
tokio::signal::ctrl_c(); update the shutdown task (the async block spawned into
shutdown_handle) to also listen for SIGTERM on Unix (e.g., using
tokio::signal::unix::signal or equivalent) and use tokio::select! to await
either ctrl_c() or the SIGTERM stream, then log and send on shutdown_tx_clone as
currently done; keep the existing send(true) behavior so Shutdown handling
(shutdown_tx, shutdown_tx_clone, shutdown_handle) works for both signals.
- Around line 93-100: The shutdown timeout call using tokio::time::timeout
around awaiting heartbeat_handle, leader_handle, and sync_handle currently
discards the result; change it to inspect the outcome of timeout(...).await and
log when the timeout elapses (i.e., when the result is Err). Specifically, after
calling tokio::time::timeout(timeout, async { let _ = heartbeat_handle.await;
let _ = leader_handle.await; let _ = sync_handle.await; }).await, match the
Result and emit a warning/error via the project logger/tracing (include
CONFIG.orchestrator.shutdown_timeout value in the message) so it’s clear the
shutdown timed out and which background tasks (heartbeat_handle, leader_handle,
sync_handle) may have been abandoned.

In `@rust/crates/scheduler/src/orchestrator/sync.rs`:
- Around line 58-103: The polling loop retries immediately after
dao.get_assignments_for_instance fails, which can spam errors and DB load; add a
simple backoff by tracking consecutive failures (e.g., consecutive_db_errors)
and when Err(e) increment it and compute a backoff duration (capped exponential,
e.g., min(base * 2^n, max)), then await a tokio::time::sleep(backoff) before
continuing while still selecting on shutdown.changed; on success reset the
counter to zero. Update the block around ticker.tick handling (where
dao.get_assignments_for_instance is called) and use tokio::select with the sleep
future and shutdown.changed so shutdown still aborts promptly; keep
cluster_feed.update_clusters and
crate::metrics::set_orchestrator_assigned_clusters logic unchanged on success.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2ac90a2c-c58e-416c-9d4f-866e51175ed1

📥 Commits

Reviewing files that changed from the base of the PR and between a008061 and 42b911d.

📒 Files selected for processing (19)
  • .gitignore
  • cuebot/src/main/resources/conf/ddl/postgres/migrations/V38__Add_scheduler_orchestrator_tables.sql
  • rust/crates/scheduler/Cargo.toml
  • rust/crates/scheduler/src/cluster.rs
  • rust/crates/scheduler/src/cluster_key.rs
  • rust/crates/scheduler/src/config/mod.rs
  • rust/crates/scheduler/src/dao/cluster_dao.rs
  • rust/crates/scheduler/src/main.rs
  • rust/crates/scheduler/src/metrics/mod.rs
  • rust/crates/scheduler/src/orchestrator/dao.rs
  • rust/crates/scheduler/src/orchestrator/distributor.rs
  • rust/crates/scheduler/src/orchestrator/instance.rs
  • rust/crates/scheduler/src/orchestrator/leader.rs
  • rust/crates/scheduler/src/orchestrator/mod.rs
  • rust/crates/scheduler/src/orchestrator/sync.rs
  • rust/crates/scheduler/src/pipeline/entrypoint.rs
  • rust/crates/scheduler/tests/smoke_tests.rs
  • rust/crates/scheduler/tests/stress_tests.rs
  • rust/crates/scheduler/tests/util.rs

Comment on lines +91 to +100
pub fn from_tags(facility_id: Uuid, show_id: Uuid, tags: Vec<Tag>) -> Self {
let tag_type = tags.first().map_or("unknown", |t| t.ttype.as_str());
let sorted_tags: BTreeSet<Tag> = tags.into_iter().collect();
let tag_names: Vec<&str> = sorted_tags.iter().map(|t| t.name.as_str()).collect();
let id = format!(
"{}:{}:{}:{}",
facility_id,
show_id,
tag_type,
tag_names.join(",")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject empty or mixed-type inputs in from_tags().

Line 92 falls back to unknown for empty input, and with mixed TagTypes it encodes only the first tag's type while joining every tag name. That makes the new stable ID ambiguous and order-dependent for a public constructor.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/cluster.rs` around lines 91 - 100, from_tags
currently accepts empty or mixed-type Tag lists and builds an ambiguous ID by
using the first tag's type and joining all names; change from_tags to validate
inputs up-front: if tags.is_empty() return an Err (or panic if API policy
dictates), and ensure all tags have the same ttype (e.g., check
tags.iter().all(|t| t.ttype == tags[0].ttype)) returning an Err for mixed types;
only after validation compute tag_type from the first tag, collect a sorted,
deduplicated set of tag names (you already use BTreeSet<Tag>) and build the id
from those verified values; update the signature of from_tags (and callers) to
return Result<Self, SomeError> or document the panic behavior so the public
constructor no longer produces ambiguous IDs.

pub clusters: RwLock<Vec<Cluster>>,
current_index: AtomicUsize,
stop_flag: AtomicBool,
sleep_map: Mutex<HashMap<Cluster, SystemTime>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sleep entries should be keyed by cluster.id.

Line 116 stores sleep state under the entire Cluster, and Line 244 retains only exact structural matches. For chunked clusters, a tag-only refresh with the same stable id is therefore treated as a different key and loses its sleep/backoff state.

Also applies to: 243-244

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/cluster.rs` at line 116, The sleep_map currently
keys by the full Cluster struct which causes equivalent clusters with the same
stable id to miss matches; change sleep_map: Mutex<HashMap<Cluster, SystemTime>>
to use the cluster's stable identifier type (e.g., Mutex<HashMap<ClusterId,
SystemTime>> or Mutex<HashMap<String, SystemTime>> depending on Cluster::id
type) and update all accesses (insertion, lookup, removal/retention code paths
that reference sleep_map, including where you currently index with a Cluster) to
use cluster.id (or clone/borrow the id) as the key; ensure the ClusterId type
implements Eq+Hash (or convert id to a hashable primitive) so existing logic
that checks/retains sleep entries will correctly match tag-only refreshes.

Comment on lines +247 to +250
let mut clusters = self.clusters.write().unwrap_or_else(|p| p.into_inner());
*clusters = new_clusters;
}
self.current_index.store(0, Ordering::Relaxed);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Reset current_index under the write-lock before exposing a shorter list.

After Line 248 swaps in a shorter vector, stream() can acquire the read lock before Line 250 runs and then panic on Line 493 by indexing with the stale larger current_index. Move the reset under the same write-lock; clamping the loaded index in stream() would be a good second guard.

Suggested local fix
         {
             let mut clusters = self.clusters.write().unwrap_or_else(|p| p.into_inner());
             *clusters = new_clusters;
+            self.current_index.store(0, Ordering::Relaxed);
         }
-        self.current_index.store(0, Ordering::Relaxed);

Also applies to: 492-496

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rust/crates/scheduler/src/cluster.rs` around lines 247 - 250, The write that
replaces the vector in self.clusters must also reset self.current_index while
still holding the write lock to avoid exposing a stale index; move the
current_index.store(0, Ordering::Relaxed) into the same write-lock block that
assigns *clusters = new_clusters; additionally, add a defensive clamp in
stream() where it loads current_index (e.g., load with current_index.load(...))
to bound the index to clusters.len().saturating_sub(1) before indexing so a
concurrent read cannot panic if a race occurs.

Don't treat UPDATE 0 as a successful heartbeat.

UPDATE ... WHERE pk_instance = $1 can legitimately affect 0 rows if delete_dead_instances() wins a
transient race. Returning Ok(()) here leaves a healthy scheduler running but permanently absent from
orchestrator membership until restart because the caller never learns it must re-register.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant