WIP. AutoBahn tests 1-5 working. Some tests in 6 and 9 still failing.

This commit is contained in:
Veebs 2011-10-16 23:39:27 +11:00
parent 91796814eb
commit 234952a516
2 changed files with 62 additions and 7 deletions

View File

@ -67,6 +67,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
private static final byte OPCODE_PING = 0x9; private static final byte OPCODE_PING = 0x9;
private static final byte OPCODE_PONG = 0xA; private static final byte OPCODE_PONG = 0xA;
private UTF8Output fragmentedFramesText = null;
private int fragmentedFramesCount = 0; private int fragmentedFramesCount = 0;
private boolean frameFinalFlag; private boolean frameFinalFlag;
@ -204,11 +205,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else { } else {
framePayloadLength = framePayloadLen1; framePayloadLength = framePayloadLen1;
} }
logger.debug("Frame length =" + framePayloadLength);
checkpoint(State.MASKING_KEY); checkpoint(State.MASKING_KEY);
case MASKING_KEY: case MASKING_KEY:
maskingKey = buffer.readBytes(4); maskingKey = buffer.readBytes(4);
checkpoint(State.PAYLOAD); checkpoint(State.PAYLOAD);
return null;
case PAYLOAD: case PAYLOAD:
// Some times, the payload may not be delivered in 1 nice packet // Some times, the payload may not be delivered in 1 nice packet
// We need to accumulate the data until we have it all // We need to accumulate the data until we have it all
@ -253,10 +255,43 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
unmask(framePayload); unmask(framePayload);
} }
// Count the number of fragments // Processing for fragmented messages
String aggregatedText = null;
if (frameFinalFlag) { if (frameFinalFlag) {
fragmentedFramesCount = 0; // Final frame of the sequence. Apparently ping frames are
// allowed in the middle of a fragmented message
if (frameOpcode != OPCODE_PING) {
fragmentedFramesCount = 0;
// Check text for UTF8 correctness
if (frameOpcode == OPCODE_TEXT || fragmentedFramesText != null) {
checkUTF8String(channel, framePayload.array());
}
// If final frame in a fragmented message, then set
// aggregated text so it can be returned
if (fragmentedFramesText != null) {
aggregatedText = fragmentedFramesText.toString();
fragmentedFramesText = null;
}
}
} else { } else {
// Not final frame so we can expect more frames in the
// fragmented sequence
if (fragmentedFramesCount == 0) {
// First text or binary frame for a fragmented set
fragmentedFramesText = null;
if (frameOpcode == OPCODE_TEXT) {
checkUTF8String(channel, framePayload.array());
}
} else {
// Subsequent frames - only check if init frame is text
if (fragmentedFramesText != null) {
checkUTF8String(channel, framePayload.array());
}
}
// Increment counter
fragmentedFramesCount++; fragmentedFramesCount++;
} }
@ -270,7 +305,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else if (frameOpcode == OPCODE_PONG) { } else if (frameOpcode == OPCODE_PONG) {
return new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload); return new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_CONT) { } else if (frameOpcode == OPCODE_CONT) {
return new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload); return new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload, aggregatedText);
} else if (frameOpcode == OPCODE_CLOSE) { } else if (frameOpcode == OPCODE_CLOSE) {
this.receivedClosingHandshake = true; this.receivedClosingHandshake = true;
return new CloseWebSocketFrame(frameFinalFlag, frameRsv); return new CloseWebSocketFrame(frameFinalFlag, frameRsv);
@ -310,4 +345,23 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
return (int) l; return (int) l;
} }
} }
private void checkUTF8String(Channel channel, byte[] bytes) throws CorruptedFrameException {
try {
StringBuilder sb = new StringBuilder("UTF8 " + bytes.length + " bytes: ");
for (byte b : bytes) {
sb.append(Integer.toHexString(b)).append(" ");
}
logger.debug(sb.toString());
if (fragmentedFramesText == null) {
fragmentedFramesText = new UTF8Output(bytes);
} else {
fragmentedFramesText.write(bytes);
}
} catch (UTF8Exception ex) {
protocolViolation(channel, "invalid UTF-8 bytes");
}
}
} }

View File

@ -110,10 +110,11 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
} else { } else {
throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName()); throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName());
} }
logger.debug("Encoding WebSocket Frame opCode=" + opcode);
int length = data.readableBytes(); int length = data.readableBytes();
logger.debug("Encoding WebSocket Frame opCode=" + opcode + " length=" + length);
int b0 = 0; int b0 = 0;
if (frame.isFinalFragment()) { if (frame.isFinalFragment()) {
b0 |= (1 << 7); b0 |= (1 << 7);