Test against AutoBahn WebSocket testsuite. Work in progress

This commit is contained in:
Veebs 2011-10-16 17:01:24 +11:00
parent 7057c59f3d
commit 6f9a886a36
16 changed files with 403 additions and 223 deletions

View File

@ -68,7 +68,7 @@ public class WebSocketClientHandler extends SimpleChannelUpstreamHandler impleme
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
channel = e.getChannel();
this.handshaker = new WebSocketClientHandshakerFactory().newHandshaker(url, version, null);
this.handshaker = new WebSocketClientHandshakerFactory().newHandshaker(url, version, null, false);
handshaker.beginOpeningHandshake(ctx, channel);
}

View File

@ -97,7 +97,7 @@ public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
this.getWebSocketLocation(req), null);
this.getWebSocketLocation(req), null, false);
this.handshaker = wsFactory.newHandshaker(ctx, req);
if (this.handshaker == null) {
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx);

View File

@ -25,11 +25,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
*/
public class BinaryWebSocketFrame extends WebSocketFrame {
@Override
public WebSocketFrameType getType() {
return WebSocketFrameType.BINARY;
}
/**
* Creates a new empty binary frame.
*/
@ -38,7 +33,8 @@ public class BinaryWebSocketFrame extends WebSocketFrame {
}
/**
* Creates a new frame with the specified binary data.
* Creates a new binary frame with the specified binary data. The final
* fragment flag is set to true.
*
* @param binaryData
* the content of the frame.
@ -47,9 +43,26 @@ public class BinaryWebSocketFrame extends WebSocketFrame {
this.setBinaryData(binaryData);
}
/**
* Creates a new binary frame with the specified binary data and the final
* fragment flag.
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param binaryData
* the content of the frame.
*/
public BinaryWebSocketFrame(boolean finalFragment, int rsv, ChannelBuffer binaryData) {
this.setFinalFragment(finalFragment);
this.setRsv(rsv);
this.setBinaryData(binaryData);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(type: " + getType() + ", " + "data: " + getBinaryData() + ')';
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
}
}

View File

@ -24,15 +24,28 @@ import org.jboss.netty.buffer.ChannelBuffers;
*/
public class CloseWebSocketFrame extends WebSocketFrame {
@Override
public WebSocketFrameType getType() {
return WebSocketFrameType.CLOSE;
}
/**
* Creates a new empty binary frame.
* Creates a new empty close frame.
*/
public CloseWebSocketFrame() {
this.setBinaryData(ChannelBuffers.EMPTY_BUFFER);
}
/**
* Creates a new close frame
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
*/
public CloseWebSocketFrame(boolean finalFragment, int rsv) {
this.setFinalFragment(finalFragment);
this.setRsv(rsv);
}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -25,20 +25,16 @@ import org.jboss.netty.buffer.ChannelBuffers;
*/
public class PingWebSocketFrame extends WebSocketFrame {
@Override
public WebSocketFrameType getType() {
return WebSocketFrameType.PING;
}
/**
* Creates a new empty binary frame.
* Creates a new empty ping frame.
*/
public PingWebSocketFrame() {
this.setFinalFragment(true);
this.setBinaryData(ChannelBuffers.EMPTY_BUFFER);
}
/**
* Creates a new frame with the specified binary data.
* Creates a new ping frame with the specified binary data.
*
* @param binaryData
* the content of the frame.
@ -47,9 +43,25 @@ public class PingWebSocketFrame extends WebSocketFrame {
this.setBinaryData(binaryData);
}
/**
* Creates a new ping frame with the specified binary data
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param binaryData
* the content of the frame.
*/
public PingWebSocketFrame(boolean finalFragment, int rsv, ChannelBuffer binaryData) {
this.setFinalFragment(finalFragment);
this.setRsv(rsv);
this.setBinaryData(binaryData);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(type: " + getType() + ", " + "data: " + getBinaryData() + ')';
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
}
}

View File

@ -25,20 +25,15 @@ import org.jboss.netty.buffer.ChannelBuffers;
*/
public class PongWebSocketFrame extends WebSocketFrame {
@Override
public WebSocketFrameType getType() {
return WebSocketFrameType.PONG;
}
/**
* Creates a new empty binary frame.
* Creates a new empty pong frame.
*/
public PongWebSocketFrame() {
this.setBinaryData(ChannelBuffers.EMPTY_BUFFER);
}
/**
* Creates a new frame with the specified binary data.
* Creates a new pong frame with the specified binary data.
*
* @param binaryData
* the content of the frame.
@ -47,9 +42,24 @@ public class PongWebSocketFrame extends WebSocketFrame {
this.setBinaryData(binaryData);
}
/**
* Creates a new pong frame with the specified binary data
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param binaryData
* the content of the frame.
*/
public PongWebSocketFrame(boolean finalFragment, int rsv, ChannelBuffer binaryData) {
this.setFinalFragment(finalFragment);
this.setRsv(rsv);
this.setBinaryData(binaryData);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(type: " + getType() + ", " + "data: " + getBinaryData() + ')';
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
}
}

View File

@ -27,11 +27,6 @@ import org.jboss.netty.util.CharsetUtil;
*/
public class TextWebSocketFrame extends WebSocketFrame {
@Override
public WebSocketFrameType getType() {
return WebSocketFrameType.TEXT;
}
/**
* Creates a new empty text frame.
*/
@ -40,17 +35,23 @@ public class TextWebSocketFrame extends WebSocketFrame {
}
/**
* Creates a new text frame with the specified text string.
* Creates a new text frame with the specified text string. The final
* fragment flag is set to true.
*
* @param text
* String to put in the frame
*/
public TextWebSocketFrame(String text) {
if (text == null || text.isEmpty()) {
this.setBinaryData(ChannelBuffers.EMPTY_BUFFER);
} else {
this.setBinaryData(ChannelBuffers.copiedBuffer(text, CharsetUtil.UTF_8));
}
}
/**
* Creates a new frame with the specified binary data.
* Creates a new text frame with the specified binary data. The final
* fragment flag is set to true.
*
* @param binaryData
* the content of the frame. Must be UTF-8 encoded
@ -59,6 +60,44 @@ public class TextWebSocketFrame extends WebSocketFrame {
this.setBinaryData(binaryData);
}
/**
* Creates a new text frame with the specified text string. The final
* fragment flag is set to true.
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param text
* String to put in the frame
*/
public TextWebSocketFrame(boolean finalFragment, int rsv, String text) {
this.setFinalFragment(finalFragment);
this.setRsv(rsv);
if (text == null || text.isEmpty()) {
this.setBinaryData(ChannelBuffers.EMPTY_BUFFER);
} else {
this.setBinaryData(ChannelBuffers.copiedBuffer(text, CharsetUtil.UTF_8));
}
}
/**
* Creates a new text frame with the specified binary data. The final
* fragment flag is set to true.
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param binaryData
* the content of the frame. Must be UTF-8 encoded
*/
public TextWebSocketFrame(boolean finalFragment, int rsv, ChannelBuffer binaryData) {
this.setFinalFragment(finalFragment);
this.setRsv(rsv);
this.setBinaryData(binaryData);
}
/**
* Returns the text data in this frame
*/

View File

@ -43,7 +43,7 @@ public class WebSocket00FrameEncoder extends OneToOneEncoder {
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame.getType() == WebSocketFrameType.TEXT) {
if (frame instanceof TextWebSocketFrame) {
// Text frame
ChannelBuffer data = frame.getBinaryData();
ChannelBuffer encoded = channel.getConfig().getBufferFactory()
@ -52,7 +52,7 @@ public class WebSocket00FrameEncoder extends OneToOneEncoder {
encoded.writeBytes(data, data.readerIndex(), data.readableBytes());
encoded.writeByte((byte) 0xFF);
return encoded;
} else if (frame.getType() == WebSocketFrameType.CLOSE) {
} else if (frame instanceof CloseWebSocketFrame) {
// Close frame
ChannelBuffer data = frame.getBinaryData();
ChannelBuffer encoded = channel.getConfig().getBufferFactory().getBuffer(data.order(), 2);

View File

@ -39,14 +39,15 @@
package org.jboss.netty.handler.codec.http.websocketx;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import java.util.ArrayList;
import java.util.List;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* Decodes a web socket frame from wire protocol version 8 format. This code was
@ -57,6 +58,8 @@ import java.util.List;
*/
public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDecoder.State> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameDecoder.class);
private static final byte OPCODE_CONT = 0x0;
private static final byte OPCODE_TEXT = 0x1;
private static final byte OPCODE_BINARY = 0x2;
@ -64,20 +67,22 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
private static final byte OPCODE_PING = 0x9;
private static final byte OPCODE_PONG = 0xA;
public static final int MAX_LENGTH = 16384;
private int fragmentedFramesCount = 0;
private Byte fragmentOpcode;
private Byte opcode = null;
private int currentFrameLength;
private boolean frameFinalFlag;
private int frameRsv;
private int frameOpcode;
private long framePayloadLength;
private ChannelBuffer framePayload = null;
private int framePayloadBytesRead = 0;
private ChannelBuffer maskingKey;
private int currentPayloadBytesRead = 0;
private ChannelBuffer currentPayload = null;
private List<ChannelBuffer> frames = new ArrayList<ChannelBuffer>();
private boolean allowExtensions = false;
private boolean maskedPayload = false;
private boolean receivedClosingHandshake = false;
public static enum State {
FRAME_START, PARSING_LENGTH, MASKING_KEY, PARSING_LENGTH_2, PARSING_LENGTH_3, PAYLOAD
FRAME_START, MASKING_KEY, PAYLOAD, CORRUPT
}
/**
@ -86,10 +91,13 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
* @param maskedPayload
* Web socket servers must set this to true processed incoming
* masked payload. Client implementations must set this to false.
* @param allowExtensions
* Flag to allow reserved extension bits to be used or not
*/
public WebSocket08FrameDecoder(boolean maskedPayload) {
public WebSocket08FrameDecoder(boolean maskedPayload, boolean allowExtensions) {
super(State.FRAME_START);
this.maskedPayload = maskedPayload;
this.allowExtensions = allowExtensions;
}
@Override
@ -104,76 +112,99 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
switch (state) {
case FRAME_START:
currentPayloadBytesRead = 0;
currentFrameLength = -1;
currentPayload = null;
framePayloadBytesRead = 0;
framePayloadLength = -1;
framePayload = null;
// FIN, RSV, OPCODE
byte b = buffer.readByte();
byte fin = (byte) (b & 0x80);
byte reserved = (byte) (b & 0x70);
byte opcode = (byte) (b & 0x0F);
frameFinalFlag = (b & 0x80) != 0;
frameRsv = (b & 0x70) >> 4;
frameOpcode = (b & 0x0F);
if (reserved != 0) {
throw new CorruptedFrameException("Reserved bits set: " + bits(reserved));
}
if (!isOpcode(opcode)) {
throw new CorruptedFrameException("Invalid opcode " + hex(opcode));
}
logger.debug("Decoding WebSocket Frame opCode=" + frameOpcode);
if (fin == 0) {
if (fragmentOpcode == null) {
if (!isDataOpcode(opcode)) {
throw new CorruptedFrameException("Fragmented frame with invalid opcode " + hex(opcode));
}
fragmentOpcode = opcode;
} else if (opcode != OPCODE_CONT) {
throw new CorruptedFrameException("Continuation frame with invalid opcode " + hex(opcode));
}
} else {
if (fragmentOpcode != null) {
if (!isControlOpcode(opcode) && opcode != OPCODE_CONT) {
throw new CorruptedFrameException("Final frame with invalid opcode " + hex(opcode));
}
} else if (opcode == OPCODE_CONT) {
throw new CorruptedFrameException("Final frame with invalid opcode " + hex(opcode));
}
this.opcode = opcode;
}
checkpoint(State.PARSING_LENGTH);
case PARSING_LENGTH:
// MASK, PAYLOAD LEN 1
b = buffer.readByte();
int length = (byte) (b);
boolean frameMasked = (b & 0x80) != 0;
int framePayloadLen1 = (b & 0x7F);
if (this.maskedPayload) {
byte masked = (byte) (b & 0x80);
if (masked == 0) {
throw new CorruptedFrameException("Unmasked frame received");
}
length = (byte) (b & 0x7F);
if (frameRsv != 0 && !this.allowExtensions) {
protocolViolation(channel, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
return null;
}
if (length < 126) {
currentFrameLength = length;
if (currentFrameLength == 0) {
checkpoint(State.PAYLOAD);
if (this.maskedPayload && !frameMasked) {
protocolViolation(channel, "unmasked client to server frame");
return null;
}
if (frameOpcode > 7) { // control frame (have MSB in opcode set)
// control frames MUST NOT be fragmented
if (!frameFinalFlag) {
protocolViolation(channel, "fragmented control frame");
return null;
}
// control frames MUST have payload 125 octets or less
if (framePayloadLen1 > 125) {
protocolViolation(channel, "control frame with payload length > 125 octets");
return null;
}
// check for reserved control frame opcodes
if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) {
protocolViolation(channel, "control frame using reserved opcode " + frameOpcode);
return null;
}
// close frame : if there is a body, the first two bytes of the
// body MUST be a 2-byte
// unsigned integer (in network byte order) representing a
// status code
if (frameOpcode == 8 && framePayloadLen1 == 1) {
protocolViolation(channel, "received close control frame with payload len 1");
return null;
}
} else { // data frame
// check for reserved data frame opcodes
if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT || frameOpcode == OPCODE_BINARY)) {
protocolViolation(channel, "data frame using reserved opcode " + frameOpcode);
return null;
}
// check opcode vs message fragmentation state 1/2
if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {
protocolViolation(channel, "received continuation data frame outside fragmented message");
return null;
}
// check opcode vs message fragmentation state 2/2
if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
protocolViolation(channel, "received non-continuation data frame while inside fragmented message");
return null;
}
}
if (framePayloadLen1 == 126) {
framePayloadLength = buffer.readUnsignedShort();
if (framePayloadLength < 126) {
protocolViolation(channel, "invalid data frame length (not using minimal length encoding)");
return null;
}
} else if (framePayloadLen1 == 127) {
framePayloadLength = buffer.readLong();
// TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe
// just check if it's negative?
if (framePayloadLength < 65536) {
protocolViolation(channel, "invalid data frame length (not using minimal length encoding)");
return null;
}
} else {
checkpoint(this.maskedPayload ? State.MASKING_KEY : State.PAYLOAD);
framePayloadLength = framePayloadLen1;
}
} else if (length == 126) {
checkpoint(State.PARSING_LENGTH_2);
} else if (length == 127) {
checkpoint(State.PARSING_LENGTH_3);
}
return null;
case PARSING_LENGTH_2:
currentFrameLength = buffer.readShort();
checkpoint(this.maskedPayload ? State.MASKING_KEY : State.PAYLOAD);
return null;
case PARSING_LENGTH_3:
currentFrameLength = buffer.readInt();
checkpoint(this.maskedPayload ? State.MASKING_KEY : State.PAYLOAD);
return null;
checkpoint(State.MASKING_KEY);
case MASKING_KEY:
maskingKey = buffer.readBytes(4);
checkpoint(State.PAYLOAD);
@ -182,28 +213,28 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Some times, the payload may not be delivered in 1 nice packet
// We need to accumulate the data until we have it all
int rbytes = actualReadableBytes();
ChannelBuffer payload = null;
ChannelBuffer payloadBuffer = null;
int willHaveReadByteCount = currentPayloadBytesRead + rbytes;
if (willHaveReadByteCount == currentFrameLength) {
int willHaveReadByteCount = framePayloadBytesRead + rbytes;
if (willHaveReadByteCount == framePayloadLength) {
// We have all our content so proceed to process
payload = buffer.readBytes(rbytes);
} else if (willHaveReadByteCount < currentFrameLength) {
payloadBuffer = buffer.readBytes(rbytes);
} else if (willHaveReadByteCount < framePayloadLength) {
// We don't have all our content so accumulate payload.
// Returning null means we will get called back
payload = buffer.readBytes(rbytes);
if (currentPayload == null) {
currentPayload = channel.getConfig().getBufferFactory().getBuffer(currentFrameLength);
payloadBuffer = buffer.readBytes(rbytes);
if (framePayload == null) {
framePayload = channel.getConfig().getBufferFactory().getBuffer(toFrameLength(framePayloadLength));
}
currentPayload.writeBytes(payload);
currentPayloadBytesRead = currentPayloadBytesRead + rbytes;
framePayload.writeBytes(payloadBuffer);
framePayloadBytesRead = framePayloadBytesRead + rbytes;
// Return null to wait for more bytes to arrive
return null;
} else if (willHaveReadByteCount > currentFrameLength) {
} else if (willHaveReadByteCount > framePayloadLength) {
// We have more than what we need so read up to the end of frame
// Leave the remainder in the buffer for next frame
payload = buffer.readBytes(currentFrameLength - currentPayloadBytesRead);
payloadBuffer = buffer.readBytes(toFrameLength(framePayloadLength - framePayloadBytesRead));
}
// Now we have all the data, the next checkpoint must be the next
@ -211,53 +242,46 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
checkpoint(State.FRAME_START);
// Take the data that we have in this packet
if (currentPayload == null) {
currentPayload = payload;
if (framePayload == null) {
framePayload = payloadBuffer;
} else {
currentPayload.writeBytes(payload);
framePayload.writeBytes(payloadBuffer);
}
// Unmask data if needed
if (this.maskedPayload) {
unmask(currentPayload);
unmask(framePayload);
}
// Accumulate fragments
if (this.opcode == OPCODE_CONT) {
this.opcode = fragmentOpcode;
frames.add(currentPayload);
int totalBytes = 0;
for (ChannelBuffer channelBuffer : frames) {
totalBytes += channelBuffer.readableBytes();
}
currentPayload = channel.getConfig().getBufferFactory().getBuffer(totalBytes);
for (ChannelBuffer channelBuffer : frames) {
currentPayload.writeBytes(channelBuffer);
}
this.fragmentOpcode = null;
frames.clear();
}
if (this.opcode == OPCODE_TEXT) {
if (currentPayload.readableBytes() > MAX_LENGTH) {
throw new TooLongFrameException();
}
return new TextWebSocketFrame(currentPayload);
} else if (this.opcode == OPCODE_BINARY) {
return new BinaryWebSocketFrame(currentPayload);
} else if (this.opcode == OPCODE_PING) {
return new PingWebSocketFrame(currentPayload);
} else if (this.opcode == OPCODE_PONG) {
return new PongWebSocketFrame(currentPayload);
} else if (this.opcode == OPCODE_CLOSE) {
this.receivedClosingHandshake = true;
return new CloseWebSocketFrame();
// Count the number of fragments
if (frameFinalFlag) {
fragmentedFramesCount = 0;
} else {
throw new UnsupportedOperationException("Cannot decode opcode: " + this.opcode);
fragmentedFramesCount++;
}
// Return the frame
if (frameOpcode == OPCODE_TEXT) {
return new TextWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_BINARY) {
return new BinaryWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_PING) {
return new PingWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_PONG) {
return new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_CONT) {
return new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_CLOSE) {
this.receivedClosingHandshake = true;
return new CloseWebSocketFrame(frameFinalFlag, frameRsv);
} else {
throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: " + frameOpcode);
}
case CORRUPT:
// If we don't keep reading Netty will throw an exception saying
// we can't return null if no bytes read and state not changed.
buffer.readByte();
return null;
default:
throw new Error("Shouldn't reach here.");
}
@ -270,24 +294,20 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
}
private String bits(byte b) {
return Integer.toBinaryString(b).substring(24);
private void protocolViolation(Channel channel, String reason) throws CorruptedFrameException {
checkpoint(State.CORRUPT);
if (channel.isConnected()) {
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
channel.close().awaitUninterruptibly();
}
throw new CorruptedFrameException(reason);
}
private String hex(byte b) {
return Integer.toHexString(b);
private int toFrameLength(long l) throws TooLongFrameException {
if (l > Integer.MAX_VALUE) {
throw new TooLongFrameException("Length:" + l);
} else {
return (int) l;
}
private boolean isOpcode(int opcode) {
return opcode == OPCODE_CONT || opcode == OPCODE_TEXT || opcode == OPCODE_BINARY || opcode == OPCODE_CLOSE
|| opcode == OPCODE_PING || opcode == OPCODE_PONG;
}
private boolean isControlOpcode(int opcode) {
return opcode == OPCODE_CLOSE || opcode == OPCODE_PING || opcode == OPCODE_PONG;
}
private boolean isDataOpcode(int opcode) {
return opcode == OPCODE_TEXT || opcode == OPCODE_BINARY;
}
}

View File

@ -41,9 +41,13 @@ package org.jboss.netty.handler.codec.http.websocketx;
import java.nio.ByteBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* <p>
@ -59,6 +63,9 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
*/
public class WebSocket08FrameEncoder extends OneToOneEncoder {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameEncoder.class);
private static final byte OPCODE_CONT = 0x0;
private static final byte OPCODE_TEXT = 0x1;
private static final byte OPCODE_BINARY = 0x2;
private static final byte OPCODE_CLOSE = 0x8;
@ -87,14 +94,6 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
WebSocketFrame frame = (WebSocketFrame) msg;
ChannelBuffer data = frame.getBinaryData();
// Create buffer with 10 extra bytes for:
// 1 byte opCode
// 5 bytes length in worst case scenario
// 4 bites mask
ChannelBuffer encoded = channel.getConfig().getBufferFactory()
.getBuffer(data.order(), data.readableBytes() + 10);
// Write opcode and length
byte opcode;
if (frame instanceof TextWebSocketFrame) {
opcode = OPCODE_TEXT;
@ -106,47 +105,70 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
opcode = OPCODE_CLOSE;
} else if (frame instanceof BinaryWebSocketFrame) {
opcode = OPCODE_BINARY;
} else if (frame instanceof ContinuationWebSocketFrame) {
opcode = OPCODE_CONT;
} else {
throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName());
}
encoded.writeByte(0x80 | opcode); // Fragmentation currently not
// supported
logger.debug("Encoding WebSocket Frame opCode=" + opcode);
int length = data.readableBytes();
if (length < 126) {
int b0 = 0;
if (frame.isFinalFragment()) {
b0 |= (1 << 7);
}
b0 |= (frame.getRsv() % 8) << 4;
b0 |= opcode % 128;
ChannelBuffer header;
ChannelBuffer body;
if (opcode == OPCODE_PING && length > 125) {
throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was "
+ length);
}
int maskLength = this.maskPayload ? 4 : 0;
if (length <= 125) {
header = ChannelBuffers.buffer(2 + maskLength);
header.writeByte(b0);
byte b = (byte) (this.maskPayload ? (0x80 | (byte) length) : (byte) length);
encoded.writeByte(b);
} else if (length < 65535) {
byte b = (byte) (this.maskPayload ? (0xFE) : 126);
encoded.writeByte(b);
encoded.writeShort(length);
header.writeByte(b);
} else if (length <= 0xFFFF) {
header = ChannelBuffers.buffer(4 + maskLength);
header.writeByte(b0);
header.writeByte(this.maskPayload ? (0xFE) : 126);
header.writeByte((length >>> 8) & 0xFF);
header.writeByte((length) & 0xFF);
} else {
byte b = (byte) (this.maskPayload ? (0xFF) : 127);
encoded.writeByte(b);
encoded.writeInt(length);
header = ChannelBuffers.buffer(10 + maskLength);
header.writeByte(b0);
header.writeByte(this.maskPayload ? (0xFF) : 127);
header.writeLong(length);
}
// Write payload
if (this.maskPayload) {
Integer random = (int) (Math.random() * Integer.MAX_VALUE);
mask = ByteBuffer.allocate(4).putInt(random).array();
header.writeBytes(mask);
encoded.writeBytes(mask);
body = ChannelBuffers.buffer(length);
int counter = 0;
while (data.readableBytes() > 0) {
byte byteData = data.readByte();
encoded.writeByte(byteData ^ mask[+counter++ % 4]);
body.writeByte(byteData ^ mask[+counter++ % 4]);
}
counter++;
} else {
encoded.writeBytes(data, data.readerIndex(), data.readableBytes());
encoded = encoded.slice(0, encoded.writerIndex());
body = data;
}
return ChannelBuffers.wrappedBuffer(header, body);
}
return encoded;
}
// If not websocket, then just return the message
return msg;
}
}

View File

@ -51,6 +51,8 @@ public class WebSocketClientHandshaker10 extends WebSocketClientHandshaker {
private String protocol = null;
private boolean allowExtensions = false;
/**
* Constructor specifying the destination web socket location and version to
* initiate
@ -64,9 +66,13 @@ public class WebSocketClientHandshaker10 extends WebSocketClientHandshaker {
* server
* @param subProtocol
* Sub protocol request sent to the server.
* @param allowExtensions
* Allow extensions to be used in the reserved bits of the web socket frame
*/
public WebSocketClientHandshaker10(URI webSocketURL, WebSocketSpecificationVersion version, String subProtocol) {
public WebSocketClientHandshaker10(URI webSocketURL, WebSocketSpecificationVersion version, String subProtocol,
boolean allowExtensions) {
super(webSocketURL, version, subProtocol);
this.allowExtensions = allowExtensions;
return;
}
@ -179,7 +185,7 @@ public class WebSocketClientHandshaker10 extends WebSocketClientHandshaker {
this.expectedChallengeResponseString));
}
ctx.getPipeline().replace("decoder", "ws-decoder", new WebSocket08FrameDecoder(false));
ctx.getPipeline().replace("decoder", "ws-decoder", new WebSocket08FrameDecoder(false, this.allowExtensions));
this.setOpenningHandshakeCompleted(true);
return;

View File

@ -37,12 +37,14 @@ public class WebSocketClientHandshakerFactory {
* @param subProtocol
* Sub protocol request sent to the server. Null if no
* sub-protocol support is required.
* @param allowExtensions
* Allow extensions to be used in the reserved bits of the web socket frame
* @throws WebSocketHandshakeException
*/
public WebSocketClientHandshaker newHandshaker(URI webSocketURL, WebSocketSpecificationVersion version,
String subProtocol) throws WebSocketHandshakeException {
String subProtocol, boolean allowExtensions) throws WebSocketHandshakeException {
if (version == WebSocketSpecificationVersion.V10) {
return new WebSocketClientHandshaker10(webSocketURL, version, subProtocol);
return new WebSocketClientHandshaker10(webSocketURL, version, subProtocol, allowExtensions);
}
if (version == WebSocketSpecificationVersion.V00) {
return new WebSocketClientHandshaker00(webSocketURL, version, subProtocol);

View File

@ -24,16 +24,22 @@ import org.jboss.netty.buffer.ChannelBuffer;
*/
public abstract class WebSocketFrame {
/**
* Flag to indicate if this frame is the final fragment in a message. The
* first fragment (frame) may also be the final fragment.
*/
private boolean finalFragment = true;
/**
* RSV1, RSV2, RSV3 used for extensions
*/
private int rsv = 0;
/**
* Contents of this frame
*/
private ChannelBuffer binaryData;
/**
* Returns the type of this frame.
*/
public abstract WebSocketFrameType getType();
/**
* Returns binary data
*/
@ -48,4 +54,29 @@ public abstract class WebSocketFrame {
this.binaryData = binaryData;
}
/**
* Flag to indicate if this frame is the final fragment in a message. The
* first fragment (frame) may also be the final fragment.
*/
public boolean isFinalFragment() {
return finalFragment;
}
public void setFinalFragment(boolean finalFragment) {
this.finalFragment = finalFragment;
}
/**
* Bits used for extensions to the standard.
*/
public int getRsv() {
return rsv;
}
public void setRsv(int rsv) {
this.rsv = rsv;
}
}

View File

@ -21,5 +21,5 @@ package org.jboss.netty.handler.codec.http.websocketx;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
*/
public enum WebSocketFrameType {
TEXT, BINARY, PING, PONG, CLOSE
TEXT, BINARY, PING, PONG, CLOSE, CONTINUATION
}

View File

@ -47,6 +47,8 @@ public class WebSocketServerHandshaker10 extends WebSocketServerHandshaker {
public static final String WEBSOCKET_08_ACCEPT_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private boolean allowExtensions = false;
/**
* Constructor specifying the destination web socket location
*
@ -56,9 +58,13 @@ public class WebSocketServerHandshaker10 extends WebSocketServerHandshaker {
* sent to this URL.
* @param subProtocols
* CSV of supported protocols
* @param allowExtensions
* Allow extensions to be used in the reserved bits of the web
* socket frame
*/
public WebSocketServerHandshaker10(String webSocketURL, String subProtocols) {
public WebSocketServerHandshaker10(String webSocketURL, String subProtocols, boolean allowExtensions) {
super(webSocketURL, subProtocols);
this.allowExtensions = allowExtensions;
return;
}
@ -139,7 +145,7 @@ public class WebSocketServerHandshaker10 extends WebSocketServerHandshaker {
// Upgrade the connection and send the handshake response.
ChannelPipeline p = ctx.getChannel().getPipeline();
p.remove("aggregator");
p.replace("decoder", "wsdecoder", new WebSocket08FrameDecoder(true));
p.replace("decoder", "wsdecoder", new WebSocket08FrameDecoder(true, this.allowExtensions));
p.replace("encoder", "wsencoder", new WebSocket08FrameEncoder(false));
return;

View File

@ -34,6 +34,8 @@ public class WebSocketServerHandshakerFactory {
private String subProtocols;
private boolean allowExtensions = false;
/**
* Constructor specifying the destination web socket location
*
@ -44,10 +46,14 @@ public class WebSocketServerHandshakerFactory {
* @param subProtocols
* CSV of supported protocols. Null if sub protocols not
* supported.
* @param allowExtensions
* Allow extensions to be used in the reserved bits of the web
* socket frame
*/
public WebSocketServerHandshakerFactory(String webSocketURL, String subProtocols) {
public WebSocketServerHandshakerFactory(String webSocketURL, String subProtocols, boolean allowExtensions) {
this.webSocketURL = webSocketURL;
this.subProtocols = subProtocols;
this.allowExtensions = allowExtensions;
return;
}
@ -74,7 +80,7 @@ public class WebSocketServerHandshakerFactory {
if (version.equals("8")) {
// Version 8 of the wire protocol - assume version 10 of the
// specification.
return new WebSocketServerHandshaker10(webSocketURL, subProtocols);
return new WebSocketServerHandshaker10(webSocketURL, subProtocols, this.allowExtensions);
} else {
return null;
}