Ported codec-http to the new API

- Added ChannelBufferHolders.catchAllBuffer()
- Relaxed UnsupportedMessageTypeException constructor signature
- EmbeddedChannel now uses the catchAllBuffer
- ChanelInboundMessageHandlerAdapter.messageReceive() throws Exception
- Added ChannelInboundStreamHandlerAdapter
This commit is contained in:
Trustin Lee 2012-05-23 11:42:10 -07:00
parent 50b4894c36
commit c883b61503
36 changed files with 745 additions and 875 deletions

View File

@ -17,10 +17,8 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.embedder.DecoderEmbedder;
/**
@ -42,7 +40,7 @@ import io.netty.handler.codec.embedder.DecoderEmbedder;
* so that this handler can intercept HTTP requests after {@link HttpMessageDecoder}
* converts {@link ChannelBuffer}s into HTTP requests.
*/
public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler {
public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object, Object> {
private DecoderEmbedder<ChannelBuffer> decoder;
@ -53,11 +51,10 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
public Object decode(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) {
// 100-continue response must be passed through.
ctx.sendUpstream(e);
return msg;
} else if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg;
@ -94,9 +91,6 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler {
}
}
}
// Because HttpMessage is a mutable object, we can simply forward the received event.
ctx.sendUpstream(e);
} else if (msg instanceof HttpChunk) {
HttpChunk c = (HttpChunk) msg;
ChannelBuffer content = c.getContent();
@ -107,7 +101,6 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler {
content = decode(content);
if (content.readable()) {
c.setContent(content);
ctx.sendUpstream(e);
}
} else {
ChannelBuffer lastProduct = finishDecode();
@ -115,19 +108,14 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler {
// Generate an additional chunk if the decoder produced
// the last product on closure,
if (lastProduct.readable()) {
Channels.fireMessageReceived(
ctx, new DefaultHttpChunk(lastProduct), e.getRemoteAddress());
return new Object[] { new DefaultHttpChunk(lastProduct), c };
}
// Emit the last chunk.
ctx.sendUpstream(e);
}
} else {
ctx.sendUpstream(e);
}
} else {
ctx.sendUpstream(e);
}
// Because HttpMessage and HttpChunk is a mutable object, we can simply forward it.
return msg;
}
/**

View File

@ -15,17 +15,16 @@
*/
package io.netty.handler.codec.http;
import java.util.Queue;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import io.netty.util.internal.QueueFactory;
import java.util.Queue;
/**
* Encodes the content of the outbound {@link HttpResponse} and {@link HttpChunk}.
* The original content is replaced with the new content encoded by the
@ -48,7 +47,7 @@ import io.netty.util.internal.QueueFactory;
* so that this handler can intercept HTTP responses before {@link HttpMessageEncoder}
* converts them into {@link ChannelBuffer}s.
*/
public abstract class HttpContentEncoder extends SimpleChannelHandler {
public abstract class HttpContentEncoder extends MessageToMessageCodec<Object, Object, Object, Object> {
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue(String.class);
private volatile EncoderEmbedder<ChannelBuffer> encoder;
@ -60,12 +59,10 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public Object decode(ChannelInboundHandlerContext<Object> ctx, Object msg)
throws Exception {
Object msg = e.getMessage();
if (!(msg instanceof HttpMessage)) {
ctx.sendUpstream(e);
return;
return msg;
}
HttpMessage m = (HttpMessage) msg;
@ -75,18 +72,15 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
}
boolean offered = acceptEncodingQueue.offer(acceptedEncoding);
assert offered;
ctx.sendUpstream(e);
return m;
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
public Object encode(ChannelOutboundHandlerContext<Object> ctx, Object msg)
throws Exception {
Object msg = e.getMessage();
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) {
// 100-continue response must be passed through.
ctx.sendDownstream(e);
return msg;
} else if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg;
@ -100,14 +94,12 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
boolean hasContent = m.isChunked() || m.getContent().readable();
if (!hasContent) {
ctx.sendDownstream(e);
return;
return m;
}
Result result = beginEncode(m, acceptEncoding);
if (result == null) {
ctx.sendDownstream(e);
return;
return m;
}
encoder = result.getContentEncoder();
@ -132,9 +124,6 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
Integer.toString(content.readableBytes()));
}
}
// Because HttpMessage is a mutable object, we can simply forward the write request.
ctx.sendDownstream(e);
} else if (msg instanceof HttpChunk) {
HttpChunk c = (HttpChunk) msg;
ChannelBuffer content = c.getContent();
@ -145,7 +134,6 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
content = encode(content);
if (content.readable()) {
c.setContent(content);
ctx.sendDownstream(e);
}
} else {
ChannelBuffer lastProduct = finishEncode();
@ -153,19 +141,14 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
// Generate an additional chunk if the decoder produced
// the last product on closure,
if (lastProduct.readable()) {
Channels.write(
ctx, Channels.succeededFuture(e.channel()), new DefaultHttpChunk(lastProduct), e.getRemoteAddress());
return new Object[] { new DefaultHttpChunk(lastProduct), c };
}
// Emit the last chunk.
ctx.sendDownstream(e);
}
} else {
ctx.sendDownstream(e);
}
} else {
ctx.sendDownstream(e);
}
// Because HttpMessage and HttpChunk is a mutable object, we can simply forward it.
return msg;
}
/**

View File

@ -116,7 +116,6 @@ public abstract class HttpMessageEncoder extends MessageToStreamEncoder<Object>
out.writeBytes(chunkContent, chunkContent.readerIndex(), chunkContent.readableBytes());
}
}
} else {
throw new UnsupportedMessageTypeException(msg, HttpMessage.class, HttpChunk.class);
}

View File

@ -16,8 +16,7 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.VoidEnum;
@ -27,11 +26,11 @@ import io.netty.util.VoidEnum;
* <p>
* For the detailed instruction on adding add Web Socket support to your HTTP server, take a look into the
* <tt>WebSocketServer</tt> example located in the {@code io.netty.example.http.websocket} package.
*
*
* @apiviz.landmark
* @apiviz.uses io.netty.handler.codec.http.websocket.WebSocketFrame
*/
public class WebSocket00FrameDecoder extends ReplayingDecoder<VoidEnum> {
public class WebSocket00FrameDecoder extends ReplayingDecoder<WebSocketFrame, VoidEnum> {
private static final int DEFAULT_MAX_FRAME_SIZE = 16384;
@ -45,7 +44,7 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<VoidEnum> {
/**
* Creates a new instance of {@code WebSocketFrameDecoder} with the specified {@code maxFrameSize}. If the client
* sends a frame size larger than {@code maxFrameSize}, the channel will be closed.
*
*
* @param maxFrameSize
* the maximum frame size to decode
*/
@ -54,23 +53,21 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<VoidEnum> {
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state)
throws Exception {
public WebSocketFrame decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
// Discard all data received if closing handshake was received before.
if (receivedClosingHandshake) {
buffer.skipBytes(actualReadableBytes());
in.skipBytes(actualReadableBytes());
return null;
}
// Decode a frame otherwise.
byte type = buffer.readByte();
byte type = in.readByte();
if ((type & 0x80) == 0x80) {
// If the MSB on type is set, decode the frame length
return decodeBinaryFrame(type, buffer);
return decodeBinaryFrame(type, in);
} else {
// Decode a 0xff terminated UTF-8 string
return decodeTextFrame(buffer);
return decodeTextFrame(in);
}
}

View File

@ -16,82 +16,72 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToStreamEncoder;
/**
* Encodes a {@link WebSocketFrame} into a {@link ChannelBuffer}.
* <p>
* For the detailed instruction on adding add Web Socket support to your HTTP server, take a look into the
* <tt>WebSocketServer</tt> example located in the {@code io.netty.example.http.websocket} package.
*
*
* @apiviz.landmark
* @apiviz.uses io.netty.handler.codec.http.websocket.WebSocketFrame
*/
@Sharable
public class WebSocket00FrameEncoder extends OneToOneEncoder {
public class WebSocket00FrameEncoder extends MessageToStreamEncoder<WebSocketFrame> {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
// Text frame
ChannelBuffer data = frame.getBinaryData();
ChannelBuffer encoded = channel.getConfig().getBufferFactory()
.getBuffer(data.order(), data.readableBytes() + 2);
encoded.writeByte((byte) 0x00);
encoded.writeBytes(data, data.readerIndex(), data.readableBytes());
encoded.writeByte((byte) 0xFF);
return encoded;
} else if (frame instanceof CloseWebSocketFrame) {
// Close frame
ChannelBuffer data = frame.getBinaryData();
ChannelBuffer encoded = channel.getConfig().getBufferFactory().getBuffer(data.order(), 2);
encoded.writeByte((byte) 0xFF);
encoded.writeByte((byte) 0x00);
return encoded;
} else {
// Binary frame
ChannelBuffer data = frame.getBinaryData();
int dataLen = data.readableBytes();
ChannelBuffer encoded = channel.getConfig().getBufferFactory().getBuffer(data.order(), dataLen + 5);
public void encode(
ChannelOutboundHandlerContext<WebSocketFrame> ctx,
WebSocketFrame msg, ChannelBuffer out) throws Exception {
if (msg instanceof TextWebSocketFrame) {
// Text frame
ChannelBuffer data = msg.getBinaryData();
out.writeByte((byte) 0x00);
out.writeBytes(data, data.readerIndex(), data.readableBytes());
out.writeByte((byte) 0xFF);
} else if (msg instanceof CloseWebSocketFrame) {
// Close frame
out.writeByte((byte) 0xFF);
out.writeByte((byte) 0x00);
} else {
// Binary frame
ChannelBuffer data = msg.getBinaryData();
int dataLen = data.readableBytes();
out.ensureWritableBytes(dataLen + 5);
// Encode type.
encoded.writeByte((byte) 0x80);
// Encode type.
out.writeByte((byte) 0x80);
// Encode length.
int b1 = dataLen >>> 28 & 0x7F;
int b2 = dataLen >>> 14 & 0x7F;
int b3 = dataLen >>> 7 & 0x7F;
int b4 = dataLen & 0x7F;
if (b1 == 0) {
if (b2 == 0) {
if (b3 == 0) {
encoded.writeByte(b4);
} else {
encoded.writeByte(b3 | 0x80);
encoded.writeByte(b4);
}
// Encode length.
int b1 = dataLen >>> 28 & 0x7F;
int b2 = dataLen >>> 14 & 0x7F;
int b3 = dataLen >>> 7 & 0x7F;
int b4 = dataLen & 0x7F;
if (b1 == 0) {
if (b2 == 0) {
if (b3 == 0) {
out.writeByte(b4);
} else {
encoded.writeByte(b2 | 0x80);
encoded.writeByte(b3 | 0x80);
encoded.writeByte(b4);
out.writeByte(b3 | 0x80);
out.writeByte(b4);
}
} else {
encoded.writeByte(b1 | 0x80);
encoded.writeByte(b2 | 0x80);
encoded.writeByte(b3 | 0x80);
encoded.writeByte(b4);
out.writeByte(b2 | 0x80);
out.writeByte(b3 | 0x80);
out.writeByte(b4);
}
// Encode binary data.
encoded.writeBytes(data, data.readerIndex(), dataLen);
return encoded;
} else {
out.writeByte(b1 | 0x80);
out.writeByte(b2 | 0x80);
out.writeByte(b3 | 0x80);
out.writeByte(b4);
}
// Encode binary data.
out.writeBytes(data, data.readerIndex(), dataLen);
}
return msg;
}
}

View File

@ -55,9 +55,8 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -68,7 +67,7 @@ import io.netty.logging.InternalLoggerFactory;
* Decodes a web socket frame from wire protocol version 8 format. This code was forked from <a
* href="https://github.com/joewalnes/webbit">webbit</a> and modified.
*/
public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDecoder.State> {
public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocketFrame, WebSocket08FrameDecoder.State> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameDecoder.class);
@ -100,7 +99,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
/**
* Constructor
*
*
* @param maskedPayload
* Web socket servers must set this to true processed incoming masked payload. Client implementations
* must set this to false.
@ -114,23 +113,23 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, State state)
throws Exception {
public WebSocketFrame decode(
ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
// Discard all data received if closing handshake was received before.
if (receivedClosingHandshake) {
buffer.skipBytes(actualReadableBytes());
in.skipBytes(actualReadableBytes());
return null;
}
switch (state) {
switch (state()) {
case FRAME_START:
framePayloadBytesRead = 0;
framePayloadLength = -1;
framePayload = null;
// FIN, RSV, OPCODE
byte b = buffer.readByte();
byte b = in.readByte();
frameFinalFlag = (b & 0x80) != 0;
frameRsv = (b & 0x70) >> 4;
frameOpcode = b & 0x0F;
@ -140,36 +139,36 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
// MASK, PAYLOAD LEN 1
b = buffer.readByte();
b = in.readByte();
boolean frameMasked = (b & 0x80) != 0;
int framePayloadLen1 = b & 0x7F;
if (frameRsv != 0 && !allowExtensions) {
protocolViolation(channel, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
protocolViolation(ctx, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
return null;
}
if (maskedPayload && !frameMasked) {
protocolViolation(channel, "unmasked client to server frame");
protocolViolation(ctx, "unmasked client to server frame");
return null;
}
if (frameOpcode > 7) { // control frame (have MSB in opcode set)
// control frames MUST NOT be fragmented
if (!frameFinalFlag) {
protocolViolation(channel, "fragmented control frame");
protocolViolation(ctx, "fragmented control frame");
return null;
}
// control frames MUST have payload 125 octets or less
if (framePayloadLen1 > 125) {
protocolViolation(channel, "control frame with payload length > 125 octets");
protocolViolation(ctx, "control frame with payload length > 125 octets");
return null;
}
// check for reserved control frame opcodes
if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) {
protocolViolation(channel, "control frame using reserved opcode " + frameOpcode);
protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode);
return null;
}
@ -177,43 +176,43 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// body MUST be a 2-byte unsigned integer (in network byte
// order) representing a status code
if (frameOpcode == 8 && framePayloadLen1 == 1) {
protocolViolation(channel, "received close control frame with payload len 1");
protocolViolation(ctx, "received close control frame with payload len 1");
return null;
}
} else { // data frame
// check for reserved data frame opcodes
if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT || frameOpcode == OPCODE_BINARY)) {
protocolViolation(channel, "data frame using reserved opcode " + frameOpcode);
protocolViolation(ctx, "data frame using reserved opcode " + frameOpcode);
return null;
}
// check opcode vs message fragmentation state 1/2
if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {
protocolViolation(channel, "received continuation data frame outside fragmented message");
protocolViolation(ctx, "received continuation data frame outside fragmented message");
return null;
}
// check opcode vs message fragmentation state 2/2
if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
protocolViolation(channel, "received non-continuation data frame while inside fragmented message");
protocolViolation(ctx, "received non-continuation data frame while inside fragmented message");
return null;
}
}
// Read frame payload length
if (framePayloadLen1 == 126) {
framePayloadLength = buffer.readUnsignedShort();
framePayloadLength = in.readUnsignedShort();
if (framePayloadLength < 126) {
protocolViolation(channel, "invalid data frame length (not using minimal length encoding)");
protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
return null;
}
} else if (framePayloadLen1 == 127) {
framePayloadLength = buffer.readLong();
framePayloadLength = in.readLong();
// TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe
// just check if it's negative?
if (framePayloadLength < 65536) {
protocolViolation(channel, "invalid data frame length (not using minimal length encoding)");
protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
return null;
}
} else {
@ -227,7 +226,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
checkpoint(State.MASKING_KEY);
case MASKING_KEY:
if (maskedPayload) {
maskingKey = buffer.readBytes(4);
maskingKey = in.readBytes(4);
}
checkpoint(State.PAYLOAD);
case PAYLOAD:
@ -242,13 +241,13 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// framePayloadLength);
if (willHaveReadByteCount == framePayloadLength) {
// We have all our content so proceed to process
payloadBuffer = buffer.readBytes(rbytes);
payloadBuffer = in.readBytes(rbytes);
} else if (willHaveReadByteCount < framePayloadLength) {
// We don't have all our content so accumulate payload.
// Returning null means we will get called back
payloadBuffer = buffer.readBytes(rbytes);
payloadBuffer = in.readBytes(rbytes);
if (framePayload == null) {
framePayload = channel.getConfig().getBufferFactory().getBuffer(toFrameLength(framePayloadLength));
framePayload = ChannelBuffers.buffer(toFrameLength(framePayloadLength));
}
framePayload.writeBytes(payloadBuffer);
framePayloadBytesRead += rbytes;
@ -258,7 +257,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else if (willHaveReadByteCount > framePayloadLength) {
// We have more than what we need so read up to the end of frame
// Leave the remainder in the buffer for next frame
payloadBuffer = buffer.readBytes(toFrameLength(framePayloadLength - framePayloadBytesRead));
payloadBuffer = in.readBytes(toFrameLength(framePayloadLength - framePayloadBytesRead));
}
// Now we have all the data, the next checkpoint must be the next
@ -284,7 +283,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else if (frameOpcode == OPCODE_PONG) {
return new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_CLOSE) {
checkCloseFrameBody(channel, framePayload);
checkCloseFrameBody(ctx, framePayload);
receivedClosingHandshake = true;
return new CloseWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
}
@ -301,7 +300,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Check text for UTF8 correctness
if (frameOpcode == OPCODE_TEXT || fragmentedFramesText != null) {
// Check UTF-8 correctness for this payload
checkUTF8String(channel, framePayload.array());
checkUTF8String(ctx, framePayload.array());
// This does a second check to make sure UTF-8
// correctness for entire text message
@ -317,12 +316,12 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// First text or binary frame for a fragmented set
fragmentedFramesText = null;
if (frameOpcode == OPCODE_TEXT) {
checkUTF8String(channel, framePayload.array());
checkUTF8String(ctx, framePayload.array());
}
} else {
// Subsequent frames - only check if init frame is text
if (fragmentedFramesText != null) {
checkUTF8String(channel, framePayload.array());
checkUTF8String(ctx, framePayload.array());
}
}
@ -343,7 +342,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
case CORRUPT:
// If we don't keep reading Netty will throw an exception saying
// we can't return null if no bytes read and state not changed.
buffer.readByte();
in.readByte();
return null;
default:
throw new Error("Shouldn't reach here.");
@ -357,16 +356,15 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
}
private void protocolViolation(Channel channel, String reason) throws CorruptedFrameException {
private void protocolViolation(ChannelInboundHandlerContext<Byte> ctx, String reason) throws CorruptedFrameException {
checkpoint(State.CORRUPT);
if (channel.isConnected()) {
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
channel.close().awaitUninterruptibly();
if (ctx.channel().isActive()) {
ctx.flush().addListener(ChannelFutureListener.CLOSE);
}
throw new CorruptedFrameException(reason);
}
private int toFrameLength(long l) throws TooLongFrameException {
private static int toFrameLength(long l) throws TooLongFrameException {
if (l > Integer.MAX_VALUE) {
throw new TooLongFrameException("Length:" + l);
} else {
@ -374,7 +372,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
}
private void checkUTF8String(Channel channel, byte[] bytes) throws CorruptedFrameException {
private void checkUTF8String(ChannelInboundHandlerContext<Byte> ctx, byte[] bytes) throws CorruptedFrameException {
try {
// StringBuilder sb = new StringBuilder("UTF8 " + bytes.length +
// " bytes: ");
@ -389,16 +387,16 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
fragmentedFramesText.write(bytes);
}
} catch (UTF8Exception ex) {
protocolViolation(channel, "invalid UTF-8 bytes");
protocolViolation(ctx, "invalid UTF-8 bytes");
}
}
protected void checkCloseFrameBody(Channel channel, ChannelBuffer buffer) throws CorruptedFrameException {
protected void checkCloseFrameBody(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws CorruptedFrameException {
if (buffer == null || buffer.capacity() == 0) {
return;
}
if (buffer.capacity() == 1) {
protocolViolation(channel, "Invalid close frame body");
protocolViolation(ctx, "Invalid close frame body");
}
// Save reader index
@ -407,9 +405,9 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
// Must have 2 byte integer within the valid range
int statusCode = buffer.readShort();
if ((statusCode >= 0 && statusCode <= 999) || (statusCode >= 1004 && statusCode <= 1006)
|| (statusCode >= 1012 && statusCode <= 2999)) {
protocolViolation(channel, "Invalid close frame status code: " + statusCode);
if (statusCode >= 0 && statusCode <= 999 || statusCode >= 1004 && statusCode <= 1006
|| statusCode >= 1012 && statusCode <= 2999) {
protocolViolation(ctx, "Invalid close frame status code: " + statusCode);
}
// May have UTF-8 message
@ -419,10 +417,10 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
try {
new UTF8Output(b);
} catch (UTF8Exception ex) {
protocolViolation(channel, "Invalid close frame reason text. Invalid UTF-8 bytes");
protocolViolation(ctx, "Invalid close frame reason text. Invalid UTF-8 bytes");
}
}
// Restore reader index
buffer.readerIndex(idx);
}

View File

@ -53,24 +53,23 @@
package io.netty.handler.codec.http.websocketx;
import java.nio.ByteBuffer;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToStreamEncoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.oneone.OneToOneEncoder;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
/**
* <p>
* Encodes a web socket frame into wire protocol version 8 format. This code was forked from <a
* href="https://github.com/joewalnes/webbit">webbit</a> and modified.
* </p>
*/
public class WebSocket08FrameEncoder extends OneToOneEncoder {
public class WebSocket08FrameEncoder extends MessageToStreamEncoder<WebSocketFrame> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameEncoder.class);
@ -85,7 +84,7 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
/**
* Constructor
*
*
* @param maskPayload
* Web socket clients must set this to true to mask payload. Server implementations must set this to
* false.
@ -95,94 +94,83 @@ public class WebSocket08FrameEncoder extends OneToOneEncoder {
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
public void encode(ChannelOutboundHandlerContext<WebSocketFrame> ctx,
WebSocketFrame msg, ChannelBuffer out) throws Exception {
byte[] mask;
if (msg instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) msg;
ChannelBuffer data = frame.getBinaryData();
if (data == null) {
data = ChannelBuffers.EMPTY_BUFFER;
}
byte opcode;
if (frame instanceof TextWebSocketFrame) {
opcode = OPCODE_TEXT;
} else if (frame instanceof PingWebSocketFrame) {
opcode = OPCODE_PING;
} else if (frame instanceof PongWebSocketFrame) {
opcode = OPCODE_PONG;
} else if (frame instanceof CloseWebSocketFrame) {
opcode = OPCODE_CLOSE;
} else if (frame instanceof BinaryWebSocketFrame) {
opcode = OPCODE_BINARY;
} else if (frame instanceof ContinuationWebSocketFrame) {
opcode = OPCODE_CONT;
} else {
throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName());
}
int length = data.readableBytes();
if (logger.isDebugEnabled()) {
logger.debug("Encoding WebSocket Frame opCode=" + opcode + " length=" + length);
}
int b0 = 0;
if (frame.isFinalFragment()) {
b0 |= 1 << 7;
}
b0 |= frame.getRsv() % 8 << 4;
b0 |= opcode % 128;
ChannelBuffer header;
ChannelBuffer body;
if (opcode == OPCODE_PING && length > 125) {
throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was "
+ length);
}
int maskLength = maskPayload ? 4 : 0;
if (length <= 125) {
header = ChannelBuffers.buffer(2 + maskLength);
header.writeByte(b0);
byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
header.writeByte(b);
} else if (length <= 0xFFFF) {
header = ChannelBuffers.buffer(4 + maskLength);
header.writeByte(b0);
header.writeByte(maskPayload ? 0xFE : 126);
header.writeByte(length >>> 8 & 0xFF);
header.writeByte(length & 0xFF);
} else {
header = ChannelBuffers.buffer(10 + maskLength);
header.writeByte(b0);
header.writeByte(maskPayload ? 0xFF : 127);
header.writeLong(length);
}
// Write payload
if (maskPayload) {
Integer random = (int) (Math.random() * Integer.MAX_VALUE);
mask = ByteBuffer.allocate(4).putInt(random).array();
header.writeBytes(mask);
body = ChannelBuffers.buffer(length);
int counter = 0;
while (data.readableBytes() > 0) {
byte byteData = data.readByte();
body.writeByte(byteData ^ mask[+counter++ % 4]);
}
} else {
body = data;
}
return ChannelBuffers.wrappedBuffer(header, body);
WebSocketFrame frame = msg;
ChannelBuffer data = frame.getBinaryData();
if (data == null) {
data = ChannelBuffers.EMPTY_BUFFER;
}
// If not websocket, then just return the message
return msg;
}
byte opcode;
if (frame instanceof TextWebSocketFrame) {
opcode = OPCODE_TEXT;
} else if (frame instanceof PingWebSocketFrame) {
opcode = OPCODE_PING;
} else if (frame instanceof PongWebSocketFrame) {
opcode = OPCODE_PONG;
} else if (frame instanceof CloseWebSocketFrame) {
opcode = OPCODE_CLOSE;
} else if (frame instanceof BinaryWebSocketFrame) {
opcode = OPCODE_BINARY;
} else if (frame instanceof ContinuationWebSocketFrame) {
opcode = OPCODE_CONT;
} else {
throw new UnsupportedOperationException("Cannot encode frame of type: " + frame.getClass().getName());
}
int length = data.readableBytes();
if (logger.isDebugEnabled()) {
logger.debug("Encoding WebSocket Frame opCode=" + opcode + " length=" + length);
}
int b0 = 0;
if (frame.isFinalFragment()) {
b0 |= 1 << 7;
}
b0 |= frame.getRsv() % 8 << 4;
b0 |= opcode % 128;
if (opcode == OPCODE_PING && length > 125) {
throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was "
+ length);
}
int maskLength = maskPayload ? 4 : 0;
if (length <= 125) {
out.ensureWritableBytes(2 + maskLength + length);
out.writeByte(b0);
byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
out.writeByte(b);
} else if (length <= 0xFFFF) {
out.ensureWritableBytes(4 + maskLength + length);
out.writeByte(b0);
out.writeByte(maskPayload ? 0xFE : 126);
out.writeByte(length >>> 8 & 0xFF);
out.writeByte(length & 0xFF);
} else {
out.ensureWritableBytes(10 + maskLength + length);
out.writeByte(b0);
out.writeByte(maskPayload ? 0xFF : 127);
out.writeLong(length);
}
// Write payload
if (maskPayload) {
int random = (int) (Math.random() * Integer.MAX_VALUE);
mask = ByteBuffer.allocate(4).putInt(random).array();
out.writeInt((int) (Math.random() * Integer.MAX_VALUE));
int counter = 0;
for (int i = data.readerIndex(); i < data.writerIndex(); i ++) {
byte byteData = data.getByte(i);
out.writeByte(byteData ^ mask[+counter++ % 4]);
}
} else {
out.writeBytes(data, data.readerIndex(), data.readableBytes());
}
}
}

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
@ -52,7 +51,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
/**
* Constructor specifying the destination web socket location
*
*
* @param webSocketURL
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
* sent to this URL.
@ -70,11 +69,11 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* is really a rehash of <a href="http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76" >hixie-76</a> and
* <a href="http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75" >hixie-75</a>.
* </p>
*
*
* <p>
* Browser request to the server:
* </p>
*
*
* <pre>
* GET /demo HTTP/1.1
* Upgrade: WebSocket
@ -84,14 +83,14 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* Sec-WebSocket-Protocol: chat, sample
* Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5
* Sec-WebSocket-Key2: 12998 5 Y3 1 .P00
*
*
* ^n:ds[4U
* </pre>
*
*
* <p>
* Server response:
* </p>
*
*
* <pre>
* HTTP/1.1 101 WebSocket Protocol Handshake
* Upgrade: WebSocket
@ -99,10 +98,10 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* Sec-WebSocket-Origin: http://example.com
* Sec-WebSocket-Location: ws://example.com/demo
* Sec-WebSocket-Protocol: sample
*
*
* 8jKS'y:G*Co,Wxa-
* </pre>
*
*
* @param channel
* Channel
* @param req
@ -112,7 +111,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
public ChannelFuture handshake(Channel channel, HttpRequest req) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.getId()));
logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.id()));
}
// Serve the WebSocket handshake request.
@ -178,7 +177,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
/**
* Echo back the closing frame
*
*
* @param channel
* Channel
* @param frame

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -51,7 +50,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
/**
* Constructor specifying the destination web socket location
*
*
* @param webSocketURL
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
* sent to this URL.
@ -71,11 +70,11 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
* "http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-08">HyBi version 8 to 10</a>. Version 8, 9 and
* 10 share the same wire protocol.
* </p>
*
*
* <p>
* Browser request to the server:
* </p>
*
*
* <pre>
* GET /chat HTTP/1.1
* Host: server.example.com
@ -86,11 +85,11 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
* Sec-WebSocket-Protocol: chat, superchat
* Sec-WebSocket-Version: 8
* </pre>
*
*
* <p>
* Server response:
* </p>
*
*
* <pre>
* HTTP/1.1 101 Switching Protocols
* Upgrade: websocket
@ -98,7 +97,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
* Sec-WebSocket-Protocol: chat
* </pre>
*
*
* @param channel
* Channel
* @param req
@ -108,7 +107,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
public ChannelFuture handshake(Channel channel, HttpRequest req) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.getId()));
logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.id()));
}
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
@ -150,7 +149,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
/**
* Echo back the closing frame and close the connection
*
*
* @param channel
* Channel
* @param frame

View File

@ -15,21 +15,20 @@
*/
package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunkAggregator;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.CharsetUtil;
@ -52,7 +51,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
/**
* Constructor specifying the destination web socket location
*
*
* @param webSocketURL
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
* sent to this URL.
@ -72,11 +71,11 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
* "http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17">HyBi versions 13-17</a>. Versions 13-17
* share the same wire protocol.
* </p>
*
*
* <p>
* Browser request to the server:
* </p>
*
*
* <pre>
* GET /chat HTTP/1.1
* Host: server.example.com
@ -87,11 +86,11 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
* Sec-WebSocket-Protocol: chat, superchat
* Sec-WebSocket-Version: 13
* </pre>
*
*
* <p>
* Server response:
* </p>
*
*
* <pre>
* HTTP/1.1 101 Switching Protocols
* Upgrade: websocket
@ -99,7 +98,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
* Sec-WebSocket-Protocol: chat
* </pre>
*
*
* @param channel
* Channel
* @param req
@ -109,7 +108,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
public ChannelFuture handshake(Channel channel, HttpRequest req) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.getId()));
logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.id()));
}
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
@ -151,7 +150,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
/**
* Echo back the closing frame and close the connection
*
*
* @param channel
* Channel
* @param frame

View File

@ -16,8 +16,7 @@
package io.netty.handler.codec.rtsp;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.handler.codec.http.HttpChunkAggregator;
@ -74,10 +73,10 @@ public abstract class RtspMessageDecoder extends HttpMessageDecoder {
aggregator = new DecoderEmbedder<HttpMessage>(new HttpChunkAggregator(maxContentLength));
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer, State state) throws Exception {
Object o = super.decode(ctx, channel, buffer, state);
public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws Exception {
Object o = super.decode(ctx, buffer);
if (o != null && aggregator.offer(o)) {
return aggregator.poll();
} else {

View File

@ -16,9 +16,9 @@
package io.netty.handler.codec.rtsp;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMessageEncoder;
@ -39,12 +39,13 @@ public abstract class RtspMessageEncoder extends HttpMessageEncoder {
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg,
ChannelBuffer out) throws Exception {
// Ignore unrelated message types such as HttpChunk.
if (!(msg instanceof HttpMessage)) {
return msg;
throw new UnsupportedMessageTypeException(msg, HttpMessage.class);
}
return super.encode(ctx, channel, msg);
super.encode(ctx, msg, out);
}
}

View File

@ -15,21 +15,14 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.CombinedChannelHandler;
/**
* A combination of {@link SpdyFrameDecoder} and {@link SpdyFrameEncoder}.
* @apiviz.has io.netty.handler.codec.spdy.SpdyFrameDecoder
* @apiviz.has io.netty.handler.codec.spdy.SpdyFrameEncoder
*/
public class SpdyFrameCodec implements ChannelUpstreamHandler,
ChannelDownstreamHandler {
private final SpdyFrameDecoder decoder;
private final SpdyFrameEncoder encoder;
public class SpdyFrameCodec extends CombinedChannelHandler {
/**
* Creates a new instance with the default decoder and encoder options
@ -47,17 +40,8 @@ public class SpdyFrameCodec implements ChannelUpstreamHandler,
public SpdyFrameCodec(
int maxChunkSize, int maxFrameSize, int maxHeaderSize,
int compressionLevel, int windowBits, int memLevel) {
decoder = new SpdyFrameDecoder(maxChunkSize, maxFrameSize, maxHeaderSize);
encoder = new SpdyFrameEncoder(compressionLevel, windowBits, memLevel);
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
decoder.handleUpstream(ctx, e);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
encoder.handleDownstream(ctx, e);
super(
new SpdyFrameDecoder(maxChunkSize, maxFrameSize, maxHeaderSize),
new SpdyFrameEncoder(compressionLevel, windowBits, memLevel));
}
}

View File

@ -15,18 +15,16 @@
*/
package io.netty.handler.codec.spdy;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.FrameDecoder;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.StreamToMessageDecoder;
/**
* Decodes {@link ChannelBuffer}s into SPDY Data and Control Frames.
*/
public class SpdyFrameDecoder extends FrameDecoder {
public class SpdyFrameDecoder extends StreamToMessageDecoder<Object> {
private final int maxChunkSize;
private final int maxFrameSize;
@ -48,7 +46,6 @@ public class SpdyFrameDecoder extends FrameDecoder {
*/
public SpdyFrameDecoder(
int maxChunkSize, int maxFrameSize, int maxHeaderSize) {
super(true); // Enable unfold for data frames
if (maxChunkSize <= 0) {
throw new IllegalArgumentException(
"maxChunkSize must be a positive integer: " + maxChunkSize);
@ -67,32 +64,27 @@ public class SpdyFrameDecoder extends FrameDecoder {
}
@Override
protected Object decodeLast(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
throws Exception {
public Object decodeLast(ChannelInboundHandlerContext<Byte> ctx,
ChannelBuffer in) throws Exception {
try {
Object frame = decode(ctx, channel, buffer);
return frame;
return decode(ctx, in);
} finally {
headerBlockDecompressor.end();
}
}
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
throws Exception {
public Object decode(ChannelInboundHandlerContext<Byte> ctx,
ChannelBuffer in) throws Exception {
// Must read common header to determine frame length
if (buffer.readableBytes() < SPDY_HEADER_SIZE) {
if (in.readableBytes() < SPDY_HEADER_SIZE) {
return null;
}
// Get frame length from common header
int frameOffset = buffer.readerIndex();
int frameOffset = in.readerIndex();
int lengthOffset = frameOffset + SPDY_HEADER_LENGTH_OFFSET;
int dataLength = getUnsignedMedium(buffer, lengthOffset);
int dataLength = getUnsignedMedium(in, lengthOffset);
int frameLength = SPDY_HEADER_SIZE + dataLength;
// Throw exception if frameLength exceeds maxFrameSize
@ -102,37 +94,37 @@ public class SpdyFrameDecoder extends FrameDecoder {
}
// Wait until entire frame is readable
if (buffer.readableBytes() < frameLength) {
if (in.readableBytes() < frameLength) {
return null;
}
// Read common header fields
boolean control = (buffer.getByte(frameOffset) & 0x80) != 0;
boolean control = (in.getByte(frameOffset) & 0x80) != 0;
int flagsOffset = frameOffset + SPDY_HEADER_FLAGS_OFFSET;
byte flags = buffer.getByte(flagsOffset);
byte flags = in.getByte(flagsOffset);
if (control) {
// Decode control frame common header
int version = getUnsignedShort(buffer, frameOffset) & 0x7FFF;
int version = getUnsignedShort(in, frameOffset) & 0x7FFF;
// Spdy versioning spec is broken
if (version != SPDY_VERSION) {
buffer.skipBytes(frameLength);
in.skipBytes(frameLength);
throw new SpdyProtocolException(
"Unsupported version: " + version);
}
int typeOffset = frameOffset + SPDY_HEADER_TYPE_OFFSET;
int type = getUnsignedShort(buffer, typeOffset);
buffer.skipBytes(SPDY_HEADER_SIZE);
int type = getUnsignedShort(in, typeOffset);
in.skipBytes(SPDY_HEADER_SIZE);
int readerIndex = buffer.readerIndex();
buffer.skipBytes(dataLength);
return decodeControlFrame(type, flags, buffer.slice(readerIndex, dataLength));
int readerIndex = in.readerIndex();
in.skipBytes(dataLength);
return decodeControlFrame(type, flags, in.slice(readerIndex, dataLength));
} else {
// Decode data frame common header
int streamID = getUnsignedInt(buffer, frameOffset);
buffer.skipBytes(SPDY_HEADER_SIZE);
int streamID = getUnsignedInt(in, frameOffset);
in.skipBytes(SPDY_HEADER_SIZE);
// Generate data frames that do not exceed maxChunkSize
int numFrames = dataLength / maxChunkSize;
@ -144,7 +136,7 @@ public class SpdyFrameDecoder extends FrameDecoder {
int chunkSize = Math.min(maxChunkSize, dataLength);
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID);
spdyDataFrame.setCompressed((flags & SPDY_DATA_FLAG_COMPRESS) != 0);
spdyDataFrame.setData(buffer.readBytes(chunkSize));
spdyDataFrame.setData(in.readBytes(chunkSize));
dataLength -= chunkSize;
if (dataLength == 0) {
spdyDataFrame.setLast((flags & SPDY_DATA_FLAG_FIN) != 0);
@ -224,8 +216,8 @@ public class SpdyFrameDecoder extends FrameDecoder {
// Each ID/Value entry is 8 bytes
// The number of entries cannot exceed SPDY_MAX_LENGTH / 8;
int numEntries = getUnsignedInt(data, data.readerIndex());
if ((numEntries > (SPDY_MAX_LENGTH - 4) / 8) ||
(data.readableBytes() != numEntries * 8 + 4)) {
if (numEntries > (SPDY_MAX_LENGTH - 4) / 8 ||
data.readableBytes() != numEntries * 8 + 4) {
throw new SpdyProtocolException(
"Received invalid SETTINGS control frame");
}
@ -240,14 +232,14 @@ public class SpdyFrameDecoder extends FrameDecoder {
// Chromium Issue 79156
// SPDY setting ids are not written in network byte order
// Read id assuming the architecture is little endian
int ID = (data.readByte() & 0xFF) |
int ID = data.readByte() & 0xFF |
(data.readByte() & 0xFF) << 8 |
(data.readByte() & 0xFF) << 16;
byte ID_flags = data.readByte();
int value = getSignedInt(data, data.readerIndex());
data.skipBytes(4);
if (!(spdySettingsFrame.isSet(ID))) {
if (!spdySettingsFrame.isSet(ID)) {
boolean persistVal = (ID_flags & SPDY_SETTINGS_PERSIST_VALUE) != 0;
boolean persisted = (ID_flags & SPDY_SETTINGS_PERSISTED) != 0;
spdySettingsFrame.setValue(ID, value, persistVal, persisted);
@ -321,8 +313,8 @@ public class SpdyFrameDecoder extends FrameDecoder {
private void decodeHeaderBlock(SpdyHeaderBlock headerFrame, ChannelBuffer headerBlock)
throws Exception {
if ((headerBlock.readableBytes() == 2) &&
(headerBlock.getShort(headerBlock.readerIndex()) == 0)) {
if (headerBlock.readableBytes() == 2 &&
headerBlock.getShort(headerBlock.readerIndex()) == 0) {
return;
}

View File

@ -16,22 +16,20 @@
package io.netty.handler.codec.spdy;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToStreamEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import java.nio.ByteOrder;
import java.util.Set;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.handler.codec.oneone.OneToOneEncoder;
/**
* Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}.
*/
public class SpdyFrameEncoder extends OneToOneEncoder {
public class SpdyFrameEncoder extends MessageToStreamEncoder<Object> {
private volatile boolean finished;
private final SpdyHeaderBlockCompressor headerBlockCompressor;
@ -52,48 +50,46 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
headerBlockCompressor = SpdyHeaderBlockCompressor.newInstance(compressionLevel, windowBits, memLevel);
}
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.state()) {
case OPEN:
case CONNECTED:
case BOUND:
if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
synchronized (headerBlockCompressor) {
finished = true;
headerBlockCompressor.end();
}
}
}
}
super.handleDownstream(ctx, evt);
public void disconnect(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
finish();
super.disconnect(ctx, future);
}
@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
public void close(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
finish();
super.close(ctx, future);
}
private void finish() {
synchronized (headerBlockCompressor) {
if (!finished) {
finished = true;
headerBlockCompressor.end();
}
}
}
@Override
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg,
ChannelBuffer out) throws Exception {
if (msg instanceof SpdyDataFrame) {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
ChannelBuffer data = spdyDataFrame.getData();
byte flags = spdyDataFrame.isLast() ? SPDY_DATA_FLAG_FIN : 0;
if (spdyDataFrame.isCompressed()) {
flags |= SPDY_DATA_FLAG_COMPRESS;
}
ChannelBuffer header = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE);
header.writeInt(spdyDataFrame.getStreamID() & 0x7FFFFFFF);
header.writeByte(flags);
header.writeMedium(data.readableBytes());
return ChannelBuffers.wrappedBuffer(header, data);
out.ensureWritableBytes(SPDY_HEADER_SIZE + data.readableBytes());
out.writeInt(spdyDataFrame.getStreamID() & 0x7FFFFFFF);
out.writeByte(flags);
out.writeMedium(data.readableBytes());
out.writeBytes(data, data.readerIndex(), data.readableBytes());
} else if (msg instanceof SpdySynStreamFrame) {
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
ChannelBuffer data = compressHeaderBlock(
encodeHeaderBlock(spdySynStreamFrame));
@ -102,70 +98,60 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
flags |= SPDY_FLAG_UNIDIRECTIONAL;
}
int headerBlockLength = data.readableBytes();
int length = (headerBlockLength == 0) ? 12 : 10 + headerBlockLength;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_SYN_STREAM_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdySynStreamFrame.getStreamID());
frame.writeInt(spdySynStreamFrame.getAssociatedToStreamID());
frame.writeShort(((short) spdySynStreamFrame.getPriority()) << 14);
int length = headerBlockLength == 0 ? 12 : 10 + headerBlockLength;
out.ensureWritableBytes(SPDY_HEADER_SIZE + length + data.readableBytes());
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_SYN_STREAM_FRAME);
out.writeByte(flags);
out.writeMedium(length);
out.writeInt(spdySynStreamFrame.getStreamID());
out.writeInt(spdySynStreamFrame.getAssociatedToStreamID());
out.writeShort(spdySynStreamFrame.getPriority() << 14);
if (data.readableBytes() == 0) {
frame.writeShort(0);
out.writeShort(0);
} else {
out.writeBytes(data, data.readerIndex(), data.readableBytes());
}
return ChannelBuffers.wrappedBuffer(frame, data);
} else if (msg instanceof SpdySynReplyFrame) {
SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
ChannelBuffer data = compressHeaderBlock(
encodeHeaderBlock(spdySynReplyFrame));
byte flags = spdySynReplyFrame.isLast() ? SPDY_FLAG_FIN : 0;
int headerBlockLength = data.readableBytes();
int length = (headerBlockLength == 0) ? 8 : 6 + headerBlockLength;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_SYN_REPLY_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdySynReplyFrame.getStreamID());
int length = headerBlockLength == 0 ? 8 : 6 + headerBlockLength;
out.ensureWritableBytes(SPDY_HEADER_SIZE + length + data.readableBytes());
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_SYN_REPLY_FRAME);
out.writeByte(flags);
out.writeMedium(length);
out.writeInt(spdySynReplyFrame.getStreamID());
if (data.readableBytes() == 0) {
frame.writeInt(0);
out.writeInt(0);
} else {
frame.writeShort(0);
out.writeShort(0);
out.writeBytes(data, data.readerIndex(), data.readableBytes());
}
return ChannelBuffers.wrappedBuffer(frame, data);
} else if (msg instanceof SpdyRstStreamFrame) {
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 8);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_RST_STREAM_FRAME);
frame.writeInt(8);
frame.writeInt(spdyRstStreamFrame.getStreamID());
frame.writeInt(spdyRstStreamFrame.getStatus().getCode());
return frame;
out.ensureWritableBytes(SPDY_HEADER_SIZE + 8);
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_RST_STREAM_FRAME);
out.writeInt(8);
out.writeInt(spdyRstStreamFrame.getStreamID());
out.writeInt(spdyRstStreamFrame.getStatus().getCode());
} else if (msg instanceof SpdySettingsFrame) {
SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
byte flags = spdySettingsFrame.clearPreviouslyPersistedSettings() ?
SPDY_SETTINGS_CLEAR : 0;
Set<Integer> IDs = spdySettingsFrame.getIDs();
int numEntries = IDs.size();
int length = 4 + numEntries * 8;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_SETTINGS_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(numEntries);
out.ensureWritableBytes(SPDY_HEADER_SIZE + length);
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_SETTINGS_FRAME);
out.writeByte(flags);
out.writeMedium(length);
out.writeInt(numEntries);
for (Integer ID: IDs) {
int id = ID.intValue();
byte ID_flags = (byte) 0;
@ -178,69 +164,53 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
// Chromium Issue 79156
// SPDY setting ids are not written in network byte order
// Write id assuming the architecture is little endian
frame.writeByte((id >> 0) & 0xFF);
frame.writeByte((id >> 8) & 0xFF);
frame.writeByte((id >> 16) & 0xFF);
frame.writeByte(ID_flags);
frame.writeInt(spdySettingsFrame.getValue(id));
out.writeByte(id >> 0 & 0xFF);
out.writeByte(id >> 8 & 0xFF);
out.writeByte(id >> 16 & 0xFF);
out.writeByte(ID_flags);
out.writeInt(spdySettingsFrame.getValue(id));
}
return frame;
} else if (msg instanceof SpdyNoOpFrame) {
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_NOOP_FRAME);
frame.writeInt(0);
return frame;
out.ensureWritableBytes(SPDY_HEADER_SIZE);
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_NOOP_FRAME);
out.writeInt(0);
} else if (msg instanceof SpdyPingFrame) {
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 4);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_PING_FRAME);
frame.writeInt(4);
frame.writeInt(spdyPingFrame.getID());
return frame;
out.ensureWritableBytes(SPDY_HEADER_SIZE + 4);
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_PING_FRAME);
out.writeInt(4);
out.writeInt(spdyPingFrame.getID());
} else if (msg instanceof SpdyGoAwayFrame) {
SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 4);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_GOAWAY_FRAME);
frame.writeInt(4);
frame.writeInt(spdyGoAwayFrame.getLastGoodStreamID());
return frame;
out.ensureWritableBytes(SPDY_HEADER_SIZE + 4);
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_GOAWAY_FRAME);
out.writeInt(4);
out.writeInt(spdyGoAwayFrame.getLastGoodStreamID());
} else if (msg instanceof SpdyHeadersFrame) {
SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
ChannelBuffer data = compressHeaderBlock(
encodeHeaderBlock(spdyHeadersFrame));
int headerBlockLength = data.readableBytes();
int length = (headerBlockLength == 0) ? 4 : 6 + headerBlockLength;
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(SPDY_VERSION | 0x8000);
frame.writeShort(SPDY_HEADERS_FRAME);
frame.writeInt(length);
frame.writeInt(spdyHeadersFrame.getStreamID());
int length = headerBlockLength == 0 ? 4 : 6 + headerBlockLength;
out.ensureWritableBytes(SPDY_HEADER_SIZE + length + data.readableBytes());
out.writeShort(SPDY_VERSION | 0x8000);
out.writeShort(SPDY_HEADERS_FRAME);
out.writeInt(length);
out.writeInt(spdyHeadersFrame.getStreamID());
if (data.readableBytes() != 0) {
frame.writeShort(0);
out.writeShort(0);
out.writeBytes(data, data.readerIndex(), data.readableBytes());
}
return ChannelBuffers.wrappedBuffer(frame, data);
} else {
// Unknown message type
throw new UnsupportedMessageTypeException(msg);
}
// Unknown message type
return msg;
}
private ChannelBuffer encodeHeaderBlock(SpdyHeaderBlock headerFrame)
private static ChannelBuffer encodeHeaderBlock(SpdyHeaderBlock headerFrame)
throws Exception {
Set<String> names = headerFrame.getHeaderNames();
int numHeaders = names.size();

View File

@ -15,35 +15,19 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.CombinedChannelHandler;
/**
* A combination of {@link SpdyHttpDecoder} and {@link SpdyHttpEncoder}
* @apiviz.has io.netty.handler.codec.sdpy.SpdyHttpDecoder
* @apiviz.has io.netty.handler.codec.spdy.SpdyHttpEncoder
*/
public class SpdyHttpCodec implements ChannelUpstreamHandler, ChannelDownstreamHandler {
private final SpdyHttpDecoder decoder;
private final SpdyHttpEncoder encoder = new SpdyHttpEncoder();
public class SpdyHttpCodec extends CombinedChannelHandler {
/**
* Creates a new instance with the specified decoder options.
*/
public SpdyHttpCodec(int maxContentLength) {
decoder = new SpdyHttpDecoder(maxContentLength);
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
decoder.handleUpstream(ctx, e);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
encoder.handleDownstream(ctx, e);
super(new SpdyHttpDecoder(maxContentLength), new SpdyHttpEncoder());
}
}

View File

@ -15,15 +15,10 @@
*/
package io.netty.handler.codec.spdy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
@ -34,13 +29,16 @@ import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.oneone.OneToOneDecoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Decodes {@link SpdySynStreamFrame}s, {@link SpdySynReplyFrame}s,
* and {@link SpdyDataFrame}s into {@link HttpRequest}s and {@link HttpResponse}s.
*/
public class SpdyHttpDecoder extends OneToOneDecoder {
public class SpdyHttpDecoder extends MessageToMessageDecoder<Object, Object> {
private final int maxContentLength;
private final Map<Integer, HttpMessage> messageMap = new HashMap<Integer, HttpMessage>();
@ -61,8 +59,9 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
this.maxContentLength = maxContentLength;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
public Object decode(ChannelInboundHandlerContext<Object> ctx, Object msg)
throws Exception {
if (msg instanceof SpdySynStreamFrame) {
@ -80,7 +79,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
if (associatedToStreamID == 0) {
SpdyRstStreamFrame spdyRstStreamFrame =
new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.INVALID_STREAM);
Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame);
ctx.write(spdyRstStreamFrame);
}
String URL = SpdyHeaders.getUrl(spdySynStreamFrame);
@ -90,7 +89,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
if (URL == null) {
SpdyRstStreamFrame spdyRstStreamFrame =
new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.PROTOCOL_ERROR);
Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame);
ctx.write(spdyRstStreamFrame);
}
try {
@ -112,7 +111,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
} catch (Exception e) {
SpdyRstStreamFrame spdyRstStreamFrame =
new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.PROTOCOL_ERROR);
Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame);
ctx.write(spdyRstStreamFrame);
}
} else {
@ -137,7 +136,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
spdySynReplyFrame.setLast(true);
SpdyHeaders.setStatus(spdySynReplyFrame, HttpResponseStatus.BAD_REQUEST);
SpdyHeaders.setVersion(spdySynReplyFrame, HttpVersion.HTTP_1_0);
Channels.write(ctx, Channels.future(channel), spdySynReplyFrame);
ctx.write(spdySynReplyFrame);
}
}
@ -164,7 +163,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
// the client must reply with a RST_STREAM frame indicating a PROTOCOL_ERROR
SpdyRstStreamFrame spdyRstStreamFrame =
new DefaultSpdyRstStreamFrame(streamID, SpdyStreamStatus.PROTOCOL_ERROR);
Channels.write(ctx, Channels.future(channel), spdyRstStreamFrame);
ctx.write(spdyRstStreamFrame);
}
} else if (msg instanceof SpdyHeadersFrame) {
@ -203,8 +202,9 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
}
if (content == ChannelBuffers.EMPTY_BUFFER) {
content = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
content.writeBytes(spdyDataFrame.getData());
ChannelBuffer data = spdyDataFrame.getData();
content = ChannelBuffers.dynamicBuffer(data.readableBytes());
content.writeBytes(data, data.readerIndex(), data.readableBytes());
httpMessage.setContent(content);
} else {
content.writeBytes(spdyDataFrame.getData());
@ -220,7 +220,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
return null;
}
private HttpRequest createHttpRequest(SpdyHeaderBlock requestFrame)
private static HttpRequest createHttpRequest(SpdyHeaderBlock requestFrame)
throws Exception {
// Create the first line of the request from the name/value pairs
HttpMethod method = SpdyHeaders.getMethod(requestFrame);
@ -250,7 +250,7 @@ public class SpdyHttpDecoder extends OneToOneDecoder {
return httpRequest;
}
private HttpResponse createHttpResponse(SpdyHeaderBlock responseFrame)
private static HttpResponse createHttpResponse(SpdyHeaderBlock responseFrame)
throws Exception {
// Create the first line of the response from the name/value pairs
HttpResponseStatus status = SpdyHeaders.getStatus(responseFrame);

View File

@ -15,16 +15,9 @@
*/
package io.netty.handler.codec.spdy;
import java.util.List;
import java.util.Map;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpChunk;
import io.netty.handler.codec.http.HttpChunkTrailer;
import io.netty.handler.codec.http.HttpHeaders;
@ -32,6 +25,9 @@ import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import java.util.List;
import java.util.Map;
/**
* Encodes {@link HttpRequest}s, {@link HttpResponse}s, and {@link HttpChunk}s
* into {@link SpdySynStreamFrame}s and {@link SpdySynReplyFrame}s.
@ -110,44 +106,31 @@ import io.netty.handler.codec.http.HttpResponse;
* All pushed resources should be sent before sending the response
* that corresponds to the initial request.
*/
public class SpdyHttpEncoder implements ChannelDownstreamHandler {
public class SpdyHttpEncoder extends MessageToMessageEncoder<Object, Object> {
private volatile int currentStreamID;
public SpdyHttpEncoder() {
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
@Override
public Object encode(ChannelOutboundHandlerContext<Object> ctx, Object msg)
throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object msg = e.getMessage();
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpRequest);
int streamID = spdySynStreamFrame.getStreamID();
ChannelFuture future = getContentFuture(ctx, e, streamID, httpRequest);
Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress());
return new Object[] { spdySynStreamFrame, dataFrame(streamID, httpRequest) };
} else if (msg instanceof HttpResponse) {
HttpResponse httpResponse = (HttpResponse) msg;
if (httpResponse.containsHeader(SpdyHttpHeaders.Names.ASSOCIATED_TO_STREAM_ID)) {
SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpResponse);
int streamID = spdySynStreamFrame.getStreamID();
ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse);
Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress());
return new Object[] { spdySynStreamFrame, dataFrame(streamID, httpResponse) };
} else {
SpdySynReplyFrame spdySynReplyFrame = createSynReplyFrame(httpResponse);
int streamID = spdySynReplyFrame.getStreamID();
ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse);
Channels.write(ctx, future, spdySynReplyFrame, e.getRemoteAddress());
return new Object[] { spdySynReplyFrame, dataFrame(streamID, httpResponse) };
}
} else if (msg instanceof HttpChunk) {
@ -161,7 +144,7 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
List<Map.Entry<String, String>> trailers = trailer.getHeaders();
if (trailers.isEmpty()) {
Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
return spdyDataFrame;
} else {
// Create SPDY HEADERS frame out of trailers
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(currentStreamID);
@ -170,23 +153,20 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
}
// Write HEADERS frame and append Data Frame
ChannelFuture future = Channels.future(e.channel());
future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame));
Channels.write(ctx, future, spdyHeadersFrame, e.getRemoteAddress());
return new Object[] { spdyHeadersFrame, spdyDataFrame };
}
} else {
Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
return spdyDataFrame;
}
} else {
// Unknown message type
ctx.sendDownstream(evt);
throw new UnsupportedMessageTypeException();
}
}
private ChannelFuture getContentFuture(
ChannelHandlerContext ctx, MessageEvent e, int streamID, HttpMessage httpMessage) {
private static SpdyDataFrame dataFrame(int streamID, HttpMessage httpMessage) {
if (httpMessage.getContent().readableBytes() == 0) {
return e.getFuture();
return null;
}
// Create SPDY Data Frame out of message content
@ -194,34 +174,7 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
spdyDataFrame.setData(httpMessage.getContent());
spdyDataFrame.setLast(true);
// Create new future and add listener
ChannelFuture future = Channels.future(e.channel());
future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame));
return future;
}
private class SpdyFrameWriter implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final MessageEvent e;
private final Object spdyFrame;
SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e, Object spdyFrame) {
this.ctx = ctx;
this.e = e;
this.spdyFrame = spdyFrame;
}
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channels.write(ctx, e.getFuture(), spdyFrame, e.getRemoteAddress());
} else if (future.isCancelled()) {
e.getFuture().cancel();
} else {
e.getFuture().setFailure(future.cause());
}
}
return spdyDataFrame;
}
private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage)

View File

@ -15,12 +15,14 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.handler.codec.CodecException;
/**
* An {@link Exception} which is thrown when the received frame cannot
* be decoded by the {@link SpdyFrameDecoder}.
* @apiviz.exclude
*/
public class SpdyProtocolException extends Exception {
public class SpdyProtocolException extends CodecException {
private static final long serialVersionUID = -1097979786367505658L;

View File

@ -53,7 +53,7 @@ final class SpdySession {
public boolean isRemoteSideClosed(int streamID) {
StreamState state = activeStreams.get(new Integer(streamID));
return (state == null) || state.isRemoteSideClosed();
return state == null || state.isRemoteSideClosed();
}
public void closeRemoteSide(int streamID) {
@ -69,7 +69,7 @@ final class SpdySession {
public boolean isLocalSideClosed(int streamID) {
StreamState state = activeStreams.get(new Integer(streamID));
return (state == null) || state.isLocalSideClosed();
return state == null || state.isLocalSideClosed();
}
public void closeLocalSide(int streamID) {
@ -85,7 +85,7 @@ final class SpdySession {
public boolean hasReceivedReply(int streamID) {
StreamState state = activeStreams.get(new Integer(streamID));
return (state != null) && state.hasReceivedReply();
return state != null && state.hasReceivedReply();
}
public void receivedReply(int streamID) {

View File

@ -15,26 +15,22 @@
*/
package io.netty.handler.codec.spdy;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Manages streams within a SPDY session.
*/
public class SpdySessionHandler extends SimpleChannelUpstreamHandler
implements ChannelDownstreamHandler {
public class SpdySessionHandler extends ChannelHandlerAdapter<Object, Object> {
private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
@ -68,10 +64,36 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelOutboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelInboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
Queue<Object> in = ctx.in().messageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
handleInboundMessage(ctx, msg);
}
ctx.fireInboundBufferUpdated();
}
private void handleInboundMessage(ChannelInboundHandlerContext<Object> ctx, Object msg)
throws Exception {
Object msg = e.getMessage();
if (msg instanceof SpdyDataFrame) {
/*
@ -99,14 +121,14 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Check if we received a data frame for a Stream-ID which is not open
if (spdySession.isRemoteSideClosed(streamID)) {
if (!sentGoAwayFrame) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM);
issueStreamError(ctx, streamID, SpdyStreamStatus.INVALID_STREAM);
}
return;
}
// Check if we received a data frame before receiving a SYN_REPLY
if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -134,13 +156,13 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
if (spdySynStreamFrame.isInvalid() ||
!isRemoteInitiatedID(streamID) ||
spdySession.isActiveStream(streamID)) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
// Stream-IDs must be monotonically increassing
if (streamID < lastGoodStreamID) {
issueSessionError(ctx, e.channel(), e.getRemoteAddress());
issueSessionError(ctx);
return;
}
@ -148,7 +170,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
boolean remoteSideClosed = spdySynStreamFrame.isLast();
boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
if (!acceptStream(streamID, remoteSideClosed, localSideClosed)) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.REFUSED_STREAM);
issueStreamError(ctx, streamID, SpdyStreamStatus.REFUSED_STREAM);
return;
}
@ -168,13 +190,13 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
if (spdySynReplyFrame.isInvalid() ||
isRemoteInitiatedID(streamID) ||
spdySession.isRemoteSideClosed(streamID)) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM);
issueStreamError(ctx, streamID, SpdyStreamStatus.INVALID_STREAM);
return;
}
// Check if we have received multiple frames for the same Stream-ID
if (spdySession.hasReceivedReply(streamID)) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -217,12 +239,12 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
*/
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getID())) {
Channels.write(ctx, Channels.future(e.channel()), spdyPingFrame, e.getRemoteAddress());
ctx.write(spdyPingFrame);
return;
}
// Note: only checks that there are outstanding pings since uniqueness is not inforced
if (pings.get() == 0) {
return;
@ -240,48 +262,69 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
// Check if we received a valid HEADERS frame
if (spdyHeadersFrame.isInvalid()) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
if (spdySession.isRemoteSideClosed(streamID)) {
issueStreamError(ctx, e, streamID, SpdyStreamStatus.INVALID_STREAM);
issueStreamError(ctx, streamID, SpdyStreamStatus.INVALID_STREAM);
return;
}
}
super.messageReceived(ctx, e);
ctx.nextIn().messageBuffer().add(msg);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.state()) {
case OPEN:
case CONNECTED:
case BOUND:
if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
sendGoAwayFrame(ctx, e);
return;
}
@Override
public void disconnect(final ChannelOutboundHandlerContext<Object> ctx,
final ChannelFuture future) throws Exception {
sendGoAwayFrame(ctx).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
ctx.disconnect(future);
}
}
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
});
}
@Override
public void close(final ChannelOutboundHandlerContext<Object> ctx,
final ChannelFuture future) throws Exception {
sendGoAwayFrame(ctx).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
ctx.close(future);
}
});
}
@Override
public void flush(ChannelOutboundHandlerContext<Object> ctx,
ChannelFuture future) throws Exception {
Queue<Object> in = ctx.prevOut().messageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
handleOutboundMessage(ctx, msg);
}
MessageEvent e = (MessageEvent) evt;
Object msg = e.getMessage();
ctx.flush(future);
}
private void handleOutboundMessage(ChannelOutboundHandlerContext<Object> ctx, Object msg)
throws Exception {
if (msg instanceof SpdyDataFrame) {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
int streamID = spdyDataFrame.getStreamID();
if (spdySession.isLocalSideClosed(streamID)) {
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -292,10 +335,11 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} else if (msg instanceof SpdySynStreamFrame) {
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
int streamID = spdySynStreamFrame.getStreamID();
boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
boolean localSideClosed = spdySynStreamFrame.isLast();
if (!acceptStream(spdySynStreamFrame.getStreamID(), remoteSideClosed, localSideClosed)) {
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
if (!acceptStream(streamID, remoteSideClosed, localSideClosed)) {
issueStreamError(ctx, streamID, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -305,7 +349,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int streamID = spdySynReplyFrame.getStreamID();
if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
ctx.fireExceptionCaught(PROTOCOL_EXCEPTION);
return;
}
@ -327,7 +371,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getID())) {
e.getFuture().setFailure(new IllegalArgumentException(
ctx.fireExceptionCaught(new IllegalArgumentException(
"invalid PING ID: " + spdyPingFrame.getID()));
return;
}
@ -336,7 +380,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} else if (msg instanceof SpdyGoAwayFrame) {
// Should send a CLOSE ChannelStateEvent
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
ctx.fireExceptionCaught(PROTOCOL_EXCEPTION);
return;
} else if (msg instanceof SpdyHeadersFrame) {
@ -345,33 +389,30 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
int streamID = spdyHeadersFrame.getStreamID();
if (spdySession.isLocalSideClosed(streamID)) {
e.getFuture().setFailure(PROTOCOL_EXCEPTION);
ctx.fireExceptionCaught(PROTOCOL_EXCEPTION);
return;
}
}
ctx.sendDownstream(evt);
ctx.out().messageBuffer().add(msg);
}
/*
* Error Handling
*/
private void issueSessionError(
ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress) {
ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress);
future.addListener(ChannelFutureListener.CLOSE);
private void issueSessionError(ChannelHandlerContext ctx) {
sendGoAwayFrame(ctx).addListener(ChannelFutureListener.CLOSE);
}
// Send a RST_STREAM frame in response to an incoming MessageEvent
// Only called in the upstream direction
private void issueStreamError(
ChannelHandlerContext ctx, MessageEvent e, int streamID, SpdyStreamStatus status) {
ChannelHandlerContext ctx, int streamID, SpdyStreamStatus status) {
removeStream(streamID);
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
Channels.write(ctx, Channels.future(e.channel()), spdyRstStreamFrame, e.getRemoteAddress());
ctx.write(spdyRstStreamFrame);
}
/*
@ -380,7 +421,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
private boolean isRemoteInitiatedID(int ID) {
boolean serverID = SpdyCodecUtil.isServerID(ID);
return (server && !serverID) || (!server && serverID);
return server && !serverID || !server && serverID;
}
private synchronized void updateConcurrentStreams(SpdySettingsFrame settings, boolean remote) {
@ -416,8 +457,8 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
if (receivedGoAwayFrame || sentGoAwayFrame) {
return false;
}
if ((maxConcurrentStreams != 0) &&
(spdySession.numActiveStreams() >= maxConcurrentStreams)) {
if (maxConcurrentStreams != 0 &&
spdySession.numActiveStreams() >= maxConcurrentStreams) {
return false;
}
spdySession.acceptStream(streamID, remoteSideClosed, localSideClosed);
@ -433,61 +474,23 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
} else {
spdySession.closeLocalSide(streamID);
}
if ((closeSessionFuture != null) && spdySession.noActiveStreams()) {
if (closeSessionFuture != null && spdySession.noActiveStreams()) {
closeSessionFuture.setSuccess();
}
}
private void removeStream(int streamID) {
spdySession.removeStream(streamID);
if ((closeSessionFuture != null) && spdySession.noActiveStreams()) {
if (closeSessionFuture != null && spdySession.noActiveStreams()) {
closeSessionFuture.setSuccess();
}
}
private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
// Avoid NotYetConnectedException
if (!e.channel().isConnected()) {
ctx.sendDownstream(e);
return;
}
ChannelFuture future = sendGoAwayFrame(ctx, e.channel(), null);
if (spdySession.noActiveStreams()) {
future.addListener(new ClosingChannelFutureListener(ctx, e));
} else {
closeSessionFuture = Channels.future(e.channel());
closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
}
}
private synchronized ChannelFuture sendGoAwayFrame(
ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress) {
private synchronized ChannelFuture sendGoAwayFrame(ChannelHandlerContext ctx) {
if (!sentGoAwayFrame) {
sentGoAwayFrame = true;
ChannelFuture future = Channels.future(channel);
Channels.write(ctx, future, new DefaultSpdyGoAwayFrame(lastGoodStreamID));
return future;
}
return Channels.succeededFuture(channel);
}
private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelStateEvent e;
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
this.ctx = ctx;
this.e = e;
}
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
if (!(sentGoAwayFuture.cause() instanceof ClosedChannelException)) {
Channels.close(ctx, e.getFuture());
} else {
e.getFuture().setSuccess();
}
return ctx.write(new DefaultSpdyGoAwayFrame(lastGoodStreamID));
}
return ctx.newSucceededFuture();
}
}

View File

@ -15,10 +15,9 @@
*/
package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.replay;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import static org.easymock.EasyMock.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
@ -41,43 +40,43 @@ import org.junit.Assert;
import org.junit.Test;
public class WebSocketServerHandshaker00Test {
private DefaultChannelPipeline createPipeline() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
private static DefaultChannelPipeline createPipeline(Channel ch) {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(ch);
pipeline.addLast("chunkAggregator", new HttpChunkAggregator(42));
pipeline.addLast("wsdecoder", new HttpRequestDecoder());
pipeline.addLast("wsencoder", new HttpResponseEncoder());
return pipeline;
}
@Test
public void testPerformOpeningHandshake() {
Channel channelMock = EasyMock.createMock(Channel.class);
DefaultChannelPipeline pipeline = createPipeline();
DefaultChannelPipeline pipeline = createPipeline(channelMock);
EasyMock.expect(channelMock.pipeline()).andReturn(pipeline);
// capture the http response in order to verify the headers
Capture<HttpResponse> res = new Capture<HttpResponse>();
EasyMock.expect(channelMock.write(capture(res))).andReturn(new DefaultChannelFuture(channelMock, true));
replay(channelMock);
HttpRequest req = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat");
req.setHeader(Names.HOST, "server.example.com");
req.setHeader(Names.UPGRADE, WEBSOCKET.toLowerCase());
req.setHeader(Names.CONNECTION, "Upgrade");
req.setHeader(Names.ORIGIN, "http://example.com");
req.setHeader(Names.SEC_WEBSOCKET_KEY1, "4 @1 46546xW%0l 1 5");
req.setHeader(Names.SEC_WEBSOCKET_KEY2, "12998 5 Y3 1 .P00");
req.setHeader(Names.SEC_WEBSOCKET_KEY2, "12998 5 Y3 1 .P00");
req.setHeader(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
ChannelBuffer buffer = ChannelBuffers.copiedBuffer("^n:ds[4U", Charset.defaultCharset());
req.setContent(buffer);
WebSocketServerHandshaker00 handsaker = new WebSocketServerHandshaker00("ws://example.com/chat", "chat");
handsaker.handshake(channelMock, req);
Assert.assertEquals("ws://example.com/chat", res.getValue().getHeader(Names.SEC_WEBSOCKET_LOCATION));
Assert.assertEquals("chat", res.getValue().getHeader(Names.SEC_WEBSOCKET_PROTOCOL));
Assert.assertEquals("8jKS'y:G*Co,Wxa-", res.getValue().getContent().toString(Charset.defaultCharset()));

View File

@ -15,10 +15,9 @@
*/
package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.replay;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import static org.easymock.EasyMock.*;
import io.netty.channel.Channel;
import io.netty.channel.DefaultChannelFuture;
import io.netty.channel.DefaultChannelPipeline;
@ -37,28 +36,28 @@ import org.junit.Assert;
import org.junit.Test;
public class WebSocketServerHandshaker08Test {
private DefaultChannelPipeline createPipeline() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
private static DefaultChannelPipeline createPipeline(Channel ch) {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(ch);
pipeline.addLast("chunkAggregator", new HttpChunkAggregator(42));
pipeline.addLast("requestDecoder", new HttpRequestDecoder());
pipeline.addLast("responseEncoder", new HttpResponseEncoder());
return pipeline;
}
@Test
public void testPerformOpeningHandshake() {
Channel channelMock = EasyMock.createMock(Channel.class);
DefaultChannelPipeline pipeline = createPipeline();
DefaultChannelPipeline pipeline = createPipeline(channelMock);
EasyMock.expect(channelMock.pipeline()).andReturn(pipeline);
// capture the http response in order to verify the headers
Capture<HttpResponse> res = new Capture<HttpResponse>();
EasyMock.expect(channelMock.write(capture(res))).andReturn(new DefaultChannelFuture(channelMock, true));
replay(channelMock);
HttpRequest req = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat");
req.setHeader(Names.HOST, "server.example.com");
req.setHeader(Names.UPGRADE, WEBSOCKET.toLowerCase());
@ -67,10 +66,10 @@ public class WebSocketServerHandshaker08Test {
req.setHeader(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
req.setHeader(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
req.setHeader(Names.SEC_WEBSOCKET_VERSION, "8");
WebSocketServerHandshaker08 handsaker = new WebSocketServerHandshaker08("ws://example.com/chat", "chat", false);
handsaker.handshake(channelMock, req);
Assert.assertEquals("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.getValue().getHeader(Names.SEC_WEBSOCKET_ACCEPT));
Assert.assertEquals("chat", res.getValue().getHeader(Names.SEC_WEBSOCKET_PROTOCOL));
}

View File

@ -15,10 +15,9 @@
*/
package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.replay;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import static org.easymock.EasyMock.*;
import io.netty.channel.Channel;
import io.netty.channel.DefaultChannelFuture;
import io.netty.channel.DefaultChannelPipeline;
@ -38,8 +37,8 @@ import org.junit.Test;
public class WebSocketServerHandshaker13Test {
private DefaultChannelPipeline createPipeline() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
private static DefaultChannelPipeline createPipeline(Channel ch) {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(ch);
pipeline.addLast("chunkAggregator", new HttpChunkAggregator(42));
pipeline.addLast("requestDecoder", new HttpRequestDecoder());
pipeline.addLast("responseEncoder", new HttpResponseEncoder());
@ -50,7 +49,7 @@ public class WebSocketServerHandshaker13Test {
public void testPerformOpeningHandshake() {
Channel channelMock = EasyMock.createMock(Channel.class);
DefaultChannelPipeline pipeline = createPipeline();
DefaultChannelPipeline pipeline = createPipeline(channelMock);
EasyMock.expect(channelMock.pipeline()).andReturn(pipeline);
// capture the http response in order to verify the headers

View File

@ -16,30 +16,26 @@
package io.netty.handler.codec.spdy;
import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.util.internal.ExecutorUtil;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.Channels;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -146,27 +142,38 @@ public abstract class AbstractSocketSpdyEchoTest {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
protected abstract ServerChannelBootstrap newServerBootstrap();
protected abstract ChannelBootstrap newClientBootstrap();
@Test(timeout = 10000)
public void testSpdyEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
ServerChannelBootstrap sb = newServerBootstrap();
ChannelBootstrap cb = newClientBootstrap();
EchoHandler sh = new EchoHandler(true);
EchoHandler ch = new EchoHandler(false);
final ServerHandler sh = new ServerHandler();
final ClientHandler ch = new ClientHandler();
sb.pipeline().addLast("decoder", new SpdyFrameDecoder());
sb.pipeline().addLast("encoder", new SpdyFrameEncoder());
sb.pipeline().addLast("handler", sh);
sb.childInitializer(new ChannelInitializer() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new SpdyFrameDecoder(),
new SpdyFrameEncoder(),
sh);
}
});
cb.pipeline().addLast("handler", ch);
cb.initializer(new ChannelInitializer() {
@Override
public void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(ch);
}
});
Channel sc = sb.bind(new InetSocketAddress(0));
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();
Channel sc = sb.localAddress(new InetSocketAddress(0)).bind().sync().channel();
int port = ((InetSocketAddress) sc.localAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
ChannelFuture ccf = cb.remoteAddress(new InetSocketAddress(InetAddress.getLocalHost(), port)).connect();
assertTrue(ccf.awaitUninterruptibly().isSuccess());
Channel cc = ccf.channel();
@ -205,47 +212,62 @@ public abstract class AbstractSocketSpdyEchoTest {
}
}
private class EchoHandler extends SimpleChannelUpstreamHandler {
private class ServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@Override
public void channelRegistered(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
channel = ctx.channel();
}
@Override
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg)
throws Exception {
ctx.write(msg);
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
ctx.close();
}
}
}
private class ClientHandler extends ChannelInboundStreamHandlerAdapter {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
final boolean server;
EchoHandler(boolean server) {
super();
this.server = server;
@Override
public void channelRegistered(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
channel = ctx.channel();
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
channel = e.channel();
}
ChannelBuffer m = ctx.in().byteBuffer().readBytes(ctx.in().byteBuffer().readableBytes());
byte[] actual = new byte[m.readableBytes()];
m.getBytes(0, actual);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
if (server) {
Channels.write(channel, e.getMessage(), e.getRemoteAddress());
} else {
ChannelBuffer m = (ChannelBuffer) e.getMessage();
byte[] actual = new byte[m.readableBytes()];
m.getBytes(0, actual);
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]);
}
counter += actual.length;
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(frames.getByte(ignoredBytes + i + lastIdx), actual[i]);
}
counter += actual.length;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
if (exception.compareAndSet(null, e.cause())) {
e.channel().close();
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
ctx.close();
}
}
}

View File

@ -15,22 +15,29 @@
*/
package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelFactory;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
import io.netty.channel.ChannelBootstrap;
import io.netty.channel.ServerChannelBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop;
public class NioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor);
protected ChannelBootstrap newClientBootstrap() {
ChannelBootstrap b = new ChannelBootstrap();
b.eventLoop(new SelectorEventLoop());
b.channel(new NioSocketChannel());
return b;
}
@Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor);
protected ServerChannelBootstrap newServerBootstrap() {
ServerChannelBootstrap b = new ServerChannelBootstrap();
b.eventLoop(new SelectorEventLoop(), new SelectorEventLoop());
b.channel(new NioServerSocketChannel());
return b;
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelFactory;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory;
@ -24,12 +23,12 @@ import io.netty.channel.socket.oio.OioServerSocketChannelFactory;
public class NioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
protected ChannelFactory newClientBootstrap() {
return new NioClientSocketChannelFactory(executor);
}
@Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
protected ChannelFactory newServerBootstrap() {
return new OioServerSocketChannelFactory(executor, executor);
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelFactory;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
@ -24,12 +23,12 @@ import io.netty.channel.socket.oio.OioClientSocketChannelFactory;
public class OioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
protected ChannelFactory newClientBootstrap() {
return new OioClientSocketChannelFactory(executor);
}
@Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
protected ChannelFactory newServerBootstrap() {
return new NioServerSocketChannelFactory(executor);
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelFactory;
import io.netty.channel.socket.oio.OioClientSocketChannelFactory;
@ -24,12 +23,12 @@ import io.netty.channel.socket.oio.OioServerSocketChannelFactory;
public class OioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
protected ChannelFactory newClientBootstrap() {
return new OioClientSocketChannelFactory(executor);
}
@Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
protected ChannelFactory newServerBootstrap() {
return new OioServerSocketChannelFactory(executor, executor);
}

View File

@ -15,15 +15,13 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import java.util.List;
import java.util.Map;
import io.netty.channel.Channels;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import org.junit.Assert;
import org.junit.Test;
@ -36,7 +34,7 @@ public class SpdySessionHandlerTest {
closeMessage.setValue(closeSignal, 0);
}
private void assertHeaderBlock(SpdyHeaderBlock received, SpdyHeaderBlock expected) {
private static void assertHeaderBlock(SpdyHeaderBlock received, SpdyHeaderBlock expected) {
for (String name: expected.getHeaderNames()) {
List<String> expectedValues = expected.getHeaders(name);
List<String> receivedValues = received.getHeaders(name);
@ -48,7 +46,7 @@ public class SpdySessionHandlerTest {
Assert.assertTrue(received.getHeaders().isEmpty());
}
private void assertDataFrame(Object msg, int streamID, boolean last) {
private static void assertDataFrame(Object msg, int streamID, boolean last) {
Assert.assertNotNull(msg);
Assert.assertTrue(msg instanceof SpdyDataFrame);
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
@ -56,7 +54,7 @@ public class SpdySessionHandlerTest {
Assert.assertTrue(spdyDataFrame.isLast() == last);
}
private void assertSynReply(Object msg, int streamID, boolean last, SpdyHeaderBlock headers) {
private static void assertSynReply(Object msg, int streamID, boolean last, SpdyHeaderBlock headers) {
Assert.assertNotNull(msg);
Assert.assertTrue(msg instanceof SpdySynReplyFrame);
SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
@ -65,7 +63,7 @@ public class SpdySessionHandlerTest {
assertHeaderBlock(spdySynReplyFrame, headers);
}
private void assertRstStream(Object msg, int streamID, SpdyStreamStatus status) {
private static void assertRstStream(Object msg, int streamID, SpdyStreamStatus status) {
Assert.assertNotNull(msg);
Assert.assertTrue(msg instanceof SpdyRstStreamFrame);
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
@ -73,21 +71,21 @@ public class SpdySessionHandlerTest {
Assert.assertTrue(spdyRstStreamFrame.getStatus().equals(status));
}
private void assertPing(Object msg, int ID) {
private static void assertPing(Object msg, int ID) {
Assert.assertNotNull(msg);
Assert.assertTrue(msg instanceof SpdyPingFrame);
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
Assert.assertTrue(spdyPingFrame.getID() == ID);
}
private void assertGoAway(Object msg, int lastGoodStreamID) {
private static void assertGoAway(Object msg, int lastGoodStreamID) {
Assert.assertNotNull(msg);
Assert.assertTrue(msg instanceof SpdyGoAwayFrame);
SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg;
Assert.assertTrue(spdyGoAwayFrame.getLastGoodStreamID() == lastGoodStreamID);
}
private void assertHeaders(Object msg, int streamID, SpdyHeaderBlock headers) {
private static void assertHeaders(Object msg, int streamID, SpdyHeaderBlock headers) {
Assert.assertNotNull(msg);
Assert.assertTrue(msg instanceof SpdyHeadersFrame);
SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
@ -167,7 +165,12 @@ public class SpdySessionHandlerTest {
// Check if session handler honors UNIDIRECTIONAL streams
spdySynStreamFrame.setLast(false);
sessionHandler.offer(spdySynStreamFrame);
Assert.assertNull(sessionHandler.peek());
try {
sessionHandler.poll();
Assert.fail();
} catch (SpdyProtocolException e) {
// Expected
}
spdySynStreamFrame.setUnidirectional(false);
// Check if session handler returns PROTOCOL_ERROR if it receives
@ -262,9 +265,9 @@ public class SpdySessionHandlerTest {
// Echo Handler opens 4 half-closed streams on session connection
// and then sets the number of concurrent streams to 3
private class EchoHandler extends SimpleChannelUpstreamHandler {
private int closeSignal;
private boolean server;
private class EchoHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private final int closeSignal;
private final boolean server;
EchoHandler(int closeSignal, boolean server) {
super();
@ -273,37 +276,34 @@ public class SpdySessionHandlerTest {
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
public void channelActive(ChannelInboundHandlerContext<Object> ctx)
throws Exception {
// Initiate 4 new streams
int streamID = server ? 2 : 1;
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(streamID, 0, (byte) 0);
spdySynStreamFrame.setLast(true);
Channels.write(e.channel(), spdySynStreamFrame);
ctx.write(spdySynStreamFrame);
spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2);
Channels.write(e.channel(), spdySynStreamFrame);
ctx.write(spdySynStreamFrame);
spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2);
Channels.write(e.channel(), spdySynStreamFrame);
ctx.write(spdySynStreamFrame);
spdySynStreamFrame.setStreamID(spdySynStreamFrame.getStreamID() + 2);
Channels.write(e.channel(), spdySynStreamFrame);
ctx.write(spdySynStreamFrame);
// Limit the number of concurrent streams to 3
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3);
Channels.write(e.channel(), spdySettingsFrame);
ctx.write(spdySettingsFrame);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Object msg = e.getMessage();
if ((msg instanceof SpdyDataFrame) ||
(msg instanceof SpdyPingFrame) ||
(msg instanceof SpdyHeadersFrame)) {
public void messageReceived(ChannelInboundHandlerContext<Object> ctx, Object msg) throws Exception {
if (msg instanceof SpdyDataFrame ||
msg instanceof SpdyPingFrame ||
msg instanceof SpdyHeadersFrame) {
Channels.write(e.channel(), msg, e.getRemoteAddress());
ctx.write(msg);
return;
}
@ -318,7 +318,7 @@ public class SpdySessionHandlerTest {
spdySynReplyFrame.addHeader(entry.getKey(), entry.getValue());
}
Channels.write(e.channel(), spdySynReplyFrame, e.getRemoteAddress());
ctx.write(spdySynReplyFrame);
return;
}
@ -326,7 +326,7 @@ public class SpdySessionHandlerTest {
SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
if (spdySettingsFrame.isSet(closeSignal)) {
Channels.close(e.channel());
ctx.close();
}
}
}

View File

@ -5,10 +5,9 @@ public class UnsupportedMessageTypeException extends CodecException {
private static final long serialVersionUID = 2799598826487038726L;
public UnsupportedMessageTypeException(
Object message, Class<?> expectedType, Class<?>... otherExpectedTypes) {
Object message, Class<?>... expectedTypes) {
super(message(
message == null? "null" : message.getClass().getName(),
expectedType, otherExpectedTypes));
message == null? "null" : message.getClass().getName(), expectedTypes));
}
public UnsupportedMessageTypeException() {
@ -28,23 +27,21 @@ public class UnsupportedMessageTypeException extends CodecException {
}
private static String message(
String actualType, Class<?> expectedType, Class<?>... otherExpectedTypes) {
if (expectedType == null) {
throw new NullPointerException("expectedType");
}
String actualType, Class<?>... expectedTypes) {
StringBuilder buf = new StringBuilder(actualType);
buf.append(" (expected: ").append(expectedType.getName());
if (otherExpectedTypes != null) {
for (Class<?> t: otherExpectedTypes) {
if (expectedTypes != null && expectedTypes.length > 0) {
buf.append(" (expected: ").append(expectedTypes[0].getName());
for (int i = 1; i < expectedTypes.length; i ++) {
Class<?> t = expectedTypes[i];
if (t == null) {
break;
}
buf.append(", ").append(t.getName());
}
buf.append(')');
}
return buf.append(')').toString();
return buf.toString();
}
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
@ -30,7 +31,7 @@ import java.util.Queue;
class EmbeddedChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig();
private final ChannelBufferHolder<?> firstOut = ChannelBufferHolders.byteBuffer();
private final ChannelBufferHolder<?> firstOut;
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> productQueue;
@ -50,6 +51,7 @@ class EmbeddedChannel extends AbstractChannel {
EmbeddedChannel(Queue<Object> productQueue) {
super(null, null);
this.productQueue = productQueue;
firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer());
}
@Override

View File

@ -43,6 +43,13 @@ public final class ChannelBufferHolder<E> {
this.byteBuf = byteBuf;
}
ChannelBufferHolder(Queue<E> msgBuf, ChannelBuffer byteBuf) {
ctx = null;
bypassDirection = 0;
this.msgBuf = msgBuf;
this.byteBuf = byteBuf;
}
public boolean isBypass() {
return bypassDirection != 0;
}

View File

@ -33,6 +33,14 @@ public final class ChannelBufferHolders {
return new ChannelBufferHolder<E>(ctx, false);
}
public static <E> ChannelBufferHolder<E> catchAllBuffer() {
return catchAllBuffer(new ArrayDeque<E>(), ChannelBuffers.dynamicBuffer());
}
public static <E> ChannelBufferHolder<E> catchAllBuffer(Queue<E> msgBuf, ChannelBuffer byteBuf) {
return new ChannelBufferHolder<E>(msgBuf, byteBuf);
}
private ChannelBufferHolders() {
// Utility class
}

View File

@ -15,8 +15,6 @@ public class ChannelInboundMessageHandlerAdapter<I> extends
public void inboundBufferUpdated(ChannelInboundHandlerContext<I> ctx)
throws Exception {
Queue<I> in = ctx.in().messageBuffer();
Queue<Object> out = ctx.nextIn().messageBuffer();
int oldOutSize = out.size();
for (;;) {
I msg = in.poll();
if (msg == null) {
@ -24,14 +22,11 @@ public class ChannelInboundMessageHandlerAdapter<I> extends
}
try {
messageReceived(ctx, msg);
ctx.fireInboundBufferUpdated();
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
if (out.size() != oldOutSize) {
ctx.fireInboundBufferUpdated();
}
}
public void messageReceived(ChannelInboundHandlerContext<I> ctx, I msg) throws Exception {

View File

@ -0,0 +1,10 @@
package io.netty.channel;
public class ChannelInboundStreamHandlerAdapter extends ChannelInboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
}