kvm/internal/audio/ipc_input.go
Alex P 1d1658db15 refactor(audio): replace GetConfig() calls with direct Config access
This change replaces all instances of GetConfig() function calls with direct access to the Config variable throughout the audio package. The modification improves performance by eliminating function call overhead and simplifies the codebase by removing unnecessary indirection.

The commit also includes minor optimizations in validation logic and connection handling, while maintaining all existing functionality. Error handling remains robust with appropriate fallbacks when config values are not available.

Additional improvements include:
- Enhanced connection health monitoring in UnifiedAudioClient
- Optimized validation functions using cached config values
- Reduced memory allocations in hot paths
- Improved error recovery during quality changes
2025-09-08 17:30:49 +00:00

1383 lines
42 KiB
Go

package audio
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Component name constants for logging
const (
AudioInputServerComponent = "audio-input-server"
AudioInputClientComponent = "audio-input-client"
)
// Constants are now defined in unified_ipc.go
var (
maxFrameSize = Config.MaxFrameSize // Maximum Opus frame size
messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size
)
// Legacy aliases for backward compatibility
type InputMessageType = UnifiedMessageType
type InputIPCMessage = UnifiedIPCMessage
// Legacy constants for backward compatibility
const (
InputMessageTypeOpusFrame = MessageTypeOpusFrame
InputMessageTypeConfig = MessageTypeConfig
InputMessageTypeOpusConfig = MessageTypeOpusConfig
InputMessageTypeStop = MessageTypeStop
InputMessageTypeHeartbeat = MessageTypeHeartbeat
InputMessageTypeAck = MessageTypeAck
)
// Methods are now inherited from UnifiedIPCMessage
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct {
header [17]byte // Pre-allocated header buffer (headerSize = 17)
data []byte // Reusable data buffer
msg InputIPCMessage // Embedded message
}
// MessagePool manages a pool of reusable messages to reduce allocations
type MessagePool struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
hitCount int64 // Pool hit counter (atomic)
missCount int64 // Pool miss counter (atomic)
// Other fields
pool chan *OptimizedIPCMessage
// Memory optimization fields
preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use
preallocSize int // Number of pre-allocated messages
maxPoolSize int // Maximum pool size to prevent memory bloat
mutex sync.RWMutex // Protects preallocated slice
}
// Global message pool instance
var globalMessagePool = &MessagePool{
pool: make(chan *OptimizedIPCMessage, messagePoolSize),
}
var messagePoolInitOnce sync.Once
// initializeMessagePool initializes the global message pool with pre-allocated messages
func initializeMessagePool() {
messagePoolInitOnce.Do(func() {
preallocSize := messagePoolSize / 4 // 25% pre-allocated for immediate use
globalMessagePool.preallocSize = preallocSize
globalMessagePool.maxPoolSize = messagePoolSize * Config.PoolGrowthMultiplier // Allow growth up to 2x
globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize)
// Pre-allocate messages for immediate use
for i := 0; i < preallocSize; i++ {
msg := &OptimizedIPCMessage{
data: make([]byte, 0, maxFrameSize),
}
globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg)
}
// Fill the channel with remaining messages
for i := preallocSize; i < messagePoolSize; i++ {
globalMessagePool.pool <- &OptimizedIPCMessage{
data: make([]byte, 0, maxFrameSize),
}
}
})
}
// Get retrieves a message from the pool
func (mp *MessagePool) Get() *OptimizedIPCMessage {
initializeMessagePool()
// First try pre-allocated messages for fastest access
mp.mutex.Lock()
if len(mp.preallocated) > 0 {
msg := mp.preallocated[len(mp.preallocated)-1]
mp.preallocated = mp.preallocated[:len(mp.preallocated)-1]
mp.mutex.Unlock()
atomic.AddInt64(&mp.hitCount, 1)
// Reset message for reuse
msg.data = msg.data[:0]
msg.msg = InputIPCMessage{}
return msg
}
mp.mutex.Unlock()
// Try channel pool next
select {
case msg := <-mp.pool:
atomic.AddInt64(&mp.hitCount, 1)
// Reset message for reuse and ensure proper capacity
msg.data = msg.data[:0]
msg.msg = InputIPCMessage{}
// Ensure data buffer has sufficient capacity
if cap(msg.data) < maxFrameSize {
msg.data = make([]byte, 0, maxFrameSize)
}
return msg
default:
// Pool exhausted, create new message with exact capacity
atomic.AddInt64(&mp.missCount, 1)
return &OptimizedIPCMessage{
data: make([]byte, 0, maxFrameSize),
}
}
}
// Put returns a message to the pool
func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
if msg == nil {
return
}
// Validate buffer capacity - reject if too small or too large
if cap(msg.data) < maxFrameSize/2 || cap(msg.data) > maxFrameSize*2 {
return // Let GC handle oversized or undersized buffers
}
// Reset the message for reuse
msg.data = msg.data[:0]
msg.msg = InputIPCMessage{}
// First try to return to pre-allocated pool for fastest reuse
mp.mutex.Lock()
if len(mp.preallocated) < mp.preallocSize {
mp.preallocated = append(mp.preallocated, msg)
mp.mutex.Unlock()
return
}
mp.mutex.Unlock()
// Try channel pool next
select {
case mp.pool <- msg:
// Successfully returned to pool
default:
// Pool full, let GC handle it
}
}
// Legacy aliases for backward compatibility
type InputIPCConfig = UnifiedIPCConfig
type InputIPCOpusConfig = UnifiedIPCOpusConfig
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
bufferSize int64 // Current buffer size (atomic)
processingTime int64 // Average processing time in nanoseconds (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
totalFrames int64 // Total frames counter (atomic)
listener net.Listener
conn net.Conn
mtx sync.Mutex
running bool
// Triple-goroutine architecture
messageChan chan *InputIPCMessage // Buffered channel for incoming messages
processChan chan *InputIPCMessage // Buffered channel for processing queue
stopChan chan struct{} // Stop signal for all goroutines
wg sync.WaitGroup // Wait group for goroutine coordination
// Channel resizing support
channelMutex sync.RWMutex // Protects channel recreation
lastBufferSize int64 // Last known buffer size for change detection
// Socket buffer configuration
socketBufferConfig SocketBufferConfig
}
// NewAudioInputServer creates a new audio input server
func NewAudioInputServer() (*AudioInputServer, error) {
socketPath := getInputSocketPath()
// Retry socket creation with cleanup to handle race conditions
var listener net.Listener
var err error
for i := 0; i < 3; i++ {
// Remove existing socket if any
os.Remove(socketPath)
// Small delay to ensure cleanup completes
if i > 0 {
time.Sleep(10 * time.Millisecond)
}
listener, err = net.Listen("unix", socketPath)
if err == nil {
break
}
// Log retry attempt
if i < 2 {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying")
}
}
if err != nil {
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
}
// Get initial buffer size from adaptive buffer manager
adaptiveManager := GetAdaptiveBufferManager()
initialBufferSize := int64(adaptiveManager.GetInputBufferSize())
// Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic
minBufferSize := int64(50)
if initialBufferSize < minBufferSize {
initialBufferSize = minBufferSize
}
// Initialize socket buffer configuration
socketBufferConfig := DefaultSocketBufferConfig()
return &AudioInputServer{
listener: listener,
messageChan: make(chan *InputIPCMessage, initialBufferSize),
processChan: make(chan *InputIPCMessage, initialBufferSize),
stopChan: make(chan struct{}),
bufferSize: initialBufferSize,
lastBufferSize: initialBufferSize,
socketBufferConfig: socketBufferConfig,
}, nil
}
// Start starts the audio input server
func (ais *AudioInputServer) Start() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.running {
return fmt.Errorf("server already running")
}
ais.running = true
// Reset counters on start
atomic.StoreInt64(&ais.totalFrames, 0)
atomic.StoreInt64(&ais.droppedFrames, 0)
atomic.StoreInt64(&ais.processingTime, 0)
// Start triple-goroutine architecture
ais.startReaderGoroutine()
ais.startProcessorGoroutine()
ais.startMonitorGoroutine()
// Submit the connection acceptor to the audio reader pool
if !SubmitAudioReaderTask(ais.acceptConnections) {
// If the pool is full or shutting down, fall back to direct goroutine creation
go ais.acceptConnections()
}
return nil
}
// Stop stops the audio input server
func (ais *AudioInputServer) Stop() {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if !ais.running {
return
}
ais.running = false
// Signal all goroutines to stop
close(ais.stopChan)
ais.wg.Wait()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
if ais.listener != nil {
ais.listener.Close()
ais.listener = nil
}
// Remove socket file to prevent restart issues
os.Remove(getInputSocketPath())
}
// Close closes the server and cleans up resources
func (ais *AudioInputServer) Close() {
ais.Stop()
// Remove socket file
os.Remove(getInputSocketPath())
}
// acceptConnections accepts incoming connections
func (ais *AudioInputServer) acceptConnections() {
for ais.running {
conn, err := ais.listener.Accept()
if err != nil {
if ais.running {
// Log error and continue accepting
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Msg("failed to accept connection, retrying")
continue
}
return
}
// Configure socket buffers for optimal performance
if err := ConfigureSocketBuffers(conn, ais.socketBufferConfig); err != nil {
// Log warning but don't fail - socket buffer optimization is not critical
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Msg("failed to configure socket buffers, using defaults")
} else {
// Record socket buffer metrics for monitoring
RecordSocketBufferMetrics(conn, "audio-input")
}
ais.mtx.Lock()
// Close existing connection if any to prevent resource leaks
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
ais.conn = conn
ais.mtx.Unlock()
// Handle this connection using the goroutine pool
if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) {
// If the pool is full or shutting down, fall back to direct goroutine creation
go ais.handleConnection(conn)
}
}
}
// handleConnection handles a single client connection
func (ais *AudioInputServer) handleConnection(conn net.Conn) {
defer conn.Close()
// Connection is now handled by the reader goroutine
// Just wait for connection to close or stop signal
for {
select {
case <-ais.stopChan:
return
default:
// Check if connection is still alive
if ais.conn == nil {
return
}
time.Sleep(Config.DefaultSleepDuration)
}
}
}
// readMessage reads a message from the connection using optimized pooled buffers with validation.
//
// Validation Rules:
// - Magic number must match InputMagicNumber ("JKMI" - JetKVM Microphone Input)
// - Message length must not exceed MaxFrameSize (default: 4096 bytes)
// - Header size is fixed at 17 bytes (4+1+4+8: Magic+Type+Length+Timestamp)
// - Data length validation prevents buffer overflow attacks
//
// Message Format:
// - Magic (4 bytes): Identifies valid JetKVM audio messages
// - Type (1 byte): InputMessageType (OpusFrame, Config, Stop, Heartbeat, Ack)
// - Length (4 bytes): Data payload size in bytes
// - Timestamp (8 bytes): Message timestamp for latency tracking
// - Data (variable): Message payload up to MaxFrameSize
//
// Error Conditions:
// - Invalid magic number: Rejects non-JetKVM messages
// - Message too large: Prevents memory exhaustion
// - Connection errors: Network/socket failures
// - Incomplete reads: Partial message reception
//
// The function uses pooled buffers for efficient memory management and
// ensures all messages conform to the JetKVM audio protocol specification.
func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) {
// Get optimized message from pool
optMsg := globalMessagePool.Get()
defer globalMessagePool.Put(optMsg)
// Read header directly into pre-allocated buffer
_, err := io.ReadFull(conn, optMsg.header[:])
if err != nil {
return nil, err
}
// Parse header using optimized access
msg := &optMsg.msg
msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4])
msg.Type = InputMessageType(optMsg.header[4])
msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9])
msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
// Validate magic number
if msg.Magic != inputMagicNumber {
return nil, fmt.Errorf("invalid magic number: got 0x%x, expected 0x%x", msg.Magic, inputMagicNumber)
}
// Validate message length
if msg.Length > uint32(maxFrameSize) {
return nil, fmt.Errorf("message too large: got %d bytes, maximum allowed %d bytes", msg.Length, maxFrameSize)
}
// Read data if present using pooled buffer
if msg.Length > 0 {
// Ensure buffer capacity
if cap(optMsg.data) < int(msg.Length) {
optMsg.data = make([]byte, msg.Length)
} else {
optMsg.data = optMsg.data[:msg.Length]
}
_, err = io.ReadFull(conn, optMsg.data)
if err != nil {
return nil, err
}
msg.Data = optMsg.data
}
// Return a copy of the message (data will be copied by caller if needed)
result := &InputIPCMessage{
Magic: msg.Magic,
Type: msg.Type,
Length: msg.Length,
Timestamp: msg.Timestamp,
}
if msg.Length > 0 {
// Copy data to ensure it's not affected by buffer reuse
result.Data = make([]byte, msg.Length)
copy(result.Data, msg.Data)
}
return result, nil
}
// processMessage processes a received message
func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
switch msg.Type {
case InputMessageTypeOpusFrame:
return ais.processOpusFrame(msg.Data)
case InputMessageTypeConfig:
return ais.processConfig(msg.Data)
case InputMessageTypeOpusConfig:
return ais.processOpusConfig(msg.Data)
case InputMessageTypeStop:
return fmt.Errorf("stop message received")
case InputMessageTypeHeartbeat:
return ais.sendAck()
default:
return fmt.Errorf("unknown message type: %d", msg.Type)
}
}
// processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error {
// Fast path: skip empty frame check - caller should handle this
dataLen := len(data)
if dataLen == 0 {
return nil
}
// Inline validation for critical audio path - avoid function call overhead
if dataLen > cachedMaxFrameSize {
return ErrFrameDataTooLarge
}
// Get cached config once - avoid repeated calls and locking
cache := Config
// Skip cache expiry check in hotpath - background updates handle this
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize)
defer ReturnBufferToPool(pcmBuffer)
// Direct CGO call - avoid wrapper function overhead
_, err := CGOAudioDecodeWrite(data, pcmBuffer)
return err
}
// processConfig processes a configuration update
func (ais *AudioInputServer) processConfig(data []byte) error {
// Validate configuration data
if len(data) == 0 {
return fmt.Errorf("empty configuration data")
}
// Basic validation for configuration size
if err := ValidateBufferSize(len(data)); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Error().Err(err).Msg("Configuration buffer validation failed")
return fmt.Errorf("configuration validation failed: %w", err)
}
// Acknowledge configuration receipt
return ais.sendAck()
}
// processOpusConfig processes a complete Opus encoder configuration update
func (ais *AudioInputServer) processOpusConfig(data []byte) error {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
// Validate configuration data size (9 * int32 = 36 bytes)
if len(data) != 36 {
return fmt.Errorf("invalid Opus configuration data size: expected 36 bytes, got %d", len(data))
}
// Deserialize Opus configuration
config := InputIPCOpusConfig{
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
Bitrate: int(binary.LittleEndian.Uint32(data[12:16])),
Complexity: int(binary.LittleEndian.Uint32(data[16:20])),
VBR: int(binary.LittleEndian.Uint32(data[20:24])),
SignalType: int(binary.LittleEndian.Uint32(data[24:28])),
Bandwidth: int(binary.LittleEndian.Uint32(data[28:32])),
DTX: int(binary.LittleEndian.Uint32(data[32:36])),
}
logger.Info().Interface("config", config).Msg("applying dynamic Opus encoder configuration")
// Apply the Opus encoder configuration dynamically
err := CGOUpdateOpusEncoderParams(
config.Bitrate,
config.Complexity,
config.VBR,
0, // VBR constraint - using default
config.SignalType,
config.Bandwidth,
config.DTX,
)
if err != nil {
logger.Error().Err(err).Msg("failed to apply Opus encoder configuration")
return fmt.Errorf("failed to apply Opus configuration: %w", err)
}
logger.Info().Msg("Opus encoder configuration applied successfully")
return ais.sendAck()
}
// sendAck sends an acknowledgment message
func (ais *AudioInputServer) sendAck() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.conn == nil {
return fmt.Errorf("no connection")
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeAck,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
return ais.writeMessage(ais.conn, msg)
}
// Global shared message pool for input IPC server
var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize)
// writeMessage writes a message to the connection using shared common utilities
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error {
// Use shared WriteIPCMessage function with global message pool
return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames)
}
// AudioInputClient handles IPC communication from the main process
type AudioInputClient struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames
conn net.Conn
mtx sync.Mutex
running bool
}
// NewAudioInputClient creates a new audio input client
func NewAudioInputClient() *AudioInputClient {
return &AudioInputClient{}
}
// Connect connects to the audio input server
func (aic *AudioInputClient) Connect() error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if aic.running {
return nil // Already connected
}
// Ensure clean state before connecting
if aic.conn != nil {
aic.conn.Close()
aic.conn = nil
}
socketPath := getInputSocketPath()
// Try connecting multiple times as the server might not be ready
// Reduced retry count and delay for faster startup
for i := 0; i < 10; i++ {
conn, err := net.Dial("unix", socketPath)
if err == nil {
aic.conn = conn
aic.running = true
// Reset frame counters on successful connection
atomic.StoreInt64(&aic.totalFrames, 0)
atomic.StoreInt64(&aic.droppedFrames, 0)
return nil
}
// Exponential backoff starting from config
backoffStart := Config.BackoffStart
delay := time.Duration(backoffStart.Nanoseconds()*(1<<uint(i/3))) * time.Nanosecond
maxDelay := Config.MaxRetryDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
}
// Ensure clean state on connection failure
aic.conn = nil
aic.running = false
return fmt.Errorf("failed to connect to audio input server after 10 attempts")
}
// Disconnect disconnects from the audio input server
func (aic *AudioInputClient) Disconnect() {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running {
return
}
aic.running = false
if aic.conn != nil {
// Send stop message
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeStop,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
_ = aic.writeMessage(msg) // Ignore errors during shutdown
aic.conn.Close()
aic.conn = nil
}
}
// SendFrame sends an Opus frame to the audio input server
func (aic *AudioInputClient) SendFrame(frame []byte) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
frameLen := len(frame)
if frameLen == 0 {
return nil // Empty frame, ignore
}
// Inline frame validation to reduce function call overhead
if frameLen > maxFrameSize {
return ErrFrameDataTooLarge
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Length: uint32(frameLen),
Timestamp: time.Now().UnixNano(),
Data: frame,
}
return aic.writeMessage(msg)
}
// SendFrameZeroCopy sends a zero-copy Opus frame to the audio input server
func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
if frame == nil {
return nil // Nil frame, ignore
}
frameLen := frame.Length()
if frameLen == 0 {
return nil // Empty frame, ignore
}
// Inline frame validation to reduce function call overhead
if frameLen > maxFrameSize {
return ErrFrameDataTooLarge
}
// Use zero-copy data directly
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Length: uint32(frameLen),
Timestamp: time.Now().UnixNano(),
Data: frame.Data(), // Zero-copy data access
}
return aic.writeMessage(msg)
}
// SendConfig sends a configuration update to the audio input server
func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
// Validate configuration parameters
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Configuration validation failed")
return fmt.Errorf("input configuration validation failed: %w", err)
}
// Serialize config (simple binary format)
data := make([]byte, 12) // 3 * int32
binary.LittleEndian.PutUint32(data[0:4], uint32(config.SampleRate))
binary.LittleEndian.PutUint32(data[4:8], uint32(config.Channels))
binary.LittleEndian.PutUint32(data[8:12], uint32(config.FrameSize))
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
}
return aic.writeMessage(msg)
}
// SendOpusConfig sends a complete Opus encoder configuration update to the audio input server
func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
// Validate configuration parameters
if config.SampleRate <= 0 || config.Channels <= 0 || config.FrameSize <= 0 || config.Bitrate <= 0 {
return fmt.Errorf("invalid Opus configuration: SampleRate=%d, Channels=%d, FrameSize=%d, Bitrate=%d",
config.SampleRate, config.Channels, config.FrameSize, config.Bitrate)
}
// Serialize Opus configuration (9 * int32 = 36 bytes)
data := make([]byte, 36)
binary.LittleEndian.PutUint32(data[0:4], uint32(config.SampleRate))
binary.LittleEndian.PutUint32(data[4:8], uint32(config.Channels))
binary.LittleEndian.PutUint32(data[8:12], uint32(config.FrameSize))
binary.LittleEndian.PutUint32(data[12:16], uint32(config.Bitrate))
binary.LittleEndian.PutUint32(data[16:20], uint32(config.Complexity))
binary.LittleEndian.PutUint32(data[20:24], uint32(config.VBR))
binary.LittleEndian.PutUint32(data[24:28], uint32(config.SignalType))
binary.LittleEndian.PutUint32(data[28:32], uint32(config.Bandwidth))
binary.LittleEndian.PutUint32(data[32:36], uint32(config.DTX))
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
}
return aic.writeMessage(msg)
}
// SendHeartbeat sends a heartbeat message
func (aic *AudioInputClient) SendHeartbeat() error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeHeartbeat,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
return aic.writeMessage(msg)
}
// writeMessage writes a message to the server
// Global shared message pool for input IPC clients
var globalInputMessagePool = NewGenericMessagePool(messagePoolSize)
func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error {
// Increment total frames counter
atomic.AddInt64(&aic.totalFrames, 1)
// Use shared WriteIPCMessage function with global message pool
return WriteIPCMessage(aic.conn, msg, globalInputMessagePool, &aic.droppedFrames)
}
// IsConnected returns whether the client is connected
func (aic *AudioInputClient) IsConnected() bool {
aic.mtx.Lock()
defer aic.mtx.Unlock()
return aic.running && aic.conn != nil
}
// GetFrameStats returns frame statistics
func (aic *AudioInputClient) GetFrameStats() (total, dropped int64) {
stats := GetFrameStats(&aic.totalFrames, &aic.droppedFrames)
return stats.Total, stats.Dropped
}
// GetDropRate returns the current frame drop rate as a percentage
func (aic *AudioInputClient) GetDropRate() float64 {
stats := GetFrameStats(&aic.totalFrames, &aic.droppedFrames)
return CalculateDropRate(stats)
}
// ResetStats resets frame statistics
func (aic *AudioInputClient) ResetStats() {
ResetFrameStats(&aic.totalFrames, &aic.droppedFrames)
}
// ResetServerStats resets server frame statistics
func (ais *AudioInputServer) ResetServerStats() {
atomic.StoreInt64(&ais.totalFrames, 0)
atomic.StoreInt64(&ais.droppedFrames, 0)
}
// RecoverFromDroppedFrames attempts to recover when too many frames are dropped
func (ais *AudioInputServer) RecoverFromDroppedFrames() {
total := atomic.LoadInt64(&ais.totalFrames)
dropped := atomic.LoadInt64(&ais.droppedFrames)
// If more than 50% of frames are dropped, attempt recovery
if total > 100 && dropped > total/2 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Warn().Int64("total", total).Int64("dropped", dropped).Msg("high drop rate detected, attempting recovery")
// Reset stats and update buffer size from adaptive manager
ais.ResetServerStats()
ais.UpdateBufferSize()
}
}
// startReaderGoroutine starts the message reader using the goroutine pool
func (ais *AudioInputServer) startReaderGoroutine() {
ais.wg.Add(1)
// Create a reader task that will run in the goroutine pool
readerTask := func() {
defer ais.wg.Done()
// Enhanced error tracking and recovery
var consecutiveErrors int
var lastErrorTime time.Time
maxConsecutiveErrors := Config.MaxConsecutiveErrors
errorResetWindow := Config.RestartWindow // Use existing restart window
baseBackoffDelay := Config.RetryDelay
maxBackoffDelay := Config.MaxRetryDelay
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
for ais.running {
ais.mtx.Lock()
conn := ais.conn
ais.mtx.Unlock()
if conn == nil {
time.Sleep(10 * time.Millisecond)
continue
}
msg, err := ais.readMessage(conn)
if err != nil {
if ais.running {
// Enhanced error handling with progressive backoff
now := time.Now()
// Reset error counter if enough time has passed
if now.Sub(lastErrorTime) > errorResetWindow {
consecutiveErrors = 0
}
consecutiveErrors++
lastErrorTime = now
// Skip logging in hotpath for performance - only log critical errors
// Progressive backoff based on error count
if consecutiveErrors > 1 {
backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
time.Sleep(backoffDelay)
}
// If too many consecutive errors, close connection to force reconnect
if consecutiveErrors >= maxConsecutiveErrors {
// Only log critical errors to reduce hotpath overhead
if logger.GetLevel() <= zerolog.ErrorLevel {
logger.Error().
Int("consecutive_errors", consecutiveErrors).
Msg("Too many consecutive read errors, closing connection")
}
ais.mtx.Lock()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
ais.mtx.Unlock()
consecutiveErrors = 0 // Reset for next connection
}
}
continue
}
// Reset error counter on successful read
if consecutiveErrors > 0 {
consecutiveErrors = 0
// Only log recovery info if debug level enabled to reduce overhead
if logger.GetLevel() <= zerolog.InfoLevel {
logger.Info().Msg("Input connection recovered")
}
}
// Send to message channel with non-blocking write (use read lock for channel access)
ais.channelMutex.RLock()
messageChan := ais.messageChan
ais.channelMutex.RUnlock()
select {
case messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1)
default:
// Channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
// Avoid sampling logic in critical path - only log if warn level enabled
if logger.GetLevel() <= zerolog.WarnLevel {
droppedCount := atomic.LoadInt64(&ais.droppedFrames)
logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame")
}
}
}
}
// Submit the reader task to the audio reader pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioReaderTaskWithBackpressure(readerTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio reader task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
}
// startProcessorGoroutine starts the message processor using the goroutine pool
func (ais *AudioInputServer) startProcessorGoroutine() {
ais.wg.Add(1)
// Create a processor task that will run in the goroutine pool
processorTask := func() {
// Only lock OS thread and set priority for high-load scenarios
// This reduces interference with input processing threads
config := Config
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// Priority scheduler not implemented - using default thread priority
}
// Create logger for this goroutine
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
// Enhanced error tracking for processing
var processingErrors int
var lastProcessingError time.Time
maxProcessingErrors := config.MaxConsecutiveErrors
errorResetWindow := config.RestartWindow
defer ais.wg.Done()
for {
select {
case <-ais.stopChan:
return
case msg := <-ais.getMessageChan():
// Process message with error handling
start := time.Now()
err := ais.processMessageWithRecovery(msg, logger)
processingTime := time.Since(start)
if err != nil {
// Track processing errors
now := time.Now()
if now.Sub(lastProcessingError) > errorResetWindow {
processingErrors = 0
}
processingErrors++
lastProcessingError = now
// Skip logging in hotpath for performance
// If too many processing errors, drop frames more aggressively
if processingErrors >= maxProcessingErrors {
// Clear processing queue to recover
processChan := ais.getProcessChan()
for len(processChan) > 0 {
select {
case <-processChan:
atomic.AddInt64(&ais.droppedFrames, 1)
default:
break
}
}
processingErrors = 0 // Reset after clearing queue
}
continue
}
// Reset error counter on successful processing
if processingErrors > 0 {
processingErrors = 0
// Skip logging in hotpath for performance
}
// Update processing time metrics
atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds())
}
}
}
// Submit the processor task to the audio processor pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTaskWithBackpressure(processorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio processor task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
}
// processMessageWithRecovery processes a message with enhanced error recovery
func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, logger zerolog.Logger) error {
// Intelligent frame dropping: prioritize recent frames
if msg.Type == InputMessageTypeOpusFrame {
// Check if processing queue is getting full
processChan := ais.getProcessChan()
queueLen := len(processChan)
bufferSize := int(atomic.LoadInt64(&ais.bufferSize))
if queueLen > bufferSize*3/4 {
// Drop oldest frames, keep newest
select {
case <-processChan: // Remove oldest
atomic.AddInt64(&ais.droppedFrames, 1)
logger.Debug().Msg("Dropped oldest frame to make room")
default:
}
}
}
// Send to processing queue with timeout (use read lock for channel access)
ais.channelMutex.RLock()
processChan := ais.processChan
ais.channelMutex.RUnlock()
select {
case processChan <- msg:
return nil
case <-time.After(Config.WriteTimeout):
// Processing queue full and timeout reached, drop frame
atomic.AddInt64(&ais.droppedFrames, 1)
return fmt.Errorf("processing queue timeout")
default:
// Processing queue full, drop frame immediately
atomic.AddInt64(&ais.droppedFrames, 1)
return fmt.Errorf("processing queue full")
}
}
// startMonitorGoroutine starts the performance monitoring using the goroutine pool
func (ais *AudioInputServer) startMonitorGoroutine() {
ais.wg.Add(1)
// Create a monitor task that will run in the goroutine pool
monitorTask := func() {
// Monitor goroutine doesn't need thread locking for most scenarios
// Only use thread optimizations for high-throughput scenarios
config := Config
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// Priority scheduler not implemented - using default thread priority
}
defer ais.wg.Done()
ticker := time.NewTicker(Config.DefaultTickerInterval)
defer ticker.Stop()
// Buffer size update ticker (less frequent)
bufferUpdateTicker := time.NewTicker(Config.BufferUpdateInterval)
defer bufferUpdateTicker.Stop()
for {
select {
case <-ais.stopChan:
return
case <-ticker.C:
// Process frames from processing queue
for {
select {
case msg := <-ais.getProcessChan():
start := time.Now()
err := ais.processMessage(msg)
processingTime := time.Since(start)
// Calculate end-to-end latency using message timestamp
var latency time.Duration
if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 {
msgTime := time.Unix(0, msg.Timestamp)
latency = time.Since(msgTime)
// Use exponential moving average for end-to-end latency tracking
currentAvg := atomic.LoadInt64(&ais.processingTime)
// Weight: 90% historical, 10% current (for smoother averaging)
newAvg := (currentAvg*9 + latency.Nanoseconds()) / 10
atomic.StoreInt64(&ais.processingTime, newAvg)
} else {
// Fallback to processing time only
latency = processingTime
currentAvg := atomic.LoadInt64(&ais.processingTime)
newAvg := (currentAvg + processingTime.Nanoseconds()) / 2
atomic.StoreInt64(&ais.processingTime, newAvg)
}
// Report latency to adaptive buffer manager
ais.ReportLatency(latency)
if err != nil {
atomic.AddInt64(&ais.droppedFrames, 1)
}
default:
// No more messages to process
goto checkBufferUpdate
}
}
checkBufferUpdate:
// Check if we need to update buffer size
select {
case <-bufferUpdateTicker.C:
// Update buffer size from adaptive buffer manager
ais.UpdateBufferSize()
default:
// No buffer update needed
}
}
}
}
// Submit the monitor task to the audio processor pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTaskWithBackpressure(monitorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio monitor task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
}
// GetServerStats returns server performance statistics
func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessingTime time.Duration, bufferSize int64) {
return atomic.LoadInt64(&ais.totalFrames),
atomic.LoadInt64(&ais.droppedFrames),
time.Duration(atomic.LoadInt64(&ais.processingTime)),
atomic.LoadInt64(&ais.bufferSize)
}
// UpdateBufferSize updates the buffer size from adaptive buffer manager
func (ais *AudioInputServer) UpdateBufferSize() {
adaptiveManager := GetAdaptiveBufferManager()
newSize := int64(adaptiveManager.GetInputBufferSize())
oldSize := atomic.LoadInt64(&ais.bufferSize)
// Only recreate channels if size changed significantly (>25% difference)
if oldSize > 0 {
diff := float64(newSize-oldSize) / float64(oldSize)
if diff < 0.25 && diff > -0.25 {
return // Size change not significant enough
}
}
atomic.StoreInt64(&ais.bufferSize, newSize)
// Recreate channels with new buffer size if server is running
if ais.running {
ais.recreateChannels(int(newSize))
}
}
// recreateChannels recreates the message channels with new buffer size
func (ais *AudioInputServer) recreateChannels(newSize int) {
ais.channelMutex.Lock()
defer ais.channelMutex.Unlock()
// Create new channels with updated buffer size
newMessageChan := make(chan *InputIPCMessage, newSize)
newProcessChan := make(chan *InputIPCMessage, newSize)
// Drain old channels and transfer messages to new channels
ais.drainAndTransferChannel(ais.messageChan, newMessageChan)
ais.drainAndTransferChannel(ais.processChan, newProcessChan)
// Replace channels atomically
ais.messageChan = newMessageChan
ais.processChan = newProcessChan
ais.lastBufferSize = int64(newSize)
}
// drainAndTransferChannel drains the old channel and transfers messages to new channel
func (ais *AudioInputServer) drainAndTransferChannel(oldChan, newChan chan *InputIPCMessage) {
for {
select {
case msg := <-oldChan:
// Try to transfer to new channel, drop if full
select {
case newChan <- msg:
// Successfully transferred
default:
// New channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
}
default:
// Old channel empty
return
}
}
}
// ReportLatency reports processing latency to adaptive buffer manager
func (ais *AudioInputServer) ReportLatency(latency time.Duration) {
adaptiveManager := GetAdaptiveBufferManager()
adaptiveManager.UpdateLatency(latency)
}
// GetMessagePoolStats returns detailed statistics about the message pool
func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats {
mp.mutex.RLock()
preallocatedCount := len(mp.preallocated)
mp.mutex.RUnlock()
hitCount := atomic.LoadInt64(&mp.hitCount)
missCount := atomic.LoadInt64(&mp.missCount)
totalRequests := hitCount + missCount
var hitRate float64
if totalRequests > 0 {
hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier
}
// Calculate channel pool size
channelPoolSize := len(mp.pool)
return MessagePoolStats{
MaxPoolSize: mp.maxPoolSize,
ChannelPoolSize: channelPoolSize,
PreallocatedCount: int64(preallocatedCount),
PreallocatedMax: int64(mp.preallocSize),
HitCount: hitCount,
MissCount: missCount,
HitRate: hitRate,
}
}
// MessagePoolStats provides detailed message pool statistics
type MessagePoolStats struct {
MaxPoolSize int
ChannelPoolSize int
PreallocatedCount int64
PreallocatedMax int64
HitCount int64
MissCount int64
HitRate float64 // Percentage
}
// GetGlobalMessagePoolStats returns statistics for the global message pool
func GetGlobalMessagePoolStats() MessagePoolStats {
return globalMessagePool.GetMessagePoolStats()
}
// getMessageChan safely returns the current message channel
func (ais *AudioInputServer) getMessageChan() chan *InputIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.messageChan
}
// getProcessChan safely returns the current process channel
func (ais *AudioInputServer) getProcessChan() chan *InputIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.processChan
}
// Helper functions
// getInputSocketPath is now defined in unified_ipc.go