Commit 14a43036 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Rework connection replacement.

We used to signal connection replacement by reusing the same connection
id.  This turned out to be racy, as we couldn't reliably discard old
answers after a connection id was refused.

We now use a new id for every new connection, and explicitly signal
stream replacement in the offer message.  This requires maintaining a
local id on the client side.
parent 9d9db1a9
...@@ -114,11 +114,7 @@ this point, you may set up an `audio` or `video` component straight away, ...@@ -114,11 +114,7 @@ this point, you may set up an `audio` or `video` component straight away,
or you may choose to wait until the `ondowntrack` callback is called. or you may choose to wait until the `ondowntrack` callback is called.
After a new stream is created, `ondowntrack` will be called whenever After a new stream is created, `ondowntrack` will be called whenever
a track is added. If the `MediaStream` passed to `ondowntrack` differs a track is added.
from the one previously received, then the stream has been torn down and
recreated, and you must drop all previously received tracks; in practice,
it is enough to set the `srcObject` property of the video component to the
new stream.
The `onstatus` callback is invoked whenever the client library detects The `onstatus` callback is invoked whenever the client library detects
a change in the status of the stream; states `connected` and `complete` a change in the status of the stream; states `connected` and `complete`
...@@ -126,7 +122,10 @@ indicate a functioning stream; other states indicate that the stream is ...@@ -126,7 +122,10 @@ indicate a functioning stream; other states indicate that the stream is
not working right now but might recover in the future. not working right now but might recover in the future.
The `onclose` callback is called when the stream is destroyed, either by The `onclose` callback is called when the stream is destroyed, either by
the server or in response to a call to the `close` method. the server or in response to a call to the `close` method. The optional
parameter is true when the stream is being replaced by a new stream; in
that case, the call to `onclose` will be followed with a call to
`onstream` with the same `localId` value.
## Pushing outgoing video streams ## Pushing outgoing video streams
...@@ -145,6 +144,11 @@ localStream.getTracks().forEach(t => { ...@@ -145,6 +144,11 @@ localStream.getTracks().forEach(t => {
}); });
``` ```
The `newUpStream` method takes an optional parameter. If this is set to
the `localId` property of an existing stream, then the existing stream
will be closed and the server will be informed that the new stream
replaces the existing stream.
See above for information about setting up the `labels` dictionary. See above for information about setting up the `labels` dictionary.
## Stream statistics ## Stream statistics
......
...@@ -142,8 +142,8 @@ A stream is created by the sender with the `offer` message: ...@@ -142,8 +142,8 @@ A stream is created by the sender with the `offer` message:
```javascript ```javascript
{ {
type: 'offer', type: 'offer',
kind: '' or 'renegotiate'
id: id, id: id,
replace: id,
source: source-id, source: source-id,
username: username, username: username,
sdp: sdp, sdp: sdp,
...@@ -151,12 +151,10 @@ A stream is created by the sender with the `offer` message: ...@@ -151,12 +151,10 @@ A stream is created by the sender with the `offer` message:
} }
``` ```
If kind is the empty string, then this is a new offer that might or might If a stream with the same id exists, then this is a renegotation;
not replace an existing stream; if a stream with the same id exists, it otherwise this message creates a new stream. If the field `replace` is
must be torn down before the new stream is created. If kind is not empty, then this request additionally requests that an existing stream
`renegotiate`, then a stream with the given id already exists, and the with the given id should be closed, and the new stream should replace it.
receiving peer may either tear down the existing stream or merely perform
a renegotiation.
The field `sdp` contains the raw SDP string (i.e. the `sdp` field of The field `sdp` contains the raw SDP string (i.e. the `sdp` field of
a JSEP session description). Galène will interpret the `nack`, a JSEP session description). Galène will interpret the `nack`,
......
...@@ -93,7 +93,7 @@ func (client *Client) Kick(id, user, message string) error { ...@@ -93,7 +93,7 @@ func (client *Client) Kick(id, user, message string) error {
return err return err
} }
func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack) error { func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, replace string) error {
if client.group != g { if client.group != g {
return nil return nil
} }
...@@ -105,6 +105,14 @@ func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []c ...@@ -105,6 +105,14 @@ func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []c
return errors.New("disk client is closed") return errors.New("disk client is closed")
} }
rp := client.down[replace]
if rp != nil {
rp.Close()
delete(client.down, replace)
} else {
log.Printf("Replacing unknown connection")
}
old := client.down[id] old := client.down[id]
if old != nil { if old != nil {
old.Close() old.Close()
......
...@@ -99,7 +99,7 @@ type Client interface { ...@@ -99,7 +99,7 @@ type Client interface {
Permissions() ClientPermissions Permissions() ClientPermissions
SetPermissions(ClientPermissions) SetPermissions(ClientPermissions)
OverridePermissions(*Group) bool OverridePermissions(*Group) bool
PushConn(g *Group, id string, conn conn.Up, tracks []conn.UpTrack) error PushConn(g *Group, id string, conn conn.Up, tracks []conn.UpTrack, replace string) error
PushClient(id, username string, add bool) error PushClient(id, username string, add bool) error
} }
......
...@@ -335,6 +335,7 @@ type rtpUpConnection struct { ...@@ -335,6 +335,7 @@ type rtpUpConnection struct {
mu sync.Mutex mu sync.Mutex
pushed bool pushed bool
replace string
tracks []*rtpUpTrack tracks []*rtpUpTrack
local []conn.Down local []conn.Down
} }
...@@ -347,6 +348,16 @@ func (up *rtpUpConnection) getTracks() []*rtpUpTrack { ...@@ -347,6 +348,16 @@ func (up *rtpUpConnection) getTracks() []*rtpUpTrack {
return tracks return tracks
} }
func (up *rtpUpConnection) getReplace(reset bool) string {
up.mu.Lock()
defer up.mu.Unlock()
replace := up.replace
if reset {
up.replace = ""
}
return replace
}
func (up *rtpUpConnection) Id() string { func (up *rtpUpConnection) Id() string {
return up.id return up.id
} }
...@@ -443,6 +454,8 @@ func (up *rtpUpConnection) complete() bool { ...@@ -443,6 +454,8 @@ func (up *rtpUpConnection) complete() bool {
func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) { func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) {
up.mu.Lock() up.mu.Lock()
up.pushed = true up.pushed = true
replace := up.replace
up.replace = ""
tracks := make([]conn.UpTrack, len(up.tracks)) tracks := make([]conn.UpTrack, len(up.tracks))
for i, t := range up.tracks { for i, t := range up.tracks {
tracks[i] = t tracks[i] = t
...@@ -450,7 +463,7 @@ func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) { ...@@ -450,7 +463,7 @@ func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) {
up.mu.Unlock() up.mu.Unlock()
for _, c := range cs { for _, c := range cs {
c.PushConn(g, up.id, up, tracks) c.PushConn(g, up.id, up, tracks, replace)
} }
} }
......
...@@ -169,6 +169,7 @@ type clientMessage struct { ...@@ -169,6 +169,7 @@ type clientMessage struct {
Type string `json:"type"` Type string `json:"type"`
Kind string `json:"kind,omitempty"` Kind string `json:"kind,omitempty"`
Id string `json:"id,omitempty"` Id string `json:"id,omitempty"`
Replace string `json:"replace,omitempty"`
Source string `json:"source,omitempty"` Source string `json:"source,omitempty"`
Dest string `json:"dest,omitempty"` Dest string `json:"dest,omitempty"`
Username string `json:"username,omitempty"` Username string `json:"username,omitempty"`
...@@ -246,17 +247,26 @@ func addUpConn(c *webClient, id string, labels map[string]string, offer string) ...@@ -246,17 +247,26 @@ func addUpConn(c *webClient, id string, labels map[string]string, offer string)
return conn, true, nil return conn, true, nil
} }
func delUpConn(c *webClient, id string) bool { var ErrUserMismatch = errors.New("user id mismatch")
func delUpConn(c *webClient, id string, userId string) (string, error) {
c.mu.Lock() c.mu.Lock()
if c.up == nil { if c.up == nil {
c.mu.Unlock() c.mu.Unlock()
return false return "", os.ErrNotExist
} }
conn := c.up[id] conn := c.up[id]
if conn == nil { if conn == nil {
c.mu.Unlock() c.mu.Unlock()
return false return "", os.ErrNotExist
}
if userId != "" && conn.userId != userId {
c.mu.Unlock()
return "", ErrUserMismatch
} }
replace := conn.getReplace(true)
delete(c.up, id) delete(c.up, id)
c.mu.Unlock() c.mu.Unlock()
...@@ -264,7 +274,7 @@ func delUpConn(c *webClient, id string) bool { ...@@ -264,7 +274,7 @@ func delUpConn(c *webClient, id string) bool {
if g != nil { if g != nil {
go func(clients []group.Client) { go func(clients []group.Client) {
for _, c := range clients { for _, c := range clients {
err := c.PushConn(g, conn.id, nil, nil) err := c.PushConn(g, conn.id, nil, nil, replace)
if err != nil { if err != nil {
log.Printf("PushConn: %v", err) log.Printf("PushConn: %v", err)
} }
...@@ -275,7 +285,7 @@ func delUpConn(c *webClient, id string) bool { ...@@ -275,7 +285,7 @@ func delUpConn(c *webClient, id string) bool {
} }
conn.pc.Close() conn.pc.Close()
return true return conn.replace, nil
} }
func getDownConn(c *webClient, id string) *rtpDownConnection { func getDownConn(c *webClient, id string) *rtpDownConnection {
...@@ -355,13 +365,13 @@ func addDownConnHelper(c *webClient, conn *rtpDownConnection, remote conn.Up) er ...@@ -355,13 +365,13 @@ func addDownConnHelper(c *webClient, conn *rtpDownConnection, remote conn.Up) er
return nil return nil
} }
func delDownConn(c *webClient, id string) bool { func delDownConn(c *webClient, id string) error {
conn := delDownConnHelper(c, id) conn := delDownConnHelper(c, id)
if conn != nil { if conn != nil {
conn.pc.Close() conn.pc.Close()
return true return nil
} }
return false return os.ErrNotExist
} }
func delDownConnHelper(c *webClient, id string) *rtpDownConnection { func delDownConnHelper(c *webClient, id string) *rtpDownConnection {
...@@ -429,7 +439,7 @@ func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrac ...@@ -429,7 +439,7 @@ func addDownTrack(c *webClient, conn *rtpDownConnection, remoteTrack conn.UpTrac
return sender, nil return sender, nil
} }
func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bool) error { func negotiate(c *webClient, down *rtpDownConnection, restartIce bool, replace string) error {
if down.pc.SignalingState() == webrtc.SignalingStateHaveLocalOffer { if down.pc.SignalingState() == webrtc.SignalingStateHaveLocalOffer {
// avoid sending multiple offers back-to-back // avoid sending multiple offers back-to-back
if restartIce { if restartIce {
...@@ -470,17 +480,12 @@ func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bo ...@@ -470,17 +480,12 @@ func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bo
} }
} }
kind := ""
if renegotiate {
kind = "renegotiate"
}
source, username := down.remote.User() source, username := down.remote.User()
return c.write(clientMessage{ return c.write(clientMessage{
Type: "offer", Type: "offer",
Kind: kind,
Id: down.id, Id: down.id,
Replace: replace,
Source: source, Source: source,
Username: username, Username: username,
SDP: down.pc.LocalDescription().SDP, SDP: down.pc.LocalDescription().SDP,
...@@ -500,31 +505,21 @@ func sendICE(c *webClient, id string, candidate *webrtc.ICECandidate) error { ...@@ -500,31 +505,21 @@ func sendICE(c *webClient, id string, candidate *webrtc.ICECandidate) error {
}) })
} }
func gotOffer(c *webClient, id string, sdp string, renegotiate bool, labels map[string]string) error { func gotOffer(c *webClient, id string, sdp string, labels map[string]string, replace string) error {
if !renegotiate { up, _, err := addUpConn(c, id, labels, sdp)
// unless the client indicates that this is a compatible
// renegotiation, tear down the existing connection.
delUpConn(c, id)
}
up, isnew, err := addUpConn(c, id, labels, sdp)
if err != nil { if err != nil {
return err return err
} }
up.userId = c.Id() up.userId = c.Id()
up.username = c.Username() up.username = c.Username()
up.replace = replace
err = up.pc.SetRemoteDescription(webrtc.SessionDescription{ err = up.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer, Type: webrtc.SDPTypeOffer,
SDP: sdp, SDP: sdp,
}) })
if err != nil { if err != nil {
if renegotiate && !isnew {
// create a new PC from scratch
log.Printf("SetRemoteDescription(offer): %v", err)
return gotOffer(c, id, sdp, false, labels)
}
return err return err
} }
...@@ -679,8 +674,8 @@ func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rt ...@@ -679,8 +674,8 @@ func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rt
return down, nil return down, nil
} }
func (c *webClient) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack) error { func (c *webClient) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, replace string) error {
err := c.action(pushConnAction{g, id, up, tracks}) err := c.action(pushConnAction{g, id, up, tracks, replace})
if err != nil { if err != nil {
return err return err
} }
...@@ -750,6 +745,7 @@ type pushConnAction struct { ...@@ -750,6 +745,7 @@ type pushConnAction struct {
id string id string
conn conn.Up conn conn.Up
tracks []conn.UpTrack tracks []conn.UpTrack
replace string
} }
type pushConnsAction struct { type pushConnsAction struct {
...@@ -811,15 +807,23 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { ...@@ -811,15 +807,23 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
return nil return nil
} }
if a.conn == nil { if a.conn == nil {
found := delDownConn(c, a.id) if a.replace != "" {
if found { err := delDownConn(
c, a.replace,
)
if err == nil {
c.write(clientMessage{
Type: "close",
Id: a.replace,
})
}
}
err := delDownConn(c, a.id)
if err == nil {
c.write(clientMessage{ c.write(clientMessage{
Type: "close", Type: "close",
Id: a.id, Id: a.id,
}) })
} else {
log.Printf("Deleting unknown " +
"down connection")
} }
continue continue
} }
...@@ -830,7 +834,9 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { ...@@ -830,7 +834,9 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
return err return err
} }
if down != nil { if down != nil {
err = negotiate(c, down, false, false) err = negotiate(
c, down, false, a.replace,
)
if err != nil { if err != nil {
log.Printf( log.Printf(
"Negotiation failed: %v", "Negotiation failed: %v",
...@@ -852,13 +858,17 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { ...@@ -852,13 +858,17 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
continue continue
} }
tracks := u.getTracks() tracks := u.getTracks()
replace := u.getReplace(false)
ts := make([]conn.UpTrack, len(tracks)) ts := make([]conn.UpTrack, len(tracks))
for i, t := range tracks { for i, t := range tracks {
ts[i] = t ts[i] = t
} }
go func(u *rtpUpConnection, ts []conn.UpTrack) { go func(u *rtpUpConnection,
ts []conn.UpTrack,
replace string) {
err := a.client.PushConn( err := a.client.PushConn(
g, u.id, u, ts, g, u.id, u, ts, replace,
) )
if err != nil { if err != nil {
log.Printf( log.Printf(
...@@ -866,11 +876,11 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { ...@@ -866,11 +876,11 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
err, err,
) )
} }
}(u, ts) }(u, ts, replace)
} }
case connectionFailedAction: case connectionFailedAction:
if down := getDownConn(c, a.id); down != nil { if down := getDownConn(c, a.id); down != nil {
err := negotiate(c, down, true, true) err := negotiate(c, down, true, "")
if err != nil { if err != nil {
return err return err
} }
...@@ -883,7 +893,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { ...@@ -883,7 +893,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
go c.PushConn( go c.PushConn(
c.group, c.group,
down.remote.Id(), down.remote, down.remote.Id(), down.remote,
tracks, tracks, "",
) )
} else if up := getUpConn(c, a.id); up != nil { } else if up := getUpConn(c, a.id); up != nil {
c.write(clientMessage{ c.write(clientMessage{
...@@ -912,8 +922,12 @@ func clientLoop(c *webClient, ws *websocket.Conn) error { ...@@ -912,8 +922,12 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
if !c.permissions.Present { if !c.permissions.Present {
up := getUpConns(c) up := getUpConns(c)
for _, u := range up { for _, u := range up {
found := delUpConn(c, u.id) replace, err :=
if found { delUpConn(c, u.id, c.id)
if err == nil {
if replace != "" {
delUpConn(c, replace, c.id)
}
failUpConnection( failUpConnection(
c, u.id, c, u.id,
"permission denied", "permission denied",
...@@ -975,7 +989,7 @@ func leaveGroup(c *webClient) { ...@@ -975,7 +989,7 @@ func leaveGroup(c *webClient) {
c.setRequested(map[string]uint32{}) c.setRequested(map[string]uint32{})
if c.up != nil { if c.up != nil {
for id := range c.up { for id := range c.up {
delUpConn(c, id) delUpConn(c, id, c.id)
} }
} }
...@@ -1158,15 +1172,16 @@ func handleClientMessage(c *webClient, m clientMessage) error { ...@@ -1158,15 +1172,16 @@ func handleClientMessage(c *webClient, m clientMessage) error {
return c.setRequested(m.Request) return c.setRequested(m.Request)
case "offer": case "offer":
if !c.permissions.Present { if !c.permissions.Present {
if m.Replace != "" {
delUpConn(c, m.Replace, c.id)
}
c.write(clientMessage{ c.write(clientMessage{
Type: "abort", Type: "abort",
Id: m.Id, Id: m.Id,
}) })
return c.error(group.UserError("not authorised")) return c.error(group.UserError("not authorised"))
} }
err := gotOffer( err := gotOffer(c, m.Id, m.SDP, m.Labels, m.Replace)
c, m.Id, m.SDP, m.Kind == "renegotiate", m.Labels,
)
if err != nil { if err != nil {
log.Printf("gotOffer: %v", err) log.Printf("gotOffer: %v", err)
return failUpConnection(c, m.Id, "negotiation failed") return failUpConnection(c, m.Id, "negotiation failed")
...@@ -1184,8 +1199,9 @@ func handleClientMessage(c *webClient, m clientMessage) error { ...@@ -1184,8 +1199,9 @@ func handleClientMessage(c *webClient, m clientMessage) error {
down := getDownConn(c, m.Id) down := getDownConn(c, m.Id)
if down.negotiationNeeded > negotiationUnneeded { if down.negotiationNeeded > negotiationUnneeded {
err := negotiate( err := negotiate(
c, down, true, c, down,
down.negotiationNeeded == negotiationRestartIce, down.negotiationNeeded == negotiationRestartIce,
"",
) )
if err != nil { if err != nil {
return failDownConnection( return failDownConnection(
...@@ -1196,7 +1212,7 @@ func handleClientMessage(c *webClient, m clientMessage) error { ...@@ -1196,7 +1212,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
case "renegotiate": case "renegotiate":
down := getDownConn(c, m.Id) down := getDownConn(c, m.Id)
if down != nil { if down != nil {
err := negotiate(c, down, true, true) err := negotiate(c, down, true, "")
if err != nil { if err != nil {
return failDownConnection( return failDownConnection(
c, m.Id, "renegotiation failed", c, m.Id, "renegotiation failed",
...@@ -1206,14 +1222,22 @@ func handleClientMessage(c *webClient, m clientMessage) error { ...@@ -1206,14 +1222,22 @@ func handleClientMessage(c *webClient, m clientMessage) error {
log.Printf("Trying to renegotiate unknown connection") log.Printf("Trying to renegotiate unknown connection")
} }
case "close": case "close":
found := delUpConn(c, m.Id) replace, err := delUpConn(c, m.Id, c.id)
if !found { if err != nil {
log.Printf("Deleting unknown up connection %v", m.Id) log.Printf("Deleting up connection %v: %v",
m.Id, err)
return nil
}
if replace != "" {
_, err := delUpConn(c, replace, c.id)
if err != nil && !os.IsNotExist(err) {
log.Printf("Replace up connection: %v", err)
}
} }
case "abort": case "abort":
found := delDownConn(c, m.Id) err := delDownConn(c, m.Id)
if !found { if err != nil {
log.Printf("Attempted to abort unknown connection") log.Printf("Abort: %v", err)
} }
c.write(clientMessage{ c.write(clientMessage{
Type: "close", Type: "close",
......
...@@ -340,7 +340,7 @@ function gotClose(code, reason) { ...@@ -340,7 +340,7 @@ function gotClose(code, reason) {
function gotDownStream(c) { function gotDownStream(c) {
c.onclose = function(replace) { c.onclose = function(replace) {
if(!replace) if(!replace)
delMedia(c.id); delMedia(c.localId);
}; };
c.onerror = function(e) { c.onerror = function(e) {
console.error(e); console.error(e);
...@@ -397,10 +397,9 @@ getButtonElement('unpresentbutton').onclick = function(e) { ...@@ -397,10 +397,9 @@ getButtonElement('unpresentbutton').onclick = function(e) {
}; };
function changePresentation() { function changePresentation() {
let id = findUpMedia('local'); let c = findUpMedia('local');
if(id) { if(c)
addLocalMedia(id); addLocalMedia(c.localId);
}
} }
/** /**
...@@ -624,7 +623,7 @@ function gotUpStats(stats) { ...@@ -624,7 +623,7 @@ function gotUpStats(stats) {
* @param {boolean} value * @param {boolean} value
*/ */
function setActive(c, value) { function setActive(c, value) {
let peer = document.getElementById('peer-' + c.id); let peer = document.getElementById('peer-' + c.localId);
if(value) if(value)
peer.classList.add('peer-active'); peer.classList.add('peer-active');
else else
...@@ -773,10 +772,10 @@ async function setMediaChoices(done) { ...@@ -773,10 +772,10 @@ async function setMediaChoices(done) {
/** /**
* @param {string} [id] * @param {string} [localId]
*/ */
function newUpStream(id) { function newUpStream(localId) {
let c = serverConnection.newUpStream(id); let c = serverConnection.newUpStream(localId);
c.onstatus = function(status) { c.onstatus = function(status) {
setMediaStatus(c); setMediaStatus(c);
}; };
...@@ -1012,9 +1011,9 @@ function isSafari() { ...@@ -1012,9 +1011,9 @@ function isSafari() {
} }
/** /**
* @param {string} [id] * @param {string} [localId]
*/ */
async function addLocalMedia(id) { async function addLocalMedia(localId) {
let settings = getSettings(); let settings = getSettings();
let audio = settings.audio ? {deviceId: settings.audio} : false; let audio = settings.audio ? {deviceId: settings.audio} : false;
...@@ -1039,13 +1038,7 @@ async function addLocalMedia(id) { ...@@ -1039,13 +1038,7 @@ async function addLocalMedia(id) {
} }
} }
let old = id && serverConnection.up[id]; let old = serverConnection.findByLocalId(localId);
if(!audio && !video) {
if(old)
old.close();
return;
}
if(old && old.onclose) { if(old && old.onclose) {
// make sure that the camera is released before we try to reopen it // make sure that the camera is released before we try to reopen it
old.onclose.call(old, true); old.onclose.call(old, true);
...@@ -1063,7 +1056,7 @@ async function addLocalMedia(id) { ...@@ -1063,7 +1056,7 @@ async function addLocalMedia(id) {
setMediaChoices(true); setMediaChoices(true);
let c = newUpStream(id); let c = newUpStream(localId);
c.kind = 'local'; c.kind = 'local';
c.stream = stream; c.stream = stream;
...@@ -1076,21 +1069,21 @@ async function addLocalMedia(id) { ...@@ -1076,21 +1069,21 @@ async function addLocalMedia(id) {
stopStream(stream); stopStream(stream);
setFilter(c, null); setFilter(c, null);
if(!replace) if(!replace)
delMedia(c.id); delMedia(c.localId);
} }
} catch(e) { } catch(e) {
displayWarning(e); displayWarning(e);
c.onclose = replace => { c.onclose = replace => {
stopStream(c.stream); stopStream(c.stream);
if(!replace) if(!replace)
delMedia(c.id); delMedia(c.localId);
} }
} }
} else { } else {
c.onclose = replace => { c.onclose = replace => {
stopStream(c.stream); stopStream(c.stream);
if(!replace) if(!replace)
delMedia(c.id); delMedia(c.localId);
} }
} }
...@@ -1144,7 +1137,7 @@ async function addShareMedia() { ...@@ -1144,7 +1137,7 @@ async function addShareMedia() {
c.onclose = replace => { c.onclose = replace => {
stopStream(stream); stopStream(stream);
if(!replace) if(!replace)
delMedia(c.id); delMedia(c.localId);
} }
stream.getTracks().forEach(t => { stream.getTracks().forEach(t => {
c.pc.addTrack(t, stream); c.pc.addTrack(t, stream);
...@@ -1180,13 +1173,13 @@ async function addFileMedia(file) { ...@@ -1180,13 +1173,13 @@ async function addFileMedia(file) {
c.onclose = function(replace) { c.onclose = function(replace) {
stopStream(c.stream); stopStream(c.stream);
let media = /** @type{HTMLVideoElement} */ let media = /** @type{HTMLVideoElement} */
(document.getElementById('media-' + this.id)); (document.getElementById('media-' + this.localId));
if(media && media.src) { if(media && media.src) {
URL.revokeObjectURL(media.src); URL.revokeObjectURL(media.src);
media.src = null; media.src = null;
} }
if(!replace) if(!replace)
delMedia(c.id); delMedia(c.localId);
}; };
stream.onaddtrack = function(e) { stream.onaddtrack = function(e) {
...@@ -1261,11 +1254,13 @@ function closeUpMediaKind(kind) { ...@@ -1261,11 +1254,13 @@ function closeUpMediaKind(kind) {
/** /**
* @param {string} kind * @param {string} kind
* @returns {Stream}
*/ */
function findUpMedia(kind) { function findUpMedia(kind) {
for(let id in serverConnection.up) { for(let id in serverConnection.up) {
if(serverConnection.up[id].kind === kind) let c = serverConnection.up[id]
return id; if(c.kind === kind)
return c;
} }
return null; return null;
} }
...@@ -1304,16 +1299,16 @@ function muteLocalTracks(mute) { ...@@ -1304,16 +1299,16 @@ function muteLocalTracks(mute) {
async function setMedia(c, isUp, mirror, video) { async function setMedia(c, isUp, mirror, video) {
let peersdiv = document.getElementById('peers'); let peersdiv = document.getElementById('peers');
let div = document.getElementById('peer-' + c.id); let div = document.getElementById('peer-' + c.localId);
if(!div) { if(!div) {
div = document.createElement('div'); div = document.createElement('div');
div.id = 'peer-' + c.id; div.id = 'peer-' + c.localId;
div.classList.add('peer'); div.classList.add('peer');
peersdiv.appendChild(div); peersdiv.appendChild(div);
} }
let media = /** @type {HTMLVideoElement} */ let media = /** @type {HTMLVideoElement} */
(document.getElementById('media-' + c.id)); (document.getElementById('media-' + c.localId));
if(media) { if(media) {
if(video) { if(video) {
throw new Error("Duplicate video"); throw new Error("Duplicate video");
...@@ -1331,21 +1326,24 @@ async function setMedia(c, isUp, mirror, video) { ...@@ -1331,21 +1326,24 @@ async function setMedia(c, isUp, mirror, video) {
media.autoplay = true; media.autoplay = true;
/** @ts-ignore */ /** @ts-ignore */
media.playsinline = true; media.playsinline = true;
media.id = 'media-' + c.id; media.id = 'media-' + c.localId;
div.appendChild(media); div.appendChild(media);
if(!video) if(!video)
addCustomControls(media, div, c); addCustomControls(media, div, c);
}
if(mirror) if(mirror)
media.classList.add('mirror'); media.classList.add('mirror');
} else
media.classList.remove('mirror');
if(!video && media.srcObject !== c.stream) if(!video && media.srcObject !== c.stream)
media.srcObject = c.stream; media.srcObject = c.stream;
let label = document.getElementById('label-' + c.id); let label = document.getElementById('label-' + c.localId);
if(!label) { if(!label) {
label = document.createElement('div'); label = document.createElement('div');
label.id = 'label-' + c.id; label.id = 'label-' + c.localId;
label.classList.add('label'); label.classList.add('label');
div.appendChild(label); div.appendChild(label);
} }
...@@ -1382,7 +1380,7 @@ function cloneHTMLElement(elt) { ...@@ -1382,7 +1380,7 @@ function cloneHTMLElement(elt) {
*/ */
function addCustomControls(media, container, c) { function addCustomControls(media, container, c) {
media.controls = false; media.controls = false;
let controls = document.getElementById('controls-' + c.id); let controls = document.getElementById('controls-' + c.localId);
if(controls) { if(controls) {
console.warn('Attempted to add duplicate controls'); console.warn('Attempted to add duplicate controls');
return; return;
...@@ -1391,7 +1389,7 @@ function addCustomControls(media, container, c) { ...@@ -1391,7 +1389,7 @@ function addCustomControls(media, container, c) {
let template = let template =
document.getElementById('videocontrols-template').firstElementChild; document.getElementById('videocontrols-template').firstElementChild;
controls = cloneHTMLElement(template); controls = cloneHTMLElement(template);
controls.id = 'controls-' + c.id; controls.id = 'controls-' + c.localId;
let volume = getVideoButton(controls, 'volume'); let volume = getVideoButton(controls, 'volume');
if(c.kind === 'local') { if(c.kind === 'local') {
...@@ -1508,16 +1506,16 @@ function registerControlHandlers(media, container) { ...@@ -1508,16 +1506,16 @@ function registerControlHandlers(media, container) {
} }
/** /**
* @param {string} id * @param {string} localId
*/ */
function delMedia(id) { function delMedia(localId) {
let mediadiv = document.getElementById('peers'); let mediadiv = document.getElementById('peers');
let peer = document.getElementById('peer-' + id); let peer = document.getElementById('peer-' + localId);
if(!peer) if(!peer)
throw new Error('Removing unknown media'); throw new Error('Removing unknown media');
let media = /** @type{HTMLVideoElement} */ let media = /** @type{HTMLVideoElement} */
(document.getElementById('media-' + id)); (document.getElementById('media-' + localId));
media.srcObject = null; media.srcObject = null;
mediadiv.removeChild(peer); mediadiv.removeChild(peer);
...@@ -1534,7 +1532,7 @@ function setMediaStatus(c) { ...@@ -1534,7 +1532,7 @@ function setMediaStatus(c) {
let state = c && c.pc && c.pc.iceConnectionState; let state = c && c.pc && c.pc.iceConnectionState;
let good = state === 'connected' || state === 'completed'; let good = state === 'connected' || state === 'completed';
let media = document.getElementById('media-' + c.id); let media = document.getElementById('media-' + c.localId);
if(!media) { if(!media) {
console.warn('Setting status of unknown media.'); console.warn('Setting status of unknown media.');
return; return;
...@@ -1560,7 +1558,7 @@ function setMediaStatus(c) { ...@@ -1560,7 +1558,7 @@ function setMediaStatus(c) {
* @param {string} [fallback] * @param {string} [fallback]
*/ */
function setLabel(c, fallback) { function setLabel(c, fallback) {
let label = document.getElementById('label-' + c.id); let label = document.getElementById('label-' + c.localId);
if(!label) if(!label)
return; return;
let l = c.username; let l = c.username;
......
...@@ -36,15 +36,30 @@ function toHex(array) { ...@@ -36,15 +36,30 @@ function toHex(array) {
return a.reduce((x, y) => x + hex(y), ''); return a.reduce((x, y) => x + hex(y), '');
} }
/** randomid returns a random string of 32 hex digits (16 bytes). /**
* newRandomId returns a random string of 32 hex digits (16 bytes).
*
* @returns {string} * @returns {string}
*/ */
function randomid() { function newRandomId() {
let a = new Uint8Array(16); let a = new Uint8Array(16);
crypto.getRandomValues(a); crypto.getRandomValues(a);
return toHex(a); return toHex(a);
} }
let localIdCounter = 0;
/**
* newLocalId returns a string that is unique in this session.
*
* @returns {string}
*/
function newLocalId() {
let id = `${localIdCounter}`
localIdCounter++;
return id;
}
/** /**
* ServerConnection encapsulates a websocket connection to the server and * ServerConnection encapsulates a websocket connection to the server and
* all the associated streams. * all the associated streams.
...@@ -57,7 +72,7 @@ function ServerConnection() { ...@@ -57,7 +72,7 @@ function ServerConnection() {
* @type {string} * @type {string}
* @const * @const
*/ */
this.id = randomid(); this.id = newRandomId();
/** /**
* The group that we have joined, or null if we haven't joined yet. * The group that we have joined, or null if we haven't joined yet.
* *
...@@ -168,6 +183,7 @@ function ServerConnection() { ...@@ -168,6 +183,7 @@ function ServerConnection() {
* @property {string} type * @property {string} type
* @property {string} [kind] * @property {string} [kind]
* @property {string} [id] * @property {string} [id]
* @property {string} [replace]
* @property {string} [source] * @property {string} [source]
* @property {string} [dest] * @property {string} [dest]
* @property {string} [username] * @property {string} [username]
...@@ -259,7 +275,7 @@ ServerConnection.prototype.connect = async function(url) { ...@@ -259,7 +275,7 @@ ServerConnection.prototype.connect = async function(url) {
break; break;
case 'offer': case 'offer':
sc.gotOffer(m.id, m.labels, m.source, m.username, sc.gotOffer(m.id, m.labels, m.source, m.username,
m.sdp, m.kind === 'renegotiate'); m.sdp, m.replace);
break; break;
case 'answer': case 'answer':
sc.gotAnswer(m.id, m.sdp); sc.gotAnswer(m.id, m.sdp);
...@@ -390,26 +406,52 @@ ServerConnection.prototype.request = function(what) { ...@@ -390,26 +406,52 @@ ServerConnection.prototype.request = function(what) {
}); });
}; };
/**
* @param {string} localId
* @returns {Stream}
*/
ServerConnection.prototype.findByLocalId = function(localId) {
if(!localId)
return null;
for(let id in serverConnection.up) {
let s = serverConnection.up[id];
if(s.localId == localId)
return s;
}
return null;
}
/** /**
* newUpStream requests the creation of a new up stream. * newUpStream requests the creation of a new up stream.
* *
* @param {string} [id] - The id of the stream to create. * @param {string} [localId]
* - The local id of the stream to create. If a stream already exists with
* the same local id, it is replaced with the new stream.
* @returns {Stream} * @returns {Stream}
*/ */
ServerConnection.prototype.newUpStream = function(id) { ServerConnection.prototype.newUpStream = function(localId) {
let sc = this; let sc = this;
if(!id) { let id = newRandomId();
id = randomid();
if(sc.up[id]) if(sc.up[id])
throw new Error('Eek!'); throw new Error('Eek!');
}
let pc = new RTCPeerConnection(sc.rtcConfiguration); let pc = new RTCPeerConnection(sc.rtcConfiguration);
if(!pc) if(!pc)
throw new Error("Couldn't create peer connection"); throw new Error("Couldn't create peer connection");
if(sc.up[id])
sc.up[id].close();
let c = new Stream(this, id, pc, true); let oldId = null;
if(localId) {
let old = sc.findByLocalId(localId);
oldId = old && old.id;
if(old)
old.close(true);
}
let c = new Stream(this, id, localId || newLocalId(), pc, true);
if(oldId)
c.replace = oldId;
sc.up[id] = c; sc.up[id] = c;
pc.onnegotiationneeded = async e => { pc.onnegotiationneeded = async e => {
...@@ -518,26 +560,32 @@ ServerConnection.prototype.groupAction = function(kind, message) { ...@@ -518,26 +560,32 @@ ServerConnection.prototype.groupAction = function(kind, message) {
* @param {string} source * @param {string} source
* @param {string} username * @param {string} username
* @param {string} sdp * @param {string} sdp
* @param {boolean} renegotiate * @param {string} replace
* @function * @function
*/ */
ServerConnection.prototype.gotOffer = async function(id, labels, source, username, sdp, renegotiate) { ServerConnection.prototype.gotOffer = async function(id, labels, source, username, sdp, replace) {
let sc = this; let sc = this;
let c = sc.down[id];
if(c && !renegotiate) {
// SDP is rather inflexible as to what can be renegotiated.
// Unless the server indicates that this is a renegotiation with
// all parameters unchanged, tear down the existing connection.
c.close(true);
c = null;
}
if(sc.up[id]) if(sc.up[id])
throw new Error('Duplicate connection id'); throw new Error('Duplicate connection id');
let oldLocalId = null;
if(replace) {
let old = sc.down[replace];
if(old) {
oldLocalId = old.localId;
old.close(true);
} else
console.error("Replacing unknown stream");
}
let c = sc.down[id];
if(c && oldLocalId)
console.error("Replacing duplicate stream");
if(!c) { if(!c) {
let pc = new RTCPeerConnection(sc.rtcConfiguration); let pc = new RTCPeerConnection(sc.rtcConfiguration);
c = new Stream(this, id, pc, false); c = new Stream(this, id, oldLocalId || newLocalId(), pc, false);
sc.down[id] = c; sc.down[id] = c;
c.pc.onicecandidate = function(e) { c.pc.onicecandidate = function(e) {
...@@ -709,11 +757,12 @@ ServerConnection.prototype.gotRemoteIce = async function(id, candidate) { ...@@ -709,11 +757,12 @@ ServerConnection.prototype.gotRemoteIce = async function(id, candidate) {
* *
* @param {ServerConnection} sc * @param {ServerConnection} sc
* @param {string} id * @param {string} id
* @param {string} localId
* @param {RTCPeerConnection} pc * @param {RTCPeerConnection} pc
* *
* @constructor * @constructor
*/ */
function Stream(sc, id, pc, up) { function Stream(sc, id, localId, pc, up) {
/** /**
* The associated ServerConnection. * The associated ServerConnection.
* *
...@@ -728,6 +777,13 @@ function Stream(sc, id, pc, up) { ...@@ -728,6 +777,13 @@ function Stream(sc, id, pc, up) {
* @const * @const
*/ */
this.id = id; this.id = id;
/**
* The local id of this stream.
*
* @type {string}
* @const
*/
this.localId = localId;
/** /**
* Indicates whether the stream is in the client->server direction. * Indicates whether the stream is in the client->server direction.
* *
...@@ -779,6 +835,12 @@ function Stream(sc, id, pc, up) { ...@@ -779,6 +835,12 @@ function Stream(sc, id, pc, up) {
* @type {Object<string,string>} * @type {Object<string,string>}
*/ */
this.labelsByMid = {}; this.labelsByMid = {};
/**
* The id of the stream that we are currently replacing.
*
* @type {string}
*/
this.replace = null;
/** /**
* Indicates whether we have already sent a local description. * Indicates whether we have already sent a local description.
* *
...@@ -899,7 +961,7 @@ Stream.prototype.close = function(replace) { ...@@ -899,7 +961,7 @@ Stream.prototype.close = function(replace) {
c.pc.close(); c.pc.close();
if(c.up && c.localDescriptionSent) { if(c.up && !replace && c.localDescriptionSent) {
try { try {
c.sc.send({ c.sc.send({
type: 'close', type: 'close',
...@@ -1034,10 +1096,12 @@ Stream.prototype.negotiate = async function (restartIce) { ...@@ -1034,10 +1096,12 @@ Stream.prototype.negotiate = async function (restartIce) {
username: c.sc.username, username: c.sc.username,
kind: this.localDescriptionSent ? 'renegotiate' : '', kind: this.localDescriptionSent ? 'renegotiate' : '',
id: c.id, id: c.id,
replace: this.replace,
labels: c.labelsByMid, labels: c.labelsByMid,
sdp: c.pc.localDescription.sdp, sdp: c.pc.localDescription.sdp,
}); });
this.localDescriptionSent = true; this.localDescriptionSent = true;
this.replace = null;
c.flushLocalIceCandidates(); c.flushLocalIceCandidates();
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment