Commit c00a2199 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Move PLI rate-limiting into the reader loop.

parent 7665067a
...@@ -319,16 +319,11 @@ func (down *rtpDownConnection) flushICECandidates() error { ...@@ -319,16 +319,11 @@ func (down *rtpDownConnection) flushICECandidates() error {
return err return err
} }
type upTrackAtomics struct {
lastPLI uint64
}
type rtpUpTrack struct { type rtpUpTrack struct {
track *webrtc.TrackRemote track *webrtc.TrackRemote
rate *estimator.Estimator rate *estimator.Estimator
cache *packetcache.Cache cache *packetcache.Cache
jitter *jitter.Estimator jitter *jitter.Estimator
atomics *upTrackAtomics
cname atomic.Value cname atomic.Value
localCh chan trackAction localCh chan trackAction
...@@ -597,7 +592,6 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon ...@@ -597,7 +592,6 @@ func newUpConn(c group.Client, id string, label string, offer string) (*rtpUpCon
cache: packetcache.New(minPacketCache(remote)), cache: packetcache.New(minPacketCache(remote)),
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate), jitter: jitter.New(remote.Codec().ClockRate),
atomics: &upTrackAtomics{},
localCh: make(chan trackAction, 2), localCh: make(chan trackAction, 2),
readerDone: make(chan struct{}), readerDone: make(chan struct{}),
} }
...@@ -626,12 +620,6 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error { ...@@ -626,12 +620,6 @@ func (up *rtpUpConnection) sendPLI(track *rtpUpTrack) error {
if !track.hasRtcpFb("nack", "pli") { if !track.hasRtcpFb("nack", "pli") {
return ErrUnsupportedFeedback return ErrUnsupportedFeedback
} }
last := atomic.LoadUint64(&track.atomics.lastPLI)
now := rtptime.Jiffies()
if now >= last && now-last < rtptime.JiffiesPerSec/2 {
return ErrRateLimited
}
atomic.StoreUint64(&track.atomics.lastPLI, now)
return sendPLI(up.pc, track.track.SSRC()) return sendPLI(up.pc, track.track.SSRC())
} }
......
...@@ -3,6 +3,7 @@ package rtpconn ...@@ -3,6 +3,7 @@ package rtpconn
import ( import (
"io" "io"
"log" "log"
"time"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
...@@ -21,10 +22,39 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { ...@@ -21,10 +22,39 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) {
isvideo := track.track.Kind() == webrtc.RTPCodecTypeVideo isvideo := track.track.Kind() == webrtc.RTPCodecTypeVideo
codec := track.track.Codec() codec := track.track.Codec()
sendNACK := track.hasRtcpFb("nack", "") sendNACK := track.hasRtcpFb("nack", "")
sendPLI := track.hasRtcpFb("nack", "pli")
var kfNeeded bool var kfNeeded bool
var kfRequested time.Time
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
for { for {
inner:
for {
select {
case action := <-track.localCh:
switch action.action {
case trackActionAdd, trackActionDel:
err := writers.add(
action.track,
action.action == trackActionAdd,
)
if err != nil {
log.Printf(
"add/remove track: %v",
err,
)
}
case trackActionKeyframe:
kfNeeded = true
default:
log.Printf("Unknown action")
}
default:
break inner
}
}
bytes, _, err := track.track.Read(buf) bytes, _, err := track.track.Read(buf)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
...@@ -103,31 +133,18 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) { ...@@ -103,31 +133,18 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) {
writers.write(packet.SequenceNumber, index, delay, writers.write(packet.SequenceNumber, index, delay,
isvideo, packet.Marker) isvideo, packet.Marker)
select { now := time.Now()
case action := <-track.localCh: if kfNeeded && now.Sub(kfRequested) > time.Second/2 {
switch action.action { if sendPLI {
case trackActionAdd, trackActionDel: err := conn.sendPLI(track)
err := writers.add(
action.track,
action.action == trackActionAdd,
)
if err != nil { if err != nil {
log.Printf("add/remove track: %v", err) log.Printf("sendPLI: %v", err)
kfNeeded = false
} }
case trackActionKeyframe: } else {
kfNeeded = true
default:
log.Printf("Unknown action %v", action.action)
}
default:
}
if kfNeeded {
err := conn.sendPLI(track)
if err != nil && err != ErrRateLimited {
log.Printf("sendPLI: %v", err)
kfNeeded = false kfNeeded = false
} }
kfRequested = now
} }
} }
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment