feat: add gemini/local speak skills, agentic gemini Go2 blueprint, an…#2250
feat: add gemini/local speak skills, agentic gemini Go2 blueprint, an…#2250grmkris wants to merge 4 commits into
Conversation
…d capture-viewer tool Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR adds a Gemini-native TTS speak skill (
Confidence Score: 4/5Safe to merge for non-production trials, but the GeminiTTSNode accumulates one daemon thread per utterance across the session lifetime, which will eventually exhaust thread resources on long-running robots. The Gemini speak pipeline calls consume_text() on every utterance, creating a new _process_queue thread each time that is never stopped. In a long robot session with hundreds of speech events, this will exhaust thread resources. The completion signal also fires before audio is queued, so the audio lock can be released while the previous utterance is still playing. These are the same structural defects in OpenAITTSNode that were already identified; this PR replicates them in GeminiTTSNode without fixing them. dimos/stream/audio/tts/node_gemini.py and dimos/agents/skills/gemini_speak_skill.py — specifically the consume_text() call site in _speak_blocking and the ordering of text_subject vs audio_subject emissions in _synthesize_speech. Important Files Changed
Sequence DiagramsequenceDiagram
participant Agent
participant GeminiSpeakSkill
participant GeminiTTSNode
participant SounddeviceAudioOutput
Agent->>GeminiSpeakSkill: "speak(text, blocking=True)"
GeminiSpeakSkill->>GeminiSpeakSkill: acquire _audio_lock
GeminiSpeakSkill->>GeminiTTSNode: consume_text(text_subject) ⚠️ spawns new thread each call
GeminiSpeakSkill->>GeminiTTSNode: emit_text().subscribe(set_as_complete)
GeminiSpeakSkill->>GeminiTTSNode: text_subject.on_next(text)
GeminiTTSNode->>GeminiTTSNode: _queue_text(text)
GeminiTTSNode->>GeminiTTSNode: _synthesize_speech(text) via thread
GeminiTTSNode->>GeminiTTSNode: text_subject.on_next(text) ⚠️ fires BEFORE audio queued
GeminiTTSNode-->>GeminiSpeakSkill: audio_complete.set()
GeminiTTSNode->>SounddeviceAudioOutput: audio_subject.on_next(audio_event)
GeminiSpeakSkill->>GeminiSpeakSkill: sleep(0.3) then release _audio_lock
GeminiSpeakSkill-->>Agent: "Spoke: {text}"
Reviews (4): Last reviewed commit: "feat(go2): pose on captures + global_cos..." | Re-trigger Greptile |
| """ | ||
| Start consuming text from the observable source. | ||
|
|
||
| Args: | ||
| text_observable: Observable source of text strings | ||
|
|
||
| Returns: | ||
| Self for method chaining | ||
| """ | ||
| logger.info("Starting GeminiTTSNode") | ||
|
|
||
| # Start the processing thread | ||
| self.processing_thread = threading.Thread(target=self._process_queue, daemon=True) # type: ignore[assignment] | ||
| self.processing_thread.start() # type: ignore[attr-defined] | ||
|
|
||
| # Subscribe to the text observable | ||
| self.subscription = text_observable.subscribe( # type: ignore[assignment] | ||
| on_next=self._queue_text, | ||
| on_error=lambda e: logger.error(f"Error in GeminiTTSNode: {e}"), | ||
| ) | ||
|
|
There was a problem hiding this comment.
Thread accumulation on every
speak() call
consume_text() unconditionally spawns a new _process_queue thread and creates a new subscription on every invocation, and self.processing_thread is overwritten rather than checked. The calling side — GeminiSpeakSkill._speak_blocking() — calls consume_text() once per utterance, so after N speech events there are N daemon threads all spinning on the shared text_queue. The same defect exists in OpenAITTSNode/SpeakSkill; this PR replicates it. A guard like if self.processing_thread and self.processing_thread.is_alive(): return self (or calling consume_text only once from GeminiSpeakSkill.start() and routing text via _queue_text directly) would prevent unbounded thread growth in long robot sessions.
Replace the custom Go2FullRecorder with the stock go2-memory recorder for the capture demo (camera + lidar + odom is enough for the frames+trajectory viewer). Point the capture-viewer at recording_go2.db and ignore recording sidecars + the MuJoCo runtime log. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| self.text_subject.on_next(text) | ||
|
|
||
| # Gemini returns raw 16-bit PCM bytes (24 kHz, mono, little-endian). | ||
| pcm_bytes = response.candidates[0].content.parts[0].inline_data.data | ||
| audio_array = np.frombuffer(pcm_bytes, dtype=np.int16) | ||
|
|
||
| audio_event = AudioEvent( | ||
| data=audio_array, | ||
| sample_rate=_SAMPLE_RATE, | ||
| timestamp=time.time(), | ||
| channels=1, | ||
| ) | ||
|
|
||
| self.audio_subject.on_next(audio_event) |
There was a problem hiding this comment.
Completion signal fires before audio is confirmed
text_subject.on_next(text) (line 179) triggers audio_complete in GeminiSpeakSkill._speak_blocking before the audio data is even extracted. If response.candidates[0].content.parts[0].inline_data.data raises (empty candidates, content filtered, API format change), the exception is swallowed by the outer try/except but the completion event has already been set — so _speak_blocking returns "Spoke: {text}" even though the user heard nothing. Move text_subject.on_next(text) to after self.audio_subject.on_next(audio_event) so the signal fires only when both synthesis and audio queueing have succeeded.
New TakePictureSkill subscribes to color_image, caches the latest frame, and on
take_picture() JPEG-encodes it and POSTs to robomoo's /api/robot/frame with a
shared bearer token (ROBOMOO_URL / ROBOT_INGEST_TOKEN from env). Wired into
unitree_go2_agentic_gemini and registered as take-picture-skill so the agent can
call it ("take a picture").
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
take_picture now attaches the robot's odom pose (poseX/poseY) + label so the web can pin captures on the map. New MapUploader subscribes global_costmap, renders it with turbo_image, and POSTs the PNG + grid metadata to robomoo /api/robot/map (throttled). Both wired into unitree_go2_agentic_gemini. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…d capture-viewer tool
Problem
Closes DIM-XXX
Solution
How to Test
Contributor License Agreement