[#910] Make use of ByteBufHolder in SPDY, HTTP multipart and WebSockets to allow for buffer pooling

This commit is contained in:
Norman Maurer 2013-01-11 07:46:56 +01:00
parent f568aa42f0
commit dfbecb796c
40 changed files with 585 additions and 416 deletions

View File

@ -353,4 +353,12 @@ public abstract class AbstractDiskHttpData extends AbstractHttpData {
return file;
}
@Override
public boolean isFreed() {
if (file == null || !file.exists()) {
return true;
}
return false;
}
}

View File

@ -15,8 +15,11 @@
*/
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.handler.codec.http.HttpConstants;
import java.io.IOException;
import java.nio.charset.Charset;
/**
@ -96,4 +99,19 @@ public abstract class AbstractHttpData implements HttpData {
public long length() {
return size;
}
@Override
public ByteBuf data() {
try {
return getByteBuf();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void free() {
delete();
}
}

View File

@ -226,4 +226,9 @@ public abstract class AbstractMemoryHttpData extends AbstractHttpData {
public File getFile() throws IOException {
throw new IOException("Not represented by a file");
}
@Override
public boolean isFreed() {
return data().isFreed();
}
}

View File

@ -30,4 +30,7 @@ public interface Attribute extends HttpData {
* Sets the value of this HttpData.
*/
void setValue(String value) throws IOException;
@Override
Attribute copy();
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.handler.codec.http.HttpConstants;
import java.io.IOException;
@ -138,4 +139,19 @@ public class DiskAttribute extends AbstractDiskHttpData implements Attribute {
protected String getPrefix() {
return prefix;
}
@Override
public DiskAttribute copy() {
DiskAttribute attr = new DiskAttribute(getName());
attr.setCharset(getCharset());
ByteBuf content = data();
if (content != null) {
try {
attr.setContent(content.copy());
} catch (IOException e) {
throw new ChannelException(e);
}
}
return attr;
}
}

View File

@ -15,9 +15,12 @@
*/
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
/**
@ -159,4 +162,19 @@ public class DiskFileUpload extends AbstractDiskHttpData implements FileUpload {
protected String getPrefix() {
return prefix;
}
@Override
public DiskFileUpload copy() {
DiskFileUpload upload = new DiskFileUpload(getName(),
getFilename(), getContentType(), getContentTransferEncoding(), getCharset(), size);
ByteBuf buf = data();
if (buf != null) {
try {
upload.setContent(buf.copy());
} catch (IOException e) {
throw new ChannelException(e);
}
}
return upload;
}
}

View File

@ -55,4 +55,7 @@ public interface FileUpload extends HttpData {
* @return the Content-Transfer-Encoding
*/
String getContentTransferEncoding();
@Override
FileUpload copy();
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import java.io.File;
import java.io.IOException;
@ -25,7 +26,7 @@ import java.nio.charset.Charset;
/**
* Extended interface for InterfaceHttpData
*/
public interface HttpData extends InterfaceHttpData {
public interface HttpData extends InterfaceHttpData, ByteBufHolder {
/**
* Set the content from the ChannelBuffer (erase any previous data)
*
@ -176,4 +177,6 @@ public interface HttpData extends InterfaceHttpData {
*/
File getFile() throws IOException;
@Override
HttpData copy();
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.handler.codec.http.HttpConstants;
import java.io.IOException;
@ -100,4 +101,18 @@ public class MemoryAttribute extends AbstractMemoryHttpData implements Attribute
return getName() + '=' + getValue();
}
@Override
public MemoryAttribute copy() {
MemoryAttribute attr = new MemoryAttribute(getName());
attr.setCharset(getCharset());
ByteBuf content = data();
if (content != null) {
try {
attr.setContent(content.copy());
} catch (IOException e) {
throw new ChannelException(e);
}
}
return attr;
}
}

View File

@ -15,8 +15,11 @@
*/
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.IOException;
import java.nio.charset.Charset;
/**
@ -125,4 +128,20 @@ public class MemoryFileUpload extends AbstractMemoryHttpData implements FileUplo
"Completed: " + isCompleted() +
"\r\nIsInMemory: " + isInMemory();
}
@Override
public MemoryFileUpload copy() {
MemoryFileUpload upload = new MemoryFileUpload(getName(), getFilename(), getContentType(),
getContentTransferEncoding(), getCharset(), size);
ByteBuf buf = data();
if (buf != null) {
try {
upload.setContent(buf.copy());
return upload;
} catch (IOException e) {
throw new ChannelException(e);
}
}
return upload;
}
}

View File

@ -199,4 +199,23 @@ public class MixedAttribute implements Attribute {
return attribute.getFile();
}
@Override
public Attribute copy() {
return attribute.copy();
}
@Override
public ByteBuf data() {
return attribute.data();
}
@Override
public void free() {
attribute.free();
}
@Override
public boolean isFreed() {
return attribute.isFreed();
}
}

View File

@ -224,4 +224,23 @@ public class MixedFileUpload implements FileUpload {
return fileUpload.getFile();
}
@Override
public FileUpload copy() {
return fileUpload.copy();
}
@Override
public ByteBuf data() {
return fileUpload.data();
}
@Override
public void free() {
fileUpload.free();
}
@Override
public boolean isFreed() {
return fileUpload.isFreed();
}
}

View File

@ -27,7 +27,7 @@ public class BinaryWebSocketFrame extends WebSocketFrame {
* Creates a new empty binary frame.
*/
public BinaryWebSocketFrame() {
setBinaryData(Unpooled.EMPTY_BUFFER);
super(Unpooled.buffer(0));
}
/**
@ -37,7 +37,7 @@ public class BinaryWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public BinaryWebSocketFrame(ByteBuf binaryData) {
setBinaryData(binaryData);
super(binaryData);
}
/**
@ -51,14 +51,11 @@ public class BinaryWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public BinaryWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
setBinaryData(binaryData);
super(finalFragment, rsv, binaryData);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
public BinaryWebSocketFrame copy() {
return new BinaryWebSocketFrame(isFinalFragment(), rsv(), data().copy());
}
}

View File

@ -30,7 +30,7 @@ public class CloseWebSocketFrame extends WebSocketFrame {
* Creates a new empty close frame.
*/
public CloseWebSocketFrame() {
setBinaryData(Unpooled.EMPTY_BUFFER);
super(Unpooled.buffer(0));
}
/**
@ -72,9 +72,10 @@ public class CloseWebSocketFrame extends WebSocketFrame {
* Reason text. Set to null if no text.
*/
public CloseWebSocketFrame(boolean finalFragment, int rsv, int statusCode, String reasonText) {
setFinalFragment(finalFragment);
setRsv(rsv);
super(finalFragment, rsv, newBinaryData(statusCode, reasonText));
}
private static ByteBuf newBinaryData(int statusCode, String reasonText) {
byte[] reasonBytes = EMTPY_REASON;
if (reasonText != null) {
reasonBytes = reasonText.getBytes(CharsetUtil.UTF_8);
@ -87,7 +88,7 @@ public class CloseWebSocketFrame extends WebSocketFrame {
}
binaryData.readerIndex(0);
setBinaryData(binaryData);
return binaryData;
}
/**
@ -101,21 +102,15 @@ public class CloseWebSocketFrame extends WebSocketFrame {
* the content of the frame. Must be 2 byte integer followed by optional UTF-8 encoded string.
*/
public CloseWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
if (binaryData == null) {
setBinaryData(Unpooled.EMPTY_BUFFER);
} else {
setBinaryData(binaryData);
}
super(finalFragment, rsv, binaryData);
}
/**
* Returns the closing status code as per <a href="http://tools.ietf.org/html/rfc6455#section-7.4">RFC 6455</a>. If
* a status code is set, -1 is returned.
*/
public int getStatusCode() {
ByteBuf binaryData = getBinaryData();
public int statusCode() {
ByteBuf binaryData = data();
if (binaryData == null || binaryData.capacity() == 0) {
return -1;
}
@ -131,8 +126,8 @@ public class CloseWebSocketFrame extends WebSocketFrame {
* Returns the reason text as per <a href="http://tools.ietf.org/html/rfc6455#section-7.4">RFC 6455</a> If a reason
* text is not supplied, an empty string is returned.
*/
public String getReasonText() {
ByteBuf binaryData = getBinaryData();
public String reasonText() {
ByteBuf binaryData = data();
if (binaryData == null || binaryData.capacity() <= 2) {
return "";
}
@ -145,7 +140,7 @@ public class CloseWebSocketFrame extends WebSocketFrame {
}
@Override
public String toString() {
return getClass().getSimpleName();
public CloseWebSocketFrame copy() {
return new CloseWebSocketFrame(isFinalFragment(), rsv(), data().copy());
}
}

View File

@ -31,7 +31,7 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
* Creates a new empty continuation frame.
*/
public ContinuationWebSocketFrame() {
setBinaryData(Unpooled.EMPTY_BUFFER);
super(Unpooled.buffer(0));
}
/**
@ -41,7 +41,7 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
* @param binaryData the content of the frame.
*/
public ContinuationWebSocketFrame(ByteBuf binaryData) {
setBinaryData(binaryData);
super(binaryData);
}
/**
@ -55,9 +55,7 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public ContinuationWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
setBinaryData(binaryData);
super(finalFragment, rsv, binaryData);
}
/**
@ -75,9 +73,7 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
*/
public ContinuationWebSocketFrame(
boolean finalFragment, int rsv, ByteBuf binaryData, String aggregatedText) {
setFinalFragment(finalFragment);
setRsv(rsv);
setBinaryData(binaryData);
super(finalFragment, rsv, binaryData);
this.aggregatedText = aggregatedText;
}
@ -92,19 +88,14 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
* text content of the frame.
*/
public ContinuationWebSocketFrame(boolean finalFragment, int rsv, String text) {
setFinalFragment(finalFragment);
setRsv(rsv);
setText(text);
this(finalFragment, rsv, fromText(text), null);
}
/**
* Returns the text data in this frame
*/
public String getText() {
if (getBinaryData() == null) {
return null;
}
return getBinaryData().toString(CharsetUtil.UTF_8);
public String text() {
return data().toString(CharsetUtil.UTF_8);
}
/**
@ -113,28 +104,24 @@ public class ContinuationWebSocketFrame extends WebSocketFrame {
* @param text
* text to store
*/
public void setText(String text) {
private static ByteBuf fromText(String text) {
if (text == null || text.isEmpty()) {
setBinaryData(Unpooled.EMPTY_BUFFER);
return Unpooled.EMPTY_BUFFER;
} else {
setBinaryData(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8));
return Unpooled.copiedBuffer(text, CharsetUtil.UTF_8);
}
}
@Override
public String toString() {
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
}
/**
* Aggregated text returned by decoder on the final continuation frame of a fragmented text message
*/
public String getAggregatedText() {
public String aggregatedText() {
return aggregatedText;
}
public void setAggregatedText(String aggregatedText) {
this.aggregatedText = aggregatedText;
@Override
public ContinuationWebSocketFrame copy() {
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), data().copy(), aggregatedText());
}
}

View File

@ -27,8 +27,7 @@ public class PingWebSocketFrame extends WebSocketFrame {
* Creates a new empty ping frame.
*/
public PingWebSocketFrame() {
setFinalFragment(true);
setBinaryData(Unpooled.EMPTY_BUFFER);
super(true, 0, Unpooled.buffer(0));
}
/**
@ -38,7 +37,7 @@ public class PingWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public PingWebSocketFrame(ByteBuf binaryData) {
setBinaryData(binaryData);
super(binaryData);
}
/**
@ -52,14 +51,11 @@ public class PingWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public PingWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
setBinaryData(binaryData);
super(finalFragment, rsv, binaryData);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
public PingWebSocketFrame copy() {
return new PingWebSocketFrame(isFinalFragment(), rsv(), data().copy());
}
}

View File

@ -27,7 +27,7 @@ public class PongWebSocketFrame extends WebSocketFrame {
* Creates a new empty pong frame.
*/
public PongWebSocketFrame() {
setBinaryData(Unpooled.EMPTY_BUFFER);
super(Unpooled.buffer(0));
}
/**
@ -37,7 +37,7 @@ public class PongWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public PongWebSocketFrame(ByteBuf binaryData) {
setBinaryData(binaryData);
super(binaryData);
}
/**
@ -51,14 +51,12 @@ public class PongWebSocketFrame extends WebSocketFrame {
* the content of the frame.
*/
public PongWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
setBinaryData(binaryData);
super(finalFragment, rsv, binaryData);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(data: " + getBinaryData() + ')';
public PongWebSocketFrame copy() {
return new PongWebSocketFrame(isFinalFragment(), rsv(), data().copy());
}
}

View File

@ -28,7 +28,7 @@ public class TextWebSocketFrame extends WebSocketFrame {
* Creates a new empty text frame.
*/
public TextWebSocketFrame() {
setBinaryData(Unpooled.EMPTY_BUFFER);
super(Unpooled.buffer(0));
}
/**
@ -38,11 +38,7 @@ public class TextWebSocketFrame extends WebSocketFrame {
* String to put in the frame
*/
public TextWebSocketFrame(String text) {
if (text == null || text.isEmpty()) {
setBinaryData(Unpooled.EMPTY_BUFFER);
} else {
setBinaryData(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8));
}
super(fromText(text));
}
/**
@ -52,7 +48,7 @@ public class TextWebSocketFrame extends WebSocketFrame {
* the content of the frame. Must be UTF-8 encoded
*/
public TextWebSocketFrame(ByteBuf binaryData) {
setBinaryData(binaryData);
super(binaryData);
}
/**
@ -66,12 +62,14 @@ public class TextWebSocketFrame extends WebSocketFrame {
* String to put in the frame
*/
public TextWebSocketFrame(boolean finalFragment, int rsv, String text) {
setFinalFragment(finalFragment);
setRsv(rsv);
super(finalFragment, rsv, fromText(text));
}
private static ByteBuf fromText(String text) {
if (text == null || text.isEmpty()) {
setBinaryData(Unpooled.EMPTY_BUFFER);
return Unpooled.EMPTY_BUFFER;
} else {
setBinaryData(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8));
return Unpooled.copiedBuffer(text, CharsetUtil.UTF_8);
}
}
@ -86,36 +84,18 @@ public class TextWebSocketFrame extends WebSocketFrame {
* the content of the frame. Must be UTF-8 encoded
*/
public TextWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
setBinaryData(binaryData);
super(finalFragment, rsv, binaryData);
}
/**
* Returns the text data in this frame
*/
public String getText() {
if (getBinaryData() == null) {
return null;
}
return getBinaryData().toString(CharsetUtil.UTF_8);
}
/**
* Sets the string for this frame
*
* @param text
* text to store
*/
public void setText(String text) {
if (text == null) {
throw new NullPointerException("text");
}
setBinaryData(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8));
public String text() {
return data().toString(CharsetUtil.UTF_8);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(text: " + getText() + ')';
public TextWebSocketFrame copy() {
return new TextWebSocketFrame(isFinalFragment(), rsv(), data().copy());
}
}

View File

@ -35,6 +35,8 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
/**
* Checks UTF8 bytes for validity before converting it into a string
*/
@ -66,9 +68,15 @@ final class UTF8Output {
private final StringBuilder stringBuilder;
UTF8Output(byte[] bytes) {
stringBuilder = new StringBuilder(bytes.length);
write(bytes);
UTF8Output(ByteBuf buffer) {
stringBuilder = new StringBuilder(buffer.readableBytes());
write(buffer);
}
public void write(ByteBuf buffer) {
for (int i = buffer.readerIndex(); i < buffer.writerIndex(); i++) {
write(buffer.getByte(i));
}
}
public void write(byte[] bytes) {

View File

@ -63,14 +63,14 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<Void> {
byte type = in.readByte();
if ((type & 0x80) == 0x80) {
// If the MSB on type is set, decode the frame length
return decodeBinaryFrame(type, in);
return decodeBinaryFrame(ctx, type, in);
} else {
// Decode a 0xff terminated UTF-8 string
return decodeTextFrame(in);
return decodeTextFrame(ctx, in);
}
}
private WebSocketFrame decodeBinaryFrame(byte type, ByteBuf buffer) {
private WebSocketFrame decodeBinaryFrame(ChannelHandlerContext ctx, byte type, ByteBuf buffer) {
long frameSize = 0;
int lengthFieldSize = 0;
byte b;
@ -92,11 +92,12 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<Void> {
receivedClosingHandshake = true;
return new CloseWebSocketFrame();
}
return new BinaryWebSocketFrame(buffer.readBytes((int) frameSize));
ByteBuf payload = ctx.alloc().buffer((int) frameSize);
buffer.readBytes(payload);
return new BinaryWebSocketFrame(payload);
}
private WebSocketFrame decodeTextFrame(ByteBuf buffer) {
private WebSocketFrame decodeTextFrame(ChannelHandlerContext ctx, ByteBuf buffer) {
int ridx = buffer.readerIndex();
int rbytes = actualReadableBytes();
int delimPos = buffer.indexOf(ridx, ridx + rbytes, (byte) 0xFF);
@ -116,7 +117,8 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<Void> {
throw new TooLongFrameException();
}
ByteBuf binaryData = buffer.readBytes(frameSize);
ByteBuf binaryData = ctx.alloc().buffer(frameSize);
buffer.readBytes(binaryData);
buffer.skipBytes(1);
int ffDelimPos = binaryData.indexOf(binaryData.readerIndex(), binaryData.writerIndex(), (byte) 0xFF);

View File

@ -42,7 +42,7 @@ public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame
WebSocketFrame msg, ByteBuf out) throws Exception {
if (msg instanceof TextWebSocketFrame) {
// Text frame
ByteBuf data = msg.getBinaryData();
ByteBuf data = msg.data();
out.writeByte((byte) 0x00);
out.writeBytes(data, data.readerIndex(), data.readableBytes());
out.writeByte((byte) 0xFF);
@ -52,7 +52,7 @@ public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame
out.writeByte((byte) 0x00);
} else {
// Binary frame
ByteBuf data = msg.getBinaryData();
ByteBuf data = msg.data();
int dataLen = data.readableBytes();
out.ensureWritableBytes(dataLen + 5);

View File

@ -54,7 +54,6 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
@ -251,15 +250,16 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// framePayloadLength);
if (willHaveReadByteCount == framePayloadLength) {
// We have all our content so proceed to process
payloadBuffer = in.readBytes(rbytes);
payloadBuffer = ctx.alloc().buffer(rbytes);
payloadBuffer.writeBytes(in, rbytes);
} else if (willHaveReadByteCount < framePayloadLength) {
// We don't have all our content so accumulate payload.
// Returning null means we will get called back
payloadBuffer = in.readBytes(rbytes);
if (framePayload == null) {
framePayload = Unpooled.buffer(toFrameLength(framePayloadLength));
framePayload = ctx.alloc().buffer(toFrameLength(framePayloadLength));
}
framePayload.writeBytes(payloadBuffer);
framePayload.writeBytes(in, rbytes);
framePayloadBytesRead += rbytes;
// Return null to wait for more bytes to arrive
@ -267,7 +267,10 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else if (willHaveReadByteCount > framePayloadLength) {
// We have more than what we need so read up to the end of frame
// Leave the remainder in the buffer for next frame
payloadBuffer = in.readBytes(toFrameLength(framePayloadLength - framePayloadBytesRead));
if (framePayload == null) {
framePayload = ctx.alloc().buffer(toFrameLength(framePayloadLength));
}
framePayload.writeBytes(in, toFrameLength(framePayloadLength - framePayloadBytesRead));
}
// Now we have all the data, the next checkpoint must be the next
@ -277,7 +280,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Take the data that we have in this packet
if (framePayload == null) {
framePayload = payloadBuffer;
} else {
} else if (payloadBuffer != null) {
framePayload.writeBytes(payloadBuffer);
}
@ -312,7 +315,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Check text for UTF8 correctness
if (frameOpcode == OPCODE_TEXT || fragmentedFramesText != null) {
// Check UTF-8 correctness for this payload
checkUTF8String(ctx, framePayload.array());
checkUTF8String(ctx, framePayload);
// This does a second check to make sure UTF-8
// correctness for entire text message
@ -328,12 +331,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// First text or binary frame for a fragmented set
fragmentedFramesText = null;
if (frameOpcode == OPCODE_TEXT) {
checkUTF8String(ctx, framePayload.array());
checkUTF8String(ctx, framePayload);
}
} else {
// Subsequent frames - only check if init frame is text
if (fragmentedFramesText != null) {
checkUTF8String(ctx, framePayload.array());
checkUTF8String(ctx, framePayload);
}
}
@ -349,7 +352,8 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else if (frameOpcode == OPCODE_CONT) {
return new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload, aggregatedText);
} else {
throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: " + frameOpcode);
throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
+ frameOpcode);
}
case CORRUPT:
// If we don't keep reading Netty will throw an exception saying
@ -362,8 +366,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
private void unmask(ByteBuf frame) {
byte[] bytes = frame.array();
for (int i = 0; i < bytes.length; i++) {
for (int i = frame.readerIndex(); i < frame.writerIndex(); i++) {
frame.setByte(i, frame.getByte(i) ^ maskingKey.getByte(i % 4));
}
}
@ -384,12 +387,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
}
private void checkUTF8String(ChannelHandlerContext ctx, byte[] bytes) {
private void checkUTF8String(ChannelHandlerContext ctx, ByteBuf buffer) {
try {
if (fragmentedFramesText == null) {
fragmentedFramesText = new UTF8Output(bytes);
fragmentedFramesText = new UTF8Output(buffer);
} else {
fragmentedFramesText.write(bytes);
fragmentedFramesText.write(buffer);
}
} catch (UTF8Exception ex) {
protocolViolation(ctx, "invalid UTF-8 bytes");
@ -418,11 +421,10 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
// May have UTF-8 message
if (buffer.readableBytes() > 0) {
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
if (buffer.readable()) {
try {
new UTF8Output(b);
new UTF8Output(buffer);
} catch (UTF8Exception ex) {
protocolViolation(ctx, "Invalid close frame reason text. Invalid UTF-8 bytes");
}

View File

@ -100,7 +100,7 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame
byte[] mask;
ByteBuf data = msg.getBinaryData();
ByteBuf data = msg.data();
if (data == null) {
data = Unpooled.EMPTY_BUFFER;
}
@ -132,7 +132,7 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame
if (msg.isFinalFragment()) {
b0 |= 1 << 7;
}
b0 |= msg.getRsv() % 8 << 4;
b0 |= msg.rsv() % 8 << 4;
b0 |= opcode % 128;
if (opcode == OPCODE_PING && length > 125) {

View File

@ -16,40 +16,32 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
/**
* Base class for web socket frames
*/
public abstract class WebSocketFrame {
public abstract class WebSocketFrame extends DefaultByteBufHolder {
/**
* Flag to indicate if this frame is the final fragment in a message. The first fragment (frame) may also be the
* final fragment.
*/
private boolean finalFragment = true;
private final boolean finalFragment;
/**
* RSV1, RSV2, RSV3 used for extensions
*/
private int rsv;
private final int rsv;
/**
* Contents of this frame
*/
private ByteBuf binaryData;
/**
* Returns binary data
*/
public ByteBuf getBinaryData() {
return binaryData;
protected WebSocketFrame(ByteBuf binaryData) {
this(true, 0, binaryData);
}
/**
* Sets the binary data for this frame
*/
public void setBinaryData(ByteBuf binaryData) {
this.binaryData = binaryData;
protected WebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) {
super(binaryData);
this.finalFragment = finalFragment;
this.rsv = rsv;
}
/**
@ -60,19 +52,19 @@ public abstract class WebSocketFrame {
return finalFragment;
}
public void setFinalFragment(boolean finalFragment) {
this.finalFragment = finalFragment;
}
/**
* Bits used for extensions to the standard.
*/
public int getRsv() {
public int rsv() {
return rsv;
}
public void setRsv(int rsv) {
this.rsv = rsv;
@Override
public abstract WebSocketFrame copy();
@Override
public String toString() {
return getClass().getSimpleName() + "(data: " + data().toString() + ')';
}
}

View File

@ -81,7 +81,7 @@ public class WebSocketServerProtocolHandler extends ChannelInboundMessageHandler
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
ctx.channel().write(new PongWebSocketFrame(frame.data()));
return;
}
@ -100,6 +100,15 @@ public class WebSocketServerProtocolHandler extends ChannelInboundMessageHandler
}
}
@Override
protected void freeInboundMessage(WebSocketFrame msg) throws Exception {
if (msg instanceof PingWebSocketFrame || msg instanceof CloseWebSocketFrame) {
// Will be freed once wrote back
return;
}
super.freeInboundMessage(msg);
}
static WebSocketServerHandshaker getHandshaker(ChannelHandlerContext ctx) {
return ctx.attr(HANDSHAKER_ATTR_KEY).get();
}

View File

@ -16,17 +16,17 @@
package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.StringUtil;
/**
* The default {@link SpdyDataFrame} implementation.
*/
public class DefaultSpdyDataFrame implements SpdyDataFrame {
public class DefaultSpdyDataFrame extends DefaultByteBufHolder implements SpdyDataFrame {
private int streamId;
private boolean last;
private ByteBuf data = Unpooled.EMPTY_BUFFER;
/**
* Creates a new instance.
@ -34,9 +34,28 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame {
* @param streamId the Stream-ID of this frame
*/
public DefaultSpdyDataFrame(int streamId) {
this(streamId, Unpooled.buffer(0));
}
/**
* Creates a new instance.
*
* @param streamId the Stream-ID of this frame
* @param data the payload of the frame. Can not exceed {@link SpdyCodecUtil#SPDY_MAX_LENGTH}
*/
public DefaultSpdyDataFrame(int streamId, ByteBuf data) {
super(validate(data));
setStreamId(streamId);
}
private static ByteBuf validate(ByteBuf data) {
if (data.readableBytes() > SpdyCodecUtil.SPDY_MAX_LENGTH) {
throw new IllegalArgumentException("data payload cannot exceed "
+ SpdyCodecUtil.SPDY_MAX_LENGTH + " bytes");
}
return data;
}
@Override
public int getStreamId() {
return streamId;
@ -62,20 +81,10 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame {
}
@Override
public ByteBuf getData() {
return data;
}
@Override
public void setData(ByteBuf data) {
if (data == null) {
data = Unpooled.EMPTY_BUFFER;
}
if (data.readableBytes() > SpdyCodecUtil.SPDY_MAX_LENGTH) {
throw new IllegalArgumentException("data payload cannot exceed "
+ SpdyCodecUtil.SPDY_MAX_LENGTH + " bytes");
}
this.data = data;
public DefaultSpdyDataFrame copy() {
DefaultSpdyDataFrame frame = new DefaultSpdyDataFrame(getStreamId(), data().copy());
frame.setLast(isLast());
return frame;
}
@Override
@ -90,7 +99,11 @@ public class DefaultSpdyDataFrame implements SpdyDataFrame {
buf.append(streamId);
buf.append(StringUtil.NEWLINE);
buf.append("--> Size = ");
buf.append(data.readableBytes());
if (isFreed()) {
buf.append("(freed)");
} else {
buf.append(data().readableBytes());
}
return buf.toString();
}
}

View File

@ -16,12 +16,13 @@
package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
/**
* A SPDY Protocol Data Frame
*/
public interface SpdyDataFrame {
public interface SpdyDataFrame extends ByteBufHolder {
/**
* Returns the Stream-ID of this frame.
@ -47,13 +48,12 @@ public interface SpdyDataFrame {
/**
* Returns the data payload of this frame. If there is no data payload
* {@link Unpooled#EMPTY_BUFFER} is returned.
*/
ByteBuf getData();
/**
* Sets the data payload of this frame. If {@code null} is specified,
* the data payload will be set to {@link Unpooled#EMPTY_BUFFER}.
*
* The data payload cannot exceed 16777215 bytes.
*/
void setData(ByteBuf data);
@Override
ByteBuf data();
@Override
SpdyDataFrame copy();
}

View File

@ -281,8 +281,9 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
return null;
}
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID);
spdyDataFrame.setData(buffer.readBytes(dataLength));
ByteBuf data = ctx.alloc().buffer(dataLength);
data.writeBytes(buffer, dataLength);
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID, data);
length -= dataLength;
if (length == 0) {

View File

@ -82,7 +82,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
if (msg instanceof SpdyDataFrame) {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
ByteBuf data = spdyDataFrame.getData();
ByteBuf data = spdyDataFrame.data();
byte flags = spdyDataFrame.isLast() ? SPDY_DATA_FLAG_FIN : 0;
out.ensureWritableBytes(SPDY_HEADER_SIZE + data.readableBytes());
out.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF);

View File

@ -199,13 +199,13 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<Object> {
}
ByteBuf content = fullHttpMessage.data();
if (content.readableBytes() > maxContentLength - spdyDataFrame.getData().readableBytes()) {
if (content.readableBytes() > maxContentLength - spdyDataFrame.data().readableBytes()) {
messageMap.remove(streamID);
throw new TooLongFrameException(
"HTTP content length exceeded " + maxContentLength + " bytes.");
}
ByteBuf spdyDataFrameData = spdyDataFrame.getData();
ByteBuf spdyDataFrameData = spdyDataFrame.data();
int spdyDataFrameDataLen = spdyDataFrameData.readableBytes();
content.writeBytes(spdyDataFrameData, spdyDataFrameData.readerIndex(), spdyDataFrameDataLen);

View File

@ -164,10 +164,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object> {
if (msg instanceof HttpContent) {
HttpContent chunk = (HttpContent) msg;
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamId);
spdyDataFrame.setData(chunk.data());
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamId, chunk.data());
spdyDataFrame.setLast(chunk instanceof LastHttpContent);
if (chunk instanceof LastHttpContent) {
LastHttpContent trailer = (LastHttpContent) chunk;
List<Map.Entry<String, String>> trailers = trailer.trailingHeaders().entries();
@ -288,4 +286,13 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object> {
return spdySynReplyFrame;
}
@Override
protected void freeOutboundMessage(Object msg) throws Exception {
if (msg instanceof HttpContent) {
// Will be freed later as the content of them is just reused here
return;
}
super.freeOutboundMessage(msg);
}
}

View File

@ -61,4 +61,13 @@ public class SpdyHttpResponseStreamIdHandler extends
return msg;
}
@Override
protected void freeInboundMessage(Object msg) throws Exception {
// just pass through so no free
}
@Override
protected void freeOutboundMessage(HttpMessage msg) throws Exception {
// just pass through so no free
}
}

View File

@ -190,7 +190,7 @@ public class SpdySessionHandler
if (flowControl) {
// Update receive window size
int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
int deltaWindowSize = -1 * spdyDataFrame.data().readableBytes();
int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
// Window size can become negative if we sent a SETTINGS frame that reduces the
@ -206,9 +206,9 @@ public class SpdySessionHandler
// Window size became negative due to sender writing frame before receiving SETTINGS
// Send data frames upstream in initialReceiveWindowSize chunks
if (newWindowSize < 0) {
while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
while (spdyDataFrame.data().readableBytes() > initialReceiveWindowSize) {
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID,
spdyDataFrame.data().readSlice(initialReceiveWindowSize));
ctx.nextOutboundMessageBuffer().add(partialDataFrame);
ctx.flush();
}
@ -496,7 +496,7 @@ public class SpdySessionHandler
if (flowControl) {
synchronized (flowControlLock) {
int dataLength = spdyDataFrame.getData().readableBytes();
int dataLength = spdyDataFrame.data().readableBytes();
int sendWindowSize = spdySession.getSendWindowSize(streamID);
if (sendWindowSize >= dataLength) {
@ -524,8 +524,8 @@ public class SpdySessionHandler
spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID,
spdyDataFrame.data().readSlice(sendWindowSize));
// Enqueue the remaining data (will be the first frame queued)
spdySession.putPendingWrite(streamID, spdyDataFrame);
@ -814,7 +814,7 @@ public class SpdySessionHandler
break;
}
int dataFrameSize = spdyDataFrame.getData().readableBytes();
int dataFrameSize = spdyDataFrame.data().readableBytes();
if (newWindowSize >= dataFrameSize) {
// Window size is large enough to send entire data frame
@ -848,8 +848,8 @@ public class SpdySessionHandler
spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
// Create a partial data frame whose length is the current window size
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID,
spdyDataFrame.data().readSlice(newWindowSize));
// The transfer window size is pre-decremented when sending a data frame downstream.
// Close the stream on write failures that leaves the transfer window in a corrupt state.

View File

@ -150,7 +150,7 @@ public class WebSocketServerProtocolHandlerTest {
@Override
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
content = "processed: " + msg.getText();
content = "processed: " + msg.text();
}
public String getContent() {

View File

@ -145,7 +145,6 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
private void writeResponse(ChannelHandlerContext ctx, HttpObject currentObj) {
// Decide whether to close the connection or not.
boolean keepAlive = isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, currentObj.decoderResult().isSuccess()? OK : BAD_REQUEST,

View File

@ -93,17 +93,17 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.isFinalFragment(), frame.getRsv(), frame.getBinaryData()));
new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof TextWebSocketFrame) {
// String text = ((TextWebSocketFrame) frame).getText();
ctx.channel().write(
new TextWebSocketFrame(frame.isFinalFragment(), frame.getRsv(), frame.getBinaryData()));
new TextWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.channel().write(
new BinaryWebSocketFrame(frame.isFinalFragment(), frame.getRsv(), frame.getBinaryData()));
new BinaryWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.channel().write(
new ContinuationWebSocketFrame(frame.isFinalFragment(), frame.getRsv(), frame.getBinaryData()));
new ContinuationWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof PongWebSocketFrame) {
// Ignore
} else {
@ -136,4 +136,14 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
private static String getWebSocketLocation(FullHttpRequest req) {
return "ws://" + req.headers().get(HttpHeaders.Names.HOST);
}
@Override
protected void freeInboundMessage(Object msg) throws Exception {
if (!(msg instanceof PongWebSocketFrame) && msg instanceof WebSocketFrame) {
// will be freed once written by the encoder
return;
}
super.freeInboundMessage(msg);
}
}

View File

@ -97,7 +97,7 @@ public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.getText());
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {

View File

@ -23,7 +23,7 @@ public class CustomTextFrameHandler extends ChannelInboundMessageHandlerAdapter<
@Override
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
String request = frame.getText();
String request = frame.text();
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}

View File

@ -109,7 +109,7 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
ctx.channel().write(new PongWebSocketFrame(frame.data()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
@ -118,7 +118,7 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).getText();
String request = ((TextWebSocketFrame) frame).text();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s received %s", ctx.channel().id(), request));
}
@ -146,6 +146,15 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
ctx.close();
}
@Override
protected void freeInboundMessage(Object msg) throws Exception {
if (msg instanceof PingWebSocketFrame || msg instanceof CloseWebSocketFrame) {
// Will be freed once wrote back
return;
}
super.freeInboundMessage(msg);
}
private static String getWebSocketLocation(FullHttpRequest req) {
return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH;
}

View File

@ -111,7 +111,7 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
ctx.channel().write(new PongWebSocketFrame(frame.data()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
@ -120,7 +120,7 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).getText();
String request = ((TextWebSocketFrame) frame).text();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s received %s", ctx.channel().id(), request));
}
@ -151,4 +151,13 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
private static String getWebSocketLocation(FullHttpRequest req) {
return "wss://" + req.headers().get(HOST) + WEBSOCKET_PATH;
}
@Override
protected void freeInboundMessage(Object msg) throws Exception {
if (msg instanceof PingWebSocketFrame || msg instanceof CloseWebSocketFrame) {
// Will be freed once wrote back
return;
}
super.freeInboundMessage(msg);
}
}