Commit 0ba427a6 authored by linquize's avatar linquize Committed by Matt Holt

websocket: Enhancements, message types, and tests (#2359)

* websocket: Should reset respawn parameter when processing next config entry

* websocket: add message types: lines, text, binary

* websocket: Add unit test

* Add websocket sample files
parent 7fab1b15
:2015 {
websocket /cat-lines/ cat {
type lines
}
websocket /cat-lines-32/ cat {
type lines
bufsize 32
}
websocket /cat-text/ cat {
type text
}
websocket /cat-text-32/ cat {
type text
bufsize 32
}
websocket /cat-binary/ cat {
type binary
}
websocket /cat-binary-32/ cat {
type binary
bufsize 32
}
websocket /curl-lines/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\" 2>&1'" {
type lines
}
websocket /curl-text/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\" 2>&1'" {
type text
}
websocket /curl-binary/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\"'" {
type binary
}
websocket /curl-binary-32/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\"'" {
type binary
bufsize 32
}
}
This diff is collapsed.
......@@ -15,6 +15,8 @@
package websocket
import (
"strconv"
"github.com/caddyserver/caddy"
"github.com/caddyserver/caddy/caddyhttp/httpserver"
)
......@@ -45,21 +47,38 @@ func setup(c *caddy.Controller) error {
func webSocketParse(c *caddy.Controller) ([]Config, error) {
var websocks []Config
var respawn bool
optionalBlock := func() (hadBlock bool, err error) {
for c.NextBlock() {
hadBlock = true
if c.Val() == "respawn" {
respawn = true
} else {
return true, c.Err("Expected websocket configuration parameter in block")
for c.Next() {
var respawn bool
var wsType string
var bufSize int
optionalBlock := func() (hadBlock bool, err error) {
for c.NextBlock() {
hadBlock = true
if c.Val() == "respawn" {
respawn = true
} else if c.Val() == "type" {
arg := c.RemainingArgs()
if len(arg) > 0 {
wsType = arg[0]
}
} else if c.Val() == "bufsize" {
arg := c.RemainingArgs()
if len(arg) > 0 {
var err error
bufSize, err = strconv.Atoi(arg[0])
if (bufSize < 0) || (err != nil) {
bufSize = 0
}
}
} else {
return true, c.Err("Expected websocket configuration parameter in block")
}
}
return
}
return
}
for c.Next() {
var val, path, command string
// Path or command; not sure which yet
......@@ -97,11 +116,17 @@ func webSocketParse(c *caddy.Controller) ([]Config, error) {
return nil, err
}
if wsType == "" {
wsType = "lines"
}
websocks = append(websocks, Config{
Path: path,
Command: cmd,
Arguments: args,
Respawn: respawn, // TODO: This isn't used currently
Type: wsType,
BufSize: bufSize,
})
}
......
......@@ -28,6 +28,7 @@ import (
"os/exec"
"strings"
"time"
"unicode/utf8"
"github.com/caddyserver/caddy/caddyhttp/httpserver"
"github.com/gorilla/websocket"
......@@ -76,6 +77,27 @@ type (
Command string
Arguments []string
Respawn bool // TODO: Not used, but parser supports it until we decide on it
Type string
BufSize int
}
wsGetUpgrader interface {
GetUpgrader() wsUpgrader
}
wsUpgrader interface {
Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (wsConn, error)
}
wsConn interface {
Close() error
ReadMessage() (messageType int, p []byte, err error)
SetPongHandler(h func(appData string) error)
SetReadDeadline(t time.Time) error
SetReadLimit(limit int64)
SetWriteDeadline(t time.Time) error
WriteControl(messageType int, data []byte, deadline time.Time) error
WriteMessage(messageType int, data []byte) error
}
)
......@@ -94,12 +116,19 @@ func (ws WebSocket) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, erro
// serveWS is used for setting and upgrading the HTTP connection to a websocket connection.
// It also spawns the child process that is associated with matched HTTP path/url.
func serveWS(w http.ResponseWriter, r *http.Request, config *Config) (int, error) {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
gu, castok := w.(wsGetUpgrader)
var u wsUpgrader
if gu != nil && castok {
u = gu.GetUpgrader()
} else {
u = &realWsUpgrader{o: &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}}
}
conn, err := upgrader.Upgrade(w, r, nil)
conn, err := u.Upgrade(w, r, nil)
if err != nil {
// the connection has been "handled" -- WriteHeader was called with Upgrade,
// so don't return an error status code; just return an error
......@@ -133,8 +162,8 @@ func serveWS(w http.ResponseWriter, r *http.Request, config *Config) (int, error
}
done := make(chan struct{})
go pumpStdout(conn, stdout, done)
pumpStdin(conn, stdin)
go pumpStdout(conn, stdout, done, config)
pumpStdin(conn, stdin, config)
_ = stdin.Close() // close stdin to end the process
......@@ -220,7 +249,7 @@ func buildEnv(cmdPath string, r *http.Request) (metavars []string, err error) {
// pumpStdin handles reading data from the websocket connection and writing
// it to stdin of the process.
func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) {
func pumpStdin(conn wsConn, stdin io.WriteCloser, config *Config) {
// Setup our connection's websocket ping/pong handlers from our const values.
defer conn.Close()
conn.SetReadLimit(maxMessageSize)
......@@ -238,7 +267,10 @@ func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) {
if err != nil {
break
}
message = append(message, '\n')
if config.Type == "lines" {
// no '\n' from client, so append '\n' to spawned process
message = append(message, '\n')
}
if _, err := stdin.Write(message); err != nil {
break
}
......@@ -247,32 +279,102 @@ func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) {
// pumpStdout handles reading data from stdout of the process and writing
// it to websocket connection.
func pumpStdout(conn *websocket.Conn, stdout io.Reader, done chan struct{}) {
func pumpStdout(conn wsConn, stdout io.Reader, done chan struct{}, config *Config) {
go pinger(conn, done)
defer func() {
_ = conn.Close()
close(done) // make sure to close the pinger when we are done.
}()
s := bufio.NewScanner(stdout)
for s.Scan() {
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Println("[ERROR] failed to set write deadline: ", err)
if config.Type == "lines" {
// message must end with '\n'
s := bufio.NewScanner(stdout)
if config.BufSize > 0 {
s.Buffer(make([]byte, config.BufSize), config.BufSize)
}
if err := conn.WriteMessage(websocket.TextMessage, bytes.TrimSpace(s.Bytes())); err != nil {
break
for s.Scan() {
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Println("[ERROR] failed to set write deadline: ", err)
}
if err := conn.WriteMessage(websocket.TextMessage, bytes.TrimSpace(s.Bytes())); err != nil {
break
}
}
}
if s.Err() != nil {
err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, s.Err().Error()), time.Time{})
if err != nil {
log.Println("[ERROR] WriteControl failed: ", err)
if s.Err() != nil {
err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, s.Err().Error()), time.Time{})
if err != nil {
log.Println("[ERROR] WriteControl failed: ", err)
}
}
} else if config.Type == "text" {
// handle UTF-8 text message, newline is not required
r := bufio.NewReader(stdout)
var err1 error
var len int
remainBuf := make([]byte, utf8.UTFMax)
remainLen := 0
bufSize := config.BufSize
if bufSize <= 0 {
bufSize = 2048
}
for {
out := make([]byte, bufSize)
copy(out[:remainLen], remainBuf[:remainLen])
len, err1 = r.Read(out[remainLen:])
if err1 != nil {
break
}
len += remainLen
remainLen = findIncompleteRuneLength(out, len)
if remainLen > 0 {
remainBuf = out[len-remainLen : len]
}
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Println("[ERROR] failed to set write deadline: ", err)
}
if err := conn.WriteMessage(websocket.TextMessage, out[0:len-remainLen]); err != nil {
break
}
}
if err1 != nil {
err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, err1.Error()), time.Time{})
if err != nil {
log.Println("[ERROR] WriteControl failed: ", err)
}
}
} else if config.Type == "binary" {
// treat message as binary data
r := bufio.NewReader(stdout)
var err1 error
var len int
bufSize := config.BufSize
if bufSize <= 0 {
bufSize = 2048
}
for {
out := make([]byte, bufSize)
len, err1 = r.Read(out)
if err1 != nil {
break
}
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Println("[ERROR] failed to set write deadline: ", err)
}
if err := conn.WriteMessage(websocket.BinaryMessage, out[0:len]); err != nil {
break
}
}
if err1 != nil {
err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, err1.Error()), time.Time{})
if err != nil {
log.Println("[ERROR] WriteControl failed: ", err)
}
}
}
}
// pinger simulates the websocket to keep it alive with ping messages.
func pinger(conn *websocket.Conn, done chan struct{}) {
func pinger(conn wsConn, done chan struct{}) {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
......@@ -291,3 +393,90 @@ func pinger(conn *websocket.Conn, done chan struct{}) {
}
}
}
type realWsUpgrader struct {
o *websocket.Upgrader
}
type realWsConn struct {
o *websocket.Conn
}
func (u *realWsUpgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (wsConn, error) {
a, b := u.o.Upgrade(w, r, responseHeader)
return &realWsConn{o: a}, b
}
func (c *realWsConn) Close() error {
return c.o.Close()
}
func (c *realWsConn) ReadMessage() (messageType int, p []byte, err error) {
return c.o.ReadMessage()
}
func (c *realWsConn) SetPongHandler(h func(appData string) error) {
c.o.SetPongHandler(h)
}
func (c *realWsConn) SetReadDeadline(t time.Time) error {
return c.o.SetReadDeadline(t)
}
func (c *realWsConn) SetReadLimit(limit int64) {
c.o.SetReadLimit(limit)
}
func (c *realWsConn) SetWriteDeadline(t time.Time) error {
return c.o.SetWriteDeadline(t)
}
func (c *realWsConn) WriteControl(messageType int, data []byte, deadline time.Time) error {
return c.o.WriteControl(messageType, data, deadline)
}
func (c *realWsConn) WriteMessage(messageType int, data []byte) error {
return c.o.WriteMessage(messageType, data)
}
func findIncompleteRuneLength(p []byte, length int) int {
if length == 0 {
return 0
}
if rune(p[length-1]) < utf8.RuneSelf {
// ASCII 7-bit always complete
return 0
}
lowest := length - utf8.UTFMax
if lowest < 0 {
lowest = 0
}
for start := length - 1; start >= lowest; start-- {
if (p[start] >> 5) == 0x06 {
// 2-byte utf-8 start byte
if length-start >= 2 {
// enough bytes
return 0
}
// 1 byte outstanding
return 1
}
if (p[start] >> 4) == 0x0E {
// 3-byte utf-8 start byte
if length-start >= 3 {
// enough bytes
return 0
}
// some bytes outstanding
return length - start
}
if (p[start] >> 3) == 0x1E {
// 4-byte utf-8 start byte
if length-start >= 4 {
// enough bytes
return 0
}
// some bytes outstanding
return length - start
}
}
// No utf-8 start byte
return 0
}
......@@ -15,8 +15,16 @@
package websocket
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
)
func TestBuildEnv(t *testing.T) {
......@@ -34,3 +42,295 @@ func TestBuildEnv(t *testing.T) {
t.Fatalf("Expected non-empty environment; got %#v", env)
}
}
func TestWebSocketCatOneLineLines(t *testing.T) {
r := httptest.NewRequest("GET", "/cat", nil)
p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "lines"}}}
readCount := 0
waitClose := make(chan bool)
inputStr := "123456"
expectedStr := inputStr
outputStr := ""
conn := &dummyWsConn{
close: func() {},
readMessage: func() (messageType int, buf []byte, err error) {
rc := readCount
readCount++
if rc == 0 {
return websocket.TextMessage, []byte(inputStr), nil
}
<-waitClose
return websocket.CloseMessage, nil, errors.New("EOF")
},
writeControl: func(messageType int, buf []byte) {},
writeMessage: func(messageType int, buf []byte) {
outputStr += string(buf)
waitClose <- true
},
}
w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}}
p.ServeHTTP(w, r)
if outputStr != expectedStr {
t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr)
}
}
func TestWebSocketCatTwoLinesLines(t *testing.T) {
r := httptest.NewRequest("GET", "/cat", nil)
p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "lines"}}}
readCount := 0
waitClose := make(chan bool)
inputStr1 := "Hello World!"
inputStr2 := "This is golang."
expectedStr := inputStr1 + inputStr2
outputStr := ""
outputCount := 0
conn := &dummyWsConn{
close: func() {},
readMessage: func() (messageType int, buf []byte, err error) {
rc := readCount
readCount++
if rc == 0 {
return websocket.TextMessage, []byte(inputStr1), nil
}
if rc == 1 {
return websocket.TextMessage, []byte(inputStr2), nil
}
<-waitClose
return websocket.CloseMessage, nil, errors.New("EOF")
},
writeControl: func(messageType int, buf []byte) {},
writeMessage: func(messageType int, buf []byte) {
outputStr += string(buf)
outputCount++
if outputCount >= 2 {
waitClose <- true
}
},
}
w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}}
p.ServeHTTP(w, r)
if outputStr != expectedStr {
t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr)
}
}
func TestWebSocketCatOneLineText(t *testing.T) {
r := httptest.NewRequest("GET", "/cat", nil)
p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "text"}}}
readCount := 0
waitClose := make(chan bool)
inputStr := "123456\n"
expectedStr := inputStr
outputStr := ""
conn := &dummyWsConn{
close: func() {},
readMessage: func() (messageType int, buf []byte, err error) {
rc := readCount
readCount++
if rc == 0 {
return websocket.TextMessage, []byte(inputStr), nil
}
<-waitClose
return websocket.CloseMessage, nil, errors.New("EOF")
},
writeControl: func(messageType int, buf []byte) {},
writeMessage: func(messageType int, buf []byte) {
outputStr += string(buf)
if strings.Count(outputStr, "\n") >= 1 {
waitClose <- true
}
},
}
w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}}
p.ServeHTTP(w, r)
if outputStr != expectedStr {
t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr)
}
}
func TestWebSocketCatTwoLinesText(t *testing.T) {
r := httptest.NewRequest("GET", "/cat", nil)
p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "text"}}}
readCount := 0
waitClose := make(chan bool)
inputStr1 := "Hello World!\n"
inputStr2 := "This is golang.\n"
expectedStr := inputStr1 + inputStr2
outputStr := ""
conn := &dummyWsConn{
close: func() {},
readMessage: func() (messageType int, buf []byte, err error) {
rc := readCount
readCount++
if rc == 0 {
return websocket.TextMessage, []byte(inputStr1), nil
}
if rc == 1 {
return websocket.TextMessage, []byte(inputStr2), nil
}
<-waitClose
return websocket.CloseMessage, nil, errors.New("EOF")
},
writeControl: func(messageType int, buf []byte) {},
writeMessage: func(messageType int, buf []byte) {
outputStr += string(buf)
if strings.Count(outputStr, "\n") >= 2 {
waitClose <- true
}
},
}
w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}}
p.ServeHTTP(w, r)
if outputStr != expectedStr {
t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr)
}
}
func TestWebSocketCatLongLinesText(t *testing.T) {
r := httptest.NewRequest("GET", "/cat", nil)
p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "text"}}}
readCount := 0
waitClose := make(chan bool)
inputStr1 := "Hello World!\n"
inputStr2 := ""
for i := 0; i < 100000; i++ {
inputStr2 += fmt.Sprintf("No newline %v.", i)
}
inputStr2 += "\n"
inputStr3 := "End of message.\n"
expectedStr := inputStr1 + inputStr2 + inputStr3
outputStr := ""
conn := &dummyWsConn{
close: func() {},
readMessage: func() (messageType int, buf []byte, err error) {
rc := readCount
readCount++
if rc == 0 {
return websocket.TextMessage, []byte(inputStr1), nil
}
if rc == 1 {
return websocket.TextMessage, []byte(inputStr2), nil
}
if rc == 2 {
return websocket.TextMessage, []byte(inputStr3), nil
}
<-waitClose
return websocket.CloseMessage, nil, errors.New("EOF")
},
writeControl: func(messageType int, buf []byte) {},
writeMessage: func(messageType int, buf []byte) {
outputStr += string(buf)
if strings.Count(outputStr, "\n") >= 3 {
waitClose <- true
}
},
}
w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}}
p.ServeHTTP(w, r)
if outputStr != expectedStr {
t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr)
}
}
func TestWebSocketCatBinary(t *testing.T) {
r := httptest.NewRequest("GET", "/cat", nil)
p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "binary"}}}
readCount := 0
waitClose := make(chan bool)
inputArr1 := []byte("Hello World!")
inputArr2 := []byte("End of message.")
expectedArr := make([]byte, 0)
expectedArr = append(expectedArr, inputArr1...)
expectedArr = append(expectedArr, inputArr2...)
outputArr := make([]byte, 0)
conn := &dummyWsConn{
close: func() {},
readMessage: func() (messageType int, buf []byte, err error) {
rc := readCount
readCount++
if rc == 0 {
return websocket.BinaryMessage, inputArr1, nil
}
if rc == 1 {
return websocket.BinaryMessage, inputArr2, nil
}
<-waitClose
return websocket.CloseMessage, nil, errors.New("EOF")
},
writeControl: func(messageType int, buf []byte) {},
writeMessage: func(messageType int, buf []byte) {
outputArr = append(outputArr, buf...)
if len(outputArr) >= len(expectedArr) {
waitClose <- true
}
},
}
w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}}
p.ServeHTTP(w, r)
if !bytes.Equal(outputArr, expectedArr) {
t.Errorf("Received Websocket response %v != %v", outputArr, expectedArr)
}
}
type myResponseRecorder struct {
o *httptest.ResponseRecorder
u *dummyWsUpgrader
}
func (t *myResponseRecorder) Header() http.Header {
return t.o.Header()
}
func (t *myResponseRecorder) Write(buf []byte) (int, error) {
return t.o.Write(buf)
}
func (t *myResponseRecorder) WriteHeader(code int) {
t.o.WriteHeader(code)
}
func (t *myResponseRecorder) Result() *http.Response {
return t.o.Result()
}
func (t *myResponseRecorder) GetUpgrader() wsUpgrader {
return t.u
}
type dummyWsUpgrader struct {
c *dummyWsConn
}
func (t *dummyWsUpgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (wsConn, error) {
return t.c, nil
}
type dummyWsConn struct {
close func()
readMessage func() (messageType int, buf []byte, err error)
writeControl func(messageType int, buf []byte)
writeMessage func(messageType int, buf []byte)
}
func (c *dummyWsConn) Close() error {
c.close()
return nil
}
func (c *dummyWsConn) ReadMessage() (messageType int, p []byte, err error) {
return c.readMessage()
}
func (c *dummyWsConn) SetPongHandler(h func(appData string) error) {
}
func (c *dummyWsConn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *dummyWsConn) SetReadLimit(limit int64) {
}
func (c *dummyWsConn) SetWriteDeadline(t time.Time) error {
return nil
}
func (c *dummyWsConn) WriteControl(messageType int, data []byte, deadline time.Time) error {
c.writeControl(messageType, data)
return nil
}
func (c *dummyWsConn) WriteMessage(messageType int, data []byte) error {
c.writeMessage(messageType, data)
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment