Commit 9093339b authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Rework handling of buffered keyframes.

Instead of buffering the last keyframe, we merely keep track of its
seqno, and use the main cache for recovering.  We also send the whole
sequence of packets rather than just the keyframe itself.
parent f4aa86a4
...@@ -38,13 +38,6 @@ type bitmap struct { ...@@ -38,13 +38,6 @@ type bitmap struct {
bitmap uint32 bitmap uint32
} }
// frame is used for storing the last keyframe
type frame struct {
timestamp uint32
complete bool
entries []entry
}
type Cache struct { type Cache struct {
mu sync.Mutex mu sync.Mutex
//stats //stats
...@@ -55,10 +48,11 @@ type Cache struct { ...@@ -55,10 +48,11 @@ type Cache struct {
totalExpected uint32 totalExpected uint32
received uint32 received uint32
totalReceived uint32 totalReceived uint32
// last seen keyframe
keyframe uint16
keyframeValid bool
// bitmap // bitmap
bitmap bitmap bitmap bitmap
// buffered keyframe
keyframe frame
// the actual cache // the actual cache
tail uint16 tail uint16
entries []entry entries []entry
...@@ -162,94 +156,6 @@ func (bitmap *bitmap) get(next uint16) (bool, uint16, uint16) { ...@@ -162,94 +156,6 @@ func (bitmap *bitmap) get(next uint16) (bool, uint16, uint16) {
return true, first, uint16(bm >> 1) return true, first, uint16(bm >> 1)
} }
// insert inserts a packet into a frame.
func (frame *frame) insert(seqno uint16, timestamp uint32, marker bool, data []byte) bool {
n := len(frame.entries)
i := 0
if n == 0 || seqno > frame.entries[n-1].seqno {
// fast path
i = n
} else {
for i < n {
if frame.entries[i].seqno >= seqno {
break
}
i++
}
if i < n && frame.entries[i].seqno == seqno {
// duplicate
return false
}
}
if n >= maxFrame {
// overflow
return false
}
lam := uint16(len(data))
if marker {
lam |= 0x8000
}
e := entry{
seqno: seqno,
lengthAndMarker: lam,
timestamp: timestamp,
}
copy(e.buf[:], data)
if i >= n {
frame.entries = append(frame.entries, e)
return true
}
frame.entries = append(frame.entries, entry{})
copy(frame.entries[i+1:], frame.entries[i:])
frame.entries[i] = e
return true
}
// store checks whether a packet is part of the current keyframe and, if
// so, inserts it.
func (frame *frame) store(seqno uint16, timestamp uint32, first bool, marker bool, data []byte) bool {
if first {
if frame.timestamp != timestamp {
frame.timestamp = timestamp
frame.complete = false
frame.entries = frame.entries[:0]
}
} else if len(frame.entries) > 0 {
if frame.timestamp != timestamp {
delta := seqno - frame.entries[0].seqno
if (delta&0x8000) == 0 && delta > 0x4000 {
frame.complete = false
frame.entries = frame.entries[:0]
}
return false
}
} else {
return false
}
done := frame.insert(seqno, timestamp, marker, data)
if done && !frame.complete {
marker := false
fst := frame.entries[0].seqno
for i := 1; i < len(frame.entries); i++ {
if frame.entries[i].seqno != fst+uint16(i) {
return done
}
if frame.entries[i].marker() {
marker = true
}
}
if marker {
frame.complete = true
}
}
return done
}
// Store stores a packet in the cache. It returns the first seqno in the // Store stores a packet in the cache. It returns the first seqno in the
// bitmap, and the index at which the packet was stored. // bitmap, and the index at which the packet was stored.
func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker bool, buf []byte) (uint16, uint16) { func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker bool, buf []byte) (uint16, uint16) {
...@@ -270,6 +176,10 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker ...@@ -270,6 +176,10 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker
cache.cycle++ cache.cycle++
} }
cache.last = seqno cache.last = seqno
if cache.keyframeValid &&
compare(cache.keyframe, seqno) > 0 {
cache.keyframeValid = false
}
} else if cmp > 0 { } else if cmp > 0 {
if cache.received < cache.expected { if cache.received < cache.expected {
cache.received++ cache.received++
...@@ -278,9 +188,9 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker ...@@ -278,9 +188,9 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker
} }
cache.bitmap.set(seqno) cache.bitmap.set(seqno)
done := cache.keyframe.store(seqno, timestamp, keyframe, marker, buf) if keyframe {
if done && !cache.keyframe.complete { cache.keyframe = seqno
completeKeyframe(cache) cache.keyframeValid = true
} }
i := cache.tail i := cache.tail
...@@ -297,58 +207,6 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker ...@@ -297,58 +207,6 @@ func (cache *Cache) Store(seqno uint16, timestamp uint32, keyframe bool, marker
return cache.bitmap.first, i return cache.bitmap.first, i
} }
// completeKeyFrame attempts to complete the current keyframe.
func completeKeyframe(cache *Cache) {
l := len(cache.keyframe.entries)
if l == 0 {
return
}
first := cache.keyframe.entries[0].seqno
last := cache.keyframe.entries[l-1].seqno
count := (last - first) // may wrap around
if count > 0x4000 {
// this shouldn't happen
return
}
var buf []byte
if count > 1 {
if buf == nil {
buf = make([]byte, BufSize)
}
for i := uint16(1); i < count; i++ {
n, ts, marker := get(first+i, cache.entries, buf)
if n > 0 {
cache.keyframe.store(
first+i, ts, false, marker, buf,
)
}
}
}
if !cache.keyframe.complete {
// Try to find packets after the last one.
for {
l := len(cache.keyframe.entries)
if cache.keyframe.entries[l-1].marker() {
break
}
if buf == nil {
buf = make([]byte, BufSize)
}
seqno := cache.keyframe.entries[l-1].seqno + 1
n, ts, marker := get(seqno, cache.entries, buf)
if n <= 0 {
break
}
done := cache.keyframe.store(
seqno, ts, false, marker, buf,
)
if !done || marker {
break
}
}
}
}
// Expect records that we expect n additional packets. // Expect records that we expect n additional packets.
func (cache *Cache) Expect(n int) { func (cache *Cache) Expect(n int) {
if n <= 0 { if n <= 0 {
...@@ -384,12 +242,7 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 { ...@@ -384,12 +242,7 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
n, _, _ := get(seqno, cache.keyframe.entries, result) n, _, _ := get(seqno, cache.entries, result)
if n > 0 {
return n
}
n, _, _ = get(seqno, cache.entries, result)
if n > 0 { if n > 0 {
return n return n
} }
...@@ -397,17 +250,13 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 { ...@@ -397,17 +250,13 @@ func (cache *Cache) Get(seqno uint16, result []byte) uint16 {
return 0 return 0
} }
func (cache *Cache) Last() (bool, uint16, uint32) { func (cache *Cache) Last() (uint16, bool) {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
if !cache.lastValid { if !cache.lastValid {
return false, 0, 0 return 0, false
} }
len, ts, _ := get(cache.last, cache.entries, nil) return cache.last, true
if len == 0 {
return false, 0, 0
}
return true, cache.last, ts
} }
// GetAt retrieves a packet from the cache assuming it is at the given index. // GetAt retrieves a packet from the cache assuming it is at the given index.
...@@ -427,32 +276,15 @@ func (cache *Cache) GetAt(seqno uint16, index uint16, result []byte) uint16 { ...@@ -427,32 +276,15 @@ func (cache *Cache) GetAt(seqno uint16, index uint16, result []byte) uint16 {
) )
} }
// Keyframe returns the last buffered keyframe. It returns the frame's // Keyframe returns the seqno of the last seen keyframe
// timestamp and a boolean indicating if the frame is complete. func (cache *Cache) Keyframe() (uint16, bool) {
func (cache *Cache) Keyframe() (uint32, bool, []uint16) {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
if len(cache.keyframe.entries) == 0 { if !cache.keyframeValid {
return 0, false, nil return 0, false
} }
return cache.keyframe, true
seqnos := make([]uint16, len(cache.keyframe.entries))
for i := range cache.keyframe.entries {
seqnos[i] = cache.keyframe.entries[i].seqno
}
return cache.keyframe.timestamp, cache.keyframe.complete, seqnos
}
func (cache *Cache) KeyframeSeqno() (bool, uint16, uint32) {
cache.mu.Lock()
defer cache.mu.Unlock()
if len(cache.keyframe.entries) == 0 {
return false, 0, 0
}
return true, cache.keyframe.entries[0].seqno, cache.keyframe.timestamp
} }
func (cache *Cache) resize(capacity int) { func (cache *Cache) resize(capacity int) {
......
...@@ -22,7 +22,7 @@ func TestCache(t *testing.T) { ...@@ -22,7 +22,7 @@ func TestCache(t *testing.T) {
buf2 := randomBuf() buf2 := randomBuf()
cache := New(16) cache := New(16)
found, _, _ := cache.Last() _, found := cache.Last()
if found { if found {
t.Errorf("Found in empty cache") t.Errorf("Found in empty cache")
} }
...@@ -30,13 +30,12 @@ func TestCache(t *testing.T) { ...@@ -30,13 +30,12 @@ func TestCache(t *testing.T) {
_, i1 := cache.Store(13, 42, false, false, buf1) _, i1 := cache.Store(13, 42, false, false, buf1)
_, i2 := cache.Store(17, 42, false, false, buf2) _, i2 := cache.Store(17, 42, false, false, buf2)
found, seqno, ts := cache.Last() seqno, found := cache.Last()
if !found { if !found {
t.Errorf("Not found") t.Errorf("Not found")
} }
if seqno != 17 || ts != 42 { if seqno != 17 {
t.Errorf("Expected %v, %v, got %v, %v", t.Errorf("Expected %v, got %v", 17, seqno)
17, 42, seqno, ts)
} }
buf := make([]byte, BufSize) buf := make([]byte, BufSize)
...@@ -170,84 +169,6 @@ func TestCacheGrowCond(t *testing.T) { ...@@ -170,84 +169,6 @@ func TestCacheGrowCond(t *testing.T) {
} }
} }
func TestKeyframe(t *testing.T) {
cache := New(16)
packet := make([]byte, 1)
buf := make([]byte, BufSize)
found, _, _ := cache.KeyframeSeqno()
if found {
t.Errorf("Found keyframe in empty cache")
}
cache.Store(7, 57, true, false, packet)
if cache.keyframe.complete {
t.Errorf("Expected false, got true")
}
cache.Store(8, 57, false, true, packet)
if !cache.keyframe.complete {
t.Errorf("Expected true, got false")
}
ts, c, kf := cache.Keyframe()
if ts != 57 || !c || len(kf) != 2 {
t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2)
}
found, seqno, ts := cache.KeyframeSeqno()
if !found || seqno != 7 || ts != 57 {
t.Errorf("Got %v %v %v, expected %v %v", found, seqno, ts, 7, 57)
}
for _, i := range kf {
l := cache.Get(i, buf)
if int(l) != len(packet) {
t.Errorf("Couldn't get %v", i)
}
}
for i := 0; i < 32; i++ {
cache.Store(uint16(9+i), uint32(58+i), false, false, packet)
}
ts, c, kf = cache.Keyframe()
if ts != 57 || !c || len(kf) != 2 {
t.Errorf("Got %v %v %v, expected %v %v", ts, c, len(kf), 57, 2)
}
for _, i := range kf {
l := cache.Get(i, buf)
if int(l) != len(packet) {
t.Errorf("Couldn't get %v", i)
}
}
}
func TestKeyframeUnsorted(t *testing.T) {
cache := New(16)
packet := make([]byte, 1)
cache.Store(7, 57, false, false, packet)
cache.Store(9, 57, false, false, packet)
cache.Store(10, 57, false, true, packet)
cache.Store(6, 57, true, false, packet)
_, c, kf := cache.Keyframe()
if len(kf) != 2 || c {
t.Errorf("Got %v %v, expected 2", c, kf)
}
cache.Store(8, 57, false, false, packet)
_, c, kf = cache.Keyframe()
if len(kf) != 5 || !c {
t.Errorf("Got %v %v, expected 5", c, kf)
}
for i, v := range kf {
if v != uint16(i+6) {
t.Errorf("Position %v, expected %v, got %v\n",
i, i+6, v)
}
}
}
func TestBitmap(t *testing.T) { func TestBitmap(t *testing.T) {
value := uint64(0xcdd58f1e035379c0) value := uint64(0xcdd58f1e035379c0)
packet := make([]byte, 1) packet := make([]byte, 1)
......
...@@ -206,9 +206,10 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error { ...@@ -206,9 +206,10 @@ func (writer *rtpWriter) add(track conn.DownTrack, add bool, max int) error {
} }
} }
func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) { func sendSequence(kf, last uint16, track conn.DownTrack, cache *packetcache.Cache) {
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
for _, seqno := range kf { seqno := kf
for ((last - seqno) & 0x8000) == 0 {
bytes := cache.Get(seqno, buf) bytes := cache.Get(seqno, buf)
if bytes == 0 { if bytes == 0 {
return return
...@@ -218,6 +219,7 @@ func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) { ...@@ -218,6 +219,7 @@ func sendKeyframe(kf []uint16, track conn.DownTrack, cache *packetcache.Cache) {
if err != nil { if err != nil {
return return
} }
seqno++
} }
} }
...@@ -253,14 +255,12 @@ func rtpWriterLoop(writer *rtpWriter, track *rtpUpTrack) { ...@@ -253,14 +255,12 @@ func rtpWriterLoop(writer *rtpWriter, track *rtpUpTrack) {
action.track.SetCname(cname) action.track.SetCname(cname)
} }
found, _, lts := track.cache.Last() last, foundLast := track.cache.Last()
kts, _, kf := track.cache.Keyframe() kf, foundKf := track.cache.Keyframe()
if found && len(kf) > 0 { if foundLast && foundKf {
if ((lts-kts)&0x80000000) != 0 || if last-kf < 40 { // modulo 2^16
lts-kts < 2*90000 { go sendSequence(
// we got a recent keyframe kf, last,
go sendKeyframe(
kf,
action.track, action.track,
track.cache, track.cache,
) )
...@@ -331,11 +331,11 @@ func nackWriter(track *rtpUpTrack) { ...@@ -331,11 +331,11 @@ func nackWriter(track *rtpUpTrack) {
// drop any nacks before the last keyframe // drop any nacks before the last keyframe
var cutoff uint16 var cutoff uint16
found, seqno, _ := track.cache.KeyframeSeqno() seqno, found := track.cache.Keyframe()
if found { if found {
cutoff = seqno cutoff = seqno
} else { } else {
last, lastSeqno, _ := track.cache.Last() lastSeqno, last := track.cache.Last()
if !last { if !last {
// NACK on a fresh track? Give up. // NACK on a fresh track? Give up.
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment