diff --git a/.changeset/quiet-bikes-happen.md b/.changeset/quiet-bikes-happen.md new file mode 100644 index 000000000..0b2f8a80d --- /dev/null +++ b/.changeset/quiet-bikes-happen.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Fix AMD delay reporting when no speech end timestamp is available and preserve short-greeting timer semantics when transcripts arrive during the human-silence window. diff --git a/agents/src/telemetry/trace_types.ts b/agents/src/telemetry/trace_types.ts index 60c00aa10..535f4d591 100644 --- a/agents/src/telemetry/trace_types.ts +++ b/agents/src/telemetry/trace_types.ts @@ -74,6 +74,8 @@ export const ATTR_END_OF_TURN_DELAY = 'lk.end_of_turn_delay'; export const ATTR_AMD_CATEGORY = 'lk.amd.category'; export const ATTR_AMD_REASON = 'lk.amd.reason'; export const ATTR_AMD_IS_MACHINE = 'lk.amd.is_machine'; +export const ATTR_AMD_SPEECH_DURATION = 'lk.amd.speech_duration'; +export const ATTR_AMD_DELAY = 'lk.amd.delay'; export const ATTR_AMD_INTERRUPT_ON_MACHINE = 'lk.amd.interrupt_on_machine'; // Adaptive Interruption attributes diff --git a/agents/src/voice/amd.test.ts b/agents/src/voice/amd.test.ts index 322a61aec..c3b612dcc 100644 --- a/agents/src/voice/amd.test.ts +++ b/agents/src/voice/amd.test.ts @@ -11,7 +11,7 @@ import type { ToolChoice, ToolContext } from '../llm/tool_context.js'; import type { APIConnectOptions } from '../types.js'; import type { AgentSession } from './agent_session.js'; import { AMD, AMDCategory } from './amd.js'; -import { AgentSessionEventTypes } from './events.js'; +import { AgentSessionEventTypes, type UserState } from './events.js'; class StaticLLM extends LLM { constructor(private readonly response: string | Error) { @@ -50,6 +50,44 @@ class StaticLLM extends LLM { } } +class ToolCallLLM extends LLM { + constructor(private readonly category: AMDCategory) { + super(); + } + + label(): string { + return 'tool-call-llm'; + } + + chat({}: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions?: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }): LLMStream { + const category = this.category; + return { + async *[Symbol.asyncIterator](): AsyncGenerator { + yield { + id: 'tc', + delta: { + role: 'assistant', + toolCalls: [ + new FunctionCall({ + callId: 'call_1', + name: 'save_prediction', + args: JSON.stringify({ label: category }), + }), + ], + }, + }; + }, + } as unknown as LLMStream; + } +} + class MockSession extends EventEmitter { llm?: LLM; pauseReplyAuthorization = vi.fn(); @@ -59,6 +97,46 @@ class MockSession extends EventEmitter { const asAgentSession = (session: MockSession): AgentSession => session as unknown as AgentSession; +type AMDInternals = { + silenceTimer: ReturnType | undefined; + silenceTimerTrigger: 'short_speech' | 'long_speech' | undefined; + machineSilenceReached: boolean; + speechEndedAt: number | undefined; +}; + +const amdInternals = (amd: AMD): AMDInternals => amd as unknown as AMDInternals; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +function emitUserStateChanged( + session: MockSession, + oldState: UserState, + newState: UserState, + createdAt: number = Date.now(), +) { + session.emit(AgentSessionEventTypes.UserStateChanged, { + type: 'user_state_changed', + oldState, + newState, + createdAt, + }); +} + +function emitFinalTranscript( + session: MockSession, + transcript: string, + createdAt: number = Date.now(), +) { + session.emit(AgentSessionEventTypes.UserInputTranscribed, { + type: 'user_input_transcribed', + transcript, + isFinal: true, + speakerId: null, + createdAt, + language: null, + }); +} + describe('AMD', () => { it('should classify voicemail and interrupt queued speech', async () => { const session = new MockSession(); @@ -81,10 +159,14 @@ describe('AMD', () => { language: null, }); - await expect(promise).resolves.toMatchObject({ + const result = await promise; + expect(result).toMatchObject({ category: AMDCategory.MACHINE_VM, isMachine: true, + speechDurationMs: 0, + delayMs: 0, }); + expect(Object.is(result.delayMs, -0)).toBe(false); expect(session.pauseReplyAuthorization).toHaveBeenCalledTimes(1); expect(session.resumeReplyAuthorization).toHaveBeenCalled(); expect(session.interrupt).toHaveBeenCalledWith({ force: true }); @@ -151,37 +233,8 @@ describe('AMD', () => { }); it('should settle from a save_prediction tool call', async () => { - class ToolCallLLM extends LLM { - label(): string { - return 'tool-call-llm'; - } - chat({}: { - chatCtx: ChatContext; - toolCtx?: ToolContext; - connOptions?: APIConnectOptions; - }): LLMStream { - return { - async *[Symbol.asyncIterator](): AsyncGenerator { - yield { - id: 'tc', - delta: { - role: 'assistant', - toolCalls: [ - new FunctionCall({ - callId: 'call_1', - name: 'save_prediction', - args: JSON.stringify({ label: AMDCategory.MACHINE_IVR }), - }), - ], - }, - }; - }, - } as unknown as LLMStream; - } - } - const session = new MockSession(); - const llm = new ToolCallLLM(); + const llm = new ToolCallLLM(AMDCategory.MACHINE_IVR); llm.on('error', () => {}); const amd = new AMD(asAgentSession(session), { llm, detectionTimeoutMs: 50 }); @@ -232,4 +285,262 @@ describe('AMD', () => { await expect(promise).resolves.toMatchObject({ category: AMDCategory.HUMAN }); }); + + it('short greeting no transcript emits pre-baked human', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 100, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(50); + emitUserStateChanged(session, 'speaking', 'listening'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('short_speech'); + expect(amdInternals(amd).silenceTimer).toBeDefined(); + + const result = await promise; + + expect(result.category).toBe(AMDCategory.HUMAN); + expect(result.reason).toBe('short_greeting'); + expect(amdInternals(amd).silenceTimer).toBeUndefined(); + expect(amdInternals(amd).silenceTimerTrigger).toBeUndefined(); + expect(amdInternals(amd).machineSilenceReached).toBe(true); + }); + + it('push text cancels pre-baked human and flips trigger', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 100, + machineSilenceThresholdMs: 300, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + let settled = false; + void promise.then( + () => { + settled = true; + }, + () => { + settled = true; + }, + ); + + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(50); + emitUserStateChanged(session, 'speaking', 'listening'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('short_speech'); + + emitFinalTranscript(session, 'hello'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('long_speech'); + expect(amdInternals(amd).silenceTimer).toBeDefined(); + + await sleep(180); + expect(settled).toBe(false); + expect(amdInternals(amd).machineSilenceReached).toBe(false); + + await sleep(200); + expect(amdInternals(amd).machineSilenceReached).toBe(true); + expect(settled).toBe(false); + + await amd.aclose(); + await expect(promise).rejects.toThrow('AMD closed'); + }); + + it('push text replacement timer preserves original deadline', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 50, + machineSilenceThresholdMs: 300, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(50); + emitUserStateChanged(session, 'speaking', 'listening'); + const speechEndedAt = amdInternals(amd).speechEndedAt; + expect(speechEndedAt).toBeDefined(); + + await sleep(40); + emitFinalTranscript(session, 'hello'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('long_speech'); + + const deadline = speechEndedAt! + 600; + while (!amdInternals(amd).machineSilenceReached && Date.now() < deadline) { + await sleep(10); + } + + const firedAt = Date.now(); + expect(amdInternals(amd).machineSilenceReached).toBe(true); + expect(firedAt - speechEndedAt!).toBeLessThan(450); + + await amd.aclose(); + await expect(promise).rejects.toThrow('AMD closed'); + }); + + it('long speech push text does not replace timer', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSpeechThresholdMs: 100, + machineSilenceThresholdMs: 300, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(150); + emitUserStateChanged(session, 'speaking', 'listening'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('long_speech'); + const handleBefore = amdInternals(amd).silenceTimer; + expect(handleBefore).toBeDefined(); + + emitFinalTranscript(session, 'hello world'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('long_speech'); + expect(amdInternals(amd).silenceTimer).toBe(handleBefore); + + await amd.aclose(); + await expect(promise).rejects.toThrow('AMD closed'); + }); + + it('short greeting with existing transcript uses long speech trigger', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 100, + machineSilenceThresholdMs: 300, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(50); + emitFinalTranscript(session, 'hi'); + emitUserStateChanged(session, 'speaking', 'listening'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('long_speech'); + const handleBefore = amdInternals(amd).silenceTimer; + expect(handleBefore).toBeDefined(); + + emitFinalTranscript(session, 'there'); + expect(amdInternals(amd).silenceTimer).toBe(handleBefore); + expect(amdInternals(amd).silenceTimerTrigger).toBe('long_speech'); + + await amd.aclose(); + await expect(promise).rejects.toThrow('AMD closed'); + }); + + it('user speech started clears trigger', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 1_000, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(50); + emitUserStateChanged(session, 'speaking', 'listening'); + expect(amdInternals(amd).silenceTimer).toBeDefined(); + expect(amdInternals(amd).silenceTimerTrigger).toBe('short_speech'); + + emitUserStateChanged(session, 'listening', 'speaking'); + expect(amdInternals(amd).silenceTimer).toBeUndefined(); + expect(amdInternals(amd).silenceTimerTrigger).toBeUndefined(); + + await amd.aclose(); + await expect(promise).rejects.toThrow('AMD closed'); + }); + + it('silence callback clears trigger on fire', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 50, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(20); + emitUserStateChanged(session, 'speaking', 'listening'); + expect(amdInternals(amd).silenceTimerTrigger).toBe('short_speech'); + + await expect(promise).resolves.toMatchObject({ category: AMDCategory.HUMAN }); + + expect(amdInternals(amd).silenceTimer).toBeUndefined(); + expect(amdInternals(amd).silenceTimerTrigger).toBeUndefined(); + }); + + it('short greeting transcript emits llm verdict', async () => { + const session = new MockSession(); + const llm = new ToolCallLLM(AMDCategory.HUMAN); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + humanSilenceThresholdMs: 100, + machineSilenceThresholdMs: 300, + noSpeechTimeoutMs: 5_000, + detectionTimeoutMs: 5_000, + }); + + const promise = amd.execute(); + emitUserStateChanged(session, 'listening', 'speaking'); + await sleep(50); + emitUserStateChanged(session, 'speaking', 'listening'); + emitFinalTranscript(session, 'hello'); + + const result = await promise; + expect(result.category).toBe(AMDCategory.HUMAN); + expect(result.reason).toBe('llm'); + expect(result.transcript).toBe('hello'); + }); + + it('returns positive zero delay when speech end is unavailable', async () => { + const session = new MockSession(); + const llm = new StaticLLM(''); + llm.on('error', () => {}); + const amd = new AMD(asAgentSession(session), { + llm, + noSpeechTimeoutMs: 10, + detectionTimeoutMs: 5_000, + }); + + const result = await amd.execute(); + + expect(result).toMatchObject({ + category: AMDCategory.MACHINE_UNAVAILABLE, + reason: 'no_speech_timeout', + speechDurationMs: 0, + delayMs: 0, + }); + expect(Object.is(result.delayMs, -0)).toBe(false); + }); }); diff --git a/agents/src/voice/amd.ts b/agents/src/voice/amd.ts index b5802fecb..969f7d605 100644 --- a/agents/src/voice/amd.ts +++ b/agents/src/voice/amd.ts @@ -32,6 +32,10 @@ export interface AMDResult { reason: string; rawResponse: string; isMachine: boolean; + /** Duration of detected user speech in milliseconds. */ + speechDurationMs: number; + /** Time from user speech end to AMD verdict in milliseconds. */ + delayMs: number; } export interface AMDOptions { @@ -73,6 +77,8 @@ const DEFAULT_MAX_TRANSCRIPT_TURNS = 2; const MAX_EXTENSIONS = 3; const MAX_EXTENSION_MS = 10_000; +type SilenceTimerTrigger = 'short_speech' | 'long_speech'; + const EVALUATED_LLM_MODELS: ReadonlySet = new Set([ 'google/gemini-3.1-flash-lite-preview', 'google/gemini-3-flash-preview', @@ -159,12 +165,14 @@ export class AMD { private verdictResult: AMDResult | undefined; private machineSilenceReached = false; private speechStartedAt: number | undefined; + private speechEndedAt: number | undefined; private detectGeneration = 0; private extensionCount = 0; private noSpeechTimer: ReturnType | undefined; private detectionTimer: ReturnType | undefined; private silenceTimer: ReturnType | undefined; + private silenceTimerTrigger: SilenceTimerTrigger | undefined; private resolveRun: ((value: AMDResult) => void) | undefined; private rejectRun: ((reason?: unknown) => void) | undefined; @@ -261,10 +269,12 @@ export class AMD { this.verdictResult = undefined; this.machineSilenceReached = false; this.speechStartedAt = undefined; + this.speechEndedAt = undefined; this.detectGeneration = 0; this.extensionCount = 0; this.resolveRun = undefined; this.rejectRun = undefined; + this.silenceTimerTrigger = undefined; } private subscribe(): void { @@ -293,6 +303,9 @@ export class AMD { clearTimeout(this[key]); this[key] = undefined; } + if (name === 'silence') { + this.silenceTimerTrigger = undefined; + } } // ─── two-gate emit system (verdict + silence) ─────────────────────────────── @@ -338,7 +351,12 @@ export class AMD { * threshold expires. Optionally provides a verdict (for no-speech / timeout / * short-greeting paths) and always opens the silence gate. */ - private onSilenceTimerFired(category?: AMDCategory, reason?: string): void { + private onSilenceTimerFired( + category?: AMDCategory, + reason?: string, + speechDurationMs?: number, + ): void { + this.clearTimer('silence'); if (category && reason && !this.verdictResult) { this.setVerdict({ category, @@ -346,6 +364,8 @@ export class AMD { transcript: this.joinTranscript(), rawResponse: '', isMachine: isMachineCategory(category), + speechDurationMs: speechDurationMs ?? this.speechDurationMs(), + delayMs: this.delayMs(), }); } this.machineSilenceReached = true; @@ -385,7 +405,8 @@ export class AMD { return; } - const speechDurationMs = ev.createdAt - (this.speechStartedAt ?? ev.createdAt); + this.speechEndedAt = ev.createdAt; + const speechDurationMs = this.speechEndedAt - (this.speechStartedAt ?? this.speechEndedAt); this.clearTimer('silence'); @@ -395,23 +416,26 @@ export class AMD { if (speechDurationMs <= this.humanSpeechThresholdMs) { if (this.transcriptParts.length === 0) { this.silenceTimer = setTimeout( - () => this.onSilenceTimerFired(AMDCategory.HUMAN, 'short_greeting'), + () => this.onSilenceTimerFired(AMDCategory.HUMAN, 'short_greeting', speechDurationMs), this.humanSilenceThresholdMs, ); + this.silenceTimerTrigger = 'short_speech'; } else { this.silenceTimer = setTimeout( - () => this.onSilenceTimerFired(), + () => this.onSilenceTimerFired(undefined, undefined, speechDurationMs), this.machineSilenceThresholdMs, ); + this.silenceTimerTrigger = 'long_speech'; } return; } // Longer speech: open silence gate after machine_silence_threshold of quiet this.silenceTimer = setTimeout( - () => this.onSilenceTimerFired(), + () => this.onSilenceTimerFired(undefined, undefined, speechDurationMs), this.machineSilenceThresholdMs, ); + this.silenceTimerTrigger = 'long_speech'; }; /** @@ -427,6 +451,20 @@ export class AMD { return; } + if (this.silenceTimer && this.silenceTimerTrigger === 'short_speech') { + const speechEndedAt = this.speechEndedAt; + if (speechEndedAt !== undefined) { + this.clearTimer('silence'); + const remaining = speechEndedAt + this.machineSilenceThresholdMs - Date.now(); + const speechDurationMs = this.speechDurationMs(); + this.silenceTimer = setTimeout( + () => this.onSilenceTimerFired(undefined, undefined, speechDurationMs), + Math.max(0, remaining), + ); + this.silenceTimerTrigger = 'long_speech'; + } + } + this.clearTimer('noSpeech'); this.transcriptParts.push(transcript); this.scheduleLLMClassification(); @@ -483,10 +521,23 @@ export class AMD { return this.transcriptParts.join('\n'); } + private speechDurationMs(): number { + if (this.speechStartedAt === undefined) { + return 0; + } + return (this.speechEndedAt ?? Date.now()) - this.speechStartedAt; + } + + private delayMs(): number { + return this.speechEndedAt !== undefined ? Date.now() - this.speechEndedAt : 0; + } + private setSpanAttributes(result: AMDResult): void { this.span?.setAttribute(traceTypes.ATTR_AMD_CATEGORY, result.category); this.span?.setAttribute(traceTypes.ATTR_AMD_REASON, result.reason); this.span?.setAttribute(traceTypes.ATTR_AMD_IS_MACHINE, result.isMachine); + this.span?.setAttribute(traceTypes.ATTR_AMD_SPEECH_DURATION, result.speechDurationMs); + this.span?.setAttribute(traceTypes.ATTR_AMD_DELAY, result.delayMs); this.span?.setAttribute(traceTypes.ATTR_USER_TRANSCRIPT, result.transcript); } @@ -531,6 +582,8 @@ export class AMD { transcript, rawResponse: '', isMachine: isMachineCategory(normalized), + speechDurationMs: this.speechDurationMs(), + delayMs: this.delayMs(), }; } return 'saved'; @@ -564,6 +617,7 @@ export class AMD { this.scheduleLLMClassification(); this.tryEmitResult(); }, clampedMs); + this.silenceTimerTrigger = 'long_speech'; return `waiting ${(clampedMs / 1000).toFixed(1)}s for more audio`; }, }); @@ -629,6 +683,8 @@ export class AMD { transcript, rawResponse, isMachine: isMachineCategory(parsed.category), + speechDurationMs: this.speechDurationMs(), + delayMs: this.delayMs(), }; }