Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
sfu
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Alain Takoudjou
sfu
Commits
2f632dbc
Commit
2f632dbc
authored
Apr 29, 2020
by
Juliusz Chroboczek
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Maintain reception statistics, send reiver reports.
parent
e2d89c7c
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
129 additions
and
10 deletions
+129
-10
client.go
client.go
+78
-2
group.go
group.go
+5
-4
packetcache/packetcache.go
packetcache/packetcache.go
+46
-4
No files found.
client.go
View file @
2f632dbc
...
...
@@ -277,6 +277,8 @@ func addUpConn(c *client, id string) (*upConnection, error) {
sendICE
(
c
,
id
,
candidate
)
})
go
rtcpUpSender
(
c
,
conn
)
pc
.
OnTrack
(
func
(
remote
*
webrtc
.
Track
,
receiver
*
webrtc
.
RTPReceiver
)
{
c
.
mu
.
Lock
()
u
,
ok
:=
c
.
up
[
id
]
...
...
@@ -303,6 +305,8 @@ func addUpConn(c *client, id string) (*upConnection, error) {
}
go
upLoop
(
conn
,
track
)
go
rtcpUpListener
(
conn
,
track
,
receiver
)
})
return
conn
,
nil
...
...
@@ -357,6 +361,78 @@ func upLoop(conn *upConnection, track *upTrack) {
}
}
func
rtcpUpListener
(
conn
*
upConnection
,
track
*
upTrack
,
r
*
webrtc
.
RTPReceiver
)
{
for
{
ps
,
err
:=
r
.
ReadRTCP
()
if
err
!=
nil
{
if
err
!=
io
.
EOF
{
log
.
Printf
(
"ReadRTCP: %v"
,
err
)
}
return
}
for
_
,
p
:=
range
ps
{
switch
p
:=
p
.
(
type
)
{
case
*
rtcp
.
SenderReport
:
atomic
.
StoreUint32
(
&
track
.
lastSenderReport
,
uint32
(
p
.
NTPTime
>>
16
))
case
*
rtcp
.
SourceDescription
:
default
:
log
.
Printf
(
"RTCP: %T"
,
p
)
}
}
}
}
func
sendRR
(
c
*
client
,
conn
*
upConnection
)
error
{
c
.
mu
.
Lock
()
if
len
(
conn
.
tracks
)
==
0
{
c
.
mu
.
Unlock
()
return
nil
}
ssrc
:=
conn
.
tracks
[
0
]
.
track
.
SSRC
()
reports
:=
make
([]
rtcp
.
ReceptionReport
,
0
,
len
(
conn
.
tracks
))
for
_
,
t
:=
range
conn
.
tracks
{
expected
,
lost
,
eseqno
:=
t
.
cache
.
GetStats
(
true
)
if
expected
==
0
{
expected
=
1
}
if
lost
>=
expected
{
lost
=
expected
-
1
}
reports
=
append
(
reports
,
rtcp
.
ReceptionReport
{
SSRC
:
t
.
track
.
SSRC
(),
LastSenderReport
:
atomic
.
LoadUint32
(
&
t
.
lastSenderReport
),
FractionLost
:
uint8
((
lost
*
256
)
/
expected
),
TotalLost
:
lost
,
LastSequenceNumber
:
eseqno
,
})
}
c
.
mu
.
Unlock
()
return
conn
.
pc
.
WriteRTCP
([]
rtcp
.
Packet
{
&
rtcp
.
ReceiverReport
{
SSRC
:
ssrc
,
Reports
:
reports
,
},
})
}
func
rtcpUpSender
(
c
*
client
,
conn
*
upConnection
)
{
for
{
time
.
Sleep
(
time
.
Second
)
err
:=
sendRR
(
c
,
conn
)
if
err
!=
nil
{
if
err
==
io
.
EOF
||
err
==
io
.
ErrClosedPipe
{
return
}
log
.
Printf
(
"WriteRTCP: %v"
,
err
)
}
}
}
func
delUpConn
(
c
*
client
,
id
string
)
{
c
.
mu
.
Lock
()
defer
c
.
mu
.
Unlock
()
...
...
@@ -498,7 +574,7 @@ func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConn
conn
.
tracks
=
append
(
conn
.
tracks
,
track
)
remoteTrack
.
addLocal
(
track
)
go
rtcpListener
(
c
.
group
,
conn
,
track
,
s
)
go
rtcp
Down
Listener
(
c
.
group
,
conn
,
track
,
s
)
return
conn
,
s
,
nil
}
...
...
@@ -509,7 +585,7 @@ func msSinceEpoch() uint64 {
return
uint64
(
time
.
Since
(
epoch
)
/
time
.
Millisecond
)
}
func
rtcpListener
(
g
*
group
,
conn
*
downConnection
,
track
*
downTrack
,
s
*
webrtc
.
RTPSender
)
{
func
rtcp
Down
Listener
(
g
*
group
,
conn
*
downConnection
,
track
*
downTrack
,
s
*
webrtc
.
RTPSender
)
{
for
{
ps
,
err
:=
s
.
ReadRTCP
()
if
err
!=
nil
{
...
...
group.go
View file @
2f632dbc
...
...
@@ -21,10 +21,11 @@ import (
)
type
upTrack
struct
{
track
*
webrtc
.
Track
cache
*
packetcache
.
Cache
maxBitrate
uint64
lastPLI
uint64
track
*
webrtc
.
Track
cache
*
packetcache
.
Cache
maxBitrate
uint64
lastPLI
uint64
lastSenderReport
uint32
mu
sync
.
Mutex
local
[]
*
downTrack
...
...
packetcache/packetcache.go
View file @
2f632dbc
...
...
@@ -13,10 +13,18 @@ type entry struct {
}
type
Cache
struct
{
mu
sync
.
Mutex
first
uint16
// the first seqno
bitmap
uint32
tail
int
// the next entry to be rewritten
mu
sync
.
Mutex
//stats
last
uint16
cycle
uint16
lastValid
bool
expected
uint32
lost
uint32
// bitmap
first
uint16
bitmap
uint32
// packet cache
tail
int
entries
[]
entry
}
...
...
@@ -65,6 +73,25 @@ func (cache *Cache) Store(seqno uint16, buf []byte) uint16 {
cache
.
mu
.
Lock
()
defer
cache
.
mu
.
Unlock
()
if
!
cache
.
lastValid
{
cache
.
last
=
seqno
cache
.
lastValid
=
true
cache
.
expected
++
}
else
{
if
((
cache
.
last
-
seqno
)
&
0x8000
)
!=
0
{
cache
.
expected
+=
uint32
(
seqno
-
cache
.
last
)
cache
.
lost
+=
uint32
(
seqno
-
cache
.
last
-
1
)
if
seqno
<
cache
.
last
{
cache
.
cycle
++
}
cache
.
last
=
seqno
}
else
{
if
cache
.
lost
>
0
{
cache
.
lost
--
}
}
}
cache
.
set
(
seqno
)
cache
.
entries
[
cache
.
tail
]
.
seqno
=
seqno
...
...
@@ -102,3 +129,18 @@ func (cache *Cache) BitmapGet() (uint16, uint16) {
cache
.
first
+=
17
return
first
,
bitmap
}
func
(
cache
*
Cache
)
GetStats
(
reset
bool
)
(
uint32
,
uint32
,
uint32
)
{
cache
.
mu
.
Lock
()
defer
cache
.
mu
.
Unlock
()
expected
:=
cache
.
expected
lost
:=
cache
.
lost
eseqno
:=
uint32
(
cache
.
cycle
)
<<
16
|
uint32
(
cache
.
last
)
if
reset
{
cache
.
expected
=
0
cache
.
lost
=
0
}
return
expected
,
lost
,
eseqno
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment