fix: Resolve IP direction mismatch by latching metadata and robustifying tcpdump header parsing
This commit is contained in:
@@ -105,6 +105,7 @@ func (c *Capturer) processStream(r io.Reader) {
|
||||
scanner := bufio.NewScanner(r)
|
||||
var buffer strings.Builder
|
||||
inSIPMessage := false
|
||||
var msgNetInfo *NetInfo
|
||||
|
||||
for scanner.Scan() {
|
||||
c.mu.Lock()
|
||||
@@ -119,22 +120,34 @@ func (c *Capturer) processStream(r io.Reader) {
|
||||
// Check for tcpdump header
|
||||
if netInfo := parseTcpdumpHeader(line); netInfo != nil {
|
||||
c.currentNetInfo = netInfo
|
||||
// If we were parsing a message, this header likely means the previous message ended (or it's just info)
|
||||
// But tcpdump prints header BEFORE payload.
|
||||
// Proceed to next line
|
||||
continue
|
||||
}
|
||||
|
||||
// Detect start of SIP message
|
||||
if idx := findSIPStart(line); idx != -1 {
|
||||
// Latch the current network info for this message
|
||||
if c.currentNetInfo != nil {
|
||||
info := *c.currentNetInfo
|
||||
msgNetInfo = &info
|
||||
}
|
||||
|
||||
// Clean the line (remove prefix garbage)
|
||||
line = line[idx:]
|
||||
|
||||
// If we were building a message, parse it
|
||||
// If we were building a message, parse it with its OWN net info
|
||||
if buffer.Len() > 0 {
|
||||
c.parseAndEmit(buffer.String())
|
||||
c.parseAndEmit(buffer.String(), msgNetInfo)
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
// NOW update msgNetInfo for the new message
|
||||
if c.currentNetInfo != nil {
|
||||
info := *c.currentNetInfo
|
||||
msgNetInfo = &info
|
||||
} else {
|
||||
msgNetInfo = nil
|
||||
}
|
||||
|
||||
inSIPMessage = true
|
||||
}
|
||||
|
||||
@@ -149,7 +162,7 @@ func (c *Capturer) processStream(r io.Reader) {
|
||||
|
||||
// Parse remaining buffer
|
||||
if buffer.Len() > 0 {
|
||||
c.parseAndEmit(buffer.String())
|
||||
c.parseAndEmit(buffer.String(), msgNetInfo)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,7 +175,7 @@ func (c *Capturer) processErrors(r io.Reader) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Capturer) parseAndEmit(raw string) {
|
||||
func (c *Capturer) parseAndEmit(raw string, netInfo *NetInfo) {
|
||||
packet, err := sip.Parse(raw)
|
||||
if err != nil {
|
||||
if c.OnError != nil {
|
||||
@@ -172,14 +185,12 @@ func (c *Capturer) parseAndEmit(raw string) {
|
||||
}
|
||||
if packet != nil {
|
||||
// Attach network info if available
|
||||
if c.currentNetInfo != nil {
|
||||
// Use timestamp from packet header if possible, or keep what parser found?
|
||||
// Parser doesn't find time.
|
||||
packet.Timestamp = c.currentNetInfo.Timestamp
|
||||
packet.SourceIP = c.currentNetInfo.SourceIP
|
||||
packet.SourcePort = c.currentNetInfo.SourcePort
|
||||
packet.DestIP = c.currentNetInfo.DestIP
|
||||
packet.DestPort = c.currentNetInfo.DestPort
|
||||
if netInfo != nil {
|
||||
packet.Timestamp = netInfo.Timestamp
|
||||
packet.SourceIP = netInfo.SourceIP
|
||||
packet.SourcePort = netInfo.SourcePort
|
||||
packet.DestIP = netInfo.DestIP
|
||||
packet.DestPort = netInfo.DestPort
|
||||
}
|
||||
|
||||
if c.OnPacket != nil {
|
||||
@@ -233,40 +244,73 @@ type NetInfo struct {
|
||||
}
|
||||
|
||||
func parseTcpdumpHeader(line string) *NetInfo {
|
||||
// Format: 15:35:10.430328 IP 192.168.0.164.51416 > 192.168.0.158.5060: ...
|
||||
// Robust parsing
|
||||
// Look for " IP " or " IP6 "
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) < 5 || parts[1] != "IP" {
|
||||
// Need at least: Time IP Src > Dst:
|
||||
if len(parts) < 5 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse Timestamp
|
||||
// Using generic date + log time
|
||||
// Find the direction arrow ">"
|
||||
arrowIdx := -1
|
||||
for i, p := range parts {
|
||||
if p == ">" {
|
||||
arrowIdx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if arrowIdx == -1 || arrowIdx < 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify "IP" or "IP6" exists before the source (usually parts[1] or parts[2])
|
||||
// Example: 15:35... IP src > dst: ...
|
||||
// Example: 15:35... IP6 src > dst: ...
|
||||
// Example (verbose): 15:35... IP (tos 0x0...) src > dst: ...
|
||||
// We'll trust the arrow for now, but double check IPs.
|
||||
|
||||
// Assume SRC is before arrow, DST is after
|
||||
srcStr := parts[arrowIdx-1]
|
||||
dstStr := parts[arrowIdx+1]
|
||||
|
||||
// Find timestamp (usually first field)
|
||||
now := time.Now()
|
||||
t, err := time.Parse("15:04:05.000000", parts[0])
|
||||
if err == nil {
|
||||
t = time.Date(now.Year(), now.Month(), now.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), now.Location())
|
||||
} else {
|
||||
// Try without microseconds
|
||||
t, err = time.Parse("15:04:05", parts[0])
|
||||
if err == nil {
|
||||
t = time.Date(now.Year(), now.Month(), now.Day(), t.Hour(), t.Minute(), t.Second(), 0, now.Location())
|
||||
} else {
|
||||
t = now
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to extract IP and Port
|
||||
parseIPPort := func(s string) (string, int) {
|
||||
lastDot := strings.LastIndex(s, ".")
|
||||
if lastDot == -1 {
|
||||
// IPv6 might use different separation or just be IP
|
||||
return s, 0
|
||||
}
|
||||
ip := s[:lastDot]
|
||||
portStr := s[lastDot+1:]
|
||||
// Remove trailing colon if present (dest)
|
||||
portStr = strings.TrimSuffix(portStr, ":")
|
||||
// Remove trailing comma if present
|
||||
portStr = strings.TrimSuffix(portStr, ",")
|
||||
|
||||
var port int
|
||||
fmt.Sscanf(portStr, "%d", &port)
|
||||
return ip, port
|
||||
}
|
||||
|
||||
srcIP, srcPort := parseIPPort(parts[2])
|
||||
dstIP, dstPort := parseIPPort(parts[4])
|
||||
srcIP, srcPort := parseIPPort(srcStr)
|
||||
dstIP, dstPort := parseIPPort(dstStr)
|
||||
|
||||
return &NetInfo{
|
||||
Timestamp: t,
|
||||
|
||||
@@ -127,6 +127,7 @@ func (c *LocalCapturer) processStream(r io.Reader) {
|
||||
scanner := bufio.NewScanner(r)
|
||||
var buffer strings.Builder
|
||||
inSIPMessage := false
|
||||
var msgNetInfo *NetInfo
|
||||
|
||||
for scanner.Scan() {
|
||||
c.mu.Lock()
|
||||
@@ -137,26 +138,60 @@ func (c *LocalCapturer) processStream(r io.Reader) {
|
||||
}
|
||||
|
||||
line := scanner.Text()
|
||||
// logger.Debug("Stdout: %s", line) // Commented out to reduce noise, enable if needed
|
||||
|
||||
// Check for tcpdump header
|
||||
if netInfo := parseTcpdumpHeader(line); netInfo != nil {
|
||||
c.currentNetInfo = netInfo
|
||||
// Proceed to next line
|
||||
continue
|
||||
}
|
||||
|
||||
// Detect start of SIP message
|
||||
if idx := findSIPStart(line); idx != -1 {
|
||||
logger.Debug("SIP Start detected: %s", line)
|
||||
// Latch the current network info for this message
|
||||
if c.currentNetInfo != nil {
|
||||
// Make a copy
|
||||
info := *c.currentNetInfo
|
||||
msgNetInfo = &info
|
||||
}
|
||||
|
||||
// Clean the line (remove prefix garbage)
|
||||
line = line[idx:]
|
||||
|
||||
// If we were building a message, parse it
|
||||
// If we were building a message, parse it with its OWN net info (which was latched previously)
|
||||
// Note: This edge case (buffer > 0 but new start) means previous message ended implicitly.
|
||||
// But wait, the msgNetInfo we just latched is for the NEW message.
|
||||
// The OLD message should have already been emitted or we are in a weird state.
|
||||
// Use the PREVIOUS msgNetInfo for the existing buffer if any.
|
||||
// Actually, single buffer logic is simple: emit what we have.
|
||||
if buffer.Len() > 0 {
|
||||
c.parseAndEmit(buffer.String())
|
||||
// We need to pass the net info that belongs to the buffered content.
|
||||
// But we just overwrote msgNetInfo.
|
||||
// Realistically, we should emit before latching new info.
|
||||
// But tcpdump header comes BEFORE the message.
|
||||
// So c.currentNetInfo is already the NEW info.
|
||||
// And the buffer contains the OLD message.
|
||||
// So when we started the OLD message, we latched OLD info.
|
||||
// We should persist that OLD info until emit.
|
||||
// This implies we need `pendingNetInfo` vs `currentNetInfo`.
|
||||
|
||||
// Simplified approach: msgNetInfo stores the info for the message currently being built in buffer.
|
||||
// When we start a NEW message, the buffer contains the PREVIOUS message.
|
||||
// So we emit the buffer with the OLD msgNetInfo.
|
||||
// THEN we start the new message and update msgNetInfo to the NEW c.currentNetInfo.
|
||||
|
||||
c.parseAndEmit(buffer.String(), msgNetInfo)
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
// NOW update msgNetInfo for the new message
|
||||
if c.currentNetInfo != nil {
|
||||
info := *c.currentNetInfo
|
||||
msgNetInfo = &info
|
||||
} else {
|
||||
msgNetInfo = nil
|
||||
}
|
||||
|
||||
inSIPMessage = true
|
||||
}
|
||||
|
||||
@@ -168,7 +203,7 @@ func (c *LocalCapturer) processStream(r io.Reader) {
|
||||
|
||||
// Parse remaining buffer
|
||||
if buffer.Len() > 0 {
|
||||
c.parseAndEmit(buffer.String())
|
||||
c.parseAndEmit(buffer.String(), msgNetInfo)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,10 +223,11 @@ func (c *LocalCapturer) processErrors(r io.Reader) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LocalCapturer) parseAndEmit(raw string) {
|
||||
func (c *LocalCapturer) parseAndEmit(raw string, netInfo *NetInfo) {
|
||||
packet, err := sip.Parse(raw)
|
||||
if err != nil {
|
||||
logger.Error("Error parsing SIP packet: %v", err)
|
||||
// Suppress verbose error logging for partial packets unless debug
|
||||
// logger.Error("Error parsing SIP packet: %v", err)
|
||||
if c.OnError != nil {
|
||||
c.OnError(err)
|
||||
}
|
||||
@@ -199,12 +235,12 @@ func (c *LocalCapturer) parseAndEmit(raw string) {
|
||||
}
|
||||
if packet != nil {
|
||||
// Attach network info if available
|
||||
if c.currentNetInfo != nil {
|
||||
packet.Timestamp = c.currentNetInfo.Timestamp
|
||||
packet.SourceIP = c.currentNetInfo.SourceIP
|
||||
packet.SourcePort = c.currentNetInfo.SourcePort
|
||||
packet.DestIP = c.currentNetInfo.DestIP
|
||||
packet.DestPort = c.currentNetInfo.DestPort
|
||||
if netInfo != nil {
|
||||
packet.Timestamp = netInfo.Timestamp
|
||||
packet.SourceIP = netInfo.SourceIP
|
||||
packet.SourcePort = netInfo.SourcePort
|
||||
packet.DestIP = netInfo.DestIP
|
||||
packet.DestPort = netInfo.DestPort
|
||||
}
|
||||
logger.Debug("Packet parsed: %s %s -> %s", packet.Method, packet.SourceIP, packet.DestIP)
|
||||
if c.OnPacket != nil {
|
||||
|
||||
Reference in New Issue
Block a user