Commit f0dcd0b1 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Rework locking of tracks.

We now add tracks after the stream has been pushed, so we need a lock
on down streams.  Also rework sendUpRTCP to reduce locking.
parent 9a5c8b6b
...@@ -139,10 +139,20 @@ type rtpDownConnection struct { ...@@ -139,10 +139,20 @@ type rtpDownConnection struct {
id string id string
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
remote conn.Up remote conn.Up
tracks []*rtpDownTrack
maxREMBBitrate *bitrate maxREMBBitrate *bitrate
iceCandidates []*webrtc.ICECandidateInit iceCandidates []*webrtc.ICECandidateInit
negotiationNeeded int negotiationNeeded int
mu sync.Mutex
tracks []*rtpDownTrack
}
func (down *rtpDownConnection) getTracks() []*rtpDownTrack {
down.mu.Lock()
defer down.mu.Unlock()
tracks := make([]*rtpDownTrack, len(down.tracks))
copy(tracks, down.tracks)
return tracks
} }
func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) { func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, error) {
...@@ -169,7 +179,8 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, ...@@ -169,7 +179,8 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection,
func (down *rtpDownConnection) GetMaxBitrate(now uint64) uint64 { func (down *rtpDownConnection) GetMaxBitrate(now uint64) uint64 {
rate := down.maxREMBBitrate.Get(now) rate := down.maxREMBBitrate.Get(now)
var trackRate uint64 var trackRate uint64
for _, t := range down.tracks { tracks := down.getTracks()
for _, t := range tracks {
r := t.maxBitrate.Get(now) r := t.maxBitrate.Get(now)
if r == ^uint64(0) { if r == ^uint64(0) {
if t.track.Kind() == webrtc.RTPCodecTypeAudio { if t.track.Kind() == webrtc.RTPCodecTypeAudio {
...@@ -779,8 +790,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei ...@@ -779,8 +790,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
} }
func sendUpRTCP(conn *rtpUpConnection) error { func sendUpRTCP(conn *rtpUpConnection) error {
conn.mu.Lock() tracks := conn.getTracks()
defer conn.mu.Unlock()
if len(conn.tracks) == 0 { if len(conn.tracks) == 0 {
state := conn.pc.ConnectionState() state := conn.pc.ConnectionState()
...@@ -793,7 +803,7 @@ func sendUpRTCP(conn *rtpUpConnection) error { ...@@ -793,7 +803,7 @@ func sendUpRTCP(conn *rtpUpConnection) error {
now := rtptime.Jiffies() now := rtptime.Jiffies()
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks)) reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
for _, t := range conn.tracks { for _, t := range tracks {
updateUpTrack(t) updateUpTrack(t)
expected, lost, totalLost, eseqno := t.cache.GetStats(true) expected, lost, totalLost, eseqno := t.cache.GetStats(true)
if expected == 0 { if expected == 0 {
...@@ -832,18 +842,21 @@ func sendUpRTCP(conn *rtpUpConnection) error { ...@@ -832,18 +842,21 @@ func sendUpRTCP(conn *rtpUpConnection) error {
} }
rate := ^uint64(0) rate := ^uint64(0)
for _, l := range conn.local {
local := conn.getLocal()
for _, l := range local {
r := l.GetMaxBitrate(now) r := l.GetMaxBitrate(now)
if r < rate { if r < rate {
rate = r rate = r
} }
} }
if rate < group.MinBitrate { if rate < group.MinBitrate {
rate = group.MinBitrate rate = group.MinBitrate
} }
var ssrcs []uint32 var ssrcs []uint32
for _, t := range conn.tracks { for _, t := range tracks {
if t.hasRtcpFb("goog-remb", "") { if t.hasRtcpFb("goog-remb", "") {
continue continue
} }
...@@ -869,21 +882,21 @@ func rtcpUpSender(conn *rtpUpConnection) { ...@@ -869,21 +882,21 @@ func rtcpUpSender(conn *rtpUpConnection) {
if err == io.EOF || err == io.ErrClosedPipe { if err == io.EOF || err == io.ErrClosedPipe {
return return
} }
log.Printf("sendRR: %v", err) log.Printf("sendUpRTCP: %v", err)
} }
} }
} }
func sendSR(conn *rtpDownConnection) error { func sendSR(conn *rtpDownConnection) error {
// since this is only called after all tracks have been created, tracks := conn.getTracks()
// there is no need for locking.
packets := make([]rtcp.Packet, 0, len(conn.tracks)) packets := make([]rtcp.Packet, 0, len(tracks))
now := time.Now() now := time.Now()
nowNTP := rtptime.TimeToNTP(now) nowNTP := rtptime.TimeToNTP(now)
jiffies := rtptime.TimeToJiffies(now) jiffies := rtptime.TimeToJiffies(now)
for _, t := range conn.tracks { for _, t := range tracks {
clockrate := t.track.Codec().ClockRate clockrate := t.track.Codec().ClockRate
var nowRTP uint32 var nowRTP uint32
......
...@@ -419,7 +419,10 @@ func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrac ...@@ -419,7 +419,10 @@ func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrac
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
atomics: &downTrackAtomics{}, atomics: &downTrackAtomics{},
} }
conn.mu.Lock()
conn.tracks = append(conn.tracks, track) conn.tracks = append(conn.tracks, track)
conn.mu.Unlock()
go rtcpDownListener(conn, track, sender) go rtcpDownListener(conn, track, sender)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment