diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java index 12253d0302..408da1423c 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java @@ -17,10 +17,8 @@ package io.netty.handler.codec.http; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.embedder.DecoderEmbedder; /** @@ -42,7 +40,7 @@ import io.netty.handler.codec.embedder.DecoderEmbedder; * so that this handler can intercept HTTP requests after {@link HttpMessageDecoder} * converts {@link ChannelBuffer}s into HTTP requests. */ -public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler { +public abstract class HttpContentDecoder extends MessageToMessageDecoder { private DecoderEmbedder decoder; @@ -53,11 +51,10 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - Object msg = e.getMessage(); + public Object decode(ChannelInboundHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) { // 100-continue response must be passed through. - ctx.sendUpstream(e); + return msg; } else if (msg instanceof HttpMessage) { HttpMessage m = (HttpMessage) msg; @@ -94,9 +91,6 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler { } } } - - // Because HttpMessage is a mutable object, we can simply forward the received event. - ctx.sendUpstream(e); } else if (msg instanceof HttpChunk) { HttpChunk c = (HttpChunk) msg; ChannelBuffer content = c.getContent(); @@ -107,7 +101,6 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler { content = decode(content); if (content.readable()) { c.setContent(content); - ctx.sendUpstream(e); } } else { ChannelBuffer lastProduct = finishDecode(); @@ -115,19 +108,14 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler { // Generate an additional chunk if the decoder produced // the last product on closure, if (lastProduct.readable()) { - Channels.fireMessageReceived( - ctx, new DefaultHttpChunk(lastProduct), e.getRemoteAddress()); + return new Object[] { new DefaultHttpChunk(lastProduct), c }; } - - // Emit the last chunk. - ctx.sendUpstream(e); } - } else { - ctx.sendUpstream(e); } - } else { - ctx.sendUpstream(e); } + + // Because HttpMessage and HttpChunk is a mutable object, we can simply forward it. + return msg; } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java index 191f0acfb5..9ea38ffdeb 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java @@ -15,17 +15,16 @@ */ package io.netty.handler.codec.http; -import java.util.Queue; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelHandler; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.embedder.EncoderEmbedder; import io.netty.util.internal.QueueFactory; +import java.util.Queue; + /** * Encodes the content of the outbound {@link HttpResponse} and {@link HttpChunk}. * The original content is replaced with the new content encoded by the @@ -48,7 +47,7 @@ import io.netty.util.internal.QueueFactory; * so that this handler can intercept HTTP responses before {@link HttpMessageEncoder} * converts them into {@link ChannelBuffer}s. */ -public abstract class HttpContentEncoder extends SimpleChannelHandler { +public abstract class HttpContentEncoder extends MessageToMessageCodec { private final Queue acceptEncodingQueue = QueueFactory.createQueue(String.class); private volatile EncoderEmbedder encoder; @@ -60,12 +59,10 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public Object decode(ChannelInboundHandlerContext ctx, Object msg) throws Exception { - Object msg = e.getMessage(); if (!(msg instanceof HttpMessage)) { - ctx.sendUpstream(e); - return; + return msg; } HttpMessage m = (HttpMessage) msg; @@ -75,18 +72,15 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler { } boolean offered = acceptEncodingQueue.offer(acceptedEncoding); assert offered; - - ctx.sendUpstream(e); + return m; } @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) + public Object encode(ChannelOutboundHandlerContext ctx, Object msg) throws Exception { - - Object msg = e.getMessage(); if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) { // 100-continue response must be passed through. - ctx.sendDownstream(e); + return msg; } else if (msg instanceof HttpMessage) { HttpMessage m = (HttpMessage) msg; @@ -100,14 +94,12 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler { boolean hasContent = m.isChunked() || m.getContent().readable(); if (!hasContent) { - ctx.sendDownstream(e); - return; + return m; } Result result = beginEncode(m, acceptEncoding); if (result == null) { - ctx.sendDownstream(e); - return; + return m; } encoder = result.getContentEncoder(); @@ -132,9 +124,6 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler { Integer.toString(content.readableBytes())); } } - - // Because HttpMessage is a mutable object, we can simply forward the write request. - ctx.sendDownstream(e); } else if (msg instanceof HttpChunk) { HttpChunk c = (HttpChunk) msg; ChannelBuffer content = c.getContent(); @@ -145,7 +134,6 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler { content = encode(content); if (content.readable()) { c.setContent(content); - ctx.sendDownstream(e); } } else { ChannelBuffer lastProduct = finishEncode(); @@ -153,19 +141,14 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler { // Generate an additional chunk if the decoder produced // the last product on closure, if (lastProduct.readable()) { - Channels.write( - ctx, Channels.succeededFuture(e.channel()), new DefaultHttpChunk(lastProduct), e.getRemoteAddress()); + return new Object[] { new DefaultHttpChunk(lastProduct), c }; } - - // Emit the last chunk. - ctx.sendDownstream(e); } - } else { - ctx.sendDownstream(e); } - } else { - ctx.sendDownstream(e); } + + // Because HttpMessage and HttpChunk is a mutable object, we can simply forward it. + return msg; } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java index 2050a66c20..18118264b1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageEncoder.java @@ -116,7 +116,6 @@ public abstract class HttpMessageEncoder extends MessageToStreamEncoder out.writeBytes(chunkContent, chunkContent.readerIndex(), chunkContent.readableBytes()); } } - } else { throw new UnsupportedMessageTypeException(msg, HttpMessage.class, HttpChunk.class); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java index 74b9f2eddd..9969cb3842 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java @@ -16,8 +16,7 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; import io.netty.util.VoidEnum; @@ -27,11 +26,11 @@ import io.netty.util.VoidEnum; *

* For the detailed instruction on adding add Web Socket support to your HTTP server, take a look into the * WebSocketServer example located in the {@code io.netty.example.http.websocket} package. - * + * * @apiviz.landmark * @apiviz.uses io.netty.handler.codec.http.websocket.WebSocketFrame */ -public class WebSocket00FrameDecoder extends ReplayingDecoder { +public class WebSocket00FrameDecoder extends ReplayingDecoder { private static final int DEFAULT_MAX_FRAME_SIZE = 16384; @@ -45,7 +44,7 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder { /** * Creates a new instance of {@code WebSocketFrameDecoder} with the specified {@code maxFrameSize}. If the client * sends a frame size larger than {@code maxFrameSize}, the channel will be closed. - * + * * @param maxFrameSize * the maximum frame size to decode */ @@ -54,23 +53,21 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder { } @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) - throws Exception { - + public WebSocketFrame decode(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { - buffer.skipBytes(actualReadableBytes()); + in.skipBytes(actualReadableBytes()); return null; } // Decode a frame otherwise. - byte type = buffer.readByte(); + byte type = in.readByte(); if ((type & 0x80) == 0x80) { // If the MSB on type is set, decode the frame length - return decodeBinaryFrame(type, buffer); + return decodeBinaryFrame(type, in); } else { // Decode a 0xff terminated UTF-8 string - return decodeTextFrame(buffer); + return decodeTextFrame(in); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java index ea0d36feb6..15caba3f52 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java @@ -16,82 +16,72 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToStreamEncoder; /** * Encodes a {@link WebSocketFrame} into a {@link ChannelBuffer}. *

* For the detailed instruction on adding add Web Socket support to your HTTP server, take a look into the * WebSocketServer example located in the {@code io.netty.example.http.websocket} package. - * + * * @apiviz.landmark * @apiviz.uses io.netty.handler.codec.http.websocket.WebSocketFrame */ @Sharable -public class WebSocket00FrameEncoder extends OneToOneEncoder { +public class WebSocket00FrameEncoder extends MessageToStreamEncoder { @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - if (msg instanceof WebSocketFrame) { - WebSocketFrame frame = (WebSocketFrame) msg; - if (frame instanceof TextWebSocketFrame) { - // Text frame - ChannelBuffer data = frame.getBinaryData(); - ChannelBuffer encoded = channel.getConfig().getBufferFactory() - .getBuffer(data.order(), data.readableBytes() + 2); - encoded.writeByte((byte) 0x00); - encoded.writeBytes(data, data.readerIndex(), data.readableBytes()); - encoded.writeByte((byte) 0xFF); - return encoded; - } else if (frame instanceof CloseWebSocketFrame) { - // Close frame - ChannelBuffer data = frame.getBinaryData(); - ChannelBuffer encoded = channel.getConfig().getBufferFactory().getBuffer(data.order(), 2); - encoded.writeByte((byte) 0xFF); - encoded.writeByte((byte) 0x00); - return encoded; - } else { - // Binary frame - ChannelBuffer data = frame.getBinaryData(); - int dataLen = data.readableBytes(); - ChannelBuffer encoded = channel.getConfig().getBufferFactory().getBuffer(data.order(), dataLen + 5); + public void encode( + ChannelOutboundHandlerContext ctx, + WebSocketFrame msg, ChannelBuffer out) throws Exception { + if (msg instanceof TextWebSocketFrame) { + // Text frame + ChannelBuffer data = msg.getBinaryData(); + out.writeByte((byte) 0x00); + out.writeBytes(data, data.readerIndex(), data.readableBytes()); + out.writeByte((byte) 0xFF); + } else if (msg instanceof CloseWebSocketFrame) { + // Close frame + out.writeByte((byte) 0xFF); + out.writeByte((byte) 0x00); + } else { + // Binary frame + ChannelBuffer data = msg.getBinaryData(); + int dataLen = data.readableBytes(); + out.ensureWritableBytes(dataLen + 5); - // Encode type. - encoded.writeByte((byte) 0x80); + // Encode type. + out.writeByte((byte) 0x80); - // Encode length. - int b1 = dataLen >>> 28 & 0x7F; - int b2 = dataLen >>> 14 & 0x7F; - int b3 = dataLen >>> 7 & 0x7F; - int b4 = dataLen & 0x7F; - if (b1 == 0) { - if (b2 == 0) { - if (b3 == 0) { - encoded.writeByte(b4); - } else { - encoded.writeByte(b3 | 0x80); - encoded.writeByte(b4); - } + // Encode length. + int b1 = dataLen >>> 28 & 0x7F; + int b2 = dataLen >>> 14 & 0x7F; + int b3 = dataLen >>> 7 & 0x7F; + int b4 = dataLen & 0x7F; + if (b1 == 0) { + if (b2 == 0) { + if (b3 == 0) { + out.writeByte(b4); } else { - encoded.writeByte(b2 | 0x80); - encoded.writeByte(b3 | 0x80); - encoded.writeByte(b4); + out.writeByte(b3 | 0x80); + out.writeByte(b4); } } else { - encoded.writeByte(b1 | 0x80); - encoded.writeByte(b2 | 0x80); - encoded.writeByte(b3 | 0x80); - encoded.writeByte(b4); + out.writeByte(b2 | 0x80); + out.writeByte(b3 | 0x80); + out.writeByte(b4); } - - // Encode binary data. - encoded.writeBytes(data, data.readerIndex(), dataLen); - return encoded; + } else { + out.writeByte(b1 | 0x80); + out.writeByte(b2 | 0x80); + out.writeByte(b3 | 0x80); + out.writeByte(b4); } + + // Encode binary data. + out.writeBytes(data, data.readerIndex(), dataLen); } - return msg; } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java index 85a3a427b2..7eb8320a3c 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java @@ -55,9 +55,8 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.CorruptedFrameException; import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; @@ -68,7 +67,7 @@ import io.netty.logging.InternalLoggerFactory; * Decodes a web socket frame from wire protocol version 8 format. This code was forked from webbit and modified. */ -public class WebSocket08FrameDecoder extends ReplayingDecoder { +public class WebSocket08FrameDecoder extends ReplayingDecoder { private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameDecoder.class); @@ -100,7 +99,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder ctx, ChannelBuffer in) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { - buffer.skipBytes(actualReadableBytes()); + in.skipBytes(actualReadableBytes()); return null; } - switch (state) { + switch (state()) { case FRAME_START: framePayloadBytesRead = 0; framePayloadLength = -1; framePayload = null; // FIN, RSV, OPCODE - byte b = buffer.readByte(); + byte b = in.readByte(); frameFinalFlag = (b & 0x80) != 0; frameRsv = (b & 0x70) >> 4; frameOpcode = b & 0x0F; @@ -140,36 +139,36 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder 7) { // control frame (have MSB in opcode set) // control frames MUST NOT be fragmented if (!frameFinalFlag) { - protocolViolation(channel, "fragmented control frame"); + protocolViolation(ctx, "fragmented control frame"); return null; } // control frames MUST have payload 125 octets or less if (framePayloadLen1 > 125) { - protocolViolation(channel, "control frame with payload length > 125 octets"); + protocolViolation(ctx, "control frame with payload length > 125 octets"); return null; } // check for reserved control frame opcodes if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) { - protocolViolation(channel, "control frame using reserved opcode " + frameOpcode); + protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode); return null; } @@ -177,43 +176,43 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder framePayloadLength) { // We have more than what we need so read up to the end of frame // Leave the remainder in the buffer for next frame - payloadBuffer = buffer.readBytes(toFrameLength(framePayloadLength - framePayloadBytesRead)); + payloadBuffer = in.readBytes(toFrameLength(framePayloadLength - framePayloadBytesRead)); } // Now we have all the data, the next checkpoint must be the next @@ -284,7 +283,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder ctx, String reason) throws CorruptedFrameException { checkpoint(State.CORRUPT); - if (channel.isConnected()) { - channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); - channel.close().awaitUninterruptibly(); + if (ctx.channel().isActive()) { + ctx.flush().addListener(ChannelFutureListener.CLOSE); } throw new CorruptedFrameException(reason); } - private int toFrameLength(long l) throws TooLongFrameException { + private static int toFrameLength(long l) throws TooLongFrameException { if (l > Integer.MAX_VALUE) { throw new TooLongFrameException("Length:" + l); } else { @@ -374,7 +372,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder ctx, byte[] bytes) throws CorruptedFrameException { try { // StringBuilder sb = new StringBuilder("UTF8 " + bytes.length + // " bytes: "); @@ -389,16 +387,16 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder ctx, ChannelBuffer buffer) throws CorruptedFrameException { if (buffer == null || buffer.capacity() == 0) { return; } if (buffer.capacity() == 1) { - protocolViolation(channel, "Invalid close frame body"); + protocolViolation(ctx, "Invalid close frame body"); } // Save reader index @@ -407,9 +405,9 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder= 0 && statusCode <= 999) || (statusCode >= 1004 && statusCode <= 1006) - || (statusCode >= 1012 && statusCode <= 2999)) { - protocolViolation(channel, "Invalid close frame status code: " + statusCode); + if (statusCode >= 0 && statusCode <= 999 || statusCode >= 1004 && statusCode <= 1006 + || statusCode >= 1012 && statusCode <= 2999) { + protocolViolation(ctx, "Invalid close frame status code: " + statusCode); } // May have UTF-8 message @@ -419,10 +417,10 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder * Encodes a web socket frame into wire protocol version 8 format. This code was forked from webbit and modified. *

*/ -public class WebSocket08FrameEncoder extends OneToOneEncoder { +public class WebSocket08FrameEncoder extends MessageToStreamEncoder { private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameEncoder.class); @@ -85,7 +84,7 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder { /** * Constructor - * + * * @param maskPayload * Web socket clients must set this to true to mask payload. Server implementations must set this to * false. @@ -95,94 +94,83 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder { } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - + public void encode(ChannelOutboundHandlerContext ctx, + WebSocketFrame msg, ChannelBuffer out) throws Exception { byte[] mask; - if (msg instanceof WebSocketFrame) { - WebSocketFrame frame = (WebSocketFrame) msg; - ChannelBuffer data = frame.getBinaryData(); - if (data == null) { - data = ChannelBuffers.EMPTY_BUFFER; - } - - byte opcode; - if (frame instanceof TextWebSocketFrame) { - opcode = OPCODE_TEXT; - } else if (frame instanceof PingWebSocketFrame) { - opcode = OPCODE_PING; - } else if (frame instanceof PongWebSocketFrame) { - opcode = OPCODE_PONG; - } else if (frame instanceof CloseWebSocketFrame) { - opcode = OPCODE_CLOSE; - } else if (frame instanceof BinaryWebSocketFrame) { - opcode = OPCODE_BINARY; - } else if (frame instanceof ContinuationWebSocketFrame) { - opcode = OPCODE_CONT; - } else { - throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName()); - } - - int length = data.readableBytes(); - - if (logger.isDebugEnabled()) { - logger.debug("Encoding WebSocket Frame opCode=" + opcode + " length=" + length); - } - - int b0 = 0; - if (frame.isFinalFragment()) { - b0 |= 1 << 7; - } - b0 |= frame.getRsv() % 8 << 4; - b0 |= opcode % 128; - - ChannelBuffer header; - ChannelBuffer body; - - if (opcode == OPCODE_PING && length > 125) { - throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was " - + length); - } - - int maskLength = maskPayload ? 4 : 0; - if (length <= 125) { - header = ChannelBuffers.buffer(2 + maskLength); - header.writeByte(b0); - byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length); - header.writeByte(b); - } else if (length <= 0xFFFF) { - header = ChannelBuffers.buffer(4 + maskLength); - header.writeByte(b0); - header.writeByte(maskPayload ? 0xFE : 126); - header.writeByte(length >>> 8 & 0xFF); - header.writeByte(length & 0xFF); - } else { - header = ChannelBuffers.buffer(10 + maskLength); - header.writeByte(b0); - header.writeByte(maskPayload ? 0xFF : 127); - header.writeLong(length); - } - - // Write payload - if (maskPayload) { - Integer random = (int) (Math.random() * Integer.MAX_VALUE); - mask = ByteBuffer.allocate(4).putInt(random).array(); - header.writeBytes(mask); - - body = ChannelBuffers.buffer(length); - int counter = 0; - while (data.readableBytes() > 0) { - byte byteData = data.readByte(); - body.writeByte(byteData ^ mask[+counter++ % 4]); - } - } else { - body = data; - } - return ChannelBuffers.wrappedBuffer(header, body); + WebSocketFrame frame = msg; + ChannelBuffer data = frame.getBinaryData(); + if (data == null) { + data = ChannelBuffers.EMPTY_BUFFER; } - // If not websocket, then just return the message - return msg; - } + byte opcode; + if (frame instanceof TextWebSocketFrame) { + opcode = OPCODE_TEXT; + } else if (frame instanceof PingWebSocketFrame) { + opcode = OPCODE_PING; + } else if (frame instanceof PongWebSocketFrame) { + opcode = OPCODE_PONG; + } else if (frame instanceof CloseWebSocketFrame) { + opcode = OPCODE_CLOSE; + } else if (frame instanceof BinaryWebSocketFrame) { + opcode = OPCODE_BINARY; + } else if (frame instanceof ContinuationWebSocketFrame) { + opcode = OPCODE_CONT; + } else { + throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName()); + } + int length = data.readableBytes(); + + if (logger.isDebugEnabled()) { + logger.debug("Encoding WebSocket Frame opCode=" + opcode + " length=" + length); + } + + int b0 = 0; + if (frame.isFinalFragment()) { + b0 |= 1 << 7; + } + b0 |= frame.getRsv() % 8 << 4; + b0 |= opcode % 128; + + if (opcode == OPCODE_PING && length > 125) { + throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was " + + length); + } + + int maskLength = maskPayload ? 4 : 0; + if (length <= 125) { + out.ensureWritableBytes(2 + maskLength + length); + out.writeByte(b0); + byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length); + out.writeByte(b); + } else if (length <= 0xFFFF) { + out.ensureWritableBytes(4 + maskLength + length); + out.writeByte(b0); + out.writeByte(maskPayload ? 0xFE : 126); + out.writeByte(length >>> 8 & 0xFF); + out.writeByte(length & 0xFF); + } else { + out.ensureWritableBytes(10 + maskLength + length); + out.writeByte(b0); + out.writeByte(maskPayload ? 0xFF : 127); + out.writeLong(length); + } + + // Write payload + if (maskPayload) { + int random = (int) (Math.random() * Integer.MAX_VALUE); + mask = ByteBuffer.allocate(4).putInt(random).array(); + out.writeInt((int) (Math.random() * Integer.MAX_VALUE)); + + int counter = 0; + for (int i = data.readerIndex(); i < data.writerIndex(); i ++) { + byte byteData = data.getByte(i); + out.writeByte(byteData ^ mask[+counter++ % 4]); + } + } else { + out.writeBytes(data, data.readerIndex(), data.readableBytes()); + } + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java index 6d3da03dc2..ff6c31cdd6 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java @@ -18,7 +18,6 @@ package io.netty.handler.codec.http.websocketx; import static io.netty.handler.codec.http.HttpHeaders.Names.*; import static io.netty.handler.codec.http.HttpHeaders.Values.*; import static io.netty.handler.codec.http.HttpVersion.*; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; @@ -52,7 +51,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker { /** * Constructor specifying the destination web socket location - * + * * @param webSocketURL * URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be * sent to this URL. @@ -70,11 +69,11 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker { * is really a rehash of hixie-76 and * hixie-75. *

- * + * *

* Browser request to the server: *

- * + * *
      * GET /demo HTTP/1.1
      * Upgrade: WebSocket
@@ -84,14 +83,14 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
      * Sec-WebSocket-Protocol: chat, sample
      * Sec-WebSocket-Key1: 4 @1  46546xW%0l 1 5
      * Sec-WebSocket-Key2: 12998 5 Y3 1  .P00
-     * 
+     *
      * ^n:ds[4U
      * 
- * + * *

* Server response: *

- * + * *
      * HTTP/1.1 101 WebSocket Protocol Handshake
      * Upgrade: WebSocket
@@ -99,10 +98,10 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
      * Sec-WebSocket-Origin: http://example.com
      * Sec-WebSocket-Location: ws://example.com/demo
      * Sec-WebSocket-Protocol: sample
-     * 
+     *
      * 8jKS'y:G*Co,Wxa-
      * 
- * + * * @param channel * Channel * @param req @@ -112,7 +111,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker { public ChannelFuture handshake(Channel channel, HttpRequest req) { if (logger.isDebugEnabled()) { - logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.getId())); + logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.id())); } // Serve the WebSocket handshake request. @@ -178,7 +177,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker { /** * Echo back the closing frame - * + * * @param channel * Channel * @param frame diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java index a9286374f8..2e5b328a4b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java @@ -17,7 +17,6 @@ package io.netty.handler.codec.http.websocketx; import static io.netty.handler.codec.http.HttpHeaders.Values.*; import static io.netty.handler.codec.http.HttpVersion.*; - import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -51,7 +50,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker { /** * Constructor specifying the destination web socket location - * + * * @param webSocketURL * URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be * sent to this URL. @@ -71,11 +70,11 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker { * "http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-08">HyBi version 8 to 10. Version 8, 9 and * 10 share the same wire protocol. *

- * + * *

* Browser request to the server: *

- * + * *
      * GET /chat HTTP/1.1
      * Host: server.example.com
@@ -86,11 +85,11 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
      * Sec-WebSocket-Protocol: chat, superchat
      * Sec-WebSocket-Version: 8
      * 
- * + * *

* Server response: *

- * + * *
      * HTTP/1.1 101 Switching Protocols
      * Upgrade: websocket
@@ -98,7 +97,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
      * Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
      * Sec-WebSocket-Protocol: chat
      * 
- * + * * @param channel * Channel * @param req @@ -108,7 +107,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker { public ChannelFuture handshake(Channel channel, HttpRequest req) { if (logger.isDebugEnabled()) { - logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.getId())); + logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.id())); } HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS); @@ -150,7 +149,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker { /** * Echo back the closing frame and close the connection - * + * * @param channel * Channel * @param frame diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java index 52749657b2..10ddb243f9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java @@ -15,21 +15,20 @@ */ package io.netty.handler.codec.http.websocketx; -import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - +import static io.netty.handler.codec.http.HttpHeaders.Values.*; +import static io.netty.handler.codec.http.HttpVersion.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.http.HttpChunkAggregator; import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpChunkAggregator; +import io.netty.handler.codec.http.HttpHeaders.Names; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpHeaders.Names; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.CharsetUtil; @@ -52,7 +51,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker { /** * Constructor specifying the destination web socket location - * + * * @param webSocketURL * URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be * sent to this URL. @@ -72,11 +71,11 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker { * "http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17">HyBi versions 13-17. Versions 13-17 * share the same wire protocol. *

- * + * *

* Browser request to the server: *

- * + * *
      * GET /chat HTTP/1.1
      * Host: server.example.com
@@ -87,11 +86,11 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
      * Sec-WebSocket-Protocol: chat, superchat
      * Sec-WebSocket-Version: 13
      * 
- * + * *

* Server response: *

- * + * *
      * HTTP/1.1 101 Switching Protocols
      * Upgrade: websocket
@@ -99,7 +98,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
      * Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
      * Sec-WebSocket-Protocol: chat
      * 
- * + * * @param channel * Channel * @param req @@ -109,7 +108,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker { public ChannelFuture handshake(Channel channel, HttpRequest req) { if (logger.isDebugEnabled()) { - logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.getId())); + logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.id())); } HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS); @@ -151,7 +150,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker { /** * Echo back the closing frame and close the connection - * + * * @param channel * Channel * @param frame diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageDecoder.java index 674e4d1744..64ef3bd713 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageDecoder.java @@ -16,8 +16,7 @@ package io.netty.handler.codec.rtsp; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.handler.codec.http.HttpChunkAggregator; @@ -74,10 +73,10 @@ public abstract class RtspMessageDecoder extends HttpMessageDecoder { aggregator = new DecoderEmbedder(new HttpChunkAggregator(maxContentLength)); } + @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer, State state) throws Exception { - Object o = super.decode(ctx, channel, buffer, state); + public Object decode(ChannelInboundHandlerContext ctx, ChannelBuffer buffer) throws Exception { + Object o = super.decode(ctx, buffer); if (o != null && aggregator.offer(o)) { return aggregator.poll(); } else { diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageEncoder.java index 214a5f7f8d..59769940fe 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspMessageEncoder.java @@ -16,9 +16,9 @@ package io.netty.handler.codec.rtsp; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.UnsupportedMessageTypeException; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpMessageEncoder; @@ -39,12 +39,13 @@ public abstract class RtspMessageEncoder extends HttpMessageEncoder { } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { + public void encode(ChannelOutboundHandlerContext ctx, Object msg, + ChannelBuffer out) throws Exception { // Ignore unrelated message types such as HttpChunk. if (!(msg instanceof HttpMessage)) { - return msg; + throw new UnsupportedMessageTypeException(msg, HttpMessage.class); } - return super.encode(ctx, channel, msg); + + super.encode(ctx, msg, out); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java index 8fe63d66c2..5a621e2962 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java @@ -15,21 +15,14 @@ */ package io.netty.handler.codec.spdy; -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelUpstreamHandler; +import io.netty.channel.CombinedChannelHandler; /** * A combination of {@link SpdyFrameDecoder} and {@link SpdyFrameEncoder}. * @apiviz.has io.netty.handler.codec.spdy.SpdyFrameDecoder * @apiviz.has io.netty.handler.codec.spdy.SpdyFrameEncoder */ -public class SpdyFrameCodec implements ChannelUpstreamHandler, - ChannelDownstreamHandler { - - private final SpdyFrameDecoder decoder; - private final SpdyFrameEncoder encoder; +public class SpdyFrameCodec extends CombinedChannelHandler { /** * Creates a new instance with the default decoder and encoder options @@ -47,17 +40,8 @@ public class SpdyFrameCodec implements ChannelUpstreamHandler, public SpdyFrameCodec( int maxChunkSize, int maxFrameSize, int maxHeaderSize, int compressionLevel, int windowBits, int memLevel) { - decoder = new SpdyFrameDecoder(maxChunkSize, maxFrameSize, maxHeaderSize); - encoder = new SpdyFrameEncoder(compressionLevel, windowBits, memLevel); - } - - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - decoder.handleUpstream(ctx, e); - } - - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - encoder.handleDownstream(ctx, e); + super( + new SpdyFrameDecoder(maxChunkSize, maxFrameSize, maxHeaderSize), + new SpdyFrameEncoder(compressionLevel, windowBits, memLevel)); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java index a34e89c91d..9b29a8d29f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java @@ -15,18 +15,16 @@ */ package io.netty.handler.codec.spdy; +import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.FrameDecoder; - -import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.handler.codec.StreamToMessageDecoder; /** * Decodes {@link ChannelBuffer}s into SPDY Data and Control Frames. */ -public class SpdyFrameDecoder extends FrameDecoder { +public class SpdyFrameDecoder extends StreamToMessageDecoder { private final int maxChunkSize; private final int maxFrameSize; @@ -48,7 +46,6 @@ public class SpdyFrameDecoder extends FrameDecoder { */ public SpdyFrameDecoder( int maxChunkSize, int maxFrameSize, int maxHeaderSize) { - super(true); // Enable unfold for data frames if (maxChunkSize <= 0) { throw new IllegalArgumentException( "maxChunkSize must be a positive integer: " + maxChunkSize); @@ -67,32 +64,27 @@ public class SpdyFrameDecoder extends FrameDecoder { } @Override - protected Object decodeLast( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) - throws Exception { + public Object decodeLast(ChannelInboundHandlerContext ctx, + ChannelBuffer in) throws Exception { try { - Object frame = decode(ctx, channel, buffer); - return frame; + return decode(ctx, in); } finally { headerBlockDecompressor.end(); } } - @Override - protected Object decode( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) - throws Exception { - + public Object decode(ChannelInboundHandlerContext ctx, + ChannelBuffer in) throws Exception { // Must read common header to determine frame length - if (buffer.readableBytes() < SPDY_HEADER_SIZE) { + if (in.readableBytes() < SPDY_HEADER_SIZE) { return null; } // Get frame length from common header - int frameOffset = buffer.readerIndex(); + int frameOffset = in.readerIndex(); int lengthOffset = frameOffset + SPDY_HEADER_LENGTH_OFFSET; - int dataLength = getUnsignedMedium(buffer, lengthOffset); + int dataLength = getUnsignedMedium(in, lengthOffset); int frameLength = SPDY_HEADER_SIZE + dataLength; // Throw exception if frameLength exceeds maxFrameSize @@ -102,37 +94,37 @@ public class SpdyFrameDecoder extends FrameDecoder { } // Wait until entire frame is readable - if (buffer.readableBytes() < frameLength) { + if (in.readableBytes() < frameLength) { return null; } // Read common header fields - boolean control = (buffer.getByte(frameOffset) & 0x80) != 0; + boolean control = (in.getByte(frameOffset) & 0x80) != 0; int flagsOffset = frameOffset + SPDY_HEADER_FLAGS_OFFSET; - byte flags = buffer.getByte(flagsOffset); + byte flags = in.getByte(flagsOffset); if (control) { // Decode control frame common header - int version = getUnsignedShort(buffer, frameOffset) & 0x7FFF; + int version = getUnsignedShort(in, frameOffset) & 0x7FFF; // Spdy versioning spec is broken if (version != SPDY_VERSION) { - buffer.skipBytes(frameLength); + in.skipBytes(frameLength); throw new SpdyProtocolException( "Unsupported version: " + version); } int typeOffset = frameOffset + SPDY_HEADER_TYPE_OFFSET; - int type = getUnsignedShort(buffer, typeOffset); - buffer.skipBytes(SPDY_HEADER_SIZE); + int type = getUnsignedShort(in, typeOffset); + in.skipBytes(SPDY_HEADER_SIZE); - int readerIndex = buffer.readerIndex(); - buffer.skipBytes(dataLength); - return decodeControlFrame(type, flags, buffer.slice(readerIndex, dataLength)); + int readerIndex = in.readerIndex(); + in.skipBytes(dataLength); + return decodeControlFrame(type, flags, in.slice(readerIndex, dataLength)); } else { // Decode data frame common header - int streamID = getUnsignedInt(buffer, frameOffset); - buffer.skipBytes(SPDY_HEADER_SIZE); + int streamID = getUnsignedInt(in, frameOffset); + in.skipBytes(SPDY_HEADER_SIZE); // Generate data frames that do not exceed maxChunkSize int numFrames = dataLength / maxChunkSize; @@ -144,7 +136,7 @@ public class SpdyFrameDecoder extends FrameDecoder { int chunkSize = Math.min(maxChunkSize, dataLength); SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID); spdyDataFrame.setCompressed((flags & SPDY_DATA_FLAG_COMPRESS) != 0); - spdyDataFrame.setData(buffer.readBytes(chunkSize)); + spdyDataFrame.setData(in.readBytes(chunkSize)); dataLength -= chunkSize; if (dataLength == 0) { spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0); @@ -224,8 +216,8 @@ public class SpdyFrameDecoder extends FrameDecoder { // Each ID/Value entry is 8 bytes // The number of entries cannot exceed SPDY_MAX_LENGTH / 8; int numEntries = getUnsignedInt(data, data.readerIndex()); - if ((numEntries > (SPDY_MAX_LENGTH - 4) / 8) || - (data.readableBytes() != numEntries * 8 + 4)) { + if (numEntries > (SPDY_MAX_LENGTH - 4) / 8 || + data.readableBytes() != numEntries * 8 + 4) { throw new SpdyProtocolException( "Received invalid SETTINGS control frame"); } @@ -240,14 +232,14 @@ public class SpdyFrameDecoder extends FrameDecoder { // Chromium Issue 79156 // SPDY setting ids are not written in network byte order // Read id assuming the architecture is little endian - int ID = (data.readByte() & 0xFF) | + int ID = data.readByte() & 0xFF | (data.readByte() & 0xFF) << 8 | (data.readByte() & 0xFF) << 16; byte ID_flags = data.readByte(); int value = getSignedInt(data, data.readerIndex()); data.skipBytes(4); - if (!(spdySettingsFrame.isSet(ID))) { + if (!spdySettingsFrame.isSet(ID)) { boolean persistVal = (ID_flags & SPDY_SETTINGS_PERSIST_VALUE) != 0; boolean persisted = (ID_flags & SPDY_SETTINGS_PERSISTED) != 0; spdySettingsFrame.setValue(ID, value, persistVal, persisted); @@ -321,8 +313,8 @@ public class SpdyFrameDecoder extends FrameDecoder { private void decodeHeaderBlock(SpdyHeaderBlock headerFrame, ChannelBuffer headerBlock) throws Exception { - if ((headerBlock.readableBytes() == 2) && - (headerBlock.getShort(headerBlock.readerIndex()) == 0)) { + if (headerBlock.readableBytes() == 2 && + headerBlock.getShort(headerBlock.readerIndex()) == 0) { return; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 4b8e95393a..478a645b54 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -16,22 +16,20 @@ package io.netty.handler.codec.spdy; import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToStreamEncoder; +import io.netty.handler.codec.UnsupportedMessageTypeException; import java.nio.ByteOrder; import java.util.Set; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.handler.codec.oneone.OneToOneEncoder; - /** * Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}. */ -public class SpdyFrameEncoder extends OneToOneEncoder { +public class SpdyFrameEncoder extends MessageToStreamEncoder { private volatile boolean finished; private final SpdyHeaderBlockCompressor headerBlockCompressor; @@ -52,48 +50,46 @@ public class SpdyFrameEncoder extends OneToOneEncoder { headerBlockCompressor = SpdyHeaderBlockCompressor.newInstance(compressionLevel, windowBits, memLevel); } + @Override - public void handleDownstream( - ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { - if (evt instanceof ChannelStateEvent) { - ChannelStateEvent e = (ChannelStateEvent) evt; - switch (e.state()) { - case OPEN: - case CONNECTED: - case BOUND: - if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) { - synchronized (headerBlockCompressor) { - finished = true; - headerBlockCompressor.end(); - } - } - } - } - super.handleDownstream(ctx, evt); + public void disconnect(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + finish(); + super.disconnect(ctx, future); } @Override - protected Object encode( - ChannelHandlerContext ctx, Channel channel, Object msg) - throws Exception { + public void close(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + finish(); + super.close(ctx, future); + } + private void finish() { + synchronized (headerBlockCompressor) { + if (!finished) { + finished = true; + headerBlockCompressor.end(); + } + } + } + + @Override + public void encode(ChannelOutboundHandlerContext ctx, Object msg, + ChannelBuffer out) throws Exception { if (msg instanceof SpdyDataFrame) { - SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; ChannelBuffer data = spdyDataFrame.getData(); byte flags = spdyDataFrame.isLast() ? SPDY_DATA_FLAG_FIN : 0; if (spdyDataFrame.isCompressed()) { flags |= SPDY_DATA_FLAG_COMPRESS; } - ChannelBuffer header = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE); - header.writeInt(spdyDataFrame.getStreamID() & 0x7FFFFFFF); - header.writeByte(flags); - header.writeMedium(data.readableBytes()); - return ChannelBuffers.wrappedBuffer(header, data); - + out.ensureWritableBytes(SPDY_HEADER_SIZE + data.readableBytes()); + out.writeInt(spdyDataFrame.getStreamID() & 0x7FFFFFFF); + out.writeByte(flags); + out.writeMedium(data.readableBytes()); + out.writeBytes(data, data.readerIndex(), data.readableBytes()); } else if (msg instanceof SpdySynStreamFrame) { - SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; ChannelBuffer data = compressHeaderBlock( encodeHeaderBlock(spdySynStreamFrame)); @@ -102,70 +98,60 @@ public class SpdyFrameEncoder extends OneToOneEncoder { flags |= SPDY_FLAG_UNIDIRECTIONAL; } int headerBlockLength = data.readableBytes(); - int length = (headerBlockLength == 0) ? 12 : 10 + headerBlockLength; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_SYN_STREAM_FRAME); - frame.writeByte(flags); - frame.writeMedium(length); - frame.writeInt(spdySynStreamFrame.getStreamID()); - frame.writeInt(spdySynStreamFrame.getAssociatedToStreamID()); - frame.writeShort(((short) spdySynStreamFrame.getPriority()) << 14); + int length = headerBlockLength == 0 ? 12 : 10 + headerBlockLength; + out.ensureWritableBytes(SPDY_HEADER_SIZE + length + data.readableBytes()); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_SYN_STREAM_FRAME); + out.writeByte(flags); + out.writeMedium(length); + out.writeInt(spdySynStreamFrame.getStreamID()); + out.writeInt(spdySynStreamFrame.getAssociatedToStreamID()); + out.writeShort(spdySynStreamFrame.getPriority() << 14); if (data.readableBytes() == 0) { - frame.writeShort(0); + out.writeShort(0); + } else { + out.writeBytes(data, data.readerIndex(), data.readableBytes()); } - return ChannelBuffers.wrappedBuffer(frame, data); - } else if (msg instanceof SpdySynReplyFrame) { - SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg; ChannelBuffer data = compressHeaderBlock( encodeHeaderBlock(spdySynReplyFrame)); byte flags = spdySynReplyFrame.isLast() ? SPDY_FLAG_FIN : 0; int headerBlockLength = data.readableBytes(); - int length = (headerBlockLength == 0) ? 8 : 6 + headerBlockLength; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_SYN_REPLY_FRAME); - frame.writeByte(flags); - frame.writeMedium(length); - frame.writeInt(spdySynReplyFrame.getStreamID()); + int length = headerBlockLength == 0 ? 8 : 6 + headerBlockLength; + out.ensureWritableBytes(SPDY_HEADER_SIZE + length + data.readableBytes()); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_SYN_REPLY_FRAME); + out.writeByte(flags); + out.writeMedium(length); + out.writeInt(spdySynReplyFrame.getStreamID()); if (data.readableBytes() == 0) { - frame.writeInt(0); + out.writeInt(0); } else { - frame.writeShort(0); + out.writeShort(0); + out.writeBytes(data, data.readerIndex(), data.readableBytes()); } - return ChannelBuffers.wrappedBuffer(frame, data); - } else if (msg instanceof SpdyRstStreamFrame) { - SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 8); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_RST_STREAM_FRAME); - frame.writeInt(8); - frame.writeInt(spdyRstStreamFrame.getStreamID()); - frame.writeInt(spdyRstStreamFrame.getStatus().getCode()); - return frame; - + out.ensureWritableBytes(SPDY_HEADER_SIZE + 8); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_RST_STREAM_FRAME); + out.writeInt(8); + out.writeInt(spdyRstStreamFrame.getStreamID()); + out.writeInt(spdyRstStreamFrame.getStatus().getCode()); } else if (msg instanceof SpdySettingsFrame) { - SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg; byte flags = spdySettingsFrame.clearPreviouslyPersistedSettings() ? SPDY_SETTINGS_CLEAR : 0; Set IDs = spdySettingsFrame.getIDs(); int numEntries = IDs.size(); int length = 4 + numEntries * 8; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_SETTINGS_FRAME); - frame.writeByte(flags); - frame.writeMedium(length); - frame.writeInt(numEntries); + out.ensureWritableBytes(SPDY_HEADER_SIZE + length); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_SETTINGS_FRAME); + out.writeByte(flags); + out.writeMedium(length); + out.writeInt(numEntries); for (Integer ID: IDs) { int id = ID.intValue(); byte ID_flags = (byte) 0; @@ -178,69 +164,53 @@ public class SpdyFrameEncoder extends OneToOneEncoder { // Chromium Issue 79156 // SPDY setting ids are not written in network byte order // Write id assuming the architecture is little endian - frame.writeByte((id >> 0) & 0xFF); - frame.writeByte((id >> 8) & 0xFF); - frame.writeByte((id >> 16) & 0xFF); - frame.writeByte(ID_flags); - frame.writeInt(spdySettingsFrame.getValue(id)); + out.writeByte(id >> 0 & 0xFF); + out.writeByte(id >> 8 & 0xFF); + out.writeByte(id >> 16 & 0xFF); + out.writeByte(ID_flags); + out.writeInt(spdySettingsFrame.getValue(id)); } - return frame; - } else if (msg instanceof SpdyNoOpFrame) { - - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_NOOP_FRAME); - frame.writeInt(0); - return frame; - + out.ensureWritableBytes(SPDY_HEADER_SIZE); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_NOOP_FRAME); + out.writeInt(0); } else if (msg instanceof SpdyPingFrame) { - SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 4); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_PING_FRAME); - frame.writeInt(4); - frame.writeInt(spdyPingFrame.getID()); - return frame; - + out.ensureWritableBytes(SPDY_HEADER_SIZE + 4); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_PING_FRAME); + out.writeInt(4); + out.writeInt(spdyPingFrame.getID()); } else if (msg instanceof SpdyGoAwayFrame) { - SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 4); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_GOAWAY_FRAME); - frame.writeInt(4); - frame.writeInt(spdyGoAwayFrame.getLastGoodStreamID()); - return frame; - + out.ensureWritableBytes(SPDY_HEADER_SIZE + 4); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_GOAWAY_FRAME); + out.writeInt(4); + out.writeInt(spdyGoAwayFrame.getLastGoodStreamID()); } else if (msg instanceof SpdyHeadersFrame) { - SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg; ChannelBuffer data = compressHeaderBlock( encodeHeaderBlock(spdyHeadersFrame)); int headerBlockLength = data.readableBytes(); - int length = (headerBlockLength == 0) ? 4 : 6 + headerBlockLength; - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(SPDY_VERSION | 0x8000); - frame.writeShort(SPDY_HEADERS_FRAME); - frame.writeInt(length); - frame.writeInt(spdyHeadersFrame.getStreamID()); + int length = headerBlockLength == 0 ? 4 : 6 + headerBlockLength; + out.ensureWritableBytes(SPDY_HEADER_SIZE + length + data.readableBytes()); + out.writeShort(SPDY_VERSION | 0x8000); + out.writeShort(SPDY_HEADERS_FRAME); + out.writeInt(length); + out.writeInt(spdyHeadersFrame.getStreamID()); if (data.readableBytes() != 0) { - frame.writeShort(0); + out.writeShort(0); + out.writeBytes(data, data.readerIndex(), data.readableBytes()); } - return ChannelBuffers.wrappedBuffer(frame, data); + } else { + // Unknown message type + throw new UnsupportedMessageTypeException(msg); } - - // Unknown message type - return msg; } - private ChannelBuffer encodeHeaderBlock(SpdyHeaderBlock headerFrame) + private static ChannelBuffer encodeHeaderBlock(SpdyHeaderBlock headerFrame) throws Exception { Set names = headerFrame.getHeaderNames(); int numHeaders = names.size(); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java index 1e56429002..aebaf5ae62 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java @@ -15,35 +15,19 @@ */ package io.netty.handler.codec.spdy; -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelUpstreamHandler; +import io.netty.channel.CombinedChannelHandler; /** * A combination of {@link SpdyHttpDecoder} and {@link SpdyHttpEncoder} * @apiviz.has io.netty.handler.codec.sdpy.SpdyHttpDecoder * @apiviz.has io.netty.handler.codec.spdy.SpdyHttpEncoder */ -public class SpdyHttpCodec implements ChannelUpstreamHandler, ChannelDownstreamHandler { - - private final SpdyHttpDecoder decoder; - private final SpdyHttpEncoder encoder = new SpdyHttpEncoder(); +public class SpdyHttpCodec extends CombinedChannelHandler { /** * Creates a new instance with the specified decoder options. */ public SpdyHttpCodec(int maxContentLength) { - decoder = new SpdyHttpDecoder(maxContentLength); - } - - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - decoder.handleUpstream(ctx, e); - } - - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - encoder.handleDownstream(ctx, e); + super(new SpdyHttpDecoder(maxContentLength), new SpdyHttpEncoder()); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java index 7ceae7ff2c..d227c8a12b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpDecoder.java @@ -15,15 +15,10 @@ */ package io.netty.handler.codec.spdy; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.Channels; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultHttpResponse; @@ -34,13 +29,16 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.oneone.OneToOneDecoder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Decodes {@link SpdySynStreamFrame}s, {@link SpdySynReplyFrame}s, * and {@link SpdyDataFrame}s into {@link HttpRequest}s and {@link HttpResponse}s. */ -public class SpdyHttpDecoder extends OneToOneDecoder { +public class SpdyHttpDecoder extends MessageToMessageDecoder { private final int maxContentLength; private final Map messageMap = new HashMap(); @@ -61,8 +59,9 @@ public class SpdyHttpDecoder extends OneToOneDecoder { this.maxContentLength = maxContentLength; } + @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) + public Object decode(ChannelInboundHandlerContext ctx, Object msg) throws Exception { if (msg instanceof SpdySynStreamFrame) { @@ -80,7 +79,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { if (associatedToStreamID == 0) { SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.INVALID_STREAM); - Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame); + ctx.write(spdyRstStreamFrame); } String URL = SpdyHeaders.getUrl(spdySynStreamFrame); @@ -90,7 +89,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { if (URL == null) { SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.PROTOCOL_ERROR); - Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame); + ctx.write(spdyRstStreamFrame); } try { @@ -112,7 +111,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { } catch (Exception e) { SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.PROTOCOL_ERROR); - Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame); + ctx.write(spdyRstStreamFrame); } } else { @@ -137,7 +136,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { spdySynReplyFrame.setLast(true); SpdyHeaders.setStatus(spdySynReplyFrame, HttpResponseStatus.BAD_REQUEST); SpdyHeaders.setVersion(spdySynReplyFrame, HttpVersion.HTTP_1_0); - Channels.write(ctx, Channels.future(channel), spdySynReplyFrame); + ctx.write(spdySynReplyFrame); } } @@ -164,7 +163,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { // the client must reply with a RST_STREAM frame indicating a PROTOCOL_ERROR SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.PROTOCOL_ERROR); - Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame); + ctx.write(spdyRstStreamFrame); } } else if (msg instanceof SpdyHeadersFrame) { @@ -203,8 +202,9 @@ public class SpdyHttpDecoder extends OneToOneDecoder { } if (content == ChannelBuffers.EMPTY_BUFFER) { - content = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory()); - content.writeBytes(spdyDataFrame.getData()); + ChannelBuffer data = spdyDataFrame.getData(); + content = ChannelBuffers.dynamicBuffer(data.readableBytes()); + content.writeBytes(data, data.readerIndex(), data.readableBytes()); httpMessage.setContent(content); } else { content.writeBytes(spdyDataFrame.getData()); @@ -220,7 +220,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { return null; } - private HttpRequest createHttpRequest(SpdyHeaderBlock requestFrame) + private static HttpRequest createHttpRequest(SpdyHeaderBlock requestFrame) throws Exception { // Create the first line of the request from the name/value pairs HttpMethod method = SpdyHeaders.getMethod(requestFrame); @@ -250,7 +250,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder { return httpRequest; } - private HttpResponse createHttpResponse(SpdyHeaderBlock responseFrame) + private static HttpResponse createHttpResponse(SpdyHeaderBlock responseFrame) throws Exception { // Create the first line of the response from the name/value pairs HttpResponseStatus status = SpdyHeaders.getStatus(responseFrame); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java index 7a7b5f9590..e63e8a5465 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -15,16 +15,9 @@ */ package io.netty.handler.codec.spdy; -import java.util.List; -import java.util.Map; - -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.UnsupportedMessageTypeException; import io.netty.handler.codec.http.HttpChunk; import io.netty.handler.codec.http.HttpChunkTrailer; import io.netty.handler.codec.http.HttpHeaders; @@ -32,6 +25,9 @@ import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import java.util.List; +import java.util.Map; + /** * Encodes {@link HttpRequest}s, {@link HttpResponse}s, and {@link HttpChunk}s * into {@link SpdySynStreamFrame}s and {@link SpdySynReplyFrame}s. @@ -110,44 +106,31 @@ import io.netty.handler.codec.http.HttpResponse; * All pushed resources should be sent before sending the response * that corresponds to the initial request. */ -public class SpdyHttpEncoder implements ChannelDownstreamHandler { +public class SpdyHttpEncoder extends MessageToMessageEncoder { private volatile int currentStreamID; - public SpdyHttpEncoder() { - } - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt) + @Override + public Object encode(ChannelOutboundHandlerContext ctx, Object msg) throws Exception { - if (!(evt instanceof MessageEvent)) { - ctx.sendDownstream(evt); - return; - } - - MessageEvent e = (MessageEvent) evt; - Object msg = e.getMessage(); - if (msg instanceof HttpRequest) { HttpRequest httpRequest = (HttpRequest) msg; SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpRequest); int streamID = spdySynStreamFrame.getStreamID(); - ChannelFuture future = getContentFuture(ctx, e, streamID, httpRequest); - Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress()); - + return new Object[] { spdySynStreamFrame, dataFrame(streamID, httpRequest) }; } else if (msg instanceof HttpResponse) { HttpResponse httpResponse = (HttpResponse) msg; if (httpResponse.containsHeader(SpdyHttpHeaders.Names.ASSOCIATED_TO_STREAM_ID)) { SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpResponse); int streamID = spdySynStreamFrame.getStreamID(); - ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse); - Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress()); + return new Object[] { spdySynStreamFrame, dataFrame(streamID, httpResponse) }; } else { SpdySynReplyFrame spdySynReplyFrame = createSynReplyFrame(httpResponse); int streamID = spdySynReplyFrame.getStreamID(); - ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse); - Channels.write(ctx, future, spdySynReplyFrame, e.getRemoteAddress()); + return new Object[] { spdySynReplyFrame, dataFrame(streamID, httpResponse) }; } } else if (msg instanceof HttpChunk) { @@ -161,7 +144,7 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { HttpChunkTrailer trailer = (HttpChunkTrailer) chunk; List> trailers = trailer.getHeaders(); if (trailers.isEmpty()) { - Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress()); + return spdyDataFrame; } else { // Create SPDY HEADERS frame out of trailers SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(currentStreamID); @@ -170,23 +153,20 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { } // Write HEADERS frame and append Data Frame - ChannelFuture future = Channels.future(e.channel()); - future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame)); - Channels.write(ctx, future, spdyHeadersFrame, e.getRemoteAddress()); + return new Object[] { spdyHeadersFrame, spdyDataFrame }; } } else { - Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress()); + return spdyDataFrame; } } else { // Unknown message type - ctx.sendDownstream(evt); + throw new UnsupportedMessageTypeException(); } } - private ChannelFuture getContentFuture( - ChannelHandlerContext ctx, MessageEvent e, int streamID, HttpMessage httpMessage) { + private static SpdyDataFrame dataFrame(int streamID, HttpMessage httpMessage) { if (httpMessage.getContent().readableBytes() == 0) { - return e.getFuture(); + return null; } // Create SPDY Data Frame out of message content @@ -194,34 +174,7 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { spdyDataFrame.setData(httpMessage.getContent()); spdyDataFrame.setLast(true); - // Create new future and add listener - ChannelFuture future = Channels.future(e.channel()); - future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame)); - - return future; - } - - private class SpdyFrameWriter implements ChannelFutureListener { - - private final ChannelHandlerContext ctx; - private final MessageEvent e; - private final Object spdyFrame; - - SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e, Object spdyFrame) { - this.ctx = ctx; - this.e = e; - this.spdyFrame = spdyFrame; - } - - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - Channels.write(ctx, e.getFuture(), spdyFrame, e.getRemoteAddress()); - } else if (future.isCancelled()) { - e.getFuture().cancel(); - } else { - e.getFuture().setFailure(future.cause()); - } - } + return spdyDataFrame; } private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyProtocolException.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyProtocolException.java index 4148e4d16c..70b4d24b14 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyProtocolException.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyProtocolException.java @@ -15,12 +15,14 @@ */ package io.netty.handler.codec.spdy; +import io.netty.handler.codec.CodecException; + /** * An {@link Exception} which is thrown when the received frame cannot * be decoded by the {@link SpdyFrameDecoder}. * @apiviz.exclude */ -public class SpdyProtocolException extends Exception { +public class SpdyProtocolException extends CodecException { private static final long serialVersionUID = -1097979786367505658L; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java index 4c68061ca0..db1e9d774e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java @@ -53,7 +53,7 @@ final class SpdySession { public boolean isRemoteSideClosed(int streamID) { StreamState state = activeStreams.get(new Integer(streamID)); - return (state == null) || state.isRemoteSideClosed(); + return state == null || state.isRemoteSideClosed(); } public void closeRemoteSide(int streamID) { @@ -69,7 +69,7 @@ final class SpdySession { public boolean isLocalSideClosed(int streamID) { StreamState state = activeStreams.get(new Integer(streamID)); - return (state == null) || state.isLocalSideClosed(); + return state == null || state.isLocalSideClosed(); } public void closeLocalSide(int streamID) { @@ -85,7 +85,7 @@ final class SpdySession { public boolean hasReceivedReply(int streamID) { StreamState state = activeStreams.get(new Integer(streamID)); - return (state != null) && state.hasReceivedReply(); + return state != null && state.hasReceivedReply(); } public void receivedReply(int streamID) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 3ed64fef5a..3b5eff09ba 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -15,26 +15,22 @@ */ package io.netty.handler.codec.spdy; -import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; + +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; /** * Manages streams within a SPDY session. */ -public class SpdySessionHandler extends SimpleChannelUpstreamHandler - implements ChannelDownstreamHandler { +public class SpdySessionHandler extends ChannelHandlerAdapter { private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException(); @@ -68,10 +64,36 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(); + } + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) + throws Exception { + Queue in = ctx.in().messageBuffer(); + for (;;) { + Object msg = in.poll(); + if (msg == null) { + break; + } + + handleInboundMessage(ctx, msg); + } + + ctx.fireInboundBufferUpdated(); + } + + private void handleInboundMessage(ChannelInboundHandlerContext ctx, Object msg) throws Exception { - Object msg = e.getMessage(); if (msg instanceof SpdyDataFrame) { /* @@ -99,14 +121,14 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler // Check if we received a data frame for a Stream-ID which is not open if (spdySession.isRemoteSideClosed(streamID)) { if (!sentGoAwayFrame) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM); + issueStreamError(ctx, streamID, SpdyStreamStatus.INVALID_STREAM); } return; } // Check if we received a data frame before receiving a SYN_REPLY if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR); + issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR); return; } @@ -134,13 +156,13 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler if (spdySynStreamFrame.isInvalid() || !isRemoteInitiatedID(streamID) || spdySession.isActiveStream(streamID)) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR); + issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR); return; } // Stream-IDs must be monotonically increassing if (streamID < lastGoodStreamID) { - issueSessionError(ctx, e.channel(), e.getRemoteAddress()); + issueSessionError(ctx); return; } @@ -148,7 +170,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler boolean remoteSideClosed = spdySynStreamFrame.isLast(); boolean localSideClosed = spdySynStreamFrame.isUnidirectional(); if (!acceptStream(streamID, remoteSideClosed, localSideClosed)) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.REFUSED_STREAM); + issueStreamError(ctx, streamID, SpdyStreamStatus.REFUSED_STREAM); return; } @@ -168,13 +190,13 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler if (spdySynReplyFrame.isInvalid() || isRemoteInitiatedID(streamID) || spdySession.isRemoteSideClosed(streamID)) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM); + issueStreamError(ctx, streamID, SpdyStreamStatus.INVALID_STREAM); return; } // Check if we have received multiple frames for the same Stream-ID if (spdySession.hasReceivedReply(streamID)) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR); + issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR); return; } @@ -217,12 +239,12 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler */ SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - + if (isRemoteInitiatedID(spdyPingFrame.getID())) { - Channels.write(ctx, Channels.future(e.channel()), spdyPingFrame, e.getRemoteAddress()); + ctx.write(spdyPingFrame); return; } - + // Note: only checks that there are outstanding pings since uniqueness is not inforced if (pings.get() == 0) { return; @@ -240,48 +262,69 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler // Check if we received a valid HEADERS frame if (spdyHeadersFrame.isInvalid()) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR); + issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR); return; } if (spdySession.isRemoteSideClosed(streamID)) { - issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM); + issueStreamError(ctx, streamID, SpdyStreamStatus.INVALID_STREAM); return; } } - super.messageReceived(ctx, e); + ctx.nextIn().messageBuffer().add(msg); } - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt) - throws Exception { - if (evt instanceof ChannelStateEvent) { - ChannelStateEvent e = (ChannelStateEvent) evt; - switch (e.state()) { - case OPEN: - case CONNECTED: - case BOUND: - if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) { - sendGoAwayFrame(ctx, e); - return; - } + @Override + public void disconnect(final ChannelOutboundHandlerContext ctx, + final ChannelFuture future) throws Exception { + sendGoAwayFrame(ctx).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) + throws Exception { + ctx.disconnect(future); } - } - if (!(evt instanceof MessageEvent)) { - ctx.sendDownstream(evt); - return; + }); + } + + @Override + public void close(final ChannelOutboundHandlerContext ctx, + final ChannelFuture future) throws Exception { + sendGoAwayFrame(ctx).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) + throws Exception { + ctx.close(future); + } + }); + } + + @Override + public void flush(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + + Queue in = ctx.prevOut().messageBuffer(); + for (;;) { + Object msg = in.poll(); + if (msg == null) { + break; + } + + handleOutboundMessage(ctx, msg); } - MessageEvent e = (MessageEvent) evt; - Object msg = e.getMessage(); + ctx.flush(future); + } + private void handleOutboundMessage(ChannelOutboundHandlerContext ctx, Object msg) + throws Exception { if (msg instanceof SpdyDataFrame) { SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; int streamID = spdyDataFrame.getStreamID(); if (spdySession.isLocalSideClosed(streamID)) { - e.getFuture().setFailure(PROTOCOL_EXCEPTION); + issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR); return; } @@ -292,10 +335,11 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } else if (msg instanceof SpdySynStreamFrame) { SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; + int streamID = spdySynStreamFrame.getStreamID(); boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional(); boolean localSideClosed = spdySynStreamFrame.isLast(); - if (!acceptStream(spdySynStreamFrame.getStreamID(), remoteSideClosed, localSideClosed)) { - e.getFuture().setFailure(PROTOCOL_EXCEPTION); + if (!acceptStream(streamID, remoteSideClosed, localSideClosed)) { + issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR); return; } @@ -305,7 +349,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler int streamID = spdySynReplyFrame.getStreamID(); if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) { - e.getFuture().setFailure(PROTOCOL_EXCEPTION); + ctx.fireExceptionCaught(PROTOCOL_EXCEPTION); return; } @@ -327,7 +371,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; if (isRemoteInitiatedID(spdyPingFrame.getID())) { - e.getFuture().setFailure(new IllegalArgumentException( + ctx.fireExceptionCaught(new IllegalArgumentException( "invalid PING ID: " + spdyPingFrame.getID())); return; } @@ -336,7 +380,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } else if (msg instanceof SpdyGoAwayFrame) { // Should send a CLOSE ChannelStateEvent - e.getFuture().setFailure(PROTOCOL_EXCEPTION); + ctx.fireExceptionCaught(PROTOCOL_EXCEPTION); return; } else if (msg instanceof SpdyHeadersFrame) { @@ -345,33 +389,30 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler int streamID = spdyHeadersFrame.getStreamID(); if (spdySession.isLocalSideClosed(streamID)) { - e.getFuture().setFailure(PROTOCOL_EXCEPTION); + ctx.fireExceptionCaught(PROTOCOL_EXCEPTION); return; } } - ctx.sendDownstream(evt); + ctx.out().messageBuffer().add(msg); } /* * Error Handling */ - private void issueSessionError( - ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress) { - - ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress); - future.addListener(ChannelFutureListener.CLOSE); + private void issueSessionError(ChannelHandlerContext ctx) { + sendGoAwayFrame(ctx).addListener(ChannelFutureListener.CLOSE); } // Send a RST_STREAM frame in response to an incoming MessageEvent // Only called in the upstream direction private void issueStreamError( - ChannelHandlerContext ctx, MessageEvent e, int streamID, SpdyStreamStatus status) { + ChannelHandlerContext ctx, int streamID, SpdyStreamStatus status) { removeStream(streamID); SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status); - Channels.write(ctx, Channels.future(e.channel()), spdyRstStreamFrame, e.getRemoteAddress()); + ctx.write(spdyRstStreamFrame); } /* @@ -380,7 +421,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler private boolean isRemoteInitiatedID(int ID) { boolean serverID = SpdyCodecUtil.isServerID(ID); - return (server && !serverID) || (!server && serverID); + return server && !serverID || !server && serverID; } private synchronized void updateConcurrentStreams(SpdySettingsFrame settings, boolean remote) { @@ -416,8 +457,8 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler if (receivedGoAwayFrame || sentGoAwayFrame) { return false; } - if ((maxConcurrentStreams != 0) && - (spdySession.numActiveStreams() >= maxConcurrentStreams)) { + if (maxConcurrentStreams != 0 && + spdySession.numActiveStreams() >= maxConcurrentStreams) { return false; } spdySession.acceptStream(streamID, remoteSideClosed, localSideClosed); @@ -433,61 +474,23 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler } else { spdySession.closeLocalSide(streamID); } - if ((closeSessionFuture != null) && spdySession.noActiveStreams()) { + if (closeSessionFuture != null && spdySession.noActiveStreams()) { closeSessionFuture.setSuccess(); } } private void removeStream(int streamID) { spdySession.removeStream(streamID); - if ((closeSessionFuture != null) && spdySession.noActiveStreams()) { + if (closeSessionFuture != null && spdySession.noActiveStreams()) { closeSessionFuture.setSuccess(); } } - private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) { - // Avoid NotYetConnectedException - if (!e.channel().isConnected()) { - ctx.sendDownstream(e); - return; - } - - ChannelFuture future = sendGoAwayFrame(ctx, e.channel(), null); - if (spdySession.noActiveStreams()) { - future.addListener(new ClosingChannelFutureListener(ctx, e)); - } else { - closeSessionFuture = Channels.future(e.channel()); - closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e)); - } - } - - private synchronized ChannelFuture sendGoAwayFrame( - ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress) { + private synchronized ChannelFuture sendGoAwayFrame(ChannelHandlerContext ctx) { if (!sentGoAwayFrame) { sentGoAwayFrame = true; - ChannelFuture future = Channels.future(channel); - Channels.write(ctx, future, new DefaultSpdyGoAwayFrame(lastGoodStreamID)); - return future; - } - return Channels.succeededFuture(channel); - } - - private static final class ClosingChannelFutureListener implements ChannelFutureListener { - - private final ChannelHandlerContext ctx; - private final ChannelStateEvent e; - - ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) { - this.ctx = ctx; - this.e = e; - } - - public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception { - if (!(sentGoAwayFuture.cause() instanceof ClosedChannelException)) { - Channels.close(ctx, e.getFuture()); - } else { - e.getFuture().setSuccess(); - } + return ctx.write(new DefaultSpdyGoAwayFrame(lastGoodStreamID)); } + return ctx.newSucceededFuture(); } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00Test.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00Test.java index 08dffd0999..0a9ae8cede 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00Test.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00Test.java @@ -15,10 +15,9 @@ */ package io.netty.handler.codec.http.websocketx; -import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.replay; +import static io.netty.handler.codec.http.HttpHeaders.Values.*; +import static io.netty.handler.codec.http.HttpVersion.*; +import static org.easymock.EasyMock.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; @@ -41,43 +40,43 @@ import org.junit.Assert; import org.junit.Test; public class WebSocketServerHandshaker00Test { - - private DefaultChannelPipeline createPipeline() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(); + + private static DefaultChannelPipeline createPipeline(Channel ch) { + DefaultChannelPipeline pipeline = new DefaultChannelPipeline(ch); pipeline.addLast("chunkAggregator", new HttpChunkAggregator(42)); pipeline.addLast("wsdecoder", new HttpRequestDecoder()); pipeline.addLast("wsencoder", new HttpResponseEncoder()); return pipeline; } - + @Test public void testPerformOpeningHandshake() { Channel channelMock = EasyMock.createMock(Channel.class); - - DefaultChannelPipeline pipeline = createPipeline(); + + DefaultChannelPipeline pipeline = createPipeline(channelMock); EasyMock.expect(channelMock.pipeline()).andReturn(pipeline); - + // capture the http response in order to verify the headers Capture res = new Capture(); EasyMock.expect(channelMock.write(capture(res))).andReturn(new DefaultChannelFuture(channelMock, true)); - + replay(channelMock); - + HttpRequest req = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"); req.setHeader(Names.HOST, "server.example.com"); req.setHeader(Names.UPGRADE, WEBSOCKET.toLowerCase()); req.setHeader(Names.CONNECTION, "Upgrade"); req.setHeader(Names.ORIGIN, "http://example.com"); req.setHeader(Names.SEC_WEBSOCKET_KEY1, "4 @1 46546xW%0l 1 5"); - req.setHeader(Names.SEC_WEBSOCKET_KEY2, "12998 5 Y3 1 .P00"); + req.setHeader(Names.SEC_WEBSOCKET_KEY2, "12998 5 Y3 1 .P00"); req.setHeader(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat"); - + ChannelBuffer buffer = ChannelBuffers.copiedBuffer("^n:ds[4U", Charset.defaultCharset()); req.setContent(buffer); - + WebSocketServerHandshaker00 handsaker = new WebSocketServerHandshaker00("ws://example.com/chat", "chat"); handsaker.handshake(channelMock, req); - + Assert.assertEquals("ws://example.com/chat", res.getValue().getHeader(Names.SEC_WEBSOCKET_LOCATION)); Assert.assertEquals("chat", res.getValue().getHeader(Names.SEC_WEBSOCKET_PROTOCOL)); Assert.assertEquals("8jKS'y:G*Co,Wxa-", res.getValue().getContent().toString(Charset.defaultCharset())); diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08Test.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08Test.java index b61afb3efc..ae67d5f9d2 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08Test.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08Test.java @@ -15,10 +15,9 @@ */ package io.netty.handler.codec.http.websocketx; -import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.replay; +import static io.netty.handler.codec.http.HttpHeaders.Values.*; +import static io.netty.handler.codec.http.HttpVersion.*; +import static org.easymock.EasyMock.*; import io.netty.channel.Channel; import io.netty.channel.DefaultChannelFuture; import io.netty.channel.DefaultChannelPipeline; @@ -37,28 +36,28 @@ import org.junit.Assert; import org.junit.Test; public class WebSocketServerHandshaker08Test { - - private DefaultChannelPipeline createPipeline() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(); + + private static DefaultChannelPipeline createPipeline(Channel ch) { + DefaultChannelPipeline pipeline = new DefaultChannelPipeline(ch); pipeline.addLast("chunkAggregator", new HttpChunkAggregator(42)); pipeline.addLast("requestDecoder", new HttpRequestDecoder()); pipeline.addLast("responseEncoder", new HttpResponseEncoder()); return pipeline; } - + @Test public void testPerformOpeningHandshake() { Channel channelMock = EasyMock.createMock(Channel.class); - - DefaultChannelPipeline pipeline = createPipeline(); + + DefaultChannelPipeline pipeline = createPipeline(channelMock); EasyMock.expect(channelMock.pipeline()).andReturn(pipeline); - + // capture the http response in order to verify the headers Capture res = new Capture(); EasyMock.expect(channelMock.write(capture(res))).andReturn(new DefaultChannelFuture(channelMock, true)); - + replay(channelMock); - + HttpRequest req = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"); req.setHeader(Names.HOST, "server.example.com"); req.setHeader(Names.UPGRADE, WEBSOCKET.toLowerCase()); @@ -67,10 +66,10 @@ public class WebSocketServerHandshaker08Test { req.setHeader(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com"); req.setHeader(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat"); req.setHeader(Names.SEC_WEBSOCKET_VERSION, "8"); - + WebSocketServerHandshaker08 handsaker = new WebSocketServerHandshaker08("ws://example.com/chat", "chat", false); handsaker.handshake(channelMock, req); - + Assert.assertEquals("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.getValue().getHeader(Names.SEC_WEBSOCKET_ACCEPT)); Assert.assertEquals("chat", res.getValue().getHeader(Names.SEC_WEBSOCKET_PROTOCOL)); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13Test.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13Test.java index b9168eb096..7c9739103f 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13Test.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13Test.java @@ -15,10 +15,9 @@ */ package io.netty.handler.codec.http.websocketx; -import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.replay; +import static io.netty.handler.codec.http.HttpHeaders.Values.*; +import static io.netty.handler.codec.http.HttpVersion.*; +import static org.easymock.EasyMock.*; import io.netty.channel.Channel; import io.netty.channel.DefaultChannelFuture; import io.netty.channel.DefaultChannelPipeline; @@ -38,8 +37,8 @@ import org.junit.Test; public class WebSocketServerHandshaker13Test { - private DefaultChannelPipeline createPipeline() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(); + private static DefaultChannelPipeline createPipeline(Channel ch) { + DefaultChannelPipeline pipeline = new DefaultChannelPipeline(ch); pipeline.addLast("chunkAggregator", new HttpChunkAggregator(42)); pipeline.addLast("requestDecoder", new HttpRequestDecoder()); pipeline.addLast("responseEncoder", new HttpResponseEncoder()); @@ -50,7 +49,7 @@ public class WebSocketServerHandshaker13Test { public void testPerformOpeningHandshake() { Channel channelMock = EasyMock.createMock(Channel.class); - DefaultChannelPipeline pipeline = createPipeline(); + DefaultChannelPipeline pipeline = createPipeline(channelMock); EasyMock.expect(channelMock.pipeline()).andReturn(pipeline); // capture the http response in order to verify the headers diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java index b956844a6d..85382f76f5 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java @@ -16,30 +16,26 @@ package io.netty.handler.codec.spdy; import static org.junit.Assert.*; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.Channel; +import io.netty.channel.ChannelBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelInboundStreamHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ServerChannelBootstrap; +import io.netty.util.internal.ExecutorUtil; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Random; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; -import io.netty.bootstrap.ClientBootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.Channels; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.util.internal.ExecutorUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -146,27 +142,38 @@ public abstract class AbstractSocketSpdyEchoTest { ExecutorUtil.terminate(executor); } - protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); - protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); + protected abstract ServerChannelBootstrap newServerBootstrap(); + protected abstract ChannelBootstrap newClientBootstrap(); @Test(timeout = 10000) public void testSpdyEcho() throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + ServerChannelBootstrap sb = newServerBootstrap(); + ChannelBootstrap cb = newClientBootstrap(); - EchoHandler sh = new EchoHandler(true); - EchoHandler ch = new EchoHandler(false); + final ServerHandler sh = new ServerHandler(); + final ClientHandler ch = new ClientHandler(); - sb.pipeline().addLast("decoder", new SpdyFrameDecoder()); - sb.pipeline().addLast("encoder", new SpdyFrameEncoder()); - sb.pipeline().addLast("handler", sh); + sb.childInitializer(new ChannelInitializer() { + @Override + public void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast( + new SpdyFrameDecoder(), + new SpdyFrameEncoder(), + sh); + } + }); - cb.pipeline().addLast("handler", ch); + cb.initializer(new ChannelInitializer() { + @Override + public void initChannel(Channel channel) throws Exception { + channel.pipeline().addLast(ch); + } + }); - Channel sc = sb.bind(new InetSocketAddress(0)); - int port = ((InetSocketAddress) sc.getLocalAddress()).getPort(); + Channel sc = sb.localAddress(new InetSocketAddress(0)).bind().sync().channel(); + int port = ((InetSocketAddress) sc.localAddress()).getPort(); - ChannelFuture ccf = cb.connect(new InetSocketAddress(InetAddress.getLocalHost(), port)); + ChannelFuture ccf = cb.remoteAddress(new InetSocketAddress(InetAddress.getLocalHost(), port)).connect(); assertTrue(ccf.awaitUninterruptibly().isSuccess()); Channel cc = ccf.channel(); @@ -205,47 +212,62 @@ public abstract class AbstractSocketSpdyEchoTest { } } - private class EchoHandler extends SimpleChannelUpstreamHandler { + private class ServerHandler extends ChannelInboundMessageHandlerAdapter { + volatile Channel channel; + final AtomicReference exception = new AtomicReference(); + + @Override + public void channelRegistered(ChannelInboundHandlerContext ctx) + throws Exception { + channel = ctx.channel(); + } + + @Override + public void messageReceived(ChannelInboundHandlerContext ctx, Object msg) + throws Exception { + ctx.write(msg); + } + + @Override + public void exceptionCaught(ChannelInboundHandlerContext ctx, + Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + ctx.close(); + } + } + } + + private class ClientHandler extends ChannelInboundStreamHandlerAdapter { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; - final boolean server; - EchoHandler(boolean server) { - super(); - this.server = server; + @Override + public void channelRegistered(ChannelInboundHandlerContext ctx) + throws Exception { + channel = ctx.channel(); } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - channel = e.channel(); - } + ChannelBuffer m = ctx.in().byteBuffer().readBytes(ctx.in().byteBuffer().readableBytes()); + byte[] actual = new byte[m.readableBytes()]; + m.getBytes(0, actual); - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - if (server) { - Channels.write(channel, e.getMessage(), e.getRemoteAddress()); - } else { - ChannelBuffer m = (ChannelBuffer) e.getMessage(); - byte[] actual = new byte[m.readableBytes()]; - m.getBytes(0, actual); - - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]); - } - - counter += actual.length; + int lastIdx = counter; + for (int i = 0; i < actual.length; i ++) { + assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]); } + + counter += actual.length; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - if (exception.compareAndSet(null, e.cause())) { - e.channel().close(); + public void exceptionCaught(ChannelInboundHandlerContext ctx, + Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + ctx.close(); } } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java index 4a57c20623..b50262d3f2 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java @@ -15,22 +15,29 @@ */ package io.netty.handler.codec.spdy; -import java.util.concurrent.Executor; -import io.netty.channel.ChannelFactory; -import io.netty.channel.socket.nio.NioClientSocketChannelFactory; -import io.netty.channel.socket.nio.NioServerSocketChannelFactory; +import io.netty.channel.ChannelBootstrap; +import io.netty.channel.ServerChannelBootstrap; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.nio.SelectorEventLoop; public class NioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override - protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor); + protected ChannelBootstrap newClientBootstrap() { + ChannelBootstrap b = new ChannelBootstrap(); + b.eventLoop(new SelectorEventLoop()); + b.channel(new NioSocketChannel()); + return b; } @Override - protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor); + protected ServerChannelBootstrap newServerBootstrap() { + ServerChannelBootstrap b = new ServerChannelBootstrap(); + b.eventLoop(new SelectorEventLoop(), new SelectorEventLoop()); + b.channel(new NioServerSocketChannel()); + return b; } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java index 38425d9ea9..72dc70a642 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.handler.codec.spdy; -import java.util.concurrent.Executor; import io.netty.channel.ChannelFactory; import io.netty.channel.socket.nio.NioClientSocketChannelFactory; @@ -24,12 +23,12 @@ import io.netty.channel.socket.oio.OioServerSocketChannelFactory; public class NioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override - protected ChannelFactory newClientSocketChannelFactory(Executor executor) { + protected ChannelFactory newClientBootstrap() { return new NioClientSocketChannelFactory(executor); } @Override - protected ChannelFactory newServerSocketChannelFactory(Executor executor) { + protected ChannelFactory newServerBootstrap() { return new OioServerSocketChannelFactory(executor, executor); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java index 4ed2b1a129..86b48521f9 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.handler.codec.spdy; -import java.util.concurrent.Executor; import io.netty.channel.ChannelFactory; import io.netty.channel.socket.nio.NioServerSocketChannelFactory; @@ -24,12 +23,12 @@ import io.netty.channel.socket.oio.OioClientSocketChannelFactory; public class OioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override - protected ChannelFactory newClientSocketChannelFactory(Executor executor) { + protected ChannelFactory newClientBootstrap() { return new OioClientSocketChannelFactory(executor); } @Override - protected ChannelFactory newServerSocketChannelFactory(Executor executor) { + protected ChannelFactory newServerBootstrap() { return new NioServerSocketChannelFactory(executor); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/OioOioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/OioOioSocketSpdyEchoTest.java index 3e89046407..5f19f8152e 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/OioOioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/OioOioSocketSpdyEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.handler.codec.spdy; -import java.util.concurrent.Executor; import io.netty.channel.ChannelFactory; import io.netty.channel.socket.oio.OioClientSocketChannelFactory; @@ -24,12 +23,12 @@ import io.netty.channel.socket.oio.OioServerSocketChannelFactory; public class OioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override - protected ChannelFactory newClientSocketChannelFactory(Executor executor) { + protected ChannelFactory newClientBootstrap() { return new OioClientSocketChannelFactory(executor); } @Override - protected ChannelFactory newServerSocketChannelFactory(Executor executor) { + protected ChannelFactory newServerBootstrap() { return new OioServerSocketChannelFactory(executor, executor); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java index b40b27b0f8..01576159a0 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -15,15 +15,13 @@ */ package io.netty.handler.codec.spdy; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.handler.codec.embedder.DecoderEmbedder; + import java.util.List; import java.util.Map; -import io.netty.channel.Channels; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.handler.codec.embedder.DecoderEmbedder; import org.junit.Assert; import org.junit.Test; @@ -36,7 +34,7 @@ public class SpdySessionHandlerTest { closeMessage.setValue(closeSignal, 0); } - private void assertHeaderBlock(SpdyHeaderBlock received, SpdyHeaderBlock expected) { + private static void assertHeaderBlock(SpdyHeaderBlock received, SpdyHeaderBlock expected) { for (String name: expected.getHeaderNames()) { List expectedValues = expected.getHeaders(name); List receivedValues = received.getHeaders(name); @@ -48,7 +46,7 @@ public class SpdySessionHandlerTest { Assert.assertTrue(received.getHeaders().isEmpty()); } - private void assertDataFrame(Object msg, int streamID, boolean last) { + private static void assertDataFrame(Object msg, int streamID, boolean last) { Assert.assertNotNull(msg); Assert.assertTrue(msg instanceof SpdyDataFrame); SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; @@ -56,7 +54,7 @@ public class SpdySessionHandlerTest { Assert.assertTrue(spdyDataFrame.isLast() == last); } - private void assertSynReply(Object msg, int streamID, boolean last, SpdyHeaderBlock headers) { + private static void assertSynReply(Object msg, int streamID, boolean last, SpdyHeaderBlock headers) { Assert.assertNotNull(msg); Assert.assertTrue(msg instanceof SpdySynReplyFrame); SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg; @@ -65,7 +63,7 @@ public class SpdySessionHandlerTest { assertHeaderBlock(spdySynReplyFrame, headers); } - private void assertRstStream(Object msg, int streamID, SpdyStreamStatus status) { + private static void assertRstStream(Object msg, int streamID, SpdyStreamStatus status) { Assert.assertNotNull(msg); Assert.assertTrue(msg instanceof SpdyRstStreamFrame); SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg; @@ -73,21 +71,21 @@ public class SpdySessionHandlerTest { Assert.assertTrue(spdyRstStreamFrame.getStatus().equals(status)); } - private void assertPing(Object msg, int ID) { + private static void assertPing(Object msg, int ID) { Assert.assertNotNull(msg); Assert.assertTrue(msg instanceof SpdyPingFrame); SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; Assert.assertTrue(spdyPingFrame.getID() == ID); } - private void assertGoAway(Object msg, int lastGoodStreamID) { + private static void assertGoAway(Object msg, int lastGoodStreamID) { Assert.assertNotNull(msg); Assert.assertTrue(msg instanceof SpdyGoAwayFrame); SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg; Assert.assertTrue(spdyGoAwayFrame.getLastGoodStreamID() == lastGoodStreamID); } - private void assertHeaders(Object msg, int streamID, SpdyHeaderBlock headers) { + private static void assertHeaders(Object msg, int streamID, SpdyHeaderBlock headers) { Assert.assertNotNull(msg); Assert.assertTrue(msg instanceof SpdyHeadersFrame); SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg; @@ -167,7 +165,12 @@ public class SpdySessionHandlerTest { // Check if session handler honors UNIDIRECTIONAL streams spdySynStreamFrame.setLast(false); sessionHandler.offer(spdySynStreamFrame); - Assert.assertNull(sessionHandler.peek()); + try { + sessionHandler.poll(); + Assert.fail(); + } catch (SpdyProtocolException e) { + // Expected + } spdySynStreamFrame.setUnidirectional(false); // Check if session handler returns PROTOCOL_ERROR if it receives @@ -262,9 +265,9 @@ public class SpdySessionHandlerTest { // Echo Handler opens 4 half-closed streams on session connection // and then sets the number of concurrent streams to 3 - private class EchoHandler extends SimpleChannelUpstreamHandler { - private int closeSignal; - private boolean server; + private class EchoHandler extends ChannelInboundMessageHandlerAdapter { + private final int closeSignal; + private final boolean server; EchoHandler(int closeSignal, boolean server) { super(); @@ -273,37 +276,34 @@ public class SpdySessionHandlerTest { } @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { - // Initiate 4 new streams int streamID = server ? 2 : 1; SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0); spdySynStreamFrame.setLast(true); - Channels.write(e.channel(), spdySynStreamFrame); + ctx.write(spdySynStreamFrame); spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2); - Channels.write(e.channel(), spdySynStreamFrame); + ctx.write(spdySynStreamFrame); spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2); - Channels.write(e.channel(), spdySynStreamFrame); + ctx.write(spdySynStreamFrame); spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2); - Channels.write(e.channel(), spdySynStreamFrame); + ctx.write(spdySynStreamFrame); // Limit the number of concurrent streams to 3 SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3); - Channels.write(e.channel(), spdySettingsFrame); + ctx.write(spdySettingsFrame); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - Object msg = e.getMessage(); - if ((msg instanceof SpdyDataFrame) || - (msg instanceof SpdyPingFrame) || - (msg instanceof SpdyHeadersFrame)) { + public void messageReceived(ChannelInboundHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof SpdyDataFrame || + msg instanceof SpdyPingFrame || + msg instanceof SpdyHeadersFrame) { - Channels.write(e.channel(), msg, e.getRemoteAddress()); + ctx.write(msg); return; } @@ -318,7 +318,7 @@ public class SpdySessionHandlerTest { spdySynReplyFrame.addHeader(entry.getKey(), entry.getValue()); } - Channels.write(e.channel(), spdySynReplyFrame, e.getRemoteAddress()); + ctx.write(spdySynReplyFrame); return; } @@ -326,7 +326,7 @@ public class SpdySessionHandlerTest { SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg; if (spdySettingsFrame.isSet(closeSignal)) { - Channels.close(e.channel()); + ctx.close(); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/UnsupportedMessageTypeException.java b/codec/src/main/java/io/netty/handler/codec/UnsupportedMessageTypeException.java index 5dcf2874fa..ca9c4a16ac 100644 --- a/codec/src/main/java/io/netty/handler/codec/UnsupportedMessageTypeException.java +++ b/codec/src/main/java/io/netty/handler/codec/UnsupportedMessageTypeException.java @@ -5,10 +5,9 @@ public class UnsupportedMessageTypeException extends CodecException { private static final long serialVersionUID = 2799598826487038726L; public UnsupportedMessageTypeException( - Object message, Class expectedType, Class... otherExpectedTypes) { + Object message, Class... expectedTypes) { super(message( - message == null? "null" : message.getClass().getName(), - expectedType, otherExpectedTypes)); + message == null? "null" : message.getClass().getName(), expectedTypes)); } public UnsupportedMessageTypeException() { @@ -28,23 +27,21 @@ public class UnsupportedMessageTypeException extends CodecException { } private static String message( - String actualType, Class expectedType, Class... otherExpectedTypes) { - if (expectedType == null) { - throw new NullPointerException("expectedType"); - } - + String actualType, Class... expectedTypes) { StringBuilder buf = new StringBuilder(actualType); - buf.append(" (expected: ").append(expectedType.getName()); - if (otherExpectedTypes != null) { - for (Class t: otherExpectedTypes) { + if (expectedTypes != null && expectedTypes.length > 0) { + buf.append(" (expected: ").append(expectedTypes[0].getName()); + for (int i = 1; i < expectedTypes.length; i ++) { + Class t = expectedTypes[i]; if (t == null) { break; } buf.append(", ").append(t.getName()); } + buf.append(')'); } - return buf.append(')').toString(); + return buf.toString(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java index e8f4b8a324..07f1d5bd21 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.embedder; import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; import io.netty.channel.AbstractChannel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; @@ -30,7 +31,7 @@ import java.util.Queue; class EmbeddedChannel extends AbstractChannel { private final ChannelConfig config = new DefaultChannelConfig(); - private final ChannelBufferHolder firstOut = ChannelBufferHolders.byteBuffer(); + private final ChannelBufferHolder firstOut; private final SocketAddress localAddress = new EmbeddedSocketAddress(); private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); private final Queue productQueue; @@ -50,6 +51,7 @@ class EmbeddedChannel extends AbstractChannel { EmbeddedChannel(Queue productQueue) { super(null, null); this.productQueue = productQueue; + firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer()); } @Override diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index d7ec361a08..e69d1e19b8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -43,6 +43,13 @@ public final class ChannelBufferHolder { this.byteBuf = byteBuf; } + ChannelBufferHolder(Queue msgBuf, ChannelBuffer byteBuf) { + ctx = null; + bypassDirection = 0; + this.msgBuf = msgBuf; + this.byteBuf = byteBuf; + } + public boolean isBypass() { return bypassDirection != 0; } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java index f9d7dc5637..01a3726481 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java @@ -33,6 +33,14 @@ public final class ChannelBufferHolders { return new ChannelBufferHolder(ctx, false); } + public static ChannelBufferHolder catchAllBuffer() { + return catchAllBuffer(new ArrayDeque(), ChannelBuffers.dynamicBuffer()); + } + + public static ChannelBufferHolder catchAllBuffer(Queue msgBuf, ChannelBuffer byteBuf) { + return new ChannelBufferHolder(msgBuf, byteBuf); + } + private ChannelBufferHolders() { // Utility class } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index 605b9f72cf..4b56a0f919 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -15,8 +15,6 @@ public class ChannelInboundMessageHandlerAdapter extends public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { Queue in = ctx.in().messageBuffer(); - Queue out = ctx.nextIn().messageBuffer(); - int oldOutSize = out.size(); for (;;) { I msg = in.poll(); if (msg == null) { @@ -24,14 +22,11 @@ public class ChannelInboundMessageHandlerAdapter extends } try { messageReceived(ctx, msg); + ctx.fireInboundBufferUpdated(); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } - - if (out.size() != oldOutSize) { - ctx.fireInboundBufferUpdated(); - } } public void messageReceived(ChannelInboundHandlerContext ctx, I msg) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java new file mode 100644 index 0000000000..fe99879f50 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java @@ -0,0 +1,10 @@ +package io.netty.channel; + + +public class ChannelInboundStreamHandlerAdapter extends ChannelInboundHandlerAdapter { + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.byteBuffer(); + } +}