feat: unified audio buffer + fragment reassembly fix

This commit is contained in:
Jose Luis Montañes Ojados
2026-01-16 14:19:02 +01:00
parent fb17813dcb
commit 8ef80530f6
6 changed files with 392 additions and 250 deletions

View File

@@ -36,11 +36,10 @@ type Client struct {
Connected bool
ServerName string
// Fragment reassembly
FragmentBuffer []byte
FragmentStartPktID uint16
FragmentCompressed bool
Fragmenting bool
// Fragment reassembly (packet queue like ts3j)
CommandQueue map[uint16]*protocol.Packet // Packets waiting for reassembly
ExpectedCommandPID uint16 // Next expected packet ID
FragmentState bool // Toggle: true = collecting, false = ready
// Server Data
Channels map[uint64]*Channel
@@ -64,6 +63,8 @@ func NewClient(nickname string) *Client {
VoicePacketID: 1,
Channels: make(map[uint64]*Channel),
VoiceDecoders: make(map[uint16]*opus.Decoder),
CommandQueue: make(map[uint16]*protocol.Packet),
ExpectedCommandPID: 0,
done: make(chan struct{}),
}
}

View File

@@ -5,12 +5,35 @@ import (
"fmt"
"log"
"strings"
"unicode"
"go-ts/pkg/protocol"
"github.com/dgryski/go-quicklz"
)
// sanitizeForLog removes control characters that can corrupt terminal output
func sanitizeForLog(s string) string {
var result strings.Builder
result.Grow(len(s))
for _, r := range s {
if r >= 32 && r < 127 {
// Printable ASCII
result.WriteRune(r)
} else if unicode.IsPrint(r) && r < 256 {
// Printable extended ASCII
result.WriteRune(r)
} else if r == '\n' || r == '\r' || r == '\t' {
// Keep whitespace
result.WriteRune(r)
} else {
// Replace control characters with placeholder
result.WriteRune('.')
}
}
return result.String()
}
func (c *Client) handleCommand(pkt *protocol.Packet) error {
// Check if Encrypted
// PacketTypeCommand is usually encrypted.
@@ -75,71 +98,142 @@ func (c *Client) handleCommand(pkt *protocol.Packet) error {
c.Connected = true
}
// Fragment reassembly logic:
// - First fragment: Fragmented=true, optionally Compressed=true -> start buffer
// - Middle fragments: Fragmented=false, Compressed=false -> append to buffer
// - Last fragment: Fragmented=true -> append and process
isFragmented := pkt.Header.FlagFragmented()
// Queue-based fragment reassembly (like ts3j)
// Store packet in queue
c.CommandQueue[pkt.Header.PacketID] = &protocol.Packet{
Header: pkt.Header,
Data: append([]byte{}, data...), // Clone data (already decrypted)
}
if isFragmented && !c.Fragmenting {
// First fragment - start collecting
c.Fragmenting = true
c.FragmentBuffer = make([]byte, 0, 4096)
c.FragmentBuffer = append(c.FragmentBuffer, data...)
c.FragmentStartPktID = pkt.Header.PacketID
c.FragmentCompressed = pkt.Header.FlagCompressed()
log.Printf("Fragment start (PID=%d, Compressed=%v, Len=%d)", pkt.Header.PacketID, c.FragmentCompressed, len(data))
return nil // Wait for more fragments
} else if c.Fragmenting && !isFragmented {
// Middle fragment - append
c.FragmentBuffer = append(c.FragmentBuffer, data...)
log.Printf("Fragment continue (PID=%d, TotalLen=%d)", pkt.Header.PacketID, len(c.FragmentBuffer))
return nil // Wait for more fragments
} else if c.Fragmenting && isFragmented {
// Last fragment - complete reassembly
c.FragmentBuffer = append(c.FragmentBuffer, data...)
log.Printf("Fragment end (PID=%d, TotalLen=%d)", pkt.Header.PacketID, len(c.FragmentBuffer))
data = c.FragmentBuffer
// Try to process packets in order
for {
nextPkt, ok := c.CommandQueue[c.ExpectedCommandPID]
if !ok {
// Missing packet, wait for it
break
}
// Decompress if first fragment was compressed
if c.FragmentCompressed {
decompressed, err := quicklz.Decompress(data)
if err != nil {
log.Printf("QuickLZ decompression of fragmented data failed: %v", err)
// Fallback to raw data
isFragmented := nextPkt.Header.FlagFragmented()
if isFragmented {
// Toggle fragment state
c.FragmentState = !c.FragmentState
if c.FragmentState {
// Starting a new fragment sequence
// Don't process yet, wait for more
c.ExpectedCommandPID++
continue
} else {
log.Printf("Decompressed fragmented: %d -> %d bytes", len(data), len(decompressed))
data = decompressed
// Ending fragment sequence - reassemble all
reassembled, compressed := c.reassembleFragments()
if reassembled == nil {
log.Printf("Fragment reassembly failed")
break
}
data = reassembled
// Decompress if first packet was compressed
if compressed {
decompressed, err := quicklz.Decompress(data)
if err != nil {
log.Printf("QuickLZ decompression of fragmented data failed: %v", err)
} else {
log.Printf("Decompressed fragmented: %d -> %d bytes", len(data), len(decompressed))
data = decompressed
}
}
}
} else if c.FragmentState {
// Middle fragment - keep collecting
c.ExpectedCommandPID++
continue
} else {
// Non-fragmented packet - process normally
data = nextPkt.Data
// Decompress if needed
if nextPkt.Header.FlagCompressed() {
decompressed, err := quicklz.Decompress(data)
if err != nil {
log.Printf("QuickLZ decompression failed: %v (falling back to raw)", err)
} else {
log.Printf("Decompressed: %d -> %d bytes", len(data), len(decompressed))
data = decompressed
}
}
}
// Reset fragment state
c.Fragmenting = false
c.FragmentBuffer = nil
} else {
// Non-fragmented packet - decompress if needed
if pkt.Header.FlagCompressed() {
decompressed, err := quicklz.Decompress(data)
if err != nil {
log.Printf("QuickLZ decompression failed: %v (falling back to raw)", err)
// Fallback to raw data - might not be compressed despite flag
} else {
log.Printf("Decompressed: %d -> %d bytes", len(data), len(decompressed))
data = decompressed
}
// Remove processed packet from queue
delete(c.CommandQueue, c.ExpectedCommandPID)
c.ExpectedCommandPID++
// Process the command
if err := c.processCommand(data, nextPkt); err != nil {
log.Printf("Error processing command: %v", err)
}
}
return nil
}
// reassembleFragments collects all buffered fragments in order and returns reassembled data
func (c *Client) reassembleFragments() ([]byte, bool) {
var result []byte
compressed := false
// Find the start of the fragment sequence (scan backwards from current)
startPID := c.ExpectedCommandPID
for {
prevPID := startPID - 1
pkt, ok := c.CommandQueue[prevPID]
if !ok {
break
}
// Check if this is the start (has Fragmented flag)
if pkt.Header.FlagFragmented() {
startPID = prevPID
break
}
startPID = prevPID
}
// Now collect from startPID to ExpectedCommandPID (inclusive)
for pid := startPID; pid <= c.ExpectedCommandPID; pid++ {
pkt, ok := c.CommandQueue[pid]
if !ok {
log.Printf("Missing fragment PID=%d during reassembly", pid)
return nil, false
}
// First fragment may have compressed flag
if pid == startPID && pkt.Header.FlagCompressed() {
compressed = true
}
result = append(result, pkt.Data...)
delete(c.CommandQueue, pid)
}
log.Printf("Reassembled fragments PID %d-%d, total %d bytes, compressed=%v",
startPID, c.ExpectedCommandPID, len(result), compressed)
return result, compressed
}
// processCommand handles a single fully reassembled command
func (c *Client) processCommand(data []byte, pkt *protocol.Packet) error {
cmdStr := string(data)
// Debug: Log packet flags and raw command preview
// Debug: Log packet flags and raw command preview (sanitized)
log.Printf("Debug Packet: Compressed=%v, Fragmented=%v, RawLen=%d, Preview=%q",
pkt.Header.FlagCompressed(), pkt.Header.FlagFragmented(), len(data),
func() string {
if len(cmdStr) > 100 {
return cmdStr[:100]
preview := cmdStr
if len(preview) > 100 {
preview = preview[:100]
}
return cmdStr
return sanitizeForLog(preview)
}())
// Fix Garbage Headers (TS3 often sends binary garbage before command)
@@ -152,7 +246,7 @@ func (c *Client) handleCommand(pkt *protocol.Packet) error {
cmdStr = cmdStr[validStart:]
}
log.Printf("Command: %s", cmdStr)
log.Printf("Command: %s", sanitizeForLog(cmdStr))
// Parse Commands (possibly multiple piped items)
commands := protocol.ParseCommands([]byte(cmdStr))