Commit f53276b8 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Simplify the protocol and the protocol interface.

Split the id field into id and source, where source indicates the sender
of the message and id the entity being sent.  Remove the label request,
just use the offerer's username.  Maintain the username within the
ServerConnection, this removes a parameter from some methods.
parent 05633561
......@@ -105,7 +105,6 @@ serverConnection.ondownstream = function(stream) {
stream.onclose = ...;
stream.onerror = ...;
stream.ondowntrack = ...;
stream.onlabel = ...;
stream.onstatus = ...;
}
```
......@@ -115,10 +114,6 @@ The `stream.labels` dictionary maps each track's id to one of `audio`,
this point, you may set up an `audio` or `video` component straight away,
or you may choose to wait until the `ondowntrack` callback is called.
The server will usually invoke the `onlabel` callback in order to set
a user-readable label on the stream; this is currently just the
originating client's username.
After a new stream is created, `ondowntrack` will be called whenever
a track is added. If the `MediaStream` passed to `ondowntrack` differs
from the one previously received, then the stream has been torn down and
......
......@@ -16,7 +16,7 @@ type Up interface {
AddLocal(Down) error
DelLocal(Down) bool
Id() string
Label() string
User() (string, string)
Codecs() []webrtc.RTPCodecCapability
}
......
......@@ -89,7 +89,7 @@ func (client *Client) Kick(id, user, message string) error {
return err
}
func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, label string) error {
func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack) error {
if client.group != g {
return nil
}
......@@ -122,7 +122,7 @@ func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []c
client.down = make(map[string]*diskConn)
}
down, err := newDiskConn(client, directory, label, up, tracks)
down, err := newDiskConn(client, directory, up, tracks)
if err != nil {
g.WallOps("Write to disk: " + err.Error())
return err
......@@ -135,7 +135,7 @@ func (client *Client) PushConn(g *group.Group, id string, up conn.Up, tracks []c
type diskConn struct {
client *Client
directory string
label string
username string
hasVideo bool
mu sync.Mutex
......@@ -167,7 +167,7 @@ func (conn *diskConn) reopen() error {
}
conn.file = nil
file, err := openDiskFile(conn.directory, conn.label)
file, err := openDiskFile(conn.directory, conn.username)
if err != nil {
return err
}
......@@ -196,15 +196,15 @@ func (conn *diskConn) Close() error {
return nil
}
func openDiskFile(directory, label string) (*os.File, error) {
func openDiskFile(directory, username string) (*os.File, error) {
filenameFormat := "2006-01-02T15:04:05.000"
if runtime.GOOS == "windows" {
filenameFormat = "2006-01-02T15-04-05-000"
}
filename := time.Now().Format(filenameFormat)
if label != "" {
filename = filename + "-" + label
if username != "" {
filename = filename + "-" + username
}
for counter := 0; counter < 100; counter++ {
var fn string
......@@ -240,11 +240,12 @@ type diskTrack struct {
lastKf uint32
}
func newDiskConn(client *Client, directory, label string, up conn.Up, remoteTracks []conn.UpTrack) (*diskConn, error) {
func newDiskConn(client *Client, directory string, up conn.Up, remoteTracks []conn.UpTrack) (*diskConn, error) {
_, username := up.User()
conn := diskConn{
client: client,
directory: directory,
label: label,
username: username,
tracks: make([]*diskTrack, 0, len(remoteTracks)),
remote: up,
}
......
......@@ -98,7 +98,7 @@ type Client interface {
Challengeable
SetPermissions(ClientPermissions)
OverridePermissions(*Group) bool
PushConn(g *Group, id string, conn conn.Up, tracks []conn.UpTrack, label string) error
PushConn(g *Group, id string, conn conn.Up, tracks []conn.UpTrack) error
PushClient(id, username string, add bool) error
}
......
......@@ -308,7 +308,8 @@ func (up *rtpUpTrack) hasRtcpFb(tpe, parameter string) bool {
type rtpUpConnection struct {
id string
label string
userId string
username string
pc *webrtc.PeerConnection
labels map[string]string
iceCandidates []*webrtc.ICECandidateInit
......@@ -331,8 +332,8 @@ func (up *rtpUpConnection) Id() string {
return up.id
}
func (up *rtpUpConnection) Label() string {
return up.label
func (up *rtpUpConnection) User() (string, string) {
return up.userId, up.username
}
func (up *rtpUpConnection) Codecs() []webrtc.RTPCodecCapability {
......@@ -430,7 +431,7 @@ func pushConnNow(up *rtpUpConnection, g *group.Group, cs []group.Client) {
up.mu.Unlock()
for _, c := range cs {
c.PushConn(g, up.id, up, tracks, up.label)
c.PushConn(g, up.id, up, tracks)
}
}
......
......@@ -165,6 +165,7 @@ type clientMessage struct {
Type string `json:"type"`
Kind string `json:"kind,omitempty"`
Id string `json:"id,omitempty"`
Source string `json:"source,omitempty"`
Dest string `json:"dest,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
......@@ -259,7 +260,7 @@ func delUpConn(c *webClient, id string) bool {
if g != nil {
go func(clients []group.Client) {
for _, c := range clients {
err := c.PushConn(g, conn.id, nil, nil, "")
err := c.PushConn(g, conn.id, nil, nil)
if err != nil {
log.Printf("PushConn: %v", err)
}
......@@ -455,12 +456,16 @@ func negotiate(c *webClient, down *rtpDownConnection, renegotiate, restartIce bo
kind = "renegotiate"
}
source, username := down.remote.User()
return c.write(clientMessage{
Type: "offer",
Kind: kind,
Id: down.id,
Offer: &offer,
Labels: labels,
Type: "offer",
Kind: kind,
Id: down.id,
Source: source,
Username: username,
Offer: &offer,
Labels: labels,
})
}
......@@ -488,9 +493,9 @@ func gotOffer(c *webClient, id string, offer webrtc.SessionDescription, renegoti
return err
}
if u := c.Username(); u != "" {
up.label = u
}
up.userId = c.Id()
up.username = c.Username()
err = up.pc.SetRemoteDescription(offer)
if err != nil {
if renegotiate && !isnew {
......@@ -644,17 +649,11 @@ func addDownConnTracks(c *webClient, remote conn.Up, tracks []conn.UpTrack) (*rt
return down, nil
}
func (c *webClient) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack, label string) error {
func (c *webClient) PushConn(g *group.Group, id string, up conn.Up, tracks []conn.UpTrack) error {
err := c.action(pushConnAction{g, id, up, tracks})
if err != nil {
return err
}
if up != nil && label != "" {
err := c.action(addLabelAction{up.Id(), up.Label()})
if err != nil {
return err
}
}
return nil
}
......@@ -723,11 +722,6 @@ type pushConnAction struct {
tracks []conn.UpTrack
}
type addLabelAction struct {
id string
label string
}
type pushConnsAction struct {
group *group.Group
client group.Client
......@@ -756,6 +750,13 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
err := c.write(clientMessage{
Type: "handshake",
})
if err != nil {
return err
}
for {
select {
case m, ok := <-read:
......@@ -808,12 +809,6 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
continue
}
}
case addLabelAction:
c.write(clientMessage{
Type: "label",
Id: a.id,
Value: a.label,
})
case pushConnsAction:
g := c.group
if g == nil || a.group != g {
......@@ -830,7 +825,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
}
go func(u *rtpUpConnection, ts []conn.UpTrack) {
err := a.client.PushConn(
g, u.id, u, ts, u.label,
g, u.id, u, ts,
)
if err != nil {
log.Printf(
......@@ -855,7 +850,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
go c.PushConn(
c.group,
down.remote.Id(), down.remote,
tracks, down.remote.Label(),
tracks,
)
} else if up := getUpConn(c, a.id); up != nil {
c.write(clientMessage{
......@@ -877,6 +872,7 @@ func clientLoop(c *webClient, ws *websocket.Conn) error {
Type: "joined",
Kind: "change",
Group: g.Name(),
Username: c.username,
Permissions: &perms,
RTCConfiguration: ice.ICEConfiguration(),
})
......@@ -1020,6 +1016,20 @@ func kickClient(g *group.Group, id, user, dest string, message string) error {
}
func handleClientMessage(c *webClient, m clientMessage) error {
if m.Source != "" {
if m.Source != c.Id() {
return group.ProtocolError("spoofed client id")
}
}
if m.Type != "join" {
if m.Username != "" {
if m.Username != c.Username() {
return group.ProtocolError("spoofed username")
}
}
}
switch m.Type {
case "join":
if m.Kind == "leave" {
......@@ -1032,6 +1042,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
Type: "joined",
Kind: "leave",
Group: m.Group,
Username: c.username,
Permissions: &perms,
})
}
......@@ -1063,6 +1074,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
Type: "joined",
Kind: "fail",
Group: m.Group,
Username: c.username,
Permissions: &group.ClientPermissions{},
Value: s,
})
......@@ -1074,6 +1086,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
Type: "joined",
Kind: "redirect",
Group: m.Group,
Username: c.username,
Permissions: &group.ClientPermissions{},
Value: redirect,
})
......@@ -1084,6 +1097,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
Type: "joined",
Kind: "join",
Group: m.Group,
Username: c.username,
Permissions: &perms,
RTCConfiguration: ice.ICEConfiguration(),
})
......@@ -1171,12 +1185,6 @@ func handleClientMessage(c *webClient, m clientMessage) error {
log.Printf("ICE: %v", err)
}
case "chat", "usermessage":
if m.Id != c.id {
return group.ProtocolError("wrong sender id")
}
if m.Username != "" && m.Username != c.username {
return group.ProtocolError("wrong sender username")
}
g := c.group
if g == nil {
return c.error(group.UserError("join a group first"))
......@@ -1187,13 +1195,13 @@ func handleClientMessage(c *webClient, m clientMessage) error {
if m.Type == "chat" {
if m.Dest == "" {
g.AddToChatHistory(
m.Id, m.Username, tm, m.Kind, m.Value,
m.Source, m.Username, tm, m.Kind, m.Value,
)
}
}
mm := clientMessage{
Type: m.Type,
Id: m.Id,
Source: m.Source,
Dest: m.Dest,
Username: m.Username,
Privileged: c.permissions.Op,
......@@ -1221,12 +1229,6 @@ func handleClientMessage(c *webClient, m clientMessage) error {
ccc.write(mm)
}
case "groupaction":
if m.Id != c.id {
return group.ProtocolError("wrong sender id")
}
if m.Username != "" && m.Username != c.username {
return group.ProtocolError("wrong sender username")
}
g := c.group
if g == nil {
return c.error(group.UserError("join a group first"))
......@@ -1304,12 +1306,6 @@ func handleClientMessage(c *webClient, m clientMessage) error {
return group.ProtocolError("unknown group action")
}
case "useraction":
if m.Id != c.id {
return group.ProtocolError("wrong sender id")
}
if m.Username != "" && m.Username != c.username {
return group.ProtocolError("wrong sender username")
}
g := c.group
if g == nil {
return c.error(group.UserError("join a group first"))
......@@ -1332,7 +1328,7 @@ func handleClientMessage(c *webClient, m clientMessage) error {
if ok {
message = v
}
err := kickClient(g, m.Id, m.Username, m.Dest, message)
err := kickClient(g, m.Source, m.Username, m.Dest, message)
if err != nil {
return c.error(err)
}
......
......@@ -73,18 +73,6 @@ function getUserPass() {
return userpass || null;
}
/**
* Return null if the user hasn't logged in yet.
*
* @returns {string}
*/
function getUsername() {
let userpass = getUserPass();
if(!userpass)
return null;
return userpass.username;
}
/**
* @typedef {Object} settings
* @property {boolean} [localMute]
......@@ -332,9 +320,6 @@ function gotDownStream(c) {
c.ondowntrack = function(track, transceiver, label, stream) {
setMedia(c, false);
}
c.onlabel = function(label) {
setLabel(c);
}
c.onstatus = function(status) {
setMediaStatus(c);
}
......@@ -1324,7 +1309,7 @@ function setLabel(c, fallback) {
let label = document.getElementById('label-' + c.id);
if(!label)
return;
let l = c.label;
let l = c.username;
if(l) {
label.textContent = l;
label.classList.remove('label-fallback');
......@@ -1856,7 +1841,7 @@ commands.clear = {
predicate: operatorPredicate,
description: 'clear the chat history',
f: (c, r) => {
serverConnection.groupAction(getUsername(), 'clearchat');
serverConnection.groupAction('clearchat');
}
};
......@@ -1865,7 +1850,7 @@ commands.lock = {
description: 'lock this group',
parameters: '[message]',
f: (c, r) => {
serverConnection.groupAction(getUsername(), 'lock', r);
serverConnection.groupAction('lock', r);
}
};
......@@ -1873,7 +1858,7 @@ commands.unlock = {
predicate: operatorPredicate,
description: 'unlock this group, revert the effect of /lock',
f: (c, r) => {
serverConnection.groupAction(getUsername(), 'unlock');
serverConnection.groupAction('unlock');
}
};
......@@ -1881,7 +1866,7 @@ commands.record = {
predicate: recordingPredicate,
description: 'start recording',
f: (c, r) => {
serverConnection.groupAction(getUsername(), 'record');
serverConnection.groupAction('record');
}
};
......@@ -1889,7 +1874,7 @@ commands.unrecord = {
predicate: recordingPredicate,
description: 'stop recording',
f: (c, r) => {
serverConnection.groupAction(getUsername(), 'unrecord');
serverConnection.groupAction('unrecord');
}
};
......@@ -1897,7 +1882,7 @@ commands.subgroups = {
predicate: operatorPredicate,
description: 'list subgroups',
f: (c, r) => {
serverConnection.groupAction(getUsername(), 'subgroups');
serverConnection.groupAction('subgroups');
}
};
......@@ -1969,9 +1954,8 @@ commands.msg = {
let id = findUserId(p[0]);
if(!id)
throw new Error(`Unknown user ${p[0]}`);
let username = getUsername();
serverConnection.chat(username, '', id, p[1]);
addToChatbox(serverConnection.id, id, username,
serverConnection.chat('', id, p[1]);
addToChatbox(serverConnection.id, id, serverConnection.username,
Date.now(), false, '', p[1]);
}
};
......@@ -1987,7 +1971,7 @@ function userCommand(c, r) {
let id = findUserId(p[0]);
if(!id)
throw new Error(`Unknown user ${p[0]}`);
serverConnection.userAction(getUsername(), c, id, p[1]);
serverConnection.userAction(c, id, p[1]);
}
function userMessage(c, r) {
......@@ -1997,7 +1981,7 @@ function userMessage(c, r) {
let id = findUserId(p[0]);
if(!id)
throw new Error(`Unknown user ${p[0]}`);
serverConnection.userMessage(getUsername(), c, id, p[1]);
serverConnection.userMessage(c, id, p[1]);
}
commands.kick = {
......@@ -2058,7 +2042,7 @@ commands.wall = {
f: (c, r) => {
if(!r)
throw new Error('empty message');
serverConnection.userMessage(getUsername(), 'warning', '', r);
serverConnection.userMessage('warning', '', r);
},
};
......@@ -2122,9 +2106,8 @@ function handleInput() {
return;
}
let username = getUsername();
try {
serverConnection.chat(username, me ? 'me' : '', '', message);
serverConnection.chat(me ? 'me' : '', '', message);
} catch(e) {
console.error(e);
displayError(e);
......
......@@ -59,11 +59,15 @@ function ServerConnection() {
*/
this.id = randomid();
/**
* The group that we have joined, or nil if we haven't joined yet.
* The group that we have joined, or null if we haven't joined yet.
*
* @type {string}
*/
this.group = null;
/**
* The username we joined as.
*/
this.username = null;
/**
* The underlying websocket.
*
......@@ -171,6 +175,7 @@ function ServerConnection() {
* @property {string} type
* @property {string} [kind]
* @property {string} [id]
* @property {string} [source]
* @property {string} [dest]
* @property {string} [username]
* @property {string} [password]
......@@ -248,6 +253,7 @@ ServerConnection.prototype.connect = async function(url) {
if(sc.group && sc.onjoined)
sc.onjoined.call(sc, 'leave', sc.group, {}, '');
sc.group = null;
sc.username = null;
if(sc.onclose)
sc.onclose.call(sc, e.code, e.reason);
reject(new Error('websocket close ' + e.code + ' ' + e.reason));
......@@ -255,8 +261,11 @@ ServerConnection.prototype.connect = async function(url) {
this.socket.onmessage = function(e) {
let m = JSON.parse(e.data);
switch(m.type) {
case 'handshake':
break;
case 'offer':
sc.gotOffer(m.id, m.labels, m.offer, m.kind === 'renegotiate');
sc.gotOffer(m.id, m.labels, m.source, m.username,
m.offer, m.kind === 'renegotiate');
break;
case 'answer':
sc.gotAnswer(m.id, m.answer);
......@@ -273,9 +282,6 @@ ServerConnection.prototype.connect = async function(url) {
case 'ice':
sc.gotRemoteIce(m.id, m.candidate);
break;
case 'label':
sc.gotLabel(m.id, m.value);
break;
case 'joined':
if(sc.group) {
if(m.group !== sc.group) {
......@@ -284,6 +290,7 @@ ServerConnection.prototype.connect = async function(url) {
} else {
sc.group = m.group;
}
sc.username = m.username;
sc.permissions = m.permissions || [];
sc.rtcConfiguration = m.rtcConfiguration || null;
if(sc.onjoined)
......@@ -298,14 +305,14 @@ ServerConnection.prototype.connect = async function(url) {
case 'chat':
if(sc.onchat)
sc.onchat.call(
sc, m.id, m.dest, m.username, m.time,
sc, m.source, m.dest, m.username, m.time,
m.privileged, m.kind, m.value,
);
break;
case 'usermessage':
if(sc.onusermessage)
sc.onusermessage.call(
sc, m.id, m.dest, m.username, m.time,
sc, m.source, m.dest, m.username, m.time,
m.privileged, m.kind, m.value,
);
break;
......@@ -441,18 +448,17 @@ ServerConnection.prototype.newUpStream = function(id) {
* chat sends a chat message to the server. The server will normally echo
* the message back to the client.
*
* @param {string} username - The sender's username.
* @param {string} kind
* - The kind of message, either '', 'me' or an application-specific type.
* @param {string} dest - The id to send the message to, empty for broadcast.
* @param {string} value - The text of the message.
*/
ServerConnection.prototype.chat = function(username, kind, dest, value) {
ServerConnection.prototype.chat = function(kind, dest, value) {
this.send({
type: 'chat',
id: this.id,
source: this.id,
dest: dest,
username: username,
username: this.username,
kind: kind,
value: value,
});
......@@ -461,17 +467,16 @@ ServerConnection.prototype.chat = function(username, kind, dest, value) {
/**
* userAction sends a request to act on a user.
*
* @param {string} username - The sender's username.
* @param {string} kind - One of "op", "unop", "kick", "present", "unpresent".
* @param {string} dest - The id of the user to act upon.
* @param {string} [value] - An optional user-readable message.
*/
ServerConnection.prototype.userAction = function(username, kind, dest, value) {
ServerConnection.prototype.userAction = function(kind, dest, value) {
this.send({
type: 'useraction',
id: this.id,
source: this.id,
dest: dest,
username: username,
username: this.username,
kind: kind,
value: value,
});
......@@ -481,17 +486,16 @@ ServerConnection.prototype.userAction = function(username, kind, dest, value) {
* userMessage sends an application-specific message to a user.
* This is similar to a chat message, but is not saved in the chat history.
*
* @param {string} username - The sender's username.
* @param {string} kind - The kind of application-specific message.
* @param {string} dest - The id to send the message to, empty for broadcast.
* @param {string} [value] - An optional parameter.
*/
ServerConnection.prototype.userMessage = function(username, kind, dest, value) {
ServerConnection.prototype.userMessage = function(kind, dest, value) {
this.send({
type: 'usermessage',
id: this.id,
source: this.id,
dest: dest,
username: username,
username: this.username,
kind: kind,
value: value,
});
......@@ -500,17 +504,16 @@ ServerConnection.prototype.userMessage = function(username, kind, dest, value) {
/**
* groupAction sends a request to act on the current group.
*
* @param {string} username - The sender's username.
* @param {string} kind
* - One of 'clearchat', 'lock', 'unlock', 'record' or 'unrecord'.
* @param {string} [message] - An optional user-readable message.
*/
ServerConnection.prototype.groupAction = function(username, kind, message) {
ServerConnection.prototype.groupAction = function(kind, message) {
this.send({
type: 'groupaction',
id: this.id,
source: this.id,
kind: kind,
username: username,
username: this.username,
value: message,
});
};
......@@ -520,11 +523,13 @@ ServerConnection.prototype.groupAction = function(username, kind, message) {
*
* @param {string} id
* @param {Object<string, string>} labels
* @param {string} source
* @param {string} username
* @param {RTCSessionDescriptionInit} offer
* @param {boolean} renegotiate
* @function
*/
ServerConnection.prototype.gotOffer = async function(id, labels, offer, renegotiate) {
ServerConnection.prototype.gotOffer = async function(id, labels, source, username, offer, renegotiate) {
let sc = this;
let c = sc.down[id];
if(c && !renegotiate) {
......@@ -586,6 +591,8 @@ ServerConnection.prototype.gotOffer = async function(id, labels, offer, renegoti
}
c.labelsByMid = labels;
c.source = source;
c.username = username;
if(sc.ondownstream)
sc.ondownstream.call(sc, c);
......@@ -620,22 +627,6 @@ ServerConnection.prototype.gotOffer = async function(id, labels, offer, renegoti
c.onnegotiationcompleted.call(c);
};
/**
* Called when we receive a stream label from the server. Don't call this.
*
* @param {string} id
* @param {string} label
*/
ServerConnection.prototype.gotLabel = function(id, label) {
let c = this.down[id];
if(!c)
throw new Error('Got label for unknown id');
c.label = label;
if(c.onlabel)
c.onlabel.call(c, label);
};
/**
* Called when we receive an answer from the server. Don't call this.
*
......@@ -765,13 +756,19 @@ function Stream(sc, id, pc, up) {
*/
this.kind = null;
/**
* For down streams, a user-readable label.
* For down streams, the id of the client that created the stream.
*
* @type {string}
*/
this.source = null;
/**
* For down streams, the username of the client who created the stream.
*
* @type {string}
*/
this.label = null;
this.username = null;
/**
* The associated RTCPeerConnectoin. This is null before the stream
* The associated RTCPeerConnection. This is null before the stream
* is connected, and may change over time.
*
* @type {RTCPeerConnection}
......@@ -1036,6 +1033,8 @@ Stream.prototype.negotiate = async function (restartIce) {
c.sc.send({
type: 'offer',
source: c.sc.id,
username: c.sc.username,
kind: this.localDescriptionSent ? 'renegotiate' : '',
id: c.id,
labels: c.labelsByMid,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment