Commit 22585e9d authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Handle spatial scalability.

Maintain spatial layer information, and drop lower layers when
possible.  Yields a 20% saving with VP9.
parent 781bdf8c
...@@ -40,5 +40,5 @@ type DownTrack interface { ...@@ -40,5 +40,5 @@ type DownTrack interface {
Write(buf []byte) (int, error) Write(buf []byte) (int, error)
SetTimeOffset(ntp uint64, rtp uint32) SetTimeOffset(ntp uint64, rtp uint32)
SetCname(string) SetCname(string)
GetMaxBitrate() (uint64, int) GetMaxBitrate() (uint64, int, int)
} }
...@@ -616,6 +616,6 @@ func (conn *diskConn) initWriter(width, height uint32) error { ...@@ -616,6 +616,6 @@ func (conn *diskConn) initWriter(width, height uint32) error {
return nil return nil
} }
func (t *diskTrack) GetMaxBitrate() (uint64, int) { func (t *diskTrack) GetMaxBitrate() (uint64, int, int) {
return ^uint64(0), -1 return ^uint64(0), -1, -1
} }
...@@ -64,7 +64,7 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) { ...@@ -64,7 +64,7 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) {
return nil, offset, offset > 0 return nil, offset, offset > 0
} }
l := data[offset] l := data[offset]
length |= int(l & 0x7f) << (offset * 7) length |= int(l&0x7f) << (offset * 7)
offset++ offset++
if (l & 0x80) == 0 { if (l & 0x80) == 0 {
break break
...@@ -182,50 +182,66 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) { ...@@ -182,50 +182,66 @@ func isKeyframe(codec string, packet *rtp.Packet) (bool, bool) {
var errTruncated = errors.New("truncated packet") var errTruncated = errors.New("truncated packet")
var errUnsupportedCodec = errors.New("unsupported codec") var errUnsupportedCodec = errors.New("unsupported codec")
func packetFlags(codec string, buf []byte) (seqno uint16, start bool, pid uint16, tid uint8, sid uint8, layersync bool, discardable bool, err error) { type packetFlags struct {
seqno uint16
start bool
pid uint16 // only if it needs rewriting
tid uint8
sid uint8
tidupsync bool
sidsync bool
sidnonreference bool
discardable bool
}
func getPacketFlags(codec string, buf []byte) (packetFlags, error) {
if len(buf) < 12 { if len(buf) < 12 {
err = errTruncated return packetFlags{}, errTruncated
return
} }
seqno = (uint16(buf[2]) << 8) | uint16(buf[3]) var flags packetFlags
flags.seqno = (uint16(buf[2]) << 8) | uint16(buf[3])
if strings.EqualFold(codec, "video/vp8") { if strings.EqualFold(codec, "video/vp8") {
var packet rtp.Packet var packet rtp.Packet
err = packet.Unmarshal(buf) err := packet.Unmarshal(buf)
if err != nil { if err != nil {
return return flags, err
} }
var vp8 codecs.VP8Packet var vp8 codecs.VP8Packet
_, err = vp8.Unmarshal(packet.Payload) _, err = vp8.Unmarshal(packet.Payload)
if err != nil { if err != nil {
return return flags, err
} }
start = vp8.S == 1 && vp8.PID == 0 flags.start = vp8.S == 1 && vp8.PID == 0
pid = vp8.PictureID flags.pid = vp8.PictureID
tid = vp8.TID flags.tid = vp8.TID
layersync = vp8.Y == 1 flags.tidupsync = vp8.Y == 1
discardable = vp8.N == 1 flags.discardable = vp8.N == 1
return return flags, nil
} else if strings.EqualFold(codec, "video/vp9") { } else if strings.EqualFold(codec, "video/vp9") {
var packet rtp.Packet var packet rtp.Packet
err = packet.Unmarshal(buf) err := packet.Unmarshal(buf)
if err != nil { if err != nil {
return return flags, err
} }
var vp9 codecs.VP9Packet var vp9 codecs.VP9Packet
_, err = vp9.Unmarshal(packet.Payload) _, err = vp9.Unmarshal(packet.Payload)
if err != nil { if err != nil {
return return flags, err
} }
start = vp9.B flags.start = vp9.B
tid = vp9.TID flags.tid = vp9.TID
sid = vp9.SID flags.sid = vp9.SID
layersync = vp9.U flags.tidupsync = vp9.U
return flags.sidsync = vp9.P
} // not yet in pion/rtp
return flags.sidnonreference = (packet.Payload[0] & 0x01) != 0
return flags, nil
}
return flags, nil
} }
func rewritePacket(codec string, data []byte, seqno uint16, delta uint16) error { func rewritePacket(codec string, data []byte, seqno uint16, delta uint16) error {
......
...@@ -16,12 +16,13 @@ var vp8 = []byte{ ...@@ -16,12 +16,13 @@ var vp8 = []byte{
func TestPacketFlags(t *testing.T) { func TestPacketFlags(t *testing.T) {
buf := append([]byte{}, vp8...) buf := append([]byte{}, vp8...)
seqno, start, pid, tid, sid, layersync, discardable, err := flags, err := getPacketFlags("video/vp8", buf)
packetFlags("video/vp8", buf) if flags.seqno != 42 || !flags.start || flags.pid != 57 ||
if seqno != 42 || !start || pid != 57 || sid != 0 || tid != 0 || flags.sid != 0 || flags.tid != 0 ||
layersync || discardable || err != nil { flags.tidupsync || flags.discardable || err != nil {
t.Errorf("Got %v, %v, %v, %v, %v, %v (%v)", t.Errorf("Got %v, %v, %v, %v, %v, %v (%v)",
seqno, start, pid, sid, layersync, discardable, err, flags.seqno, flags.start, flags.pid, flags.sid,
flags.tidupsync, flags.discardable, err,
) )
} }
} }
...@@ -34,10 +35,12 @@ func TestRewrite(t *testing.T) { ...@@ -34,10 +35,12 @@ func TestRewrite(t *testing.T) {
t.Errorf("rewrite: %v", err) t.Errorf("rewrite: %v", err)
continue continue
} }
seqno, _, pid, _, _, _, _, err := packetFlags("video/vp8", buf) flags, err := getPacketFlags("video/vp8", buf)
if err != nil || seqno != i || pid != (57 + i) & 0x7FFF { if err != nil || flags.seqno != i ||
flags.pid != (57 + i) & 0x7FFF {
t.Errorf("Expected %v %v, got %v %v (%v)", t.Errorf("Expected %v %v, got %v %v (%v)",
i, (57 + i) & 0x7FFF, seqno, pid, err) i, (57 + i) & 0x7FFF,
flags.seqno, flags.pid, err)
} }
} }
} }
...@@ -126,14 +126,31 @@ func (down *rtpDownTrack) SetCname(cname string) { ...@@ -126,14 +126,31 @@ func (down *rtpDownTrack) SetCname(cname string) {
down.cname.Store(cname) down.cname.Store(cname)
} }
func (down *rtpDownTrack) getLayerInfo() (uint8, uint8, uint8) { type layerInfo struct {
sid, wantedSid, maxSid uint8
tid, wantedTid, maxTid uint8
}
func (down *rtpDownTrack) getLayerInfo() layerInfo {
info := atomic.LoadUint32(&down.atomics.layerInfo) info := atomic.LoadUint32(&down.atomics.layerInfo)
return uint8(info >> 16), uint8(info >> 8), uint8(info) return layerInfo{
sid: uint8((info & 0xF)),
wantedSid: uint8((info >> 4) & 0xF),
maxSid: uint8((info >> 8) & 0xF),
tid: uint8((info >> 16) & 0xF),
wantedTid: uint8((info >> 20) & 0xF),
maxTid: uint8((info >> 24) & 0xF),
}
} }
func (down *rtpDownTrack) setLayerInfo(layer, wanted, max uint8) { func (down *rtpDownTrack) setLayerInfo(info layerInfo) {
atomic.StoreUint32(&down.atomics.layerInfo, atomic.StoreUint32(&down.atomics.layerInfo,
(uint32(layer)<<16)|(uint32(wanted)<<8)|uint32(max), uint32(info.sid&0xF)|
uint32(info.wantedSid&0xF)<<4|
uint32(info.maxSid&0xF)<<8|
uint32(info.tid&0xF)<<16|
uint32(info.wantedTid&0xF)<<20|
uint32(info.maxTid&0xF)<<24,
) )
} }
...@@ -195,45 +212,59 @@ var packetBufPool = sync.Pool{ ...@@ -195,45 +212,59 @@ var packetBufPool = sync.Pool{
func (down *rtpDownTrack) Write(buf []byte) (int, error) { func (down *rtpDownTrack) Write(buf []byte) (int, error) {
codec := down.remote.Codec().MimeType codec := down.remote.Codec().MimeType
seqno, start, pid, tid, _, u, _, err := packetFlags(codec, buf) flags, err := getPacketFlags(codec, buf)
if err != nil { if err != nil {
return 0, err return 0, err
} }
layer, wantedLayer, maxLayer := down.getLayerInfo() layer := down.getLayerInfo()
if tid > maxLayer { if flags.tid > layer.maxTid || flags.sid > layer.maxSid {
if layer == maxLayer { if flags.tid > layer.maxTid {
wantedLayer = tid if layer.tid == layer.maxTid {
layer = tid layer.wantedTid = flags.tid
layer.tid = flags.tid
}
layer.maxTid = flags.tid
}
if flags.sid > layer.maxSid {
if layer.sid == layer.maxSid {
layer.wantedSid = flags.sid
layer.sid = flags.sid
} }
maxLayer = tid layer.maxSid = flags.sid
if wantedLayer > maxLayer {
wantedLayer = maxLayer
} }
down.setLayerInfo(layer, wantedLayer, maxLayer) down.setLayerInfo(layer)
down.adjustLayer() down.adjustLayer()
} }
if start && layer != wantedLayer { if flags.start && (layer.tid != layer.wantedTid) {
if u || wantedLayer < layer { if layer.wantedTid < layer.tid || flags.tidupsync {
layer = wantedLayer layer.tid = layer.wantedTid
down.setLayerInfo(layer, wantedLayer, maxLayer) down.setLayerInfo(layer)
} }
} }
if tid > layer { if flags.start && (layer.sid != layer.wantedSid) {
ok := down.packetmap.Drop(seqno, pid) if flags.sidsync {
layer.sid = layer.wantedTid
down.setLayerInfo(layer)
}
}
if flags.tid > layer.tid || flags.sid > layer.sid ||
(flags.sid < layer.sid && flags.sidnonreference) {
ok := down.packetmap.Drop(flags.seqno, flags.pid)
if ok { if ok {
return 0, nil return 0, nil
} }
} }
ok, newseqno, piddelta := down.packetmap.Map(seqno, pid) ok, newseqno, piddelta := down.packetmap.Map(flags.seqno, flags.pid)
if !ok { if !ok {
return 0, nil return 0, nil
} }
if newseqno == seqno && piddelta == 0 { if newseqno == flags.seqno && piddelta == 0 {
return down.write(buf) return down.write(buf)
} }
...@@ -257,35 +288,35 @@ func (down *rtpDownTrack) write(buf []byte) (int, error) { ...@@ -257,35 +288,35 @@ func (down *rtpDownTrack) write(buf []byte) (int, error) {
return n, err return n, err
} }
func (t *rtpDownTrack) GetMaxBitrate() (uint64, int) { func (t *rtpDownTrack) GetMaxBitrate() (uint64, int, int) {
now := rtptime.Jiffies() now := rtptime.Jiffies()
layer, _, _ := t.getLayerInfo() layer := t.getLayerInfo()
r := t.maxBitrate.Get(now) r := t.maxBitrate.Get(now)
if r == ^uint64(0) { if r == ^uint64(0) {
r = 512 * 1024 r = 512 * 1024
} }
rr := t.maxREMBBitrate.Get(now) rr := t.maxREMBBitrate.Get(now)
if rr == 0 || r < rr { if rr != 0 && rr < r {
return r, int(layer) r = rr
} }
return rr, int(layer) return r, int(layer.sid), int(layer.tid)
} }
func (t *rtpDownTrack) adjustLayer() { func (t *rtpDownTrack) adjustLayer() {
max, _ := t.GetMaxBitrate() max, _, _ := t.GetMaxBitrate()
r, _ := t.rate.Estimate() r, _ := t.rate.Estimate()
rate := uint64(r) * 8 rate := uint64(r) * 8
if rate < max*7/8 { if rate < max*7/8 {
layer, wanted, max := t.getLayerInfo() layer := t.getLayerInfo()
if layer < max { if layer.tid < layer.maxTid {
wanted = layer + 1 layer.wantedTid = layer.tid + 1
t.setLayerInfo(layer, wanted, max) t.setLayerInfo(layer)
} }
} else if rate > max*3/2 { } else if rate > max*3/2 {
layer, wanted, max := t.getLayerInfo() layer := t.getLayerInfo()
if layer > 0 { if layer.tid > 0 {
wanted = layer - 1 layer.wantedTid = layer.tid - 1
t.setLayerInfo(layer, wanted, max) t.setLayerInfo(layer)
} }
} }
} }
...@@ -881,12 +912,16 @@ func sendUpRTCP(up *rtpUpConnection) error { ...@@ -881,12 +912,16 @@ func sendUpRTCP(up *rtpUpConnection) error {
} else { } else {
minrate := ^uint64(0) minrate := ^uint64(0)
maxrate := uint64(group.MinBitrate) maxrate := uint64(group.MinBitrate)
maxlayer := 0 maxsid := 0
maxtid := 0
local := t.getLocal() local := t.getLocal()
for _, down := range local { for _, down := range local {
r, l := down.GetMaxBitrate() r, sid, tid := down.GetMaxBitrate()
if maxlayer < l { if maxsid < sid {
maxlayer = l maxsid = sid
}
if maxtid < tid {
maxtid = tid
} }
if r < group.MinBitrate { if r < group.MinBitrate {
r = group.MinBitrate r = group.MinBitrate
...@@ -898,10 +933,15 @@ func sendUpRTCP(up *rtpUpConnection) error { ...@@ -898,10 +933,15 @@ func sendUpRTCP(up *rtpUpConnection) error {
maxrate = r maxrate = r
} }
} }
// assume that lower spatial layers take up 1/5 of
// the throughput
if maxsid > 0 {
maxrate = maxrate * 5 / 4
}
// assume that each layer takes two times less // assume that each layer takes two times less
// throughput than the higher one. Then we've // throughput than the higher one. Then we've
// got enough slack for a factor of 2^(layers-1). // got enough slack for a factor of 2^(layers-1).
for i := 0; i < maxlayer; i++ { for i := 0; i < maxtid; i++ {
if minrate < ^uint64(0)/2 { if minrate < ^uint64(0)/2 {
minrate *= 2 minrate *= 2
} }
......
...@@ -18,19 +18,23 @@ func TestDownTrackAtomics(t *testing.T) { ...@@ -18,19 +18,23 @@ func TestDownTrackAtomics(t *testing.T) {
down.setSRTime(4, 5) down.setSRTime(4, 5)
down.maxBitrate.Set(6, rtptime.Jiffies()) down.maxBitrate.Set(6, rtptime.Jiffies())
down.maxREMBBitrate.Set(7, rtptime.Jiffies()) down.maxREMBBitrate.Set(7, rtptime.Jiffies())
down.setLayerInfo(8, 9, 10) info := layerInfo{8, 9, 10, 11, 12, 13}
down.setLayerInfo(info)
ntp, rtp := down.getTimeOffset() ntp, rtp := down.getTimeOffset()
rtt := down.getRTT() rtt := down.getRTT()
sr, srntp := down.getSRTime() sr, srntp := down.getSRTime()
br, lbr := down.GetMaxBitrate() br, sbr, tbr := down.GetMaxBitrate()
l, w, m := down.getLayerInfo() info2 := down.getLayerInfo()
if ntp != 1 || rtp != 2 || rtt != 3 || sr != 4 || srntp != 5 || if ntp != 1 || rtp != 2 || rtt != 3 || sr != 4 || srntp != 5 ||
br != 6 || lbr != 8 || l != 8 || w != 9 || m != 10 { br != 6 || sbr != 8 || tbr != 11 {
t.Errorf( t.Errorf(
"Expected 1 2 3 4 5 6 8 8 9 10, "+ "Expected 1 2 3 4 5 6 8 11, "+
"got %v %v %v %v %v %v %v %v %v %v", "got %v %v %v %v %v %v %v %v",
ntp, rtp, rtt, sr, srntp, br, lbr, l, w, m, ntp, rtp, rtt, sr, srntp, br, sbr, tbr,
) )
} }
if info2 != info {
t.Errorf("Expected %v, got %v", info, info2)
}
} }
...@@ -49,7 +49,11 @@ func (c *webClient) GetStats() *stats.Client { ...@@ -49,7 +49,11 @@ func (c *webClient) GetStats() *stats.Client {
Id: down.id, Id: down.id,
} }
for _, t := range down.tracks { for _, t := range down.tracks {
l, _, ml := t.getLayerInfo() layer := t.getLayerInfo()
sid := layer.sid
maxSid := layer.maxSid
tid := layer.tid
maxTid := layer.maxTid
rate, _ := t.rate.Estimate() rate, _ := t.rate.Estimate()
rtt := rtptime.ToDuration(t.getRTT(), rtt := rtptime.ToDuration(t.getRTT(),
rtptime.JiffiesPerSec) rtptime.JiffiesPerSec)
...@@ -57,8 +61,10 @@ func (c *webClient) GetStats() *stats.Client { ...@@ -57,8 +61,10 @@ func (c *webClient) GetStats() *stats.Client {
j := time.Duration(jitter) * time.Second / j := time.Duration(jitter) * time.Second /
time.Duration(t.track.Codec().ClockRate) time.Duration(t.track.Codec().ClockRate)
conns.Tracks = append(conns.Tracks, stats.Track{ conns.Tracks = append(conns.Tracks, stats.Track{
Layer: &l, Tid: &tid,
MaxLayer: &ml, MaxTid: &maxTid,
Sid: &sid,
MaxSid: &maxSid,
Bitrate: uint64(rate) * 8, Bitrate: uint64(rate) * 8,
MaxBitrate: t.maxBitrate.Get(jiffies), MaxBitrate: t.maxBitrate.Get(jiffies),
Loss: float64(loss) / 256.0, Loss: float64(loss) / 256.0,
......
...@@ -99,8 +99,15 @@ function formatTrack(table, track) { ...@@ -99,8 +99,15 @@ function formatTrack(table, track) {
tr.appendChild(document.createElement('td')); tr.appendChild(document.createElement('td'));
tr.appendChild(document.createElement('td')); tr.appendChild(document.createElement('td'));
let td = document.createElement('td'); let td = document.createElement('td');
if(track.layer && track.maxLayer) let layer = '';
td.textContent = `${track.layer}/${track.maxLayer}`; if(track.sid || track.maxSid)
layer = layer + `s${track.sid}/${track.maxSid}`;
if(track.tid || track.maxTid) {
if(layer !== '')
layer = layer + '+';
layer = layer + `t${track.tid}/${track.maxTid}`;
}
td.textContent = layer;
tr.appendChild(td); tr.appendChild(td);
let td2 = document.createElement('td'); let td2 = document.createElement('td');
if(track.maxBitrate) if(track.maxBitrate)
......
...@@ -47,8 +47,10 @@ func (d *Duration) UnmarshalJSON(buf []byte) error { ...@@ -47,8 +47,10 @@ func (d *Duration) UnmarshalJSON(buf []byte) error {
} }
type Track struct { type Track struct {
Layer *uint8 `json:"layer,omitempty"` Sid *uint8 `json:"sid,omitempty"`
MaxLayer *uint8 `json:"maxLayer,omitempty"` MaxSid *uint8 `json:"maxSid,omitempty"`
Tid *uint8 `json:"tid,omitempty"`
MaxTid *uint8 `json:"maxTid,omitempty"`
Bitrate uint64 `json:"bitrate"` Bitrate uint64 `json:"bitrate"`
MaxBitrate uint64 `json:"maxBitrate,omitempty"` MaxBitrate uint64 `json:"maxBitrate,omitempty"`
Loss float64 `json:"loss"` Loss float64 `json:"loss"`
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment