Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 81 additions & 87 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,
mon: mon,
externalIP: opts.IP,
mediaTimeout: mediaTimeout,
timeoutResetTick: make(chan time.Duration, 1),
timeoutKick: make(chan struct{}, 1),
jitterEnabled: opts.EnableJitterBuffer,
logSignalChanges: opts.LogSignalChanges,
port: newUDPConn(log, conn, opts.SymmetricRTP),
Expand All @@ -366,7 +366,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,
}
p.timeoutInitial.Store(&opts.MediaTimeoutInitial)
p.timeoutGeneral.Store(&opts.MediaTimeout)
go p.timeoutLoop(tid, func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the tid. It helps find stuck media loops for specific calls.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see tid is actually used in the old code. Do we still need it 🤔

go p.timeoutLoop(func() {
close(mediaTimeout)
})
p.log.Debugw("listening for media on UDP", "port", p.Port())
Expand All @@ -383,9 +383,10 @@ type MediaPort struct {
port *udpConn
mediaReceived core.Fuse
packetCount atomic.Uint64
lastPacketTime atomic.Int64 // UnixNano of last RTP packet, 0 if none
mediaTimeout <-chan struct{}
timeoutKick chan struct{} // wakes timeoutLoop when the deadline may have changed
timeoutStart atomic.Pointer[time.Time]
timeoutResetTick chan time.Duration
timeoutInitial atomic.Pointer[time.Duration]
timeoutGeneral atomic.Pointer[time.Duration]
closed core.Fuse
Expand Down Expand Up @@ -416,9 +417,17 @@ func (p *MediaPort) EnableOut() {
p.audioOut.Enable()
}

func (p *MediaPort) kickTimeoutLoop() {
select {
case p.timeoutKick <- struct{}{}:
default: // already pending
}
}

func (p *MediaPort) disableTimeout() {
p.log.Debugw("media timeout disabled")
p.timeoutStart.Store(nil)
p.kickTimeoutLoop()
}

func (p *MediaPort) enableTimeout(initial, general time.Duration) {
Expand All @@ -433,17 +442,14 @@ func (p *MediaPort) enableTimeout(initial, general time.Duration) {
}
p.timeoutInitial.Store(&initial)
p.timeoutGeneral.Store(&general)
select {
case p.timeoutResetTick <- general:
default:
}
now := time.Now()
p.timeoutStart.Store(&now)
p.log.Debugw("media timeout enabled",
"packets", p.packetCount.Load(),
"initial", initial,
"timeout", general,
)
p.kickTimeoutLoop()
}

func (p *MediaPort) EnableTimeout(enabled bool) {
Expand All @@ -458,107 +464,94 @@ func (p *MediaPort) SetTimeout(initial, general time.Duration) {
p.enableTimeout(initial, general)
}

func (p *MediaPort) timeoutLoop(tid traceid.ID, timeoutCallback func()) {
func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
defer p.log.Infow("media timeout loop stopped")
ticker := time.NewTicker(p.opts.MediaTimeout)
defer ticker.Stop()

var (
lastPackets uint64
startPackets uint64
lastTime time.Time
lastLog = time.Now()
)
const disabledPark = time.Hour
timer := time.NewTimer(disabledPark)
defer timer.Stop()

lastLog := time.Now()
for {
select {
case <-p.closed.Watch():
return
case tick := <-p.timeoutResetTick:
ticker.Reset(tick)
startPackets = p.packetCount.Load()
lastTime = time.Now()
lastLog = lastTime
p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick)
case <-ticker.C:
log := p.log
curPackets := p.packetCount.Load()
startPtr := p.timeoutStart.Load()
var startTime time.Time
if startPtr != nil {
startTime = *startPtr
}
verbose := false
if now := time.Now(); now.Sub(lastLog) > time.Hour {
verbose = true
lastLog = now
log = log.WithValues(
"startPackets", startPackets,
"packets", curPackets,
"lastPackets", lastPackets,
"sinceLast", time.Since(lastTime),
"sinceStart", time.Since(startTime),
)
if curPackets == startPackets {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better keep this one, it helped find quite a few random deadlocks on the media path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now check whether it's isInitial using lastPacketTime.Before(startTime), which should be sufficient? logs are also preserved from line 520 to 525

log.Warnw("media timout is idle for a long time", nil)
} else {
log.Infow("media timeout stats")
}
case <-p.timeoutKick:
case <-timer.C:
}

verbose := false
if now := time.Now(); now.Sub(lastLog) > time.Hour {
verbose = true
lastLog = now
}

startPtr := p.timeoutStart.Load()
if startPtr == nil {
if verbose {
p.log.Infow("media timeout disabled", "packets", p.packetCount.Load())
}
if curPackets != lastPackets {
lastPackets = curPackets
lastTime = time.Now()
if verbose {
log.Infow("got a new packet")
}
continue // wait for the next tick
timer.Reset(disabledPark)
continue
}
startTime := *startPtr

var lastPacketTime time.Time
if nano := p.lastPacketTime.Load(); nano > 0 {
lastPacketTime = time.Unix(0, nano)
}

// First timeout could be different. Usually it's longer to allow for a call setup.
// In some cases it could be shorter (e.g. when we notice an issue with signaling and suspect media will fail).
// Initial mode: no packet has arrived since the timeout was last enabled or reset.
isInitial := lastPacketTime.Before(startTime)
var (
deadline time.Time
timeout time.Duration
)
if isInitial {
timeout = p.opts.MediaTimeoutInitial
if ptr := p.timeoutInitial.Load(); ptr != nil {
timeout = *ptr
}
if startPtr == nil {
if verbose {
log.Infow("timeout is disabled")
}
continue // timeout disabled
deadline = startTime.Add(timeout)
} else {
timeout = p.opts.MediaTimeout
if ptr := p.timeoutGeneral.Load(); ptr != nil {
timeout = *ptr
}
isInitial := lastPackets == startPackets
sinceStart := time.Since(*startPtr)
sinceLast := time.Since(lastTime)
var (
since time.Duration
timeout time.Duration
deadline = lastPacketTime.Add(timeout)
}
remaining := time.Until(deadline)

if verbose {
log := p.log.WithValues(
"packets", p.packetCount.Load(),
"sinceStart", time.Since(startTime),
"sinceLast", time.Since(lastPacketTime),
"remaining", remaining,
"timeout", timeout,
"isInitial", isInitial,
)
// First timeout could be different. Usually it's longer to allow for a call setup.
// In some cases it could be shorter (e.g. when we notice an issue with signaling and suspect media will fail).
if isInitial {
since = sinceStart
timeout = p.opts.MediaTimeoutInitial
if ptr := p.timeoutInitial.Load(); ptr != nil {
timeout = *ptr
}
log.Warnw("media timeout is idle for a long time", nil)
} else {
since = sinceLast
timeout = p.opts.MediaTimeout
if ptr := p.timeoutGeneral.Load(); ptr != nil {
timeout = *ptr
}
log.Infow("media timeout stats")
}
}

// Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet.
if since+timeout/10 < timeout {
if verbose {
log.Infow("too early to trigger", "since", since, "timeout", timeout)
}
continue
}
if remaining <= 0 {
p.log.Infow("triggering media timeout",
"packets", lastPackets,
"startPackets", startPackets,
"sinceStart", sinceStart,
"sinceLast", sinceLast,
"packets", p.packetCount.Load(),
"sinceStart", time.Since(startTime),
"sinceLast", time.Since(lastPacketTime),
"timeout", timeout,
"isInitial", isInitial,
)
timeoutCallback()
return
}
timer.Reset(remaining)
}
}

Expand Down Expand Up @@ -752,6 +745,7 @@ func (p *MediaPort) rtpReadLoop(tid traceid.ID, log logger.Logger, r rtp.ReadStr
return
}
p.packetCount.Add(1)
p.lastPacketTime.Store(time.Now().UnixNano())
p.stats.Packets.Add(1)
if n > rtp.MTUSize {
if !overflow {
Expand Down
Loading