Skip to content

Commit da23428

Browse files
committed
Merge PR dlorenc#342: Clean up acknowledged messages in routing loop
2 parents 2d7359e + b2aa96a commit da23428

2 files changed

Lines changed: 84 additions & 0 deletions

File tree

internal/daemon/daemon.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,14 @@ func (d *Daemon) routeMessages() {
419419

420420
d.logger.Info("Delivered message %s from %s to %s/%s", msg.ID, msg.From, repoName, agentName)
421421
}
422+
423+
// Clean up acknowledged messages to prevent pile-up
424+
count, err := msgMgr.DeleteAcked(repoName, agentName)
425+
if err != nil {
426+
d.logger.Error("Failed to clean up acked messages for %s/%s: %v", repoName, agentName, err)
427+
} else if count > 0 {
428+
d.logger.Debug("Cleaned up %d acked messages for %s/%s", count, repoName, agentName)
429+
}
422430
}
423431
}
424432
}

internal/daemon/daemon_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,6 +1182,82 @@ func TestMessageRoutingWithRealTmux(t *testing.T) {
11821182
}
11831183
}
11841184

1185+
func TestMessageRoutingCleansUpAckedMessages(t *testing.T) {
1186+
tmuxClient := tmux.NewClient()
1187+
if !tmuxClient.IsTmuxAvailable() {
1188+
t.Fatal("tmux is required for this test but not available")
1189+
}
1190+
1191+
d, cleanup := setupTestDaemon(t)
1192+
defer cleanup()
1193+
1194+
// Create a real tmux session
1195+
sessionName := "mc-test-cleanup"
1196+
if err := tmuxClient.CreateSession(context.Background(), sessionName, true); err != nil {
1197+
t.Fatalf("tmux is required for this test but cannot create sessions in this environment: %v", err)
1198+
}
1199+
defer tmuxClient.KillSession(context.Background(), sessionName)
1200+
1201+
// Create window for worker
1202+
if err := tmuxClient.CreateWindow(context.Background(), sessionName, "worker1"); err != nil {
1203+
t.Fatalf("Failed to create worker window: %v", err)
1204+
}
1205+
1206+
// Add repo and agent
1207+
repo := &state.Repository{
1208+
GithubURL: "https://github.com/test/repo",
1209+
TmuxSession: sessionName,
1210+
Agents: make(map[string]state.Agent),
1211+
}
1212+
if err := d.state.AddRepo("test-repo", repo); err != nil {
1213+
t.Fatalf("Failed to add repo: %v", err)
1214+
}
1215+
1216+
worker := state.Agent{
1217+
Type: state.AgentTypeWorker,
1218+
TmuxWindow: "worker1",
1219+
Task: "Test task",
1220+
CreatedAt: time.Now(),
1221+
}
1222+
if err := d.state.AddAgent("test-repo", "worker1", worker); err != nil {
1223+
t.Fatalf("Failed to add worker: %v", err)
1224+
}
1225+
1226+
// Create messages and immediately ack them
1227+
msgMgr := messages.NewManager(d.paths.MessagesDir)
1228+
for i := 0; i < 5; i++ {
1229+
msg, err := msgMgr.Send("test-repo", "supervisor", "worker1", "Test message")
1230+
if err != nil {
1231+
t.Fatalf("Failed to send message: %v", err)
1232+
}
1233+
// Mark as acked
1234+
if err := msgMgr.Ack("test-repo", "worker1", msg.ID); err != nil {
1235+
t.Fatalf("Failed to ack message: %v", err)
1236+
}
1237+
}
1238+
1239+
// Verify we have 5 acked messages
1240+
allMsgs, err := msgMgr.List("test-repo", "worker1")
1241+
if err != nil {
1242+
t.Fatalf("Failed to list messages: %v", err)
1243+
}
1244+
if len(allMsgs) != 5 {
1245+
t.Fatalf("Expected 5 messages, got %d", len(allMsgs))
1246+
}
1247+
1248+
// Trigger message routing which should clean up acked messages
1249+
d.TriggerMessageRouting()
1250+
1251+
// Verify acked messages were deleted
1252+
remainingMsgs, err := msgMgr.List("test-repo", "worker1")
1253+
if err != nil {
1254+
t.Fatalf("Failed to list messages after cleanup: %v", err)
1255+
}
1256+
if len(remainingMsgs) != 0 {
1257+
t.Errorf("Expected 0 messages after cleanup, got %d", len(remainingMsgs))
1258+
}
1259+
}
1260+
11851261
func TestWakeLoopUpdatesNudgeTime(t *testing.T) {
11861262
tmuxClient := tmux.NewClient()
11871263
if !tmuxClient.IsTmuxAvailable() {

0 commit comments

Comments
 (0)