diff --git a/bot.ps1 b/bot.ps1 new file mode 100644 index 0000000..a929bc8 --- /dev/null +++ b/bot.ps1 @@ -0,0 +1,2 @@ +$env:XAI_API_KEY = "xai-TyecBoTLlFNL0Qxwnb0eRainG8hKTpJGtnCziMhm1tTyB1FrLpZm0gHNYA9qqqX21JsXStN1f9DseLdJ" +go run ./cmd/voicebot --server localhost:9987 --nickname GrokBot --voice Ara \ No newline at end of file diff --git a/cmd/voicebot/main.go b/cmd/voicebot/main.go new file mode 100644 index 0000000..029f21f --- /dev/null +++ b/cmd/voicebot/main.go @@ -0,0 +1,258 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "go-ts/pkg/ts3client" + "go-ts/pkg/xai" +) + +// VoiceSession represents an active xAI voice session for a user +type VoiceSession struct { + ClientID uint16 + Nickname string + XAI *xai.Client + AudioBuffer []int16 // Buffer to accumulate audio samples + AudioQueue chan []int16 // Queue for sending audio with proper timing + done chan struct{} // Signal to stop audio sender +} + +// Bot manages the TeamSpeak connection and xAI sessions +type Bot struct { + ts3 *ts3client.Client + apiKey string + voice string + prompt string + + selfID uint16 // Our own ClientID + sessions map[uint16]*VoiceSession + sessionsMu sync.RWMutex +} + +func main() { + serverAddr := flag.String("server", "127.0.0.1:9987", "TeamSpeak 3 Server Address") + nickname := flag.String("nickname", "GrokBot", "Bot nickname") + voice := flag.String("voice", xai.VoiceAra, "xAI voice (Ara, Rex, Sal, Eve, Leo)") + flag.Parse() + + apiKey := os.Getenv("XAI_API_KEY") + if apiKey == "" { + log.Fatal("XAI_API_KEY environment variable not set") + } + + log.Println("=== xAI Voice Bot for TeamSpeak ===") + log.Printf("Server: %s", *serverAddr) + log.Printf("Nickname: %s", *nickname) + log.Printf("Voice: %s", *voice) + + bot := &Bot{ + apiKey: apiKey, + voice: *voice, + prompt: "Eres Grok, un asistente de voz amigable y útil. Responde de forma concisa y natural.", + sessions: make(map[uint16]*VoiceSession), + } + + // Create TeamSpeak client + bot.ts3 = ts3client.New(*serverAddr, ts3client.Config{ + Nickname: *nickname, + }) + + // Register event handlers + bot.ts3.On(ts3client.EventConnected, func(e *ts3client.ConnectedEvent) { + bot.selfID = e.ClientID // Store our own ID + log.Printf("✓ Conectado a TeamSpeak! ClientID=%d, Server=%s", e.ClientID, e.ServerName) + }) + + bot.ts3.On(ts3client.EventChannelList, func(e *ts3client.ChannelListEvent) { + log.Printf("✓ %d canales disponibles", len(e.Channels)) + }) + + bot.ts3.On(ts3client.EventClientEnter, func(e *ts3client.ClientEnterEvent) { + log.Printf("→ Usuario entró: %s (ID=%d)", e.Nickname, e.ClientID) + + // Don't create session for ourselves (compare by ID, not nickname) + if e.ClientID == bot.selfID { + log.Printf(" (Soy yo, ignorando)") + return + } + + // Create xAI session for this user + go bot.createSession(e.ClientID, e.Nickname) + }) + + bot.ts3.On(ts3client.EventClientLeft, func(e *ts3client.ClientLeftEvent) { + log.Printf("← Usuario salió: ID=%d (%s)", e.ClientID, e.Reason) + + // Close xAI session for this user + bot.closeSession(e.ClientID) + }) + + bot.ts3.On(ts3client.EventAudio, func(e *ts3client.AudioEvent) { + // Forward audio from TeamSpeak to all xAI sessions + // In a real implementation, you'd want to track which user + // is speaking and only send to their session + bot.sessionsMu.RLock() + for _, session := range bot.sessions { + if session.XAI != nil && session.XAI.IsConnected() { + session.XAI.SendAudio(e.PCM) + } + } + bot.sessionsMu.RUnlock() + }) + + bot.ts3.On(ts3client.EventError, func(e *ts3client.ErrorEvent) { + if e.ID != "0" { + log.Printf("! Error del servidor: [%s] %s", e.ID, e.Message) + } + }) + + // Handle shutdown + go func() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + log.Println("Cerrando...") + + // Close all xAI sessions + bot.sessionsMu.Lock() + for _, session := range bot.sessions { + if session.XAI != nil { + session.XAI.Close() + } + } + bot.sessionsMu.Unlock() + + bot.ts3.Disconnect() + // os.Exit(0) + }() + + // Connect to TeamSpeak + if err := bot.ts3.Connect(); err != nil { + log.Fatalf("Error de conexión: %v", err) + } +} + +// createSession creates a new xAI voice session for a user +func (b *Bot) createSession(clientID uint16, nickname string) { + log.Printf("[Session] Creando sesión xAI para %s...", nickname) + + // Create session with audio queue + session := &VoiceSession{ + ClientID: clientID, + Nickname: nickname, + AudioBuffer: make([]int16, 0, 960*10), + AudioQueue: make(chan []int16, 500), // Buffer up to 500 frames (~10 sec) + done: make(chan struct{}), + } + + // Start audio sender goroutine with proper 20ms timing + go b.audioSender(session) + + xaiClient := xai.New(b.apiKey) + + // Set up audio callback - buffer and queue in 960-sample chunks + xaiClient.OnAudio(func(pcm []int16) { + b.sessionsMu.Lock() + session.AudioBuffer = append(session.AudioBuffer, pcm...) + + // Queue complete 960-sample frames + for len(session.AudioBuffer) >= 960 { + frame := make([]int16, 960) + copy(frame, session.AudioBuffer[:960]) + session.AudioBuffer = session.AudioBuffer[960:] + + // Non-blocking send to queue + select { + case session.AudioQueue <- frame: + default: + // Queue full, drop frame + } + } + b.sessionsMu.Unlock() + }) + + // Set up transcript callback for logging + xaiClient.OnTranscript(func(text string) { + log.Printf("[Grok] %s", text) + }) + + // Clear audio queue when user starts speaking (interruption) + xaiClient.OnSpeechStarted(func() { + b.sessionsMu.Lock() + // Clear the buffer + session.AudioBuffer = session.AudioBuffer[:0] + // Drain the queue + for len(session.AudioQueue) > 0 { + <-session.AudioQueue + } + b.sessionsMu.Unlock() + log.Printf("[Session] Audio queue cleared (user interruption)") + }) + + // Connect to xAI + if err := xaiClient.Connect(); err != nil { + log.Printf("[Session] Error conectando a xAI: %v", err) + close(session.done) + return + } + + // Configure the session + if err := xaiClient.ConfigureSession(b.voice, b.prompt); err != nil { + log.Printf("[Session] Error configurando sesión: %v", err) + xaiClient.Close() + close(session.done) + return + } + + // Store the xAI client in session + session.XAI = xaiClient + + b.sessionsMu.Lock() + b.sessions[clientID] = session + b.sessionsMu.Unlock() + + log.Printf("[Session] ✓ Sesión xAI activa para %s", nickname) +} + +// audioSender sends audio frames to TeamSpeak with proper 20ms timing +func (b *Bot) audioSender(session *VoiceSession) { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-session.done: + return + case <-ticker.C: + // Try to get a frame from the queue + select { + case frame := <-session.AudioQueue: + if err := b.ts3.SendAudio(frame); err != nil { + log.Printf("[Session] Error enviando audio: %v", err) + } + default: + // No frame available, that's ok + } + } + } +} + +// closeSession closes an xAI session for a user +func (b *Bot) closeSession(clientID uint16) { + b.sessionsMu.Lock() + defer b.sessionsMu.Unlock() + + if session, ok := b.sessions[clientID]; ok { + log.Printf("[Session] Cerrando sesión xAI para %s", session.Nickname) + if session.XAI != nil { + session.XAI.Close() + } + delete(b.sessions, clientID) + } +} diff --git a/go.mod b/go.mod index c415065..40e8583 100644 --- a/go.mod +++ b/go.mod @@ -10,3 +10,5 @@ require ( ) require gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 + +require github.com/gorilla/websocket v1.5.3 // indirect diff --git a/go.sum b/go.sum index 822bcea..b45c987 100644 --- a/go.sum +++ b/go.sum @@ -2,5 +2,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/dgryski/go-quicklz v0.0.0-20151014073603-d7042a82d57e h1:MhBotBstN1h/GeA7lx7xstbFB8avummjt+nzOi2cY7Y= github.com/dgryski/go-quicklz v0.0.0-20151014073603-d7042a82d57e/go.mod h1:XLmYwGWgVzMPLlMmcNcWt3b5ixRabPLstWnPVEDRhzc= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 h1:xeVptzkP8BuJhoIjNizd2bRHfq9KB9HfOLZu90T04XM= gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302/go.mod h1:/L5E7a21VWl8DeuCPKxQBdVG5cy+L0MRZ08B1wnqt7g= diff --git a/pkg/xai/client.go b/pkg/xai/client.go new file mode 100644 index 0000000..9d98973 --- /dev/null +++ b/pkg/xai/client.go @@ -0,0 +1,277 @@ +package xai + +import ( + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "sync" + + "github.com/gorilla/websocket" +) + +const ( + // WebSocket endpoint for xAI realtime API + RealtimeURL = "wss://api.x.ai/v1/realtime" +) + +// AudioHandler is called when audio is received from xAI +type AudioHandler func(pcm []int16) + +// TranscriptHandler is called when transcript text is received +type TranscriptHandler func(text string) + +// SpeechHandler is called when speech is detected (for interruptions) +type SpeechHandler func() + +// Client manages a WebSocket connection to xAI Voice Agent API +type Client struct { + apiKey string + conn *websocket.Conn + mu sync.Mutex + + // Callbacks + onAudio AudioHandler + onTranscript TranscriptHandler + onSpeechStarted SpeechHandler + + // State + connected bool + done chan struct{} +} + +// New creates a new xAI client +func New(apiKey string) *Client { + return &Client{ + apiKey: apiKey, + done: make(chan struct{}), + } +} + +// OnAudio sets the callback for received audio +func (c *Client) OnAudio(handler AudioHandler) { + c.onAudio = handler +} + +// OnTranscript sets the callback for received transcripts +func (c *Client) OnTranscript(handler TranscriptHandler) { + c.onTranscript = handler +} + +// OnSpeechStarted sets the callback for when user starts speaking (for interruptions) +func (c *Client) OnSpeechStarted(handler SpeechHandler) { + c.onSpeechStarted = handler +} + +// Connect establishes WebSocket connection to xAI +func (c *Client) Connect() error { + header := make(map[string][]string) + header["Authorization"] = []string{"Bearer " + c.apiKey} + + dialer := websocket.Dialer{} + conn, _, err := dialer.Dial(RealtimeURL, header) + if err != nil { + return fmt.Errorf("failed to connect to xAI: %w", err) + } + + c.conn = conn + c.connected = true + + // Start message receiver + go c.receiveLoop() + + log.Println("[xAI] Connected to Voice Agent API") + return nil +} + +// ConfigureSession sets up the voice session +func (c *Client) ConfigureSession(voice, instructions string) error { + msg := SessionUpdate{ + Type: "session.update", + Session: Session{ + Voice: voice, + Instructions: instructions, + TurnDetection: &TurnDetection{ + Type: "server_vad", + }, + Audio: &AudioConfig{ + Input: &AudioFormatConfig{ + Format: AudioFormat{Type: "audio/pcm", Rate: 48000}, + }, + Output: &AudioFormatConfig{ + Format: AudioFormat{Type: "audio/pcm", Rate: 48000}, + }, + }, + }, + } + + return c.sendJSON(msg) +} + +// SendAudio sends PCM audio data to xAI +// pcm should be int16 samples at 48kHz mono +func (c *Client) SendAudio(pcm []int16) error { + // Convert int16 slice to bytes (little endian) + buf := make([]byte, len(pcm)*2) + for i, sample := range pcm { + binary.LittleEndian.PutUint16(buf[i*2:], uint16(sample)) + } + + // Encode to base64 + encoded := base64.StdEncoding.EncodeToString(buf) + + msg := InputAudioBufferAppend{ + Type: "input_audio_buffer.append", + Audio: encoded, + } + + return c.sendJSON(msg) +} + +// Close closes the WebSocket connection +func (c *Client) Close() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + close(c.done) + c.conn.Close() + c.connected = false + log.Println("[xAI] Connection closed") + } +} + +// IsConnected returns connection status +func (c *Client) IsConnected() bool { + return c.connected +} + +// sendJSON sends a JSON message over WebSocket +func (c *Client) sendJSON(v any) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return fmt.Errorf("not connected") + } + + data, err := json.Marshal(v) + if err != nil { + return err + } + + return c.conn.WriteMessage(websocket.TextMessage, data) +} + +// receiveLoop handles incoming messages from xAI +func (c *Client) receiveLoop() { + defer func() { + c.connected = false + }() + + for { + select { + case <-c.done: + return + default: + } + + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + log.Println("[xAI] Connection closed normally") + } else { + log.Printf("[xAI] Read error: %v", err) + } + return + } + + c.handleMessage(message) + } +} + +// handleMessage processes an incoming WebSocket message +func (c *Client) handleMessage(data []byte) { + // Parse base message to get type + var base ServerMessage + if err := json.Unmarshal(data, &base); err != nil { + log.Printf("[xAI] Failed to parse message: %v", err) + return + } + + switch base.Type { + case "session.updated": + log.Println("[xAI] Session configured successfully") + + case "session.created": + log.Println("[xAI] Session created") + + case "conversation.created": + log.Println("[xAI] Conversation created") + + case "response.output_audio.delta": + var msg ResponseOutputAudioDelta + if err := json.Unmarshal(data, &msg); err != nil { + log.Printf("[xAI] Failed to parse audio delta: %v", err) + return + } + c.handleAudioDelta(msg.Delta) + + case "response.output_audio.done": + // Audio stream complete for this response + log.Println("[xAI] Audio response complete") + + case "response.output_audio_transcript.delta": + // Could extract transcript text here + var raw map[string]any + json.Unmarshal(data, &raw) + if delta, ok := raw["delta"].(string); ok && c.onTranscript != nil { + c.onTranscript(delta) + } + + case "response.done": + log.Println("[xAI] Response complete") + + case "input_audio_buffer.speech_started": + log.Println("[xAI] Speech started (VAD)") + if c.onSpeechStarted != nil { + c.onSpeechStarted() + } + + case "input_audio_buffer.speech_stopped": + log.Println("[xAI] Speech stopped (VAD)") + + case "error": + var msg ErrorMessage + if err := json.Unmarshal(data, &msg); err == nil { + log.Printf("[xAI] Error: %s - %s", msg.Error.Code, msg.Error.Message) + } + + default: + // Log unhandled message types for debugging + log.Printf("[xAI] Received: %s", base.Type) + } +} + +// handleAudioDelta processes received audio data +func (c *Client) handleAudioDelta(base64Audio string) { + if c.onAudio == nil { + return + } + + // Decode base64 + audioBytes, err := base64.StdEncoding.DecodeString(base64Audio) + if err != nil { + log.Printf("[xAI] Failed to decode audio: %v", err) + return + } + + // Convert bytes to int16 (little endian) + pcm := make([]int16, len(audioBytes)/2) + for i := 0; i < len(pcm); i++ { + pcm[i] = int16(binary.LittleEndian.Uint16(audioBytes[i*2:])) + } + + c.onAudio(pcm) +} diff --git a/pkg/xai/types.go b/pkg/xai/types.go new file mode 100644 index 0000000..490f248 --- /dev/null +++ b/pkg/xai/types.go @@ -0,0 +1,136 @@ +package xai + +// Message types for xAI Voice Agent WebSocket API + +// ClientMessage is the base for messages sent to the server +type ClientMessage struct { + Type string `json:"type"` +} + +// SessionUpdate configures the voice session +type SessionUpdate struct { + Type string `json:"type"` // "session.update" + Session Session `json:"session"` +} + +type Session struct { + Voice string `json:"voice,omitempty"` + Instructions string `json:"instructions,omitempty"` + TurnDetection *TurnDetection `json:"turn_detection,omitempty"` + Audio *AudioConfig `json:"audio,omitempty"` +} + +type TurnDetection struct { + Type string `json:"type"` // "server_vad" or null +} + +type AudioConfig struct { + Input *AudioFormatConfig `json:"input,omitempty"` + Output *AudioFormatConfig `json:"output,omitempty"` +} + +type AudioFormatConfig struct { + Format AudioFormat `json:"format"` +} + +type AudioFormat struct { + Type string `json:"type"` // "audio/pcm", "audio/pcmu", "audio/pcma" + Rate int `json:"rate"` // 8000, 16000, 24000, 48000, etc. +} + +// InputAudioBufferAppend sends audio data to the server +type InputAudioBufferAppend struct { + Type string `json:"type"` // "input_audio_buffer.append" + Audio string `json:"audio"` // Base64 encoded PCM +} + +// ResponseCreate requests a response from the model +type ResponseCreate struct { + Type string `json:"type"` // "response.create" + Response ResponseSettings `json:"response"` +} + +type ResponseSettings struct { + Modalities []string `json:"modalities"` // ["text", "audio"] +} + +// ConversationItemCreate creates a new conversation item +type ConversationItemCreate struct { + Type string `json:"type"` // "conversation.item.create" + Item ConversationItem `json:"item"` +} + +type ConversationItem struct { + Type string `json:"type"` // "message" + Role string `json:"role"` // "user", "assistant" + Content []ItemContent `json:"content"` +} + +type ItemContent struct { + Type string `json:"type"` // "input_text", "input_audio" + Text string `json:"text,omitempty"` +} + +// ============================================================================= +// Server Messages +// ============================================================================= + +// ServerMessage is the base for messages received from the server +type ServerMessage struct { + Type string `json:"type"` + EventID string `json:"event_id,omitempty"` +} + +// SessionUpdated confirms session configuration +type SessionUpdated struct { + Type string `json:"type"` // "session.updated" + EventID string `json:"event_id"` + Session Session `json:"session"` +} + +// ResponseOutputAudioDelta contains audio data from the model +type ResponseOutputAudioDelta struct { + Type string `json:"type"` // "response.output_audio.delta" + EventID string `json:"event_id"` + Delta string `json:"delta"` // Base64 encoded PCM +} + +// ResponseDone indicates the response is complete +type ResponseDone struct { + Type string `json:"type"` // "response.done" + EventID string `json:"event_id"` +} + +// InputAudioBufferSpeechStarted indicates VAD detected speech start +type InputAudioBufferSpeechStarted struct { + Type string `json:"type"` // "input_audio_buffer.speech_started" + EventID string `json:"event_id"` +} + +// InputAudioBufferSpeechStopped indicates VAD detected speech stop +type InputAudioBufferSpeechStopped struct { + Type string `json:"type"` // "input_audio_buffer.speech_stopped" + EventID string `json:"event_id"` +} + +// ErrorMessage represents an error from the server +type ErrorMessage struct { + Type string `json:"type"` // "error" + EventID string `json:"event_id"` + Error ErrorInfo `json:"error"` +} + +type ErrorInfo struct { + Type string `json:"type"` + Code string `json:"code"` + Message string `json:"message"` +} + +// Available voices +const ( + VoiceAra = "Ara" + VoiceRex = "Rex" + VoiceSal = "Sal" + VoiceEve = "Eve" + VoiceLeo = "Leo" +) diff --git a/voicebot.exe b/voicebot.exe new file mode 100644 index 0000000..78924cd Binary files /dev/null and b/voicebot.exe differ