Commit 4f85ca04 authored by indexzero's avatar indexzero

[api doc test] node-http-proxy now emits `websocket:*` on important WebSocket...

[api doc test] node-http-proxy now emits `websocket:*` on important WebSocket events. Added tests for these features and updated some code docs
parent fcfe8462
...@@ -553,17 +553,29 @@ HttpProxy.prototype._forwardRequest = function (req) { ...@@ -553,17 +553,29 @@ HttpProxy.prototype._forwardRequest = function (req) {
}; };
HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options) { HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options) {
var self = this, outgoing, errState = false, CRLF = '\r\n'; var self = this,
listeners = {},
errState = false,
CRLF = '\r\n',
outgoing;
// WebSocket requests has method = GET //
// WebSocket requests must have the `GET` method and
// the `upgrade:websocket` header
//
if (req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket') { if (req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket') {
//
// This request is not WebSocket request // This request is not WebSocket request
//
return; return;
} }
// Turn of all bufferings //
// For server set KeepAlive // Helper function for setting appropriate socket values:
// For client set encoding // 1. Turn of all bufferings
// 2. For server set KeepAlive
// 3. For client set encoding
//
function _socket(socket, keepAlive) { function _socket(socket, keepAlive) {
socket.setTimeout(0); socket.setTimeout(0);
socket.setNoDelay(true); socket.setNoDelay(true);
...@@ -580,20 +592,25 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -580,20 +592,25 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
} }
} }
function onUpgrade(reverseProxy, proxySocket) { //
// On `upgrade` from the Agent socket, listen to
// the appropriate events.
//
function onUpgrade (reverseProxy, proxySocket) {
if (!reverseProxy) { if (!reverseProxy) {
proxySocket.end(); proxySocket.end();
socket.end(); socket.end();
return; return;
} }
var listeners = {}; //
// Any incoming data on this WebSocket to the proxy target
// We're now connected to the server, so lets change server socket // will be written to the `reverseProxy` socket.
proxySocket.on('data', listeners._r_data = function(data) { //
// Pass data to client proxySocket.on('data', listeners.onIncoming = function (data) {
if (reverseProxy.incoming.socket.writable) { if (reverseProxy.incoming.socket.writable) {
try { try {
self.emit('websocket:outgoing', req, socket, head, data);
reverseProxy.incoming.socket.write(data); reverseProxy.incoming.socket.write(data);
} }
catch (e) { catch (e) {
...@@ -603,9 +620,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -603,9 +620,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
} }
}); });
reverseProxy.incoming.socket.on('data', listeners._data = function(data) { //
// Pass data from client to server // Any outgoing data on this Websocket from the proxy target
// will be written to the `proxySocket` socket.
//
reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function(data) {
try { try {
self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data);
proxySocket.write(data); proxySocket.write(data);
} }
catch (e) { catch (e) {
...@@ -614,40 +635,59 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -614,40 +635,59 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
} }
}); });
// Detach event listeners from reverseProxy //
// Helper function to detach all event listeners
// from `reverseProxy` and `proxySocket`.
//
function detach() { function detach() {
proxySocket.removeListener('end', listeners._r_close); proxySocket.removeListener('end', listeners.onIncomingClose);
proxySocket.removeListener('data', listeners._r_data); proxySocket.removeListener('data', listeners.onIncoming);
reverseProxy.incoming.socket.removeListener('data', listeners._data); reverseProxy.incoming.socket.removeListener('end', listeners.onOutgoingClose);
reverseProxy.incoming.socket.removeListener('end', listeners._close); reverseProxy.incoming.socket.removeListener('data', listeners.onOutgoing);
} }
// Hook disconnections //
proxySocket.on('end', listeners._r_close = function() { // If the incoming `proxySocket` socket closes, then
// detach all event listeners.
//
proxySocket.on('end', listeners.onIncomingClose = function() {
reverseProxy.incoming.socket.end(); reverseProxy.incoming.socket.end();
detach(); detach();
// Emit the `end` event now that we have completed proxying
self.emit('websocket:end', req, socket, head);
}); });
reverseProxy.incoming.socket.on('end', listeners._close = function() { //
// If the `reverseProxy` socket closes, then detach all
// event listeners.
//
reverseProxy.incoming.socket.on('end', listeners.onOutgoingClose = function() {
proxySocket.end(); proxySocket.end();
detach(); detach();
}); });
}; };
// Client socket // Setup the incoming client socket.
_socket(socket); _socket(socket);
// Remote host address //
// Get the protocol, and host for this request and create an instance
// of `http.Agent` or `https.Agent` from the pool managed by `node-http-proxy`.
//
var protocolName = options.https || this.target.https ? 'https' : 'http', var protocolName = options.https || this.target.https ? 'https' : 'http',
agent = _getAgent(options.host, options.port, options.https || this.target.https), remoteHost = options.host + (options.port - 80 === 0 ? '' : ':' + options.port),
remoteHost = options.host + (options.port - 80 === 0 ? '' : ':' + options.port); agent = _getAgent(options.host, options.port, options.https || this.target.https);
// Change headers // Change headers (if requested).
if (this.changeOrigin) { if (this.changeOrigin) {
req.headers.host = remoteHost; req.headers.host = remoteHost;
req.headers.origin = protocolName + '://' + remoteHost; req.headers.origin = protocolName + '://' + remoteHost;
} }
//
// Make the outgoing WebSocket request
//
outgoing = { outgoing = {
host: options.host, host: options.host,
port: options.port, port: options.port,
...@@ -655,10 +695,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -655,10 +695,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
path: req.url, path: req.url,
headers: req.headers, headers: req.headers,
}; };
// Make the outgoing WebSocket request
var reverseProxy = agent.appendMessage(outgoing); var reverseProxy = agent.appendMessage(outgoing);
//
// On any errors from the `reverseProxy` emit the
// `webSocketProxyError` and close the appropriate
// connections.
//
function proxyError (err) { function proxyError (err) {
reverseProxy.end(); reverseProxy.end();
if (self.emit('webSocketProxyError', req, socket, head)) { if (self.emit('webSocketProxyError', req, socket, head)) {
...@@ -703,7 +746,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -703,7 +746,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
// //
// If the reverseProxy connection has an underlying socket, // If the reverseProxy connection has an underlying socket,
// then behing the handshake. // then execute the WebSocket handshake.
// //
if (typeof reverseProxy.socket !== 'undefined') { if (typeof reverseProxy.socket !== 'undefined') {
reverseProxy.socket.on('data', function handshake (data) { reverseProxy.socket.on('data', function handshake (data) {
...@@ -741,6 +784,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options ...@@ -741,6 +784,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
// Write the printable and non-printable data to the socket // Write the printable and non-printable data to the socket
// from the original incoming request. // from the original incoming request.
// //
self.emit('websocket:handshake', req, socket, head, sdata, data);
socket.write(sdata); socket.write(sdata);
socket.write(data); socket.write(data);
} }
......
...@@ -12,6 +12,7 @@ var fs = require('fs'), ...@@ -12,6 +12,7 @@ var fs = require('fs'),
vows = require('vows'), vows = require('vows'),
assert = require('assert'), assert = require('assert'),
request = require('request'), request = require('request'),
websocket = require('./../vendor/websocket'),
httpProxy = require('./../lib/node-http-proxy'); httpProxy = require('./../lib/node-http-proxy');
function merge (target) { function merge (target) {
...@@ -120,6 +121,39 @@ TestRunner.prototype.assertResponseCode = function (proxyPort, statusCode, creat ...@@ -120,6 +121,39 @@ TestRunner.prototype.assertResponseCode = function (proxyPort, statusCode, creat
return test; return test;
}; };
TestRunner.prototype.webSocketTest = function (options) {
var self = this;
this.startTargetServer(options.ports.target, 'hello websocket', function (err, target) {
var socket = options.io.listen(target);
if (options.onListen) {
options.onListen(socket);
}
self.startProxyServer(
options.ports.proxy,
options.ports.target,
options.host,
function (err, proxy) {
if (options.onServer) { options.onServer(proxy) }
//
// Setup the web socket against our proxy
//
var uri = options.wsprotocol + '://' + options.host + ':' + options.ports.proxy;
var ws = new websocket.WebSocket(uri + '/socket.io/websocket/', 'borf', {
origin: options.protocol + '://' + options.host
});
if (options.onWsupgrade) { ws.on('wsupgrade', options.onWsupgrade) }
if (options.onMessage) { ws.on('message', options.onMessage) }
if (options.onOpen) { ws.on('open', function () { options.onOpen(ws) }) }
}
);
});
}
// //
// Creates the reverse proxy server // Creates the reverse proxy server
// //
......
...@@ -24,11 +24,11 @@ ...@@ -24,11 +24,11 @@
*/ */
var vows = require('vows'), var assert = require('assert'),
util = require('util'), util = require('util'),
request = require('request'),
assert = require('assert'),
argv = require('optimist').argv, argv = require('optimist').argv,
request = require('request'),
vows = require('vows'),
helpers = require('./helpers'); helpers = require('./helpers');
var forwardOptions = { var forwardOptions = {
......
...@@ -5,14 +5,14 @@ ...@@ -5,14 +5,14 @@
* *
*/ */
var fs = require('fs'), var assert = require('assert'),
vows = require('vows'), fs = require('fs'),
util = require('util'),
path = require('path'), path = require('path'),
util = require('util'),
argv = require('optimist').argv,
request = require('request'), request = require('request'),
assert = require('assert'), vows = require('vows'),
helpers = require('./helpers'), helpers = require('./helpers'),
argv = require('optimist').argv,
TestRunner = helpers.TestRunner; TestRunner = helpers.TestRunner;
var protocol = argv.https ? 'https' : 'http', var protocol = argv.https ? 'https' : 'http',
......
...@@ -24,12 +24,12 @@ ...@@ -24,12 +24,12 @@
*/ */
var vows = require('vows'), var util = require('util'),
util = require('util'),
colors = require('colors'),
request = require('request'),
assert = require('assert'), assert = require('assert'),
argv = require('optimist').argv, argv = require('optimist').argv,
colors = require('colors'),
request = require('request'),
vows = require('vows'),
websocket = require('./../vendor/websocket'), websocket = require('./../vendor/websocket'),
helpers = require('./helpers'); helpers = require('./helpers');
...@@ -38,8 +38,8 @@ try { ...@@ -38,8 +38,8 @@ try {
io = require('socket.io'); io = require('socket.io');
} }
catch (ex) { catch (ex) {
console.error('Socket.io is required for this test:'); console.error('Socket.io is required for this example:');
console.error('npm ' + 'install'.green + ' socket.io'.magenta); console.error('npm ' + 'install'.green + ' socket.io@0.6.18'.magenta);
process.exit(1); process.exit(1);
} }
...@@ -52,35 +52,32 @@ vows.describe('node-http-proxy/websocket/' + wsprotocol).addBatch({ ...@@ -52,35 +52,32 @@ vows.describe('node-http-proxy/websocket/' + wsprotocol).addBatch({
"with no latency" : { "with no latency" : {
"when an inbound message is sent from a WebSocket client": { "when an inbound message is sent from a WebSocket client": {
topic: function () { topic: function () {
var that = this; var that = this
runner.startTargetServer(8130, 'hello websocket', function (err, target) {
var socket = io.listen(target),
headers = {}; headers = {};
runner.webSocketTest({
io: io,
host: 'localhost',
wsprotocol: wsprotocol,
protocol: protocol,
ports: {
target: 8130,
proxy: 8131
},
onListen: function (socket) {
socket.on('connection', function (client) { socket.on('connection', function (client) {
client.on('message', function (msg) { client.on('message', function (msg) {
that.callback(null, msg, headers); that.callback(null, msg, headers);
}); });
}); });
},
runner.startProxyServer(8131, 8130, 'localhost', function (err, proxy) { onWsupgrade: function (req, res) {
//
// Setup the web socket against our proxy
//
var ws = new websocket.WebSocket(wsprotocol + '://localhost:8131/socket.io/websocket/', 'borf', {
origin: protocol + '://localhost'
});
ws.on('wsupgrade', function (req, res) {
headers.request = req; headers.request = req;
headers.response = res.headers; headers.response = res.headers;
}); },
onOpen: function (ws) {
ws.on('open', function () {
ws.send(utils.encode('from client')); ws.send(utils.encode('from client'));
}); }
});
}); });
}, },
"the target server should receive the message": function (err, msg, headers) { "the target server should receive the message": function (err, msg, headers) {
...@@ -92,38 +89,63 @@ vows.describe('node-http-proxy/websocket/' + wsprotocol).addBatch({ ...@@ -92,38 +89,63 @@ vows.describe('node-http-proxy/websocket/' + wsprotocol).addBatch({
assert.equal(headers.request.Origin, headers.response['sec-websocket-origin']); assert.equal(headers.request.Origin, headers.response['sec-websocket-origin']);
} }
}, },
"when an outbound message is sent from the target server": { "when an inbound message is sent from a WebSocket client with event listeners": {
topic: function () { topic: function () {
var that = this; var that = this
headers = {};
runner.startTargetServer(8132, 'hello websocket', function (err, target) { runner.webSocketTest({
var socket = io.listen(target), io: io,
host: 'localhost',
wsprotocol: wsprotocol,
protocol: protocol,
ports: {
target: 8132,
proxy: 8133
},
onServer: function (server) {
server.proxy.on('websocket:incoming', function (req, socket, head, data) {
that.callback(null, data);
});
},
onOpen: function (ws) {
ws.send(utils.encode('from client'));
}
});
},
"should raise the `websocket:incoming` event": function (ign, data) {
assert.equal(utils.decode(data.toString().replace('\u0000', '')), 'from client');
},
},
"when an outbound message is sent from the target server": {
topic: function () {
var that = this,
headers = {}; headers = {};
runner.webSocketTest({
io: io,
host: 'localhost',
wsprotocol: wsprotocol,
protocol: protocol,
ports: {
target: 8134,
proxy: 8135
},
onListen: function (socket) {
socket.on('connection', function (client) { socket.on('connection', function (client) {
socket.broadcast('from server'); socket.broadcast('from server');
}); });
},
runner.startProxyServer(8133, 8132, 'localhost', function (err, proxy) { onWsupgrade: function (req, res) {
//
// Setup the web socket against our proxy
//
var ws = new websocket.WebSocket(wsprotocol + '://localhost:8133/socket.io/websocket/', 'borf', {
origin: protocol + '://localhost'
});
ws.on('wsupgrade', function (req, res) {
headers.request = req; headers.request = req;
headers.response = res.headers; headers.response = res.headers;
}); },
onMessage: function (msg) {
ws.on('message', function (msg) {
msg = utils.decode(msg); msg = utils.decode(msg);
if (!/\d+/.test(msg)) { if (!/\d+/.test(msg)) {
that.callback(null, msg, headers); that.callback(null, msg, headers);
} }
}); }
});
}); });
}, },
"the client should receive the message": function (err, msg, headers) { "the client should receive the message": function (err, msg, headers) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment