Commit 81dfabbe authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Implement sending NACKs on the upstream connection.

parent e5dae16d
......@@ -17,6 +17,7 @@ import (
"time"
"sfu/packetlist"
"sfu/packetwindow"
"github.com/gorilla/websocket"
"github.com/pion/rtcp"
......@@ -259,6 +260,20 @@ func addUpConn(c *client, id string) (*upConnection, error) {
return nil, err
}
conn := &upConnection{id: id, pc: pc}
c.mu.Lock()
defer c.mu.Unlock()
if c.up == nil {
c.up = make(map[string]*upConnection)
}
if c.up[id] != nil {
conn.pc.Close()
return nil, errors.New("Adding duplicate connection")
}
c.up[id] = conn
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
sendICE(c, id, candidate)
})
......@@ -294,6 +309,7 @@ func addUpConn(c *client, id string) (*upConnection, error) {
var packet rtp.Packet
var local []*downTrack
var localTime time.Time
window := packetwindow.New()
for {
now := time.Now()
if now.Sub(localTime) > time.Second/2 {
......@@ -315,6 +331,17 @@ func addUpConn(c *client, id string) (*upConnection, error) {
continue
}
window.Set(packet.SequenceNumber)
if packet.SequenceNumber-window.First() > 24 {
first, bitmap := window.Get17()
if bitmap != ^uint16(0) {
err := conn.sendNACK(track, first, ^bitmap)
if err != nil {
log.Printf("%v", err)
}
}
}
list.Store(packet.SequenceNumber, buf[:i])
for _, l := range local {
......@@ -330,19 +357,6 @@ func addUpConn(c *client, id string) (*upConnection, error) {
}()
})
conn := &upConnection{id: id, pc: pc}
c.mu.Lock()
defer c.mu.Unlock()
if c.up == nil {
c.up = make(map[string]*upConnection)
}
if c.up[id] != nil {
conn.pc.Close()
return nil, errors.New("Adding duplicate connection")
}
c.up[id] = conn
return conn, nil
}
......@@ -567,7 +581,7 @@ func updateUpBitrate(up *upConnection) {
for _, l := range local {
ms := atomic.LoadUint64(&l.maxBitrate.timestamp)
bitrate := atomic.LoadUint64(&l.maxBitrate.bitrate)
if now < ms || now > ms + 5000 || bitrate == 0 {
if now < ms || now > ms+5000 || bitrate == 0 {
l.setMuted(false)
continue
}
......@@ -588,7 +602,7 @@ func updateUpBitrate(up *upConnection) {
func (up *upConnection) sendPLI(track *upTrack) error {
last := atomic.LoadUint64(&track.lastPLI)
now := msSinceEpoch()
if now >= last && now - last < 200 {
if now >= last && now-last < 200 {
return nil
}
atomic.StoreUint64(&track.lastPLI, now)
......@@ -610,6 +624,25 @@ func sendREMB(pc *webrtc.PeerConnection, ssrc uint32, bitrate uint64) error {
})
}
func (up *upConnection) sendNACK(track *upTrack, first uint16, bitmap uint16) error {
return sendNACK(up.pc, track.track.SSRC(), first, bitmap)
}
func sendNACK(pc *webrtc.PeerConnection, ssrc uint32, first uint16, bitmap uint16) error {
packet := rtcp.Packet(
&rtcp.TransportLayerNack{
MediaSSRC: ssrc,
Nacks: []rtcp.NackPair{
rtcp.NackPair{
first,
rtcp.PacketBitmap(bitmap),
},
},
},
)
return pc.WriteRTCP([]rtcp.Packet{packet})
}
func sendRecovery(p *rtcp.TransportLayerNack, track *downTrack) {
var packet rtp.Packet
for _, nack := range p.Nacks {
......
package packetwindow
import (
"fmt"
)
type Window struct {
first uint16
bitmap uint32
}
func New() *Window {
return &Window{}
}
func (w *Window) String() string {
buf := make([]byte, 32)
for i := 0; i < 32; i++ {
if (w.bitmap & (1 << i)) != 0 {
buf[i] = '1'
} else {
buf[i] = '0'
}
}
return fmt.Sprintf("[%04x %s]", w.first, buf)
}
func (w *Window) First() uint16 {
return w.first
}
func (w *Window) Reset() {
w.bitmap = 0
}
func (w *Window) Set(seqno uint16) {
if w.bitmap == 0 {
w.first = seqno
w.bitmap = 1
return
}
if ((seqno - w.first) & 0x8000) != 0 {
return
}
if seqno == w.first {
w.bitmap >>= 1
w.first += 1
for (w.bitmap & 1) == 1 {
w.bitmap >>= 1
w.first += 1
}
return
}
if seqno - w.first < 32 {
w.bitmap |= (1 << uint16(seqno - w.first))
return
}
shift := seqno - w.first - 31
w.bitmap >>= shift
w.first += shift
w.bitmap |= (1 << uint16(seqno - w.first))
return
}
func (w *Window) Get17() (uint16, uint16) {
first := w.first
bitmap := uint16((w.bitmap >> 1) & 0xFFFF)
w.bitmap >>= 17
w.first += 17
return first, bitmap
}
package packetwindow
import (
"testing"
"github.com/pion/rtcp"
)
func TestWindow(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
value >>= uint16(w.first - 42)
if uint32(value) != w.bitmap {
t.Errorf("Got %b, expected %b", w.bitmap, value)
}
}
func TestWindowWrap(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
w.Set(0x7000)
w.Set(0xA000)
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
value >>= uint16(w.first - 42)
if uint32(value) != w.bitmap {
t.Errorf("Got %b, expected %b", w.bitmap, value)
}
}
func TestWindowGet(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
pos := uint16(42)
for w.bitmap != 0 {
first, bitmap := w.Get17()
if first < pos || first >= pos+64 {
t.Errorf("First is %v, pos is %v", first, pos)
}
value >>= (first - pos)
pos = first
if (value & 1) != 0 {
t.Errorf("Value is odd")
}
value >>= 1
pos += 1
if bitmap != uint16(value&0xFFFF) {
t.Errorf("Got %b, expected %b", bitmap, (value & 0xFFFF))
}
value >>= 16
pos += 16
}
if value != 0 {
t.Errorf("Value is %v", value)
}
}
func TestWindowPacket(t *testing.T) {
value := uint64(0xcdd58f1e035379c0)
w := New()
for i := 0; i < 64; i++ {
if (value & (1 << i)) != 0 {
w.Set(uint16(42 + i))
}
}
first, bitmap := w.Get17()
p := rtcp.NackPair{first, rtcp.PacketBitmap(^bitmap)}
list := p.PacketList()
for _, s := range list {
if s < 42 || s >= 42 + 64 {
if (value & (1 << (s - 42))) != 0 {
t.Errorf("Bit %v unexpectedly set", s - 42)
}
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment