-
Notifications
You must be signed in to change notification settings - Fork 186
TEL-562: make media timeout detection more deterministic #675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -353,7 +353,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, | |
| mon: mon, | ||
| externalIP: opts.IP, | ||
| mediaTimeout: mediaTimeout, | ||
| timeoutResetTick: make(chan time.Duration, 1), | ||
| timeoutKick: make(chan struct{}, 1), | ||
| jitterEnabled: opts.EnableJitterBuffer, | ||
| logSignalChanges: opts.LogSignalChanges, | ||
| port: newUDPConn(log, conn, opts.SymmetricRTP), | ||
|
|
@@ -366,7 +366,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, | |
| } | ||
| p.timeoutInitial.Store(&opts.MediaTimeoutInitial) | ||
| p.timeoutGeneral.Store(&opts.MediaTimeout) | ||
| go p.timeoutLoop(tid, func() { | ||
| go p.timeoutLoop(func() { | ||
| close(mediaTimeout) | ||
| }) | ||
| p.log.Debugw("listening for media on UDP", "port", p.Port()) | ||
|
|
@@ -383,9 +383,10 @@ type MediaPort struct { | |
| port *udpConn | ||
| mediaReceived core.Fuse | ||
| packetCount atomic.Uint64 | ||
| lastPacketTime atomic.Int64 // UnixNano of last RTP packet, 0 if none | ||
| mediaTimeout <-chan struct{} | ||
| timeoutKick chan struct{} // wakes timeoutLoop when the deadline may have changed | ||
| timeoutStart atomic.Pointer[time.Time] | ||
| timeoutResetTick chan time.Duration | ||
| timeoutInitial atomic.Pointer[time.Duration] | ||
| timeoutGeneral atomic.Pointer[time.Duration] | ||
| closed core.Fuse | ||
|
|
@@ -416,9 +417,17 @@ func (p *MediaPort) EnableOut() { | |
| p.audioOut.Enable() | ||
| } | ||
|
|
||
| func (p *MediaPort) kickTimeoutLoop() { | ||
| select { | ||
| case p.timeoutKick <- struct{}{}: | ||
| default: // already pending | ||
| } | ||
| } | ||
|
|
||
| func (p *MediaPort) disableTimeout() { | ||
| p.log.Debugw("media timeout disabled") | ||
| p.timeoutStart.Store(nil) | ||
| p.kickTimeoutLoop() | ||
| } | ||
|
|
||
| func (p *MediaPort) enableTimeout(initial, general time.Duration) { | ||
|
|
@@ -433,17 +442,14 @@ func (p *MediaPort) enableTimeout(initial, general time.Duration) { | |
| } | ||
| p.timeoutInitial.Store(&initial) | ||
| p.timeoutGeneral.Store(&general) | ||
| select { | ||
| case p.timeoutResetTick <- general: | ||
| default: | ||
| } | ||
| now := time.Now() | ||
| p.timeoutStart.Store(&now) | ||
| p.log.Debugw("media timeout enabled", | ||
| "packets", p.packetCount.Load(), | ||
| "initial", initial, | ||
| "timeout", general, | ||
| ) | ||
| p.kickTimeoutLoop() | ||
| } | ||
|
|
||
| func (p *MediaPort) EnableTimeout(enabled bool) { | ||
|
|
@@ -458,107 +464,94 @@ func (p *MediaPort) SetTimeout(initial, general time.Duration) { | |
| p.enableTimeout(initial, general) | ||
| } | ||
|
|
||
| func (p *MediaPort) timeoutLoop(tid traceid.ID, timeoutCallback func()) { | ||
| func (p *MediaPort) timeoutLoop(timeoutCallback func()) { | ||
| defer p.log.Infow("media timeout loop stopped") | ||
| ticker := time.NewTicker(p.opts.MediaTimeout) | ||
| defer ticker.Stop() | ||
|
|
||
| var ( | ||
| lastPackets uint64 | ||
| startPackets uint64 | ||
| lastTime time.Time | ||
| lastLog = time.Now() | ||
| ) | ||
| const disabledPark = time.Hour | ||
| timer := time.NewTimer(disabledPark) | ||
| defer timer.Stop() | ||
|
|
||
| lastLog := time.Now() | ||
| for { | ||
| select { | ||
| case <-p.closed.Watch(): | ||
| return | ||
| case tick := <-p.timeoutResetTick: | ||
| ticker.Reset(tick) | ||
| startPackets = p.packetCount.Load() | ||
| lastTime = time.Now() | ||
| lastLog = lastTime | ||
| p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick) | ||
| case <-ticker.C: | ||
| log := p.log | ||
| curPackets := p.packetCount.Load() | ||
| startPtr := p.timeoutStart.Load() | ||
| var startTime time.Time | ||
| if startPtr != nil { | ||
| startTime = *startPtr | ||
| } | ||
| verbose := false | ||
| if now := time.Now(); now.Sub(lastLog) > time.Hour { | ||
| verbose = true | ||
| lastLog = now | ||
| log = log.WithValues( | ||
| "startPackets", startPackets, | ||
| "packets", curPackets, | ||
| "lastPackets", lastPackets, | ||
| "sinceLast", time.Since(lastTime), | ||
| "sinceStart", time.Since(startTime), | ||
| ) | ||
| if curPackets == startPackets { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better keep this one, it helped find quite a few random deadlocks on the media path.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we now check whether it's |
||
| log.Warnw("media timout is idle for a long time", nil) | ||
| } else { | ||
| log.Infow("media timeout stats") | ||
| } | ||
| case <-p.timeoutKick: | ||
| case <-timer.C: | ||
| } | ||
|
|
||
| verbose := false | ||
| if now := time.Now(); now.Sub(lastLog) > time.Hour { | ||
| verbose = true | ||
| lastLog = now | ||
| } | ||
|
|
||
| startPtr := p.timeoutStart.Load() | ||
| if startPtr == nil { | ||
| if verbose { | ||
| p.log.Infow("media timeout disabled", "packets", p.packetCount.Load()) | ||
| } | ||
| if curPackets != lastPackets { | ||
| lastPackets = curPackets | ||
| lastTime = time.Now() | ||
| if verbose { | ||
| log.Infow("got a new packet") | ||
| } | ||
| continue // wait for the next tick | ||
| timer.Reset(disabledPark) | ||
| continue | ||
| } | ||
| startTime := *startPtr | ||
|
|
||
| var lastPacketTime time.Time | ||
| if nano := p.lastPacketTime.Load(); nano > 0 { | ||
| lastPacketTime = time.Unix(0, nano) | ||
| } | ||
|
|
||
| // First timeout could be different. Usually it's longer to allow for a call setup. | ||
| // In some cases it could be shorter (e.g. when we notice an issue with signaling and suspect media will fail). | ||
| // Initial mode: no packet has arrived since the timeout was last enabled or reset. | ||
| isInitial := lastPacketTime.Before(startTime) | ||
| var ( | ||
| deadline time.Time | ||
| timeout time.Duration | ||
| ) | ||
| if isInitial { | ||
| timeout = p.opts.MediaTimeoutInitial | ||
| if ptr := p.timeoutInitial.Load(); ptr != nil { | ||
| timeout = *ptr | ||
| } | ||
| if startPtr == nil { | ||
| if verbose { | ||
| log.Infow("timeout is disabled") | ||
| } | ||
| continue // timeout disabled | ||
| deadline = startTime.Add(timeout) | ||
| } else { | ||
| timeout = p.opts.MediaTimeout | ||
| if ptr := p.timeoutGeneral.Load(); ptr != nil { | ||
| timeout = *ptr | ||
| } | ||
| isInitial := lastPackets == startPackets | ||
| sinceStart := time.Since(*startPtr) | ||
| sinceLast := time.Since(lastTime) | ||
| var ( | ||
| since time.Duration | ||
| timeout time.Duration | ||
| deadline = lastPacketTime.Add(timeout) | ||
| } | ||
| remaining := time.Until(deadline) | ||
|
|
||
| if verbose { | ||
| log := p.log.WithValues( | ||
| "packets", p.packetCount.Load(), | ||
| "sinceStart", time.Since(startTime), | ||
| "sinceLast", time.Since(lastPacketTime), | ||
| "remaining", remaining, | ||
| "timeout", timeout, | ||
| "isInitial", isInitial, | ||
| ) | ||
| // First timeout could be different. Usually it's longer to allow for a call setup. | ||
| // In some cases it could be shorter (e.g. when we notice an issue with signaling and suspect media will fail). | ||
| if isInitial { | ||
| since = sinceStart | ||
| timeout = p.opts.MediaTimeoutInitial | ||
| if ptr := p.timeoutInitial.Load(); ptr != nil { | ||
| timeout = *ptr | ||
| } | ||
| log.Warnw("media timeout is idle for a long time", nil) | ||
| } else { | ||
| since = sinceLast | ||
| timeout = p.opts.MediaTimeout | ||
| if ptr := p.timeoutGeneral.Load(); ptr != nil { | ||
| timeout = *ptr | ||
| } | ||
| log.Infow("media timeout stats") | ||
| } | ||
| } | ||
|
|
||
| // Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet. | ||
| if since+timeout/10 < timeout { | ||
| if verbose { | ||
| log.Infow("too early to trigger", "since", since, "timeout", timeout) | ||
| } | ||
| continue | ||
| } | ||
| if remaining <= 0 { | ||
| p.log.Infow("triggering media timeout", | ||
| "packets", lastPackets, | ||
| "startPackets", startPackets, | ||
| "sinceStart", sinceStart, | ||
| "sinceLast", sinceLast, | ||
| "packets", p.packetCount.Load(), | ||
| "sinceStart", time.Since(startTime), | ||
| "sinceLast", time.Since(lastPacketTime), | ||
| "timeout", timeout, | ||
| "isInitial", isInitial, | ||
| ) | ||
| timeoutCallback() | ||
| return | ||
| } | ||
| timer.Reset(remaining) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -752,6 +745,7 @@ func (p *MediaPort) rtpReadLoop(tid traceid.ID, log logger.Logger, r rtp.ReadStr | |
| return | ||
| } | ||
| p.packetCount.Add(1) | ||
| p.lastPacketTime.Store(time.Now().UnixNano()) | ||
| p.stats.Packets.Add(1) | ||
| if n > rtp.MTUSize { | ||
| if !overflow { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep the
tid. It helps find stuck media loops for specific calls.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see
tidis actually used in the old code. Do we still need it 🤔