mirror of
https://github.com/jetkvm/kvm.git
synced 2025-09-16 08:38:14 +00:00
This change replaces all instances of GetConfig() function calls with direct access to the Config variable throughout the audio package. The modification improves performance by eliminating function call overhead and simplifies the codebase by removing unnecessary indirection. The commit also includes minor optimizations in validation logic and connection handling, while maintaining all existing functionality. Error handling remains robust with appropriate fallbacks when config values are not available. Additional improvements include: - Enhanced connection health monitoring in UnifiedAudioClient - Optimized validation functions using cached config values - Reduced memory allocations in hot paths - Improved error recovery during quality changes
416 lines
12 KiB
Go
416 lines
12 KiB
Go
//go:build cgo
|
|
|
|
package audio
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// BatchZeroCopyProcessor handles batch operations on zero-copy audio frames
|
|
// with optimized reference counting and memory management
|
|
type BatchZeroCopyProcessor struct {
|
|
// Configuration
|
|
maxBatchSize int
|
|
batchTimeout time.Duration
|
|
processingDelay time.Duration
|
|
adaptiveThreshold float64
|
|
|
|
// Processing queues
|
|
readEncodeQueue chan *batchZeroCopyRequest
|
|
decodeWriteQueue chan *batchZeroCopyRequest
|
|
|
|
// Worker management
|
|
workerPool chan struct{}
|
|
running int32
|
|
wg sync.WaitGroup
|
|
|
|
// Statistics
|
|
batchedFrames int64
|
|
singleFrames int64
|
|
batchSavings int64
|
|
processingTimeUs int64
|
|
adaptiveHits int64
|
|
adaptiveMisses int64
|
|
}
|
|
|
|
type batchZeroCopyRequest struct {
|
|
frames []*ZeroCopyAudioFrame
|
|
operation batchZeroCopyOperation
|
|
resultCh chan batchZeroCopyResult
|
|
timestamp time.Time
|
|
}
|
|
|
|
type batchZeroCopyOperation int
|
|
|
|
const (
|
|
batchOpReadEncode batchZeroCopyOperation = iota
|
|
batchOpDecodeWrite
|
|
batchOpMixed
|
|
)
|
|
|
|
type batchZeroCopyResult struct {
|
|
encodedData [][]byte // For read-encode operations
|
|
processedCount int // Number of successfully processed frames
|
|
err error
|
|
}
|
|
|
|
// Global batch zero-copy processor
|
|
var (
|
|
globalBatchZeroCopyProcessor *BatchZeroCopyProcessor
|
|
batchZeroCopyOnce sync.Once
|
|
)
|
|
|
|
// GetBatchZeroCopyProcessor returns the global batch zero-copy processor
|
|
func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
|
|
batchZeroCopyOnce.Do(func() {
|
|
globalBatchZeroCopyProcessor = NewBatchZeroCopyProcessor()
|
|
globalBatchZeroCopyProcessor.Start()
|
|
})
|
|
return globalBatchZeroCopyProcessor
|
|
}
|
|
|
|
// NewBatchZeroCopyProcessor creates a new batch zero-copy processor
|
|
func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
|
|
cache := Config
|
|
return &BatchZeroCopyProcessor{
|
|
maxBatchSize: cache.BatchProcessorFramesPerBatch,
|
|
batchTimeout: cache.BatchProcessorTimeout,
|
|
processingDelay: cache.BatchProcessingDelay,
|
|
adaptiveThreshold: cache.BatchProcessorAdaptiveThreshold,
|
|
readEncodeQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
|
|
decodeWriteQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
|
|
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
|
|
}
|
|
}
|
|
|
|
// Start starts the batch zero-copy processor workers
|
|
func (bzcp *BatchZeroCopyProcessor) Start() {
|
|
if !atomic.CompareAndSwapInt32(&bzcp.running, 0, 1) {
|
|
return // Already running
|
|
}
|
|
|
|
// Start worker goroutines for read-encode operations
|
|
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
|
|
bzcp.wg.Add(1)
|
|
go bzcp.readEncodeWorker()
|
|
}
|
|
|
|
// Start worker goroutines for decode-write operations
|
|
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
|
|
bzcp.wg.Add(1)
|
|
go bzcp.decodeWriteWorker()
|
|
}
|
|
}
|
|
|
|
// Stop stops the batch zero-copy processor
|
|
func (bzcp *BatchZeroCopyProcessor) Stop() {
|
|
if !atomic.CompareAndSwapInt32(&bzcp.running, 1, 0) {
|
|
return // Already stopped
|
|
}
|
|
|
|
close(bzcp.readEncodeQueue)
|
|
close(bzcp.decodeWriteQueue)
|
|
bzcp.wg.Wait()
|
|
}
|
|
|
|
// readEncodeWorker processes batch read-encode operations
|
|
func (bzcp *BatchZeroCopyProcessor) readEncodeWorker() {
|
|
defer bzcp.wg.Done()
|
|
|
|
for req := range bzcp.readEncodeQueue {
|
|
bzcp.processBatchReadEncode(req)
|
|
}
|
|
}
|
|
|
|
// decodeWriteWorker processes batch decode-write operations
|
|
func (bzcp *BatchZeroCopyProcessor) decodeWriteWorker() {
|
|
defer bzcp.wg.Done()
|
|
|
|
for req := range bzcp.decodeWriteQueue {
|
|
bzcp.processBatchDecodeWrite(req)
|
|
}
|
|
}
|
|
|
|
// processBatchReadEncode processes a batch of read-encode operations
|
|
func (bzcp *BatchZeroCopyProcessor) processBatchReadEncode(req *batchZeroCopyRequest) {
|
|
startTime := time.Now()
|
|
result := batchZeroCopyResult{}
|
|
|
|
// Batch AddRef all frames first
|
|
err := BatchAddRefFrames(req.frames)
|
|
if err != nil {
|
|
result.err = err
|
|
if req.resultCh != nil {
|
|
req.resultCh <- result
|
|
close(req.resultCh)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Process frames using existing batch read-encode logic
|
|
encodedData, err := BatchReadEncode(len(req.frames))
|
|
if err != nil {
|
|
// Batch release frames on error
|
|
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
|
|
// Log release error but preserve original error
|
|
_ = releaseErr
|
|
}
|
|
result.err = err
|
|
} else {
|
|
result.encodedData = encodedData
|
|
result.processedCount = len(encodedData)
|
|
// Batch release frames after successful processing
|
|
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
|
|
// Log release error but don't fail the operation
|
|
_ = releaseErr
|
|
}
|
|
}
|
|
|
|
// Update statistics
|
|
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
|
|
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
|
|
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
|
|
|
|
// Send result back
|
|
if req.resultCh != nil {
|
|
req.resultCh <- result
|
|
close(req.resultCh)
|
|
}
|
|
}
|
|
|
|
// processBatchDecodeWrite processes a batch of decode-write operations
|
|
func (bzcp *BatchZeroCopyProcessor) processBatchDecodeWrite(req *batchZeroCopyRequest) {
|
|
startTime := time.Now()
|
|
result := batchZeroCopyResult{}
|
|
|
|
// Batch AddRef all frames first
|
|
err := BatchAddRefFrames(req.frames)
|
|
if err != nil {
|
|
result.err = err
|
|
if req.resultCh != nil {
|
|
req.resultCh <- result
|
|
close(req.resultCh)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Extract data from zero-copy frames for batch processing
|
|
frameData := make([][]byte, len(req.frames))
|
|
for i, frame := range req.frames {
|
|
if frame != nil {
|
|
// Get data from zero-copy frame
|
|
frameData[i] = frame.Data()[:frame.Length()]
|
|
}
|
|
}
|
|
|
|
// Process frames using existing batch decode-write logic
|
|
err = BatchDecodeWrite(frameData)
|
|
if err != nil {
|
|
result.err = err
|
|
} else {
|
|
result.processedCount = len(req.frames)
|
|
}
|
|
|
|
// Batch release frames
|
|
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
|
|
// Log release error but don't override processing error
|
|
_ = releaseErr
|
|
}
|
|
|
|
// Update statistics
|
|
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
|
|
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
|
|
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
|
|
|
|
// Send result back
|
|
if req.resultCh != nil {
|
|
req.resultCh <- result
|
|
close(req.resultCh)
|
|
}
|
|
}
|
|
|
|
// BatchReadEncodeZeroCopy performs batch read-encode on zero-copy frames
|
|
func (bzcp *BatchZeroCopyProcessor) BatchReadEncodeZeroCopy(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
|
|
if len(frames) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// For small batches, use direct operations to avoid overhead
|
|
if len(frames) <= 2 {
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleReadEncode(frames)
|
|
}
|
|
|
|
// Use adaptive threshold to determine batch vs single processing
|
|
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
|
|
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
|
|
totalFrames := batchedFrames + singleFrames
|
|
|
|
if totalFrames > 100 { // Only apply adaptive logic after some samples
|
|
batchRatio := float64(batchedFrames) / float64(totalFrames)
|
|
if batchRatio < bzcp.adaptiveThreshold {
|
|
// Batch processing not effective, use single processing
|
|
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleReadEncode(frames)
|
|
}
|
|
atomic.AddInt64(&bzcp.adaptiveHits, 1)
|
|
}
|
|
|
|
// Use batch processing
|
|
if atomic.LoadInt32(&bzcp.running) == 0 {
|
|
// Fallback to single processing if batch processor not running
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleReadEncode(frames)
|
|
}
|
|
|
|
resultCh := make(chan batchZeroCopyResult, 1)
|
|
req := &batchZeroCopyRequest{
|
|
frames: frames,
|
|
operation: batchOpReadEncode,
|
|
resultCh: resultCh,
|
|
timestamp: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case bzcp.readEncodeQueue <- req:
|
|
// Wait for completion
|
|
result := <-resultCh
|
|
return result.encodedData, result.err
|
|
default:
|
|
// Queue full, fallback to single processing
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleReadEncode(frames)
|
|
}
|
|
}
|
|
|
|
// BatchDecodeWriteZeroCopy performs batch decode-write on zero-copy frames
|
|
func (bzcp *BatchZeroCopyProcessor) BatchDecodeWriteZeroCopy(frames []*ZeroCopyAudioFrame) error {
|
|
if len(frames) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// For small batches, use direct operations
|
|
if len(frames) <= 2 {
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleDecodeWrite(frames)
|
|
}
|
|
|
|
// Use adaptive threshold
|
|
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
|
|
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
|
|
totalFrames := batchedFrames + singleFrames
|
|
|
|
if totalFrames > 100 {
|
|
batchRatio := float64(batchedFrames) / float64(totalFrames)
|
|
if batchRatio < bzcp.adaptiveThreshold {
|
|
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleDecodeWrite(frames)
|
|
}
|
|
atomic.AddInt64(&bzcp.adaptiveHits, 1)
|
|
}
|
|
|
|
// Use batch processing
|
|
if atomic.LoadInt32(&bzcp.running) == 0 {
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleDecodeWrite(frames)
|
|
}
|
|
|
|
resultCh := make(chan batchZeroCopyResult, 1)
|
|
req := &batchZeroCopyRequest{
|
|
frames: frames,
|
|
operation: batchOpDecodeWrite,
|
|
resultCh: resultCh,
|
|
timestamp: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case bzcp.decodeWriteQueue <- req:
|
|
// Wait for completion
|
|
result := <-resultCh
|
|
return result.err
|
|
default:
|
|
// Queue full, fallback to single processing
|
|
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
|
|
return bzcp.processSingleDecodeWrite(frames)
|
|
}
|
|
}
|
|
|
|
// processSingleReadEncode processes frames individually for read-encode
|
|
func (bzcp *BatchZeroCopyProcessor) processSingleReadEncode(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
|
|
// Extract data and use existing batch processing
|
|
frameData := make([][]byte, 0, len(frames))
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.AddRef()
|
|
frameData = append(frameData, frame.Data()[:frame.Length()])
|
|
}
|
|
}
|
|
|
|
// Use existing batch read-encode
|
|
result, err := BatchReadEncode(len(frameData))
|
|
|
|
// Release frames
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.Release()
|
|
}
|
|
}
|
|
|
|
return result, err
|
|
}
|
|
|
|
// processSingleDecodeWrite processes frames individually for decode-write
|
|
func (bzcp *BatchZeroCopyProcessor) processSingleDecodeWrite(frames []*ZeroCopyAudioFrame) error {
|
|
// Extract data and use existing batch processing
|
|
frameData := make([][]byte, 0, len(frames))
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.AddRef()
|
|
frameData = append(frameData, frame.Data()[:frame.Length()])
|
|
}
|
|
}
|
|
|
|
// Use existing batch decode-write
|
|
err := BatchDecodeWrite(frameData)
|
|
|
|
// Release frames
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.Release()
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// GetBatchZeroCopyStats returns batch zero-copy processing statistics
|
|
func (bzcp *BatchZeroCopyProcessor) GetBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
|
|
return atomic.LoadInt64(&bzcp.batchedFrames),
|
|
atomic.LoadInt64(&bzcp.singleFrames),
|
|
atomic.LoadInt64(&bzcp.batchSavings),
|
|
atomic.LoadInt64(&bzcp.processingTimeUs),
|
|
atomic.LoadInt64(&bzcp.adaptiveHits),
|
|
atomic.LoadInt64(&bzcp.adaptiveMisses)
|
|
}
|
|
|
|
// Convenience functions for global batch zero-copy processor
|
|
|
|
// BatchReadEncodeZeroCopyFrames performs batch read-encode on zero-copy frames
|
|
func BatchReadEncodeZeroCopyFrames(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
|
|
return GetBatchZeroCopyProcessor().BatchReadEncodeZeroCopy(frames)
|
|
}
|
|
|
|
// BatchDecodeWriteZeroCopyFrames performs batch decode-write on zero-copy frames
|
|
func BatchDecodeWriteZeroCopyFrames(frames []*ZeroCopyAudioFrame) error {
|
|
return GetBatchZeroCopyProcessor().BatchDecodeWriteZeroCopy(frames)
|
|
}
|
|
|
|
// GetGlobalBatchZeroCopyStats returns global batch zero-copy processing statistics
|
|
func GetGlobalBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
|
|
return GetBatchZeroCopyProcessor().GetBatchZeroCopyStats()
|
|
}
|