Commit 6f9d7fc3 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Scalable video coding (SVC).

parent 7590588a
...@@ -40,5 +40,5 @@ type DownTrack interface { ...@@ -40,5 +40,5 @@ type DownTrack interface {
Write(buf []byte) (int, error) Write(buf []byte) (int, error)
SetTimeOffset(ntp uint64, rtp uint32) SetTimeOffset(ntp uint64, rtp uint32)
SetCname(string) SetCname(string)
GetMaxBitrate() uint64 GetMaxBitrate() (uint64, int)
} }
...@@ -618,6 +618,6 @@ func (conn *diskConn) initWriter(width, height uint32) error { ...@@ -618,6 +618,6 @@ func (conn *diskConn) initWriter(width, height uint32) error {
return nil return nil
} }
func (t *diskTrack) GetMaxBitrate() uint64 { func (t *diskTrack) GetMaxBitrate() (uint64, int) {
return ^uint64(0) return ^uint64(0), -1
} }
...@@ -7,7 +7,7 @@ require ( ...@@ -7,7 +7,7 @@ require (
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/pion/ice/v2 v2.1.7 github.com/pion/ice/v2 v2.1.7
github.com/pion/rtcp v1.2.6 github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.2 github.com/pion/rtp v1.6.6-0.20210512022946-4e87540a7fe6
github.com/pion/sdp/v3 v3.0.4 github.com/pion/sdp/v3 v3.0.4
github.com/pion/turn/v2 v2.0.5 github.com/pion/turn/v2 v2.0.5
github.com/pion/webrtc/v3 v3.0.27 github.com/pion/webrtc/v3 v3.0.27
......
...@@ -51,8 +51,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= ...@@ -51,8 +51,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo= github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.6.6-0.20210512022946-4e87540a7fe6 h1:xAaGxAEYiL96TRp4DhhrrH7JRLBuLM+nGqhOXnWzTBs=
github.com/pion/rtp v1.6.6-0.20210512022946-4e87540a7fe6/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sctp v1.7.12 h1:GsatLufywVruXbZZT1CKg+Jr8ZTkwiPnmUC/oO9+uuY= github.com/pion/sctp v1.7.12 h1:GsatLufywVruXbZZT1CKg+Jr8ZTkwiPnmUC/oO9+uuY=
github.com/pion/sctp v1.7.12/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= github.com/pion/sctp v1.7.12/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s=
......
...@@ -61,8 +61,8 @@ type ChatHistoryEntry struct { ...@@ -61,8 +61,8 @@ type ChatHistoryEntry struct {
} }
const ( const (
LowBitrate = 100000 LowBitrate = 100 * 1024
MinBitrate = 2 * LowBitrate MinBitrate = LowBitrate * 2
) )
type Group struct { type Group struct {
......
// Package packetmap implements remapping of sequence numbers and picture ids.
package packetmap
import (
"sync"
)
const maxEntries = 128
type Map struct {
mu sync.Mutex
next uint16
nextPid uint16
delta uint16
pidDelta uint16
lastEntry uint16
entries []entry
}
type entry struct {
first, count uint16
delta uint16
pidDelta uint16
}
// Map maps a seqno, adding the mapping if required. It returns whether
// the seqno could be mapped, the target seqno, and the pid delta to apply.
func (m *Map) Map(seqno uint16, pid uint16) (bool, uint16, uint16) {
m.mu.Lock()
defer m.mu.Unlock()
if m.delta == 0 && m.entries == nil {
m.next = seqno + 1
m.nextPid = pid
return true, seqno, 0
}
if compare(m.next, seqno) <= 0 {
if uint16(seqno-m.next) > 8*1024 {
m.reset()
m.next = seqno + 1
m.nextPid = pid
return true, seqno, 0
}
addMapping(m, seqno, pid, m.delta, m.pidDelta)
m.next = seqno + 1
m.nextPid = pid
return true, seqno + m.delta, m.pidDelta
}
if uint16(m.next-seqno) > 8*1024 {
m.reset()
m.next = seqno + 1
m.nextPid = pid
return true, seqno, 0
}
return m.direct(seqno)
}
func (m *Map) reset() {
m.next = 0
m.nextPid = 0
m.delta = 0
m.pidDelta = 0
m.lastEntry = 0
m.entries = nil
}
func addMapping(m *Map, seqno, pid uint16, delta, pidDelta uint16) {
if len(m.entries) == 0 {
m.entries = []entry{
entry{
first: seqno,
count: 1,
delta: delta,
pidDelta: pidDelta,
},
}
return
}
i := m.lastEntry
if delta == m.entries[i].delta && pidDelta == m.entries[i].pidDelta {
m.entries[m.lastEntry].count = seqno - m.entries[i].first + 1
return
}
e := entry{
first: seqno,
count: 1,
delta: delta,
pidDelta: pidDelta,
}
if len(m.entries) < maxEntries {
m.entries = append(m.entries, e)
m.lastEntry = uint16(len(m.entries) - 1)
return
}
j := (m.lastEntry + 1) % maxEntries
m.entries[j] = e
m.lastEntry = j
}
// direct maps a seqno to a target seqno. It returns true if the seqno
// could be mapped, the target seqno, and the pid delta to apply.
// Called with the m.mu taken.
func (m *Map) direct(seqno uint16) (bool, uint16, uint16) {
if len(m.entries) == 0 {
return false, 0, 0
}
i := m.lastEntry
for {
f := m.entries[i].first
if seqno >= f {
if seqno < f+m.entries[i].count {
return true,
seqno + m.entries[i].delta,
m.entries[i].pidDelta
}
return false, 0, 0
}
if i > 0 {
i--
} else {
i = uint16(len(m.entries) - 1)
}
if i == m.lastEntry {
break
}
}
return false, 0, 0
}
// Reverse maps a target seqno to the original seqno. It returns true if
// the seqno could be mapped, the original seqno, and the pid delta to
// apply in reverse.
func (m *Map) Reverse(seqno uint16) (bool, uint16, uint16) {
m.mu.Lock()
defer m.mu.Unlock()
if m.delta == 0 && m.entries == nil {
return true, seqno, 0
}
if m.entries == nil {
if m.delta == 0 {
return true, seqno, 0
}
return false, 0, 0
}
i := m.lastEntry
for {
f := m.entries[i].first + m.entries[i].delta
if seqno >= f {
if seqno < f+m.entries[i].count {
return true,
seqno - m.entries[i].delta,
m.entries[i].pidDelta
}
return false, 0, 0
}
if i > 0 {
i--
} else {
i = uint16(len(m.entries) - 1)
}
if i == m.lastEntry {
break
}
}
return false, 0, 0
}
// Drop attempts to record a dropped packet. It returns true if the
// packet is safe to drop.
func (m *Map) Drop(seqno uint16, pid uint16) bool {
m.mu.Lock()
defer m.mu.Unlock()
if seqno != m.next {
return false
}
m.pidDelta += pid - m.nextPid
m.nextPid = pid
m.delta--
m.next = seqno + 1
return true
}
// compare performs comparison modulo 2^16.
func compare(s1, s2 uint16) int {
if s1 == s2 {
return 0
}
if ((s2 - s1) & 0x8000) != 0 {
return 1
}
return -1
}
package packetmap
import (
"testing"
)
func TestNoDrops(t *testing.T) {
m := Map{}
ok, s, p := m.Map(42, 1001)
if !ok || s != 42 || p != 0 {
t.Errorf("Expected 42, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(43, 1001)
if !ok || s != 43 || p != 0 {
t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(44, 1002)
if !ok || s != 44 || p != 0 {
t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(40, 1000)
if !ok || s != 40 || p != 0 {
t.Errorf("Expected 40, 0, got %v, %v, %v", ok, s, p)
}
if len(m.entries) > 0 || m.delta != 0 || m.pidDelta != 0 {
t.Errorf("Expected 0, got %v %v %v",
len(m.entries), m.delta, m.pidDelta)
}
}
func TestDrop(t *testing.T) {
m := Map{}
ok, s, p := m.Map(42, 1001)
if !ok || s != 42 || p != 0 {
t.Errorf("Expected 42, 0, got %v, %v, %v", ok, s, p)
}
ok = m.Drop(43, 1001)
if !ok || m.pidDelta != 0 {
t.Errorf("Expected 0, got %v, %v", ok, m.pidDelta)
}
ok, s, p = m.Map(44, 1001)
if !ok || s != 43 || p != 0 {
t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(45, 1002)
if !ok || s != 44 || p != 0 {
t.Errorf("Expected 44, 0, got %v, %v, %v", ok, s, p)
}
ok = m.Drop(46, 1003)
if !ok || m.pidDelta != 1 {
t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta)
}
ok, s, p = m.Map(47, 1003)
if !ok || s != 45 || p != 1 {
t.Errorf("Expected 45, 1, got %v, %v, %v", ok, s, p)
}
ok = m.Drop(48, 1003)
if !ok || m.pidDelta != 1 {
t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta)
}
ok, s, p = m.Map(49, 1003)
if !ok || s != 46 || p != 1 {
t.Errorf("Expected 45, 1, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(60, 1007)
if !ok || s != 57 || p != 1 {
t.Errorf("Expected 57, 1, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(13, 1000)
if ok {
t.Errorf("Expected not ok")
}
ok, s, p = m.Map(44, 1001)
if !ok || s != 43 || p != 0 {
t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(45, 1002)
if !ok || s != 44 || p != 0 {
t.Errorf("Expected 44, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(48, 3)
if ok {
t.Errorf("Expected not ok")
}
ok, s, p = m.direct(1000)
if ok {
t.Errorf("Expected not ok")
}
ok, s, p = m.direct(13)
if ok {
t.Errorf("Expected not ok")
}
ok, s, p = m.Reverse(44)
if !ok || s != 45 || p != 0 {
t.Errorf("Expected 45, 0, got %v %v %v", ok, s, p)
}
}
func TestWraparound(t *testing.T) {
m := Map{}
ok, s, p := m.Map(0, 0)
if !ok || s != 0 || p != 0 {
t.Errorf("Expected 0, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(1, 0)
if !ok || s != 1 || p != 0 {
t.Errorf("Expected 1, 0, got %v, %v, %v", ok, s, p)
}
ok = m.Drop(2, 1)
if !ok || m.pidDelta != 1 {
t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta)
}
ok = m.Drop(3, 1)
if !ok || m.pidDelta != 1 {
t.Errorf("Expected 1, got %v, %v", ok, m.pidDelta)
}
for i := 4; i < 256000; i++ {
ok, s, p = m.Map(uint16(i), uint16((i/2) & 0x7FFF))
if !ok || s != uint16(i-2) || p != 1 {
t.Errorf("Expected %v, %v, got %v, %v, %v",
uint16(i-2), 1, ok, s, p)
}
}
}
func TestReset(t *testing.T) {
m := Map{}
ok, s, p := m.Map(42, 1001)
if !ok || s != 42 || p != 0 {
t.Errorf("Expected 42, 0, got %v, %v, %v", ok, s, p)
}
ok = m.Drop(43, 1001)
if !ok || m.pidDelta != 0 {
t.Errorf("Expected 0, got %v, %v", ok, m.pidDelta)
}
ok, s, p = m.Map(44, 1001)
if !ok || s != 43 || p != 0 {
t.Errorf("Expected 43, 0, got %v, %v, %v", ok, s, p)
}
ok, s, p = m.Map(40000, 2001)
if !ok || s != 40000 || p != 0 {
t.Errorf("Expected 32000, 0, got %v, %v, %v", ok, s, p)
}
if m.delta != 0 || m.entries != nil {
t.Errorf("Expected reset")
}
ok, s, p = m.Map(40001, 2001)
if !ok || s != 40001 || p != 0 {
t.Errorf("Expected 32001, 0, got %v, %v, %v", ok, s, p)
}
}
package rtpconn package rtpconn
import ( import (
"errors"
"strings" "strings"
"github.com/pion/rtp" "github.com/pion/rtp"
...@@ -108,3 +109,101 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) { ...@@ -108,3 +109,101 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) {
return false, false return false, false
} }
} }
var errTruncated = errors.New("truncated packet")
var errUnsupportedCodec = errors.New("unsupported codec")
func packetFlags(codec string, buf []byte) (seqno uint16, start bool, pid uint16, tid uint8, sid uint8, layersync bool, discardable bool, err error) {
if len(buf) < 12 {
err = errTruncated
return
}
seqno = (uint16(buf[2]) << 8) | uint16(buf[3])
if strings.EqualFold(codec, "video/vp8") {
var packet rtp.Packet
err = packet.Unmarshal(buf)
if err != nil {
return
}
var vp8 codecs.VP8Packet
_, err = vp8.Unmarshal(packet.Payload)
if err != nil {
return
}
start = vp8.S == 1 && vp8.PID == 0
pid = vp8.PictureID
tid = vp8.TID
layersync = vp8.Y == 1
discardable = vp8.N == 1
return
} else if strings.EqualFold(codec, "video/vp9") {
var packet rtp.Packet
err = packet.Unmarshal(buf)
if err != nil {
return
}
var vp9 codecs.VP9Packet
_, err = vp9.Unmarshal(packet.Payload)
if err != nil {
return
}
start = vp9.B
tid = vp9.TID
sid = vp9.SID
layersync = vp9.U
return
}
return
}
func rewritePacket(codec string, data []byte, seqno uint16, delta uint16) error {
if len(data) < 12 {
return errTruncated
}
data[2] = uint8(seqno >> 8)
data[3] = uint8(seqno)
if delta == 0 {
return nil
}
offset := 12
offset += int(data[0]&0x0F) * 4
if len(data) < offset+4 {
return errTruncated
}
if (data[0] & 0x10) != 0 {
length := uint16(data[offset+2])<<8 | uint16(data[offset+3])
offset += 4 + int(length)*4
if len(data) < offset+4 {
return errTruncated
}
}
if strings.EqualFold(codec, "video/vp8") {
x := (data[offset] & 0x80) != 0
if !x {
return nil
}
i := (data[offset+1] & 0x80) != 0
if !i {
return nil
}
m := (data[offset+2] & 0x80) != 0
if m {
pid := (uint16(data[offset+2]&0x7F) << 8) |
uint16(data[offset+3])
pid = (pid + delta) & 0x7FFF
data[offset+2] = 0x80 | byte((pid>>8)&0x7F)
data[offset+3] = byte(pid & 0xFF)
} else {
data[offset+2] = (data[offset+2] + uint8(delta)) & 0x7F
}
return nil
}
return errUnsupportedCodec
}
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"github.com/jech/galene/ice" "github.com/jech/galene/ice"
"github.com/jech/galene/jitter" "github.com/jech/galene/jitter"
"github.com/jech/galene/packetcache" "github.com/jech/galene/packetcache"
"github.com/jech/galene/packetmap"
"github.com/jech/galene/rtptime" "github.com/jech/galene/rtptime"
) )
...@@ -74,6 +75,7 @@ type downTrackAtomics struct { ...@@ -74,6 +75,7 @@ type downTrackAtomics struct {
srNTP uint64 srNTP uint64
remoteNTP uint64 remoteNTP uint64
remoteRTP uint32 remoteRTP uint32
layerInfo uint32
} }
type rtpDownTrack struct { type rtpDownTrack struct {
...@@ -81,6 +83,7 @@ type rtpDownTrack struct { ...@@ -81,6 +83,7 @@ type rtpDownTrack struct {
sender *webrtc.RTPSender sender *webrtc.RTPSender
remote conn.UpTrack remote conn.UpTrack
ssrc webrtc.SSRC ssrc webrtc.SSRC
packetmap packetmap.Map
maxBitrate *bitrate maxBitrate *bitrate
maxREMBBitrate *bitrate maxREMBBitrate *bitrate
rate *estimator.Estimator rate *estimator.Estimator
...@@ -89,14 +92,6 @@ type rtpDownTrack struct { ...@@ -89,14 +92,6 @@ type rtpDownTrack struct {
cname atomic.Value cname atomic.Value
} }
func (down *rtpDownTrack) Write(buf []byte) (int, error) {
n, err := down.track.Write(buf)
if err == nil {
down.rate.Accumulate(uint32(n))
}
return n, err
}
func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) { func (down *rtpDownTrack) SetTimeOffset(ntp uint64, rtp uint32) {
atomic.StoreUint64(&down.atomics.remoteNTP, ntp) atomic.StoreUint64(&down.atomics.remoteNTP, ntp)
atomic.StoreUint32(&down.atomics.remoteRTP, rtp) atomic.StoreUint32(&down.atomics.remoteRTP, rtp)
...@@ -131,6 +126,17 @@ func (down *rtpDownTrack) SetCname(cname string) { ...@@ -131,6 +126,17 @@ func (down *rtpDownTrack) SetCname(cname string) {
down.cname.Store(cname) down.cname.Store(cname)
} }
func (down *rtpDownTrack) getLayerInfo() (uint8, uint8, uint8) {
info := atomic.LoadUint32(&down.atomics.layerInfo)
return uint8(info >> 16), uint8(info >> 8), uint8(info)
}
func (down *rtpDownTrack) setLayerInfo(layer, wanted, max uint8) {
atomic.StoreUint32(&down.atomics.layerInfo,
(uint32(layer)<<16)|(uint32(wanted)<<8)|uint32(max),
)
}
const ( const (
negotiationUnneeded = iota negotiationUnneeded = iota
negotiationNeeded negotiationNeeded
...@@ -179,17 +185,108 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection, ...@@ -179,17 +185,108 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection,
return conn, nil return conn, nil
} }
func (t *rtpDownTrack) GetMaxBitrate() uint64 { var packetBufPool = sync.Pool{
New: func() interface{} {
return make([]byte, packetcache.BufSize)
},
}
func (down *rtpDownTrack) Write(buf []byte) (int, error) {
codec := down.remote.Codec().MimeType
seqno, start, pid, tid, _, u, _, err := packetFlags(codec, buf)
if err != nil {
return 0, err
}
layer, wantedLayer, maxLayer := down.getLayerInfo()
if tid > maxLayer {
if layer == maxLayer {
wantedLayer = tid
layer = tid
}
maxLayer = tid
if wantedLayer > maxLayer {
wantedLayer = maxLayer
}
down.setLayerInfo(layer, wantedLayer, maxLayer)
down.adjustLayer()
}
if start && layer != wantedLayer {
if u || wantedLayer < layer {
layer = wantedLayer
down.setLayerInfo(layer, wantedLayer, maxLayer)
}
}
if tid > layer {
ok := down.packetmap.Drop(seqno, pid)
if ok {
return 0, nil
}
}
ok, newseqno, piddelta := down.packetmap.Map(seqno, pid)
if !ok {
return 0, nil
}
if newseqno == seqno && piddelta == 0 {
return down.write(buf)
}
ibuf2 := packetBufPool.Get()
defer packetBufPool.Put(ibuf2)
buf2 := ibuf2.([]byte)
n := copy(buf2, buf)
err = rewritePacket(codec, buf2[:n], newseqno, piddelta)
if err != nil {
return 0, err
}
return down.write(buf2[:n])
}
func (down *rtpDownTrack) write(buf []byte) (int, error) {
n, err := down.track.Write(buf)
if err == nil {
down.rate.Accumulate(uint32(n))
}
return n, err
}
func (t *rtpDownTrack) GetMaxBitrate() (uint64, int) {
now := rtptime.Jiffies() now := rtptime.Jiffies()
layer, _, _ := t.getLayerInfo()
r := t.maxBitrate.Get(now) r := t.maxBitrate.Get(now)
if r == ^uint64(0) { if r == ^uint64(0) {
r = 512 * 1024 r = 512 * 1024
} }
rr := t.maxREMBBitrate.Get(now) rr := t.maxREMBBitrate.Get(now)
if rr == 0 || r < rr { if rr == 0 || r < rr {
return r return r, int(layer)
}
return rr, int(layer)
}
func (t *rtpDownTrack) adjustLayer() {
max, _ := t.GetMaxBitrate()
r, _ := t.rate.Estimate()
rate := uint64(r) * 8
if rate < max*7/8 {
layer, wanted, max := t.getLayerInfo()
if layer < max {
wanted = layer + 1
t.setLayerInfo(layer, wanted, max)
}
} else if rate > max*3/2 {
layer, wanted, max := t.getLayerInfo()
if layer > 0 {
wanted = layer - 1
t.setLayerInfo(layer, wanted, max)
}
} }
return rr
} }
func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
...@@ -240,6 +337,7 @@ type rtpUpTrack struct { ...@@ -240,6 +337,7 @@ type rtpUpTrack struct {
srTime uint64 srTime uint64
srNTPTime uint64 srNTPTime uint64
srRTPTime uint32 srRTPTime uint32
maxLayer uint8
local []conn.DownTrack local []conn.DownTrack
bufferedNACKs []uint16 bufferedNACKs []uint16
} }
...@@ -598,7 +696,11 @@ func gotNACK(conn *rtpDownConnection, track *rtpDownTrack, p *rtcp.TransportLaye ...@@ -598,7 +696,11 @@ func gotNACK(conn *rtpDownConnection, track *rtpDownTrack, p *rtcp.TransportLaye
var packet rtp.Packet var packet rtp.Packet
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
for _, nack := range p.Nacks { for _, nack := range p.Nacks {
nack.Range(func(seqno uint16) bool { nack.Range(func(s uint16) bool {
ok, seqno, _ := track.packetmap.Reverse(s)
if !ok {
return true
}
l := track.remote.GetRTP(seqno, buf) l := track.remote.GetRTP(seqno, buf)
if l == 0 { if l == 0 {
unhandled = append(unhandled, seqno) unhandled = append(unhandled, seqno)
...@@ -785,28 +887,44 @@ func sendUpRTCP(up *rtpUpConnection) error { ...@@ -785,28 +887,44 @@ func sendUpRTCP(up *rtpUpConnection) error {
continue continue
} }
ssrcs = append(ssrcs, uint32(t.track.SSRC())) ssrcs = append(ssrcs, uint32(t.track.SSRC()))
var r uint64
if t.Kind() == webrtc.RTPCodecTypeAudio { if t.Kind() == webrtc.RTPCodecTypeAudio {
r = 100 * 1024 rate += 100 * 1024
} else if t.Label() == "l" { } else if t.Label() == "l" {
r = group.LowBitrate rate += group.LowBitrate
} else { } else {
minrate := ^uint64(0)
maxrate := uint64(group.MinBitrate)
maxlayer := 0
local := t.getLocal() local := t.getLocal()
r = ^uint64(0)
for _, down := range local { for _, down := range local {
rr := down.GetMaxBitrate() r, l := down.GetMaxBitrate()
if rr < group.MinBitrate { if maxlayer < l {
rr = group.MinBitrate maxlayer = l
} }
if r > rr { if r < group.MinBitrate {
r = rr r = group.MinBitrate
} }
if minrate > r {
minrate = r
} }
if r == ^uint64(0) { if maxrate < r {
r = 512 * 1024 maxrate = r
}
}
// assume that each layer takes two times less
// throughput than the higher one. Then we've
// got enough slack for a factor of 2^(layers-1).
for i := 0; i < maxlayer; i++ {
if minrate < ^uint64(0)/2 {
minrate *= 2
}
}
if minrate < maxrate {
rate += minrate
} else {
rate += maxrate
} }
} }
rate += r
} }
if rate < ^uint64(0) && len(ssrcs) > 0 { if rate < ^uint64(0) && len(ssrcs) > 0 {
...@@ -968,6 +1086,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT ...@@ -968,6 +1086,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT
continue continue
} }
adjust := false
jiffies := rtptime.Jiffies() jiffies := rtptime.Jiffies()
for _, p := range ps { for _, p := range ps {
...@@ -994,10 +1113,12 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT ...@@ -994,10 +1113,12 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT
} }
case *rtcp.ReceiverEstimatedMaximumBitrate: case *rtcp.ReceiverEstimatedMaximumBitrate:
track.maxREMBBitrate.Set(p.Bitrate, jiffies) track.maxREMBBitrate.Set(p.Bitrate, jiffies)
adjust = true
case *rtcp.ReceiverReport: case *rtcp.ReceiverReport:
for _, r := range p.Reports { for _, r := range p.Reports {
if r.SSRC == uint32(track.ssrc) { if r.SSRC == uint32(track.ssrc) {
handleReport(track, r, jiffies) handleReport(track, r, jiffies)
adjust = true
} }
} }
case *rtcp.SenderReport: case *rtcp.SenderReport:
...@@ -1010,6 +1131,9 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT ...@@ -1010,6 +1131,9 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT
gotNACK(conn, track, p) gotNACK(conn, track, p)
} }
} }
if adjust {
track.adjustLayer()
}
} }
} }
......
...@@ -49,6 +49,8 @@ func (c *webClient) GetStats() *stats.Client { ...@@ -49,6 +49,8 @@ func (c *webClient) GetStats() *stats.Client {
Id: down.id, Id: down.id,
} }
for _, t := range down.tracks { for _, t := range down.tracks {
l, _, _ := t.getLayerInfo()
layer := int(l)
rate, _ := t.rate.Estimate() rate, _ := t.rate.Estimate()
rtt := rtptime.ToDuration(t.getRTT(), rtt := rtptime.ToDuration(t.getRTT(),
rtptime.JiffiesPerSec) rtptime.JiffiesPerSec)
...@@ -56,6 +58,7 @@ func (c *webClient) GetStats() *stats.Client { ...@@ -56,6 +58,7 @@ func (c *webClient) GetStats() *stats.Client {
j := time.Duration(jitter) * time.Second / j := time.Duration(jitter) * time.Second /
time.Duration(t.track.Codec().ClockRate) time.Duration(t.track.Codec().ClockRate)
conns.Tracks = append(conns.Tracks, stats.Track{ conns.Tracks = append(conns.Tracks, stats.Track{
Layer: &layer,
Bitrate: uint64(rate) * 8, Bitrate: uint64(rate) * 8,
MaxBitrate: t.maxBitrate.Get(jiffies), MaxBitrate: t.maxBitrate.Get(jiffies),
Loss: float64(loss) / 256.0, Loss: float64(loss) / 256.0,
......
...@@ -1270,6 +1270,9 @@ func handleClientMessage(c *webClient, m clientMessage) error { ...@@ -1270,6 +1270,9 @@ func handleClientMessage(c *webClient, m clientMessage) error {
return closeDownConn(c, m.Id, message) return closeDownConn(c, m.Id, message)
} }
down := getDownConn(c, m.Id) down := getDownConn(c, m.Id)
if down == nil {
return ErrUnknownId
}
if down.negotiationNeeded > negotiationUnneeded { if down.negotiationNeeded > negotiationUnneeded {
err := negotiate( err := negotiate(
c, down, c, down,
......
...@@ -95,23 +95,26 @@ function formatTrack(table, track) { ...@@ -95,23 +95,26 @@ function formatTrack(table, track) {
tr.appendChild(document.createElement('td')); tr.appendChild(document.createElement('td'));
tr.appendChild(document.createElement('td')); tr.appendChild(document.createElement('td'));
let td = document.createElement('td'); let td = document.createElement('td');
if(track.maxBitrate) td.textContent = track.layer;
td.textContent = `${track.bitrate||0}/${track.maxBitrate}`;
else
td.textContent = `${track.bitrate||0}`;
tr.appendChild(td); tr.appendChild(td);
let td2 = document.createElement('td'); let td2 = document.createElement('td');
td2.textContent = `${Math.round(track.loss * 100)}%`; if(track.maxBitrate)
td2.textContent = `${track.bitrate||0}/${track.maxBitrate}`;
else
td2.textContent = `${track.bitrate||0}`;
tr.appendChild(td2); tr.appendChild(td2);
let td3 = document.createElement('td'); let td3 = document.createElement('td');
td3.textContent = `${Math.round(track.loss * 100)}%`;
tr.appendChild(td3);
let td4 = document.createElement('td');
let text = ''; let text = '';
if(track.rtt) { if(track.rtt) {
text = text + `${Math.round(track.rtt * 1000) / 1000}ms`; text = text + `${Math.round(track.rtt * 1000) / 1000}ms`;
} }
if(track.jitter) if(track.jitter)
text = text + ${Math.round(track.jitter * 1000) / 1000}ms`; text = text + ${Math.round(track.jitter * 1000) / 1000}ms`;
td3.textContent = text; td4.textContent = text;
tr.appendChild(td3); tr.appendChild(td4);
table.appendChild(tr); table.appendChild(tr);
} }
......
...@@ -47,6 +47,7 @@ func (d *Duration) UnmarshalJSON(buf []byte) error { ...@@ -47,6 +47,7 @@ func (d *Duration) UnmarshalJSON(buf []byte) error {
} }
type Track struct { type Track struct {
Layer *int `json:"layer,omitempty"`
Bitrate uint64 `json:"bitrate"` Bitrate uint64 `json:"bitrate"`
MaxBitrate uint64 `json:"maxBitrate,omitempty"` MaxBitrate uint64 `json:"maxBitrate,omitempty"`
Loss float64 `json:"loss"` Loss float64 `json:"loss"`
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment