Commit 795a40ce authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Simulcast.

parent f1a15f07
......@@ -135,8 +135,17 @@ A peer must explicitly request the streams that it wants to receive.
```
The field `request` is a dictionary that maps the labels of requested
streams to a list containing either 'audio', 'video' or both. An entry
with an empty key `''` serves as default.
streams to a list containing either 'audio', or one of 'video' or
'video-low'. The empty key `''` serves as default. For example:
```javascript
{
type: 'request',
request: {
camera: ['audio', 'video-low'],
'': ['audio', 'video']
}
}
## Pushing streams
......@@ -157,16 +166,22 @@ A stream is created by the sender with the `offer` message:
If a stream with the same id exists, then this is a renegotation;
otherwise this message creates a new stream. If the field `replace` is
not empty, then this request additionally requests that an existing stream
with the given id should be closed, and the new stream should replace it.
with the given id should be closed, and the new stream should replace it;
this is used most notably when changing the simulcast envelope.
The field `label` is one of `camera`, `screenshare` or `video`, as in the
`request` message.
The field `label` is one of `camera`, `screenshare` or `video`, and will
be matched against the keys sent by the receiver in their `request` message.
The field `sdp` contains the raw SDP string (i.e. the `sdp` field of
a JSEP session description). Galène will interpret the `nack`,
`nack pli`, `ccm fir` and `goog-remb` RTCP feedback types, and act
accordingly.
The sender may either send a single stream per media section in the SDP,
or use rid-based simulcasting. In the latter case, it should send two
video streams, one with rid 'h' and high throughput, and one with rid 'l'
and throughput limited to roughly 100kbit/s.
The receiver may either abort the stream immediately (see below), or send
an answer.
......
......@@ -25,6 +25,7 @@ type UpTrack interface {
AddLocal(DownTrack) error
DelLocal(DownTrack) bool
Kind() webrtc.RTPCodecType
Label() string
Codec() webrtc.RTPCodecCapability
// get a recent packet. Returns 0 if the packet is not in cache.
GetRTP(seqno uint16, result []byte) uint16
......@@ -33,7 +34,6 @@ type UpTrack interface {
// Type Down represents a connection in the server to client direction.
type Down interface {
GetMaxBitrate(now uint64) uint64
}
// Type DownTrack represents a track in the server to client direction.
......@@ -42,4 +42,5 @@ type DownTrack interface {
Accumulate(bytes uint32)
SetTimeOffset(ntp uint64, rtp uint32)
SetCname(string)
GetMaxBitrate() uint64
}
......@@ -600,7 +600,7 @@ func (conn *diskConn) initWriter(width, height uint32) error {
return nil
}
func (down *diskConn) GetMaxBitrate(now uint64) uint64 {
func (t *diskTrack) GetMaxBitrate() uint64 {
return ^uint64(0)
}
......
......@@ -13,6 +13,7 @@ import (
"time"
"github.com/pion/ice/v2"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
)
......@@ -60,7 +61,8 @@ type ChatHistoryEntry struct {
}
const (
MinBitrate = 200000
LowBitrate = 100000
MinBitrate = 2 * LowBitrate
)
type Group struct {
......@@ -252,6 +254,12 @@ func APIFromCodecs(codecs []webrtc.RTPCodecCapability) (*webrtc.API, error) {
if UDPMin > 0 && UDPMax > 0 {
s.SetEphemeralUDPPortRange(UDPMin, UDPMax)
}
m.RegisterHeaderExtension(
webrtc.RTPHeaderExtensionCapability{sdp.SDESMidURI},
webrtc.RTPCodecTypeVideo)
m.RegisterHeaderExtension(
webrtc.RTPHeaderExtensionCapability{sdp.SDESRTPStreamIDURI},
webrtc.RTPCodecTypeVideo)
return webrtc.NewAPI(
webrtc.WithSettingEngine(s),
......
......@@ -77,15 +77,16 @@ type downTrackAtomics struct {
}
type rtpDownTrack struct {
track *webrtc.TrackLocalStaticRTP
sender *webrtc.RTPSender
remote conn.UpTrack
ssrc webrtc.SSRC
maxBitrate *bitrate
rate *estimator.Estimator
stats *receiverStats
atomics *downTrackAtomics
cname atomic.Value
track *webrtc.TrackLocalStaticRTP
sender *webrtc.RTPSender
remote conn.UpTrack
ssrc webrtc.SSRC
maxBitrate *bitrate
maxREMBBitrate *bitrate
rate *estimator.Estimator
stats *receiverStats
atomics *downTrackAtomics
cname atomic.Value
}
func (down *rtpDownTrack) WriteRTP(packet *rtp.Packet) error {
......@@ -140,7 +141,6 @@ type rtpDownConnection struct {
id string
pc *webrtc.PeerConnection
remote conn.Up
maxREMBBitrate *bitrate
iceCandidates []*webrtc.ICECandidateInit
negotiationNeeded int
......@@ -174,31 +174,22 @@ func newDownConn(c group.Client, id string, remote conn.Up) (*rtpDownConnection,
id: id,
pc: pc,
remote: remote,
maxREMBBitrate: new(bitrate),
}
return conn, nil
}
func (down *rtpDownConnection) GetMaxBitrate(now uint64) uint64 {
rate := down.maxREMBBitrate.Get(now)
var trackRate uint64
tracks := down.getTracks()
for _, t := range tracks {
r := t.maxBitrate.Get(now)
if r == ^uint64(0) {
if t.track.Kind() == webrtc.RTPCodecTypeAudio {
r = 128 * 1024
} else {
r = 512 * 1024
}
}
trackRate += r
func (t *rtpDownTrack) GetMaxBitrate() uint64 {
now := rtptime.Jiffies()
r := t.maxBitrate.Get(now)
if r == ^uint64(0) {
r = 512 * 1024
}
if trackRate < rate {
return trackRate
rr := t.maxREMBBitrate.Get(now)
if rr == 0 || r < rr {
return r
}
return rate
return rr
}
func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
......@@ -311,6 +302,10 @@ func (up *rtpUpTrack) GetRTP(seqno uint16, result []byte) uint16 {
return up.cache.Get(seqno, result)
}
func (up *rtpUpTrack) Label() string {
return up.track.RID()
}
func (up *rtpUpTrack) Kind() webrtc.RTPCodecType {
return up.track.Kind()
}
......@@ -687,7 +682,7 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
for {
firstSR := false
n, _, err := r.Read(buf)
n, _, err := r.ReadSimulcast(buf, track.track.RID())
if err != nil {
if err != io.EOF && err != io.ErrClosedPipe {
log.Printf("Read RTCP: %v", err)
......@@ -752,11 +747,11 @@ func rtcpUpListener(conn *rtpUpConnection, track *rtpUpTrack, r *webrtc.RTPRecei
}
}
func sendUpRTCP(conn *rtpUpConnection) error {
tracks := conn.getTracks()
func sendUpRTCP(up *rtpUpConnection) error {
tracks := up.getTracks()
if len(conn.tracks) == 0 {
state := conn.pc.ConnectionState()
if len(up.tracks) == 0 {
state := up.pc.ConnectionState()
if state == webrtc.PeerConnectionStateClosed {
return io.ErrClosedPipe
}
......@@ -765,7 +760,7 @@ func sendUpRTCP(conn *rtpUpConnection) error {
now := rtptime.Jiffies()
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
reports := make([]rtcp.ReceptionReport, 0, len(up.tracks))
for _, t := range tracks {
updateUpTrack(t)
stats := t.cache.GetStats(true)
......@@ -810,29 +805,38 @@ func sendUpRTCP(conn *rtpUpConnection) error {
},
}
rate := ^uint64(0)
local := conn.getLocal()
for _, l := range local {
r := l.GetMaxBitrate(now)
if r < rate {
rate = r
}
}
if rate < group.MinBitrate {
rate = group.MinBitrate
}
var ssrcs []uint32
var rate uint64
for _, t := range tracks {
if !t.hasRtcpFb("goog-remb", "") {
continue
}
ssrcs = append(ssrcs, uint32(t.track.SSRC()))
var r uint64
if t.Kind() == webrtc.RTPCodecTypeAudio {
r = 100 * 1024
} else if t.Label() == "l" {
r = group.LowBitrate
} else {
local := t.getLocal()
r = ^uint64(0)
for _, down := range local {
rr := down.GetMaxBitrate()
if rr < group.MinBitrate {
rr = group.MinBitrate
}
if r > rr {
r = rr
}
}
if r == ^uint64(0) {
r = 512 * 1024
}
}
rate += r
}
if len(ssrcs) > 0 {
if rate < ^uint64(0) && len(ssrcs) > 0 {
packets = append(packets,
&rtcp.ReceiverEstimatedMaximumBitrate{
Bitrate: rate,
......@@ -840,7 +844,7 @@ func sendUpRTCP(conn *rtpUpConnection) error {
},
)
}
return conn.pc.WriteRTCP(packets)
return up.pc.WriteRTCP(packets)
}
func rtcpUpSender(conn *rtpUpConnection) {
......@@ -1049,7 +1053,7 @@ func rtcpDownListener(conn *rtpDownConnection, track *rtpDownTrack, s *webrtc.RT
log.Printf("sendFIR: %v", err)
}
case *rtcp.ReceiverEstimatedMaximumBitrate:
conn.maxREMBBitrate.Set(p.Bitrate, jiffies)
track.maxREMBBitrate.Set(p.Bitrate, jiffies)
case *rtcp.ReceiverReport:
for _, r := range p.Reports {
if r.SSRC == uint32(track.ssrc) {
......
......@@ -149,6 +149,16 @@ func readLoop(conn *rtpUpConnection, track *rtpUpTrack) {
kf, _ := isKeyframe(codec.MimeType, &packet)
if packet.Extension {
packet.Extension = false
packet.Extensions = nil
bytes, err = packet.MarshalTo(buf)
if err != nil {
log.Printf("%v", err)
continue
}
}
first, index := track.cache.Store(
packet.SequenceNumber, packet.Timestamp,
kf, packet.Marker, buf[:bytes],
......
......@@ -47,7 +47,6 @@ func (c *webClient) GetStats() *stats.Client {
for _, down := range c.down {
conns := stats.Conn{
Id: down.id,
MaxBitrate: down.GetMaxBitrate(jiffies),
}
for _, t := range down.tracks {
rate, _ := t.rate.Estimate()
......
......@@ -380,14 +380,15 @@ func addDownTrackUnlocked(conn *rtpDownConnection, remoteTrack *rtpUpTrack, remo
}
track := &rtpDownTrack{
track: local,
sender: sender,
ssrc: parms.Encodings[0].SSRC,
remote: remoteTrack,
maxBitrate: new(bitrate),
stats: new(receiverStats),
rate: estimator.New(time.Second),
atomics: &downTrackAtomics{},
track: local,
sender: sender,
ssrc: parms.Encodings[0].SSRC,
remote: remoteTrack,
maxBitrate: new(bitrate),
maxREMBBitrate: new(bitrate),
stats: new(receiverStats),
rate: estimator.New(time.Second),
atomics: &downTrackAtomics{},
}
conn.tracks = append(conn.tracks, track)
......@@ -646,33 +647,60 @@ func requestedTracks(c *webClient, up conn.Up, tracks []conn.UpTrack) []conn.UpT
return nil
}
var audio, video bool
var audio, video, videoLow bool
for _, s := range r {
switch s {
case "audio":
audio = true
case "video":
video = true
case "video-low":
videoLow = true
default:
log.Printf("client requested unknown value %v", s)
}
}
var ts []conn.UpTrack
if audio {
find := func(kind webrtc.RTPCodecType, labels ...string) conn.UpTrack {
for _, l := range labels {
for _, t := range tracks {
if t.Kind() != kind {
continue
}
if t.Label() == l {
return t
}
}
}
for _, t := range tracks {
if t.Kind() == webrtc.RTPCodecTypeAudio {
ts = append(ts, t)
break
if t.Kind() != kind {
continue
}
return t
}
return nil
}
var ts []conn.UpTrack
if audio {
t := find(webrtc.RTPCodecTypeAudio)
if t != nil {
ts = append(ts, t)
}
}
if video {
for _, t := range tracks {
if t.Kind() == webrtc.RTPCodecTypeVideo {
ts = append(ts, t)
break
}
t := find(
webrtc.RTPCodecTypeVideo, "h", "m", "video",
)
if t != nil {
ts = append(ts, t)
}
} else if videoLow {
t := find(
webrtc.RTPCodecTypeVideo, "l", "m", "video",
)
if t != nil {
ts = append(ts, t)
}
}
......
......@@ -213,7 +213,9 @@
<select id="requestselect" class="select select-inline">
<option value="">nothing</option>
<option value="audio">audio only</option>
<option value="screenshare-low">screen share (low)</option>
<option value="screenshare">screen share</option>
<option value="everything-low">everything (low)</option>
<option value="everything" selected>everything</option>
</select>
</form>
......
......@@ -78,6 +78,7 @@ function getUserPass() {
* @property {boolean} [localMute]
* @property {string} [video]
* @property {string} [audio]
* @property {boolean} [simulcast]
* @property {string} [send]
* @property {string} [request]
* @property {boolean} [activityDetection]
......@@ -550,9 +551,15 @@ function mapRequest(what) {
case 'audio':
return {'': ['audio']};
break;
case 'screenshare-low':
return {screenshare: ['audio','video-low'], '': ['audio']};
break;
case 'screenshare':
return {screenshare: ['audio','video'], '': ['audio']};
break;
case 'everything-low':
return {'': ['audio','video-low']};
break;
case 'everything':
return {'': ['audio','video']}
break;
......@@ -611,20 +618,25 @@ getInputElement('fileinput').onchange = function(e) {
function gotUpStats(stats) {
let c = this;
let text = '';
let values = [];
c.pc.getSenders().forEach(s => {
let tid = s.track && s.track.id;
let stats = tid && c.stats[tid];
let rate = stats && stats['outbound-rtp'] && stats['outbound-rtp'].rate;
if(typeof rate === 'number') {
if(text)
text = text + ' + ';
text = text + Math.round(rate / 1000) + 'kbps';
for(let id in stats) {
if(stats[id] && stats[id]['outbound-rtp']) {
let rate = stats[id]['outbound-rtp'].rate;
if(typeof rate === 'number') {
values.push(rate);
}
}
});
}
setLabel(c, text);
if(values.length === 0) {
setLabel(c, '');
} else {
values.sort((x,y) => x - y);
setLabel(c, values
.map(x => Math.round(x / 1000).toString())
.reduce((x, y) => x + '+' + y));
}
}
/**
......@@ -800,6 +812,7 @@ function newUpStream(localId) {
* @param {number} [bps]
*/
async function setMaxVideoThroughput(c, bps) {
let simulcast = doSimulcast();
let senders = c.pc.getSenders();
for(let i = 0; i < senders.length; i++) {
let s = senders[i];
......@@ -808,17 +821,17 @@ async function setMaxVideoThroughput(c, bps) {
let p = s.getParameters();
if(!p.encodings)
p.encodings = [{}];
if((!simulcast && p.encodings.length != 1) ||
(simulcast && p.encodings.length != 2)) {
// change the simulcast envelope
await replaceUpStream(c);
return;
}
p.encodings.forEach(e => {
if(bps > 0)
e.maxBitrate = bps;
else
delete e.maxBitrate;
if(!e.rid || e.rid === 'h')
e.maxBitrate = bps || unlimitedRate;
});
try {
await s.setParameters(p);
} catch(e) {
console.error(e);
}
await s.setParameters(p);
}
}
......@@ -1022,6 +1035,19 @@ function isSafari() {
return ua.indexOf('safari') >= 0 && ua.indexOf('chrome') < 0;
}
const unlimitedRate = 1000000000;
const simulcastRate = 100000;
/**
* @returns {boolean}
*/
function doSimulcast() {
if(!getSettings().simulcast)
return false;
let bps = getMaxVideoThroughput();
return bps <= 0 || bps >= 2 * simulcastRate;
}
/**
* Sets up c to send the given stream. Some extra parameters are stored
* in c.userdata.
......@@ -1029,6 +1055,7 @@ function isSafari() {
* @param {Stream} c
* @param {MediaStream} stream
*/
function setUpStream(c, stream) {
if(c.stream != null)
throw new Error("Setting nonempty stream");
......@@ -1073,11 +1100,20 @@ function setUpStream(c, stream) {
c.close();
};
let encodings = [{}];
let encodings = [];
if(t.kind === 'video') {
let simulcast = doSimulcast();
let bps = getMaxVideoThroughput();
if(bps > 0)
encodings[0].maxBitrate = bps;
encodings.push({
rid: 'h',
maxBitrate: bps || unlimitedRate,
});
if(simulcast)
encodings.push({
rid: 'l',
scaleResolutionDownBy: 2,
maxBitrate: simulcastRate,
});
}
c.pc.addTransceiver(t, {
direction: 'sendonly',
......
......@@ -1246,17 +1246,20 @@ Stream.prototype.updateStats = async function() {
if(report) {
for(let r of report.values()) {
if(stid && r.type === 'outbound-rtp') {
let id = stid;
if(r.rid)
id = id + '-' + r.rid
if(!('bytesSent' in r))
continue;
if(!stats[stid])
stats[stid] = {};
stats[stid][r.type] = {};
stats[stid][r.type].timestamp = r.timestamp;
stats[stid][r.type].bytesSent = r.bytesSent;
if(old[stid] && old[stid][r.type])
stats[stid][r.type].rate =
((r.bytesSent - old[stid][r.type].bytesSent) * 1000 /
(r.timestamp - old[stid][r.type].timestamp)) * 8;
if(!stats[id])
stats[id] = {};
stats[id][r.type] = {};
stats[id][r.type].timestamp = r.timestamp;
stats[id][r.type].bytesSent = r.bytesSent;
if(old[id] && old[id][r.type])
stats[id][r.type].rate =
((r.bytesSent - old[id][r.type].bytesSent) * 1000 /
(r.timestamp - old[id][r.type].timestamp)) * 8;
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment