Commit 5916028e authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Split the main up loop into two threads.

The reader and the writer now communicate through a channel and the packet
cache.  If the writer thread drops behind, we drop packets after inserting in
the packet cache, which avoids building a backlog.
parent a6b09c91
......@@ -372,7 +372,7 @@ func addUpConn(c *client, id string) (*upConnection, error) {
}
}
go upLoop(conn, track)
go readLoop(conn, track)
go rtcpUpListener(conn, track, receiver)
})
......@@ -380,18 +380,19 @@ func addUpConn(c *client, id string) (*upConnection, error) {
return conn, nil
}
func upLoop(conn *upConnection, track *upTrack) {
type packetIndex struct {
seqno uint16
index uint16
}
func readLoop(conn *upConnection, track *upTrack) {
ch := make(chan packetIndex, 32)
defer close(ch)
go writeLoop(conn, track, ch)
buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet
var local []*downTrack
var localTime uint64
for {
now := mono.Microseconds()
if now < localTime || now > localTime+500000 {
local = track.getLocal()
localTime = now
}
bytes, err := track.track.Read(buf)
if err != nil {
if err != io.EOF {
......@@ -409,7 +410,8 @@ func upLoop(conn *upConnection, track *upTrack) {
track.jitter.Accumulate(packet.Timestamp)
first, _ := track.cache.Store(packet.SequenceNumber, buf[:bytes])
first, index :=
track.cache.Store(packet.SequenceNumber, buf[:bytes])
if packet.SequenceNumber-first > 24 {
found, first, bitmap := track.cache.BitmapGet()
if found {
......@@ -420,6 +422,45 @@ func upLoop(conn *upConnection, track *upTrack) {
}
}
select {
case ch <- packetIndex{packet.SequenceNumber, index}:
default:
// The writer is congested. Drop the packet, and
// leave it to NACK recovery if possible.
}
}
}
func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) {
var localTime uint64
var local []*downTrack
buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet
for {
now := mono.Microseconds()
if now < localTime || now > localTime+500000 {
local = track.getLocal()
localTime = now
}
pi, ok := <-ch
if !ok {
return
}
bytes := track.cache.GetAt(pi.seqno, pi.index, buf)
if bytes == 0 {
continue
}
err := packet.Unmarshal(buf[:bytes])
if err != nil {
log.Printf("%v", err)
continue
}
for _, l := range local {
err := l.track.WriteRTP(&packet)
if err != nil && err != io.ErrClosedPipe {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment