Remove MessageList from public API and change ChannelInbound/OutboundHandler accordingly

I must admit MesageList was pain in the ass.  Instead of forcing a
handler always loop over the list of messages, this commit splits
messageReceived(ctx, list) into two event handlers:

- messageReceived(ctx, msg)
- mmessageReceivedLast(ctx)

When Netty reads one or more messages, messageReceived(ctx, msg) event
is triggered for each message.  Once the current read operation is
finished, messageReceivedLast() is triggered to tell the handler that
the last messageReceived() was the last message in the current batch.

Similarly, for outbound, write(ctx, list) has been split into two:

- write(ctx, msg)
- flush(ctx, promise)

Instead of writing a list of message with a promise, a user is now
supposed to call write(msg) multiple times and then call flush() to
actually flush the buffered messages.

Please note that write() doesn't have a promise with it.  You must call
flush() to get notified on completion. (or you can use writeAndFlush())

Other changes:

- Because MessageList is completely hidden, codec framework uses
  List<Object> instead of MessageList as an output parameter.
This commit is contained in:
Trustin Lee 2013-07-08 19:03:40 +09:00
parent 1d196c5b59
commit cbd8817905
198 changed files with 1791 additions and 1808 deletions

View File

@ -19,10 +19,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.channel.MessageList;
import io.netty.handler.codec.PrematureChannelClosureException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
@ -86,7 +86,7 @@ public final class HttpClientCodec
@Override
protected void encode(
ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
if (msg instanceof HttpRequest && !done) {
queue.offer(((HttpRequest) msg).getMethod());
}
@ -110,7 +110,7 @@ public final class HttpClientCodec
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf buffer, MessageList<Object> out) throws Exception {
ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (done) {
int readable = actualReadableBytes();
if (readable == 0) {

View File

@ -19,11 +19,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
/**
* Decodes the content of the received {@link HttpRequest} and {@link HttpContent}.
* The original content is replaced with the new content decoded by the
@ -51,7 +52,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
private boolean continueResponse;
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) {
if (!(msg instanceof LastHttpContent)) {

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.HttpHeaders.Names;
@ -26,6 +25,7 @@ import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
/**
@ -69,7 +69,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpRequest msg, MessageList<Object> out)
protected void decode(ChannelHandlerContext ctx, HttpRequest msg, List<Object> out)
throws Exception {
String acceptedEncoding = msg.headers().get(HttpHeaders.Names.ACCEPT_ENCODING);
if (acceptedEncoding == null) {
@ -80,7 +80,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
}
@Override
protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
final boolean isFull = msg instanceof HttpResponse && msg instanceof LastHttpContent;
switch (state) {
case AWAIT_HEADERS: {
@ -163,7 +163,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
case AWAIT_CONTENT: {
ensureContent(msg);
encodeContent((HttpContent) msg, out);
if (out.last() instanceof LastHttpContent) {
if (msg instanceof LastHttpContent) {
state = State.AWAIT_HEADERS;
}
break;
@ -196,7 +196,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
}
}
private void encodeContent(HttpContent c, MessageList<Object> out) {
private void encodeContent(HttpContent c, List<Object> out) {
ByteBuf content = c.content();
encode(content, out);
@ -256,20 +256,20 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
}
}
private void encode(ByteBuf in, MessageList<Object> out) {
private void encode(ByteBuf in, List<Object> out) {
// call retain here as it will call release after its written to the channel
encoder.writeOutbound(in.retain());
fetchEncoderOutput(out);
}
private void finishEncode(MessageList<Object> out) {
private void finishEncode(List<Object> out) {
if (encoder.finish()) {
fetchEncoderOutput(out);
}
encoder = null;
}
private void fetchEncoderOutput(MessageList<Object> out) {
private void fetchEncoderOutput(List<Object> out) {
for (;;) {
ByteBuf buf = (ByteBuf) encoder.readOutbound();
if (buf == null) {

View File

@ -15,20 +15,21 @@
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaders.is100ContinueExpected;
import static io.netty.handler.codec.http.HttpHeaders.removeTransferEncodingChunked;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import static io.netty.handler.codec.http.HttpHeaders.*;
import java.util.List;
/**
* A {@link ChannelHandler} that aggregates an {@link HttpMessage}
@ -108,7 +109,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
FullHttpMessage currentMessage = this.currentMessage;
if (msg instanceof HttpMessage) {

View File

@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
@ -168,7 +167,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
switch (state()) {
case SKIP_CONTROL_CHARS: {
try {
@ -421,7 +420,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decode(ctx, in, out);
// Handle the last unfinished message.
@ -488,7 +487,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
reset(null);
}
private void reset(MessageList<Object> out) {
private void reset(List<Object> out) {
if (out != null) {
HttpMessage message = this.message;
ByteBuf content = this.content;
@ -539,7 +538,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
}
}
private void readFixedLengthContent(ByteBuf buffer, MessageList<Object> out) {
private void readFixedLengthContent(ByteBuf buffer, List<Object> out) {
//we have a content-length so we just read the correct number of bytes
long length = HttpHeaders.getContentLength(message, -1);
assert length <= Integer.MAX_VALUE;

View File

@ -15,16 +15,18 @@
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpConstants.COLON;
import static io.netty.handler.codec.http.HttpConstants.CR;
import static io.netty.handler.codec.http.HttpConstants.LF;
import static io.netty.handler.codec.http.HttpConstants.SP;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import java.util.Map;
import static io.netty.handler.codec.http.HttpConstants.*;
/**
* Encodes an {@link HttpMessage} or an {@link HttpContent} into
* a {@link ByteBuf}.
@ -52,7 +54,7 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
private int state = ST_INIT;
@Override
protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
if (msg instanceof HttpMessage) {
if (state != ST_INIT) {
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());

View File

@ -15,8 +15,9 @@
*/
package io.netty.handler.codec.http.multipart;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.MessageList;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpContent;
@ -28,7 +29,7 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedMessageInput;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.internal.ThreadLocalRandom;
import java.io.File;
@ -43,12 +44,10 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.regex.Pattern;
import static io.netty.buffer.Unpooled.*;
/**
* This encoder will help to encode Request for a FORM as POST.
*/
public class HttpPostRequestEncoder implements ChunkedMessageInput<HttpContent> {
public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
/**
* Different modes to use to encode form data.
@ -942,12 +941,11 @@ public class HttpPostRequestEncoder implements ChunkedMessageInput<HttpContent>
* if the encoding is in error
*/
@Override
public boolean readChunk(MessageList<HttpContent> buffer) throws ErrorDataEncoderException {
public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
if (isLastChunkSent) {
return false;
return null;
} else {
buffer.add(nextChunk());
return true;
return nextChunk();
}
}

View File

@ -17,10 +17,11 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into {@link WebSocketFrame}s.
* <p>
@ -50,7 +51,7 @@ public class WebSocket00FrameDecoder extends ReplayingDecoder<Void> implements W
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// Discard all data received if closing handshake was received before.
if (receivedClosingHandshake) {
in.skipBytes(actualReadableBytes());

View File

@ -54,16 +54,16 @@
package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.List;
/**
* Decodes a web socket frame from wire protocol version 8 format. This code was forked from <a
* href="https://github.com/joewalnes/webbit">webbit</a> and modified.
@ -121,7 +121,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// Discard all data received if closing handshake was received before.
if (receivedClosingHandshake) {
@ -403,7 +403,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
private void protocolViolation(ChannelHandlerContext ctx, String reason) {
checkpoint(State.CORRUPT);
if (ctx.channel().isActive()) {
ctx.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
ctx.flush().addListener(ChannelFutureListener.CLOSE);
}
throw new CorruptedFrameException(reason);
}

View File

@ -159,7 +159,7 @@ public abstract class WebSocketClientHandshaker {
} else {
decoder.setSingleDecode(true);
}
channel.write(request).addListener(new ChannelFutureListener() {
channel.write(request).flush().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
@ -264,6 +264,6 @@ public abstract class WebSocketClientHandshaker {
if (channel == null) {
throw new NullPointerException("channel");
}
return channel.write(frame, promise);
return channel.write(frame).flush(promise);
}
}

View File

@ -18,10 +18,10 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.HttpHeaders;
import java.net.URI;
import java.util.List;
/**
* This handler does all the heavy lifting for you to run a websocket client.
@ -129,7 +129,7 @@ public class WebSocketClientProtocolHandler extends WebSocketProtocolHandler {
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
if (handleCloseFrames && frame instanceof CloseWebSocketFrame) {
ctx.close();
return;

View File

@ -18,10 +18,10 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpResponse;
class WebSocketClientProtocolHandshakeHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
class WebSocketClientProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
private final WebSocketClientHandshaker handshaker;
public WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker) {
@ -45,9 +45,14 @@ class WebSocketClientProtocolHandshakeHandler extends SimpleChannelInboundHandle
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof FullHttpResponse)) {
ctx.fireMessageReceived(msg);
return;
}
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ctx.channel(), msg);
handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
ctx.fireUserEventTriggered(
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.pipeline().remove(this);

View File

@ -18,10 +18,11 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import java.util.List;
/**
* Handler that aggregate fragmented WebSocketFrame's.
*
@ -47,7 +48,7 @@ public class WebSocketFrameAggregator extends MessageToMessageDecoder<WebSocketF
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
if (currentFrame == null) {
tooLongFrameFound = false;
if (msg.isFinalFragment()) {

View File

@ -17,15 +17,16 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocketFrame> {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
if (frame instanceof PingWebSocketFrame) {
frame.content().retain();
ctx.channel().write(new PongWebSocketFrame(frame.content()));
ctx.channel().write(new PongWebSocketFrame(frame.content())).flush();
return;
}
if (frame instanceof PongWebSocketFrame) {

View File

@ -158,7 +158,7 @@ public abstract class WebSocketServerHandshaker {
logger.debug(String.format("%s WS Version %s server handshake", channel, version()));
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
channel.write(response).addListener(new ChannelFutureListener() {
channel.write(response).flush().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
@ -225,7 +225,7 @@ public abstract class WebSocketServerHandshaker {
if (channel == null) {
throw new NullPointerException("channel");
}
return channel.write(frame, promise).addListener(ChannelFutureListener.CLOSE);
return channel.write(frame).flush(promise).addListener(ChannelFutureListener.CLOSE);
}
/**

View File

@ -177,7 +177,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
*/
@Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
return channel.write(frame, promise);
return channel.write(frame).flush(promise);
}
@Override

View File

@ -15,21 +15,21 @@
*/
package io.netty.handler.codec.http.websocketx;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.AttributeKey;
import static io.netty.handler.codec.http.HttpVersion.*;
import java.util.List;
/**
* This handler does all the heavy lifting for you to run a websocket server.
@ -91,7 +91,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
if (frame instanceof CloseWebSocketFrame) {
WebSocketServerHandshaker handshaker = getHandshaker(ctx);
frame.retain();
@ -106,7 +106,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
if (cause instanceof WebSocketHandshakeException) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(cause.getMessage().getBytes()));
ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE);
ctx.channel().write(response).flush().addListener(ChannelFutureListener.CLOSE);
} else {
ctx.close();
}
@ -121,12 +121,16 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
}
static ChannelHandler forbiddenHttpRequestResponder() {
return new SimpleChannelInboundHandler<FullHttpRequest>() {
return new ChannelInboundHandlerAdapter() {
@Override
protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN);
ctx.channel().write(response);
ctx.channel().write(response).flush();
} else {
ctx.fireMessageReceived(msg);
}
}
};
}

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
@ -51,10 +50,8 @@ class WebSocketServerProtocolHandshakeHandler
}
@Override
public void messageReceived(final ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
MessageList<FullHttpRequest> requests = msgs.cast();
for (int i = 0; i < requests.size(); i++) {
FullHttpRequest req = requests.get(i);
public void messageReceived(final ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest req = (FullHttpRequest) msg;
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
@ -83,10 +80,9 @@ class WebSocketServerProtocolHandshakeHandler
WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());
}
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
ChannelFuture f = ctx.channel().write(res);
ChannelFuture f = ctx.channel().write(res).flush();
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}

View File

@ -15,12 +15,34 @@
*/
package io.netty.handler.codec.spdy;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_DATA_FLAG_FIN;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_DATA_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_FLAG_FIN;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_FLAG_UNIDIRECTIONAL;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_GOAWAY_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_HEADERS_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_HEADER_FLAGS_OFFSET;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_HEADER_LENGTH_OFFSET;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_HEADER_SIZE;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_HEADER_TYPE_OFFSET;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_PING_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_RST_STREAM_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SETTINGS_CLEAR;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SETTINGS_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SETTINGS_PERSISTED;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SETTINGS_PERSIST_VALUE;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SYN_REPLY_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SYN_STREAM_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_WINDOW_UPDATE_FRAME;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.getSignedInt;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.getUnsignedInt;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.getUnsignedMedium;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.getUnsignedShort;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into SPDY Frames.
@ -89,7 +111,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
}
@Override
public void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
public void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
decode(ctx, in, out);
} finally {
@ -98,7 +120,7 @@ public class SpdyFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
switch(state) {
case READ_COMMON_HEADER:
state = readCommonHeader(buffer);

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
@ -31,6 +30,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -91,7 +91,7 @@ public class SpdyHttpDecoder extends MessageToMessageDecoder<SpdyFrame> {
}
@Override
protected void decode(ChannelHandlerContext ctx, SpdyFrame msg, MessageList<Object> out)
protected void decode(ChannelHandlerContext ctx, SpdyFrame msg, List<Object> out)
throws Exception {
if (msg instanceof SpdySynStreamFrame) {

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.FullHttpRequest;
@ -139,7 +138,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
}
@Override
protected void encode(ChannelHandlerContext ctx, HttpObject msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
boolean valid = false;

View File

@ -16,12 +16,12 @@
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.util.ReferenceCountUtil;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
/**
@ -40,7 +40,7 @@ public class SpdyHttpResponseStreamIdHandler extends
}
@Override
protected void encode(ChannelHandlerContext ctx, HttpMessage msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, HttpMessage msg, List<Object> out) throws Exception {
Integer id = ids.poll();
if (id != null && id.intValue() != NO_ID && !msg.headers().contains(SpdyHttpHeaders.Names.STREAM_ID)) {
SpdyHttpHeaders.setStreamId(msg, id);
@ -50,7 +50,7 @@ public class SpdyHttpResponseStreamIdHandler extends
}
@Override
protected void decode(ChannelHandlerContext ctx, Object msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
if (msg instanceof HttpMessage) {
boolean contains = ((HttpMessage) msg).headers().contains(SpdyHttpHeaders.Names.STREAM_ID);
if (!contains) {

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
@ -61,13 +60,13 @@ public abstract class SpdyOrHttpChooser extends ChannelInboundHandlerAdapter {
protected abstract SelectedProtocol getProtocol(SSLEngine engine);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> in) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (initPipeline(ctx)) {
// When we reached here we can remove this handler as its now clear what protocol we want to use
// from this point on.
ctx.pipeline().remove(this);
ctx.fireMessageReceived(in);
ctx.fireMessageReceived(msg);
}
}

View File

@ -20,9 +20,10 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.internal.EmptyArrays;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -54,6 +55,8 @@ public class SpdySessionHandler
private final AtomicInteger pings = new AtomicInteger();
private final Queue<Object> outboundBuffer = new ArrayDeque<Object>();
private boolean sentGoAwayFrame;
private boolean receivedGoAwayFrame;
@ -81,34 +84,7 @@ public class SpdySessionHandler
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> in) throws Exception {
boolean handled = false;
MessageList<Object> out = MessageList.newInstance();
for (int i = 0 ; i < in.size(); i++) {
Object msg = in.get(i);
if (msg == null) {
break;
}
if (msg instanceof SpdySynStreamFrame) {
// Let the next handlers handle the buffered messages before SYN_STREAM message updates the
// lastGoodStreamId.
if (handled) {
ctx.fireMessageReceived(out);
out = MessageList.newInstance();
}
}
handleInboundMessage(ctx, msg, out);
handled = true;
}
in.recycle();
ctx.fireMessageReceived(out);
}
private void handleInboundMessage(ChannelHandlerContext ctx, Object msg, MessageList<Object> out) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SpdyDataFrame) {
/*
@ -133,7 +109,6 @@ public class SpdySessionHandler
* If an endpoint receives a data frame after the stream is closed, it must send
* a RST_STREAM frame with the getStatus PROTOCOL_ERROR.
*/
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
int streamId = spdyDataFrame.getStreamId();
@ -141,9 +116,9 @@ public class SpdySessionHandler
if (!spdySession.isActiveStream(streamId)) {
if (streamId <= lastGoodStreamId) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
} else if (!sentGoAwayFrame) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
}
return;
}
@ -151,13 +126,13 @@ public class SpdySessionHandler
// Check if we received a data frame for a stream which is half-closed
if (spdySession.isRemoteSideClosed(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
return;
}
// Check if we received a data frame before receiving a SYN_REPLY
if (!isRemoteInitiatedID(streamId) && !spdySession.hasReceivedReply(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -178,7 +153,7 @@ public class SpdySessionHandler
// This difference is stored for the session when writing the SETTINGS frame
// and is cleared once we send a WINDOW_UPDATE frame.
if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
return;
}
@ -188,7 +163,7 @@ public class SpdySessionHandler
while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
spdyDataFrame.content().readSlice(initialReceiveWindowSize).retain());
ctx.write(partialDataFrame);
ctx.write(partialDataFrame).flush();
}
}
@ -198,7 +173,7 @@ public class SpdySessionHandler
spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
SpdyWindowUpdateFrame spdyWindowUpdateFrame =
new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize);
ctx.write(spdyWindowUpdateFrame);
ctx.write(spdyWindowUpdateFrame).flush();
}
}
@ -230,7 +205,7 @@ public class SpdySessionHandler
if (spdySynStreamFrame.isInvalid() ||
!isRemoteInitiatedID(streamId) ||
spdySession.isActiveStream(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
@ -245,7 +220,7 @@ public class SpdySessionHandler
boolean remoteSideClosed = spdySynStreamFrame.isLast();
boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
return;
}
@ -265,13 +240,13 @@ public class SpdySessionHandler
if (spdySynReplyFrame.isInvalid() ||
isRemoteInitiatedID(streamId) ||
spdySession.isRemoteSideClosed(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
return;
}
// Check if we have received multiple frames for the same Stream-ID
if (spdySession.hasReceivedReply(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
return;
}
@ -336,7 +311,7 @@ public class SpdySessionHandler
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
if (isRemoteInitiatedID(spdyPingFrame.getId())) {
ctx.write(spdyPingFrame);
ctx.write(spdyPingFrame).flush();
return;
}
@ -357,12 +332,12 @@ public class SpdySessionHandler
// Check if we received a valid HEADERS frame
if (spdyHeadersFrame.isInvalid()) {
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
return;
}
if (spdySession.isRemoteSideClosed(streamId)) {
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
return;
}
@ -395,15 +370,15 @@ public class SpdySessionHandler
// Check for numerical overflow
if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR, out);
issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
return;
}
updateSendWindowSize(streamId, deltaWindowSize, out);
updateSendWindowSize(ctx, streamId, deltaWindowSize);
}
}
out.add(msg);
ctx.fireMessageReceived(msg);
}
@Override
@ -421,13 +396,19 @@ public class SpdySessionHandler
}
@Override
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception {
MessageList<Object> out = MessageList.newInstance();
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
outboundBuffer.add(msg);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
try {
for (;;) {
Object msg = outboundBuffer.poll();
if (msg == null) {
break;
}
if (msg instanceof SpdyDataFrame ||
msg instanceof SpdySynStreamFrame ||
msg instanceof SpdySynReplyFrame ||
@ -438,26 +419,26 @@ public class SpdySessionHandler
msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyWindowUpdateFrame) {
try {
handleOutboundMessage(ctx, msg, out);
handleOutboundMessage(ctx, msg);
} catch (SpdyProtocolException e) {
if (e == PROTOCOL_EXCEPTION) {
// on the case of PROTOCOL_EXCEPTION, fail the promise directly
// On the case of PROTOCOL_EXCEPTION, fail the promise directly
// See #1211
promise.setFailure(PROTOCOL_EXCEPTION);
return;
}
}
} else {
out.add(msg);
ctx.write(msg);
}
}
ctx.flush(promise);
} finally {
outboundBuffer.clear();
}
}
msgs.recycle();
ctx.write(out, promise);
}
private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, MessageList<Object> out)
throws Exception {
private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SpdyDataFrame) {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
@ -647,7 +628,7 @@ public class SpdySessionHandler
throw PROTOCOL_EXCEPTION;
}
out.add(msg);
ctx.write(msg);
}
/*
@ -676,17 +657,14 @@ public class SpdySessionHandler
*
* Note: this is only called by the worker thread
*/
private void issueStreamError(
ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status, MessageList<Object> in) {
private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamId);
removeStream(ctx, streamId);
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
ctx.write(spdyRstStreamFrame);
ctx.write(spdyRstStreamFrame).flush();
if (fireMessageReceived) {
in.add(spdyRstStreamFrame);
ctx.fireMessageReceived(in.copy());
in.clear();
ctx.fireMessageReceived(spdyRstStreamFrame);
}
}
@ -783,7 +761,7 @@ public class SpdySessionHandler
}
}
private void updateSendWindowSize(final int streamId, int deltaWindowSize, MessageList<Object> out) {
private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamId, int deltaWindowSize) {
synchronized (flowControlLock) {
int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
@ -822,7 +800,7 @@ public class SpdySessionHandler
halfCloseStream(streamId, false);
}
out.add(spdyDataFrame);
ctx.fireMessageReceived(spdyDataFrame);
} else {
// We can send a partial frame
spdySession.updateSendWindowSize(streamId, -1 * newWindowSize);
@ -848,7 +826,7 @@ public class SpdySessionHandler
// }
//});
out.add(partialDataFrame);
ctx.fireMessageReceived(partialDataFrame);
newWindowSize = 0;
}
@ -878,7 +856,7 @@ public class SpdySessionHandler
if (!sentGoAwayFrame) {
sentGoAwayFrame = true;
SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
return ctx.write(spdyGoAwayFrame);
return ctx.write(spdyGoAwayFrame).flush();
} else {
return ctx.newSucceededFuture();
}

View File

@ -17,10 +17,9 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
@ -147,22 +146,23 @@ public class WebSocketServerProtocolHandlerTest {
private class MockOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise)
throws Exception {
for (int i = 0; i < msgs.size(); i++) {
responses.add((FullHttpResponse) msgs.get(i));
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
responses.add((FullHttpResponse) msg);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
promise.setSuccess();
}
}
private static class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static class CustomTextFrameHandler extends ChannelInboundHandlerAdapter {
private String content;
@Override
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
assertNull(content);
content = "processed: " + msg.text();
content = "processed: " + ((TextWebSocketFrame) msg).text();
}
String getContent() {

View File

@ -19,9 +19,9 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -103,7 +103,7 @@ public class SpdyFrameDecoderTest {
}
private static void sendAndWaitForFrame(Channel cc, SpdyFrame frame, CaptureHandler handler) {
cc.write(frame);
cc.write(frame).flush();
long theFuture = System.currentTimeMillis() + 3000;
while (handler.message == null && System.currentTimeMillis() < theFuture) {
try {
@ -127,7 +127,7 @@ public class SpdyFrameDecoderTest {
frame.headers().add(headerName.toString(), headerValue.toString());
}
private static class CaptureHandler extends SimpleChannelInboundHandler<Object> {
private static class CaptureHandler extends ChannelInboundHandlerAdapter {
public volatile Object message;
@Override

View File

@ -17,8 +17,6 @@ package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -274,7 +272,7 @@ public class SpdySessionHandlerTest {
// Echo Handler opens 4 half-closed streams on session connection
// and then sets the number of concurrent streams to 3
private static class EchoHandler extends SimpleChannelInboundHandler<Object> {
private static class EchoHandler extends ChannelInboundHandlerAdapter {
private final int closeSignal;
private final boolean server;
@ -290,18 +288,18 @@ public class SpdySessionHandlerTest {
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(streamId, 0, (byte) 0);
spdySynStreamFrame.setLast(true);
ctx.write(spdySynStreamFrame);
ctx.write(spdySynStreamFrame).flush();
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
ctx.write(spdySynStreamFrame);
ctx.write(spdySynStreamFrame).flush();
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
ctx.write(spdySynStreamFrame);
ctx.write(spdySynStreamFrame).flush();
spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2);
ctx.write(spdySynStreamFrame);
ctx.write(spdySynStreamFrame).flush();
// Limit the number of concurrent streams to 3
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3);
ctx.write(spdySettingsFrame);
ctx.write(spdySettingsFrame).flush();
}
@Override
@ -317,7 +315,7 @@ public class SpdySessionHandlerTest {
spdySynReplyFrame.headers().add(entry.getKey(), entry.getValue());
}
ctx.write(spdySynReplyFrame);
ctx.write(spdySynReplyFrame).flush();
}
return;
}
@ -330,7 +328,7 @@ public class SpdySessionHandlerTest {
msg instanceof SpdyPingFrame ||
msg instanceof SpdyHeadersFrame) {
ctx.write(msg);
ctx.write(msg).flush();
return;
}

View File

@ -17,10 +17,11 @@ package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into {@link SocksAuthRequest}.
* Before returning SocksRequest decoder removes itself from pipeline.
@ -43,7 +44,7 @@ public class SocksAuthRequestDecoder extends ReplayingDecoder<SocksAuthRequestDe
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksSubnegotiationVersion.fromByte(byteBuf.readByte());

View File

@ -17,9 +17,10 @@ package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into {@link SocksAuthResponse}.
* Before returning SocksResponse decoder removes itself from pipeline.
@ -40,7 +41,7 @@ public class SocksAuthResponseDecoder extends ReplayingDecoder<SocksAuthResponse
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, MessageList<Object> out)
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out)
throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {

View File

@ -17,10 +17,11 @@ package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into {@link SocksCmdRequest}.
* Before returning SocksRequest decoder removes itself from pipeline.
@ -46,7 +47,7 @@ public class SocksCmdRequestDecoder extends ReplayingDecoder<SocksCmdRequestDeco
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());

View File

@ -17,10 +17,11 @@ package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into {@link SocksCmdResponse}.
* Before returning SocksResponse decoder removes itself from pipeline.
@ -46,7 +47,7 @@ public class SocksCmdResponseDecoder extends ReplayingDecoder<SocksCmdResponseDe
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.ArrayList;
@ -44,7 +43,7 @@ public class SocksInitRequestDecoder extends ReplayingDecoder<SocksInitRequestDe
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());

View File

@ -17,9 +17,10 @@ package io.netty.handler.codec.socks;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* Decodes {@link ByteBuf}s into {@link SocksInitResponse}.
* Before returning SocksResponse decoder removes itself from pipeline.
@ -41,7 +42,7 @@ public class SocksInitResponseDecoder extends ReplayingDecoder<SocksInitResponse
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.fromByte(byteBuf.readByte());

View File

@ -18,10 +18,10 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.internal.TypeParameterMatcher;
import java.util.List;
/**
* A Codec for on-the-fly encoding/decoding of bytes to messages and vise-versa.
*
@ -47,12 +47,12 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ByteToMessageCodec.this.decode(ctx, in, out);
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ByteToMessageCodec.this.decodeLast(ctx, in, out);
}
};
@ -78,13 +78,13 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
decoder.messageReceived(ctx, msgs);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
decoder.messageReceived(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception {
encoder.write(ctx, msgs, promise);
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
encoder.write(ctx, msg);
}
/**
@ -93,14 +93,14 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
/**
* @see ByteToMessageDecoder#decode(ChannelHandlerContext, ByteBuf, MessageList)
* @see ByteToMessageDecoder#decode(ChannelHandlerContext, ByteBuf, List)
*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
/**
* @see ByteToMessageDecoder#decodeLast(ChannelHandlerContext, ByteBuf, MessageList)
* @see ByteToMessageDecoder#decodeLast(ChannelHandlerContext, ByteBuf, List)
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decode(ctx, in, out);
}
}

View File

@ -19,9 +19,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.util.internal.StringUtil;
import java.util.List;
/**
* {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other Message type.
@ -32,7 +33,7 @@ import io.netty.util.internal.StringUtil;
* <pre>
* public class SquareDecoder extends {@link ByteToMessageDecoder} {
* {@code @Override}
* public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, {@link MessageList} out)
* public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
* throws {@link Exception} {
* out.add(in.readBytes(in.readableBytes()));
* }
@ -47,7 +48,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
protected ByteBuf cumulation;
private boolean singleDecode;
private boolean decodeWasNull;
private MessageList<Object> out;
protected ByteToMessageDecoder() {
if (getClass().isAnnotationPresent(Sharable.class)) {
@ -56,7 +56,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
/**
* If set then only one message is decoded on each {@link #messageReceived(ChannelHandlerContext, MessageList)}
* If set then only one message is decoded on each {@link #messageReceived(ChannelHandlerContext, Object)}
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
*
* Default is {@code false} as this has performance impacts.
@ -67,7 +67,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
/**
* If {@code true} then only one message is decoded on each
* {@link #messageReceived(ChannelHandlerContext, MessageList)} call.
* {@link #messageReceived(ChannelHandlerContext, Object)} call.
*
* Default is {@code false} as this has performance impacts.
*/
@ -102,13 +102,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = internalBuffer();
if (buf.isReadable()) {
if (out == null) {
ctx.fireMessageReceived(buf);
} else {
out.add(buf.copy());
}
buf.clear();
}
ctx.fireMessageReceivedLast();
handlerRemoved0(ctx);
}
@ -119,19 +115,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
out = MessageList.newInstance();
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutput out = CodecOutput.newInstance();
try {
int size = msgs.size();
for (int i = 0; i < size; i ++) {
Object m = msgs.get(i);
// handler was removed in the loop so now copy over all remaining messages
if (ctx.isRemoved()) {
out.add(msgs, i, size - i);
return;
}
if (m instanceof ByteBuf) {
ByteBuf data = (ByteBuf) m;
if (msg instanceof ByteBuf) {
ByteBuf data = (ByteBuf) msg;
if (cumulation == null) {
cumulation = data;
try {
@ -163,28 +151,22 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
}
} else {
out.add(m);
}
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
// release the cumulation if the handler was removed while messages are processed
if (ctx.isRemoved()) {
if (cumulation != null) {
cumulation.release();
cumulation = null;
}
}
MessageList<Object> out = this.out;
this.out = null;
if (out.isEmpty()) {
decodeWasNull = true;
}
msgs.recycle();
ctx.fireMessageReceived(out);
for (int i = 0; i < out.size(); i ++) {
ctx.fireMessageReceived(out.get(i));
}
out.recycle();
}
}
@ -201,7 +183,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
MessageList<Object> out = MessageList.newInstance();
CodecOutput out = CodecOutput.newInstance();
try {
if (cumulation != null) {
callDecode(ctx, cumulation, out);
@ -219,12 +201,14 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
cumulation = null;
}
ctx.fireMessageReceived(out);
for (int i = 0; i < out.size(); i ++) {
ctx.fireMessageReceived(out.get(i));
}
ctx.fireChannelInactive();
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) {
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, CodecOutput out) {
try {
while (in.isReadable()) {
int outSize = out.size();
@ -262,20 +246,20 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link MessageList} to which decoded messages should be added
* @param out the {@link CodecOutput} to which decoded messages should be added
* @throws Exception is thrown if an error accour
*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, MessageList)} but sub-classes may
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, CodecOutput)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decode(ctx, in, out);
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
/**
* A simple list that holds the output of a codec.
*/
final class CodecOutput extends ArrayList<Object> {
private static final long serialVersionUID = -8605125654176467947L;
private static final int DEFAULT_INITIAL_CAPACITY = 8;
private static final Recycler<CodecOutput> RECYCLER = new Recycler<CodecOutput>() {
@Override
protected CodecOutput newObject(Handle handle) {
return new CodecOutput(handle);
}
};
/**
* Create a new empty {@link CodecOutput} instance
*/
public static CodecOutput newInstance() {
return newInstance(DEFAULT_INITIAL_CAPACITY);
}
/**
* Create a new empty {@link CodecOutput} instance with the given capacity.
*/
public static CodecOutput newInstance(int minCapacity) {
CodecOutput ret = (CodecOutput) RECYCLER.get();
ret.ensureCapacity(minCapacity);
return ret;
}
private final Handle handle;
CodecOutput(Handle handle) {
this(handle, DEFAULT_INITIAL_CAPACITY);
}
CodecOutput(Handle handle, int initialCapacity) {
super(initialCapacity);
this.handle = handle;
}
/**
* Clear and recycle this instance.
*/
boolean recycle() {
clear();
return RECYCLER.recycle(this, handle);
}
}

View File

@ -17,7 +17,8 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import java.util.List;
/**
* A decoder that splits the received {@link ByteBuf}s by one or more
@ -211,7 +212,7 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);

View File

@ -17,7 +17,8 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import java.util.List;
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
@ -53,7 +54,7 @@ public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);

View File

@ -18,10 +18,10 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.serialization.ObjectDecoder;
import java.nio.ByteOrder;
import java.util.List;
/**
* A decoder that splits the received {@link ByteBuf}s dynamically by the
@ -348,7 +348,7 @@ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);

View File

@ -17,7 +17,8 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import java.util.List;
/**
* A decoder that splits the received {@link ByteBuf}s on line endings.
@ -69,7 +70,7 @@ public class LineBasedFrameDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);

View File

@ -18,8 +18,6 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
@ -69,26 +67,12 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
}
@Override
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception {
MessageList<Object> out = MessageList.newInstance();
boolean success = false;
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = null;
try {
int size = msgs.size();
for (int i = 0; i < size; i ++) {
// handler was removed in the loop so now copy over all remaining messages
if (ctx.isRemoved()) {
if (buf != null && buf.isReadable()) {
out.add(buf);
buf = null;
}
out.add(msgs, i, size - i);
break;
}
Object m = msgs.get(i);
if (acceptOutboundMessage(m)) {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) m;
I cast = (I) msg;
if (buf == null) {
if (preferDirect) {
buf = ctx.alloc().ioBuffer();
@ -102,40 +86,26 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
ReferenceCountUtil.release(cast);
}
} else {
if (buf != null && buf.isReadable()) {
out.add(buf);
buf = null;
}
out.add(m);
}
ctx.write(msg);
}
if (buf != null && buf.isReadable()) {
out.add(buf);
ctx.write(buf);
buf = null;
}
success = true;
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
msgs.recycle();
if (buf != null) {
buf.release();
}
if (success) {
ctx.write(out, promise);
} else {
out.releaseAllAndRecycle();
}
}
}
/**
* Encode a message into a {@link ByteBuf}. This method will be called till the {@link MessageList} has
* Encode a message into a {@link ByteBuf}. This method will be called till the {@link CodecOutput} has
* nothing left.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to

View File

@ -17,11 +17,11 @@ package io.netty.handler.codec;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.TypeParameterMatcher;
import java.util.List;
/**
* A Codec for on-the-fly encoding/decoding of message.
*
@ -62,7 +62,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends Cha
@Override
@SuppressWarnings("unchecked")
protected void encode(ChannelHandlerContext ctx, Object msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
}
};
@ -76,7 +76,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends Cha
@Override
@SuppressWarnings("unchecked")
protected void decode(ChannelHandlerContext ctx, Object msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
}
};
@ -96,13 +96,13 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends Cha
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
decoder.messageReceived(ctx, msgs);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
decoder.messageReceived(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception {
encoder.write(ctx, msgs, promise);
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
encoder.write(ctx, msg);
}
/**
@ -124,14 +124,14 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends Cha
}
/**
* @see MessageToMessageEncoder#encode(ChannelHandlerContext, Object, MessageList)
* @see MessageToMessageEncoder#encode(ChannelHandlerContext, Object, CodecOutput)
*/
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, MessageList<Object> out)
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
/**
* @see MessageToMessageDecoder#decode(ChannelHandlerContext, Object, MessageList)
* @see MessageToMessageDecoder#decode(ChannelHandlerContext, Object, CodecOutput)
*/
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, MessageList<Object> out)
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
}

View File

@ -17,11 +17,12 @@ package io.netty.handler.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.TypeParameterMatcher;
import java.util.List;
/**
* {@link ChannelInboundHandlerAdapter} which decodes from one message to an other message.
*
@ -35,7 +36,7 @@ import io.netty.util.internal.TypeParameterMatcher;
*
* {@code @Override}
* public void decode({@link ChannelHandlerContext} ctx, {@link String} message,
* {@link MessageList} out) throws {@link Exception} {
* List&lt;Object&gt; out) throws {@link Exception} {
* out.add(message.length());
* }
* }
@ -63,48 +64,40 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAd
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
MessageList<Object> out = MessageList.newInstance();
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutput out = CodecOutput.newInstance();
try {
int size = msgs.size();
for (int i = 0; i < size; i ++) {
// handler was removed in the loop so now copy over all remaining messages
if (ctx.isRemoved()) {
out.add(msgs, i, size - i);
return;
}
Object m = msgs.get(i);
if (acceptInboundMessage(m)) {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) m;
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(m);
}
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
msgs.recycle();
ctx.fireMessageReceived(out);
for (int i = 0; i < out.size(); i ++) {
ctx.fireMessageReceived(out.get(i));
}
out.recycle();
}
}
/**
* Decode from one message to an other. This method will be called till either the {@link MessageList} has
* Decode from one message to an other. This method will be called till either the {@link CodecOutput} has
* nothing left or till this method returns {@code null}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to
* @param msg the message to decode to an other one
* @param out the {@link MessageList} to which decoded messages should be added
* @param out the {@link CodecOutput} to which decoded messages should be added
* @throws Exception is thrown if an error accour
*/
protected abstract void decode(ChannelHandlerContext ctx, I msg, MessageList<Object> out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

View File

@ -17,12 +17,12 @@ package io.netty.handler.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.TypeParameterMatcher;
import java.util.List;
/**
* {@link ChannelOutboundHandlerAdapter} which encodes from one message to an other message
*
@ -33,7 +33,7 @@ import io.netty.util.internal.TypeParameterMatcher;
* {@link MessageToMessageEncoder}&lt;{@link Integer}&gt; {
*
* {@code @Override}
* public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, {@link MessageList} out)
* public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, List&lt;Object&gt; out)
* throws {@link Exception} {
* out.add(message.toString());
* }
@ -61,54 +61,41 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
}
@Override
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception {
MessageList<Object> out = MessageList.newInstance();
boolean success = false;
public void write(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutput out = CodecOutput.newInstance();
try {
int size = msgs.size();
for (int i = 0; i < size; i ++) {
// handler was removed in the loop so now copy over all remaining messages
if (ctx.isRemoved()) {
out.add(msgs, i, size - i);
break;
}
Object m = msgs.get(i);
if (acceptOutboundMessage(m)) {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) m;
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(m);
out.add(msg);
}
}
success = true;
} catch (CodecException e) {
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
msgs.recycle();
if (success) {
ctx.write(out, promise);
} else {
out.releaseAllAndRecycle();
for (int i = 0; i < out.size(); i ++) {
ctx.write(out.get(i));
}
out.recycle();
}
}
/**
* Encode from one message to an other. This method will be called till either the {@link MessageList} has nothing
* Encode from one message to an other. This method will be called till either the {@link CodecOutput} has nothing
* left or till this method returns {@code null}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
* @param msg the message to encode to an other one
* @param out the {@link MessageList} into which the encoded msg should be added
* @param out the {@link CodecOutput} into which the encoded msg should be added
* needs to do some kind of aggragation
* @throws Exception is thrown if an error accour
*/
protected abstract void encode(ChannelHandlerContext ctx, I msg, MessageList<Object> out) throws Exception;
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

View File

@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.util.Signal;
import io.netty.util.internal.StringUtil;
@ -106,7 +105,7 @@ import io.netty.util.internal.StringUtil;
* private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
*
* {@code @Override}
* public void decode(.., {@link ByteBuf} in, {@link MessageList} out) throws Exception {
* public void decode(.., {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {
*
* // A message contains 2 integers.
* values.offer(buffer.readInt());
@ -126,7 +125,7 @@ import io.netty.util.internal.StringUtil;
* private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
*
* {@code @Override}
* public void decode(.., {@link ByteBuf} buffer, {@link MessageList} out) throws Exception {
* public void decode(.., {@link ByteBuf} buffer, List&lt;Object&gt; out) throws Exception {
*
* // Revert the state of the variable that might have been changed
* // since the last partial decode.
@ -179,7 +178,7 @@ import io.netty.util.internal.StringUtil;
*
* {@code @Override}
* protected void decode({@link ChannelHandlerContext} ctx,
* {@link ByteBuf} in, {@link MessageList} out) throws Exception {
* {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {
* switch (state()) {
* case READ_LENGTH:
* length = buf.readInt();
@ -207,7 +206,7 @@ import io.netty.util.internal.StringUtil;
*
* {@code @Override}
* protected void decode({@link ChannelHandlerContext} ctx,
* {@link ByteBuf} in, {@link MessageList} out) throws Exception {
* {@link ByteBuf} in, List&lt;Object&gt; out) throws Exception {
* if (!readLength) {
* length = buf.readInt();
* <strong>readLength = true;</strong>
@ -238,7 +237,7 @@ import io.netty.util.internal.StringUtil;
*
* {@code @Override}
* protected Object decode({@link ChannelHandlerContext} ctx,
* {@link ByteBuf} in, {@link MessageList} out) {
* {@link ByteBuf} in, List&lt;Object&gt; out) {
* ...
* // Decode the first message
* Object firstMessage = ...;
@ -320,7 +319,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
MessageList<Object> out = MessageList.newInstance();
CodecOutput out = CodecOutput.newInstance();
try {
replayable.terminate();
callDecode(ctx, internalBuffer(), out);
@ -338,13 +337,15 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
cumulation = null;
}
ctx.fireMessageReceived(out);
for (int i = 0; i < out.size(); i ++) {
ctx.fireMessageReceived(out.get(i));
}
ctx.fireChannelInactive();
}
}
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) {
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, CodecOutput out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {

View File

@ -19,12 +19,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
/**
* Decodes a Base64-encoded {@link ByteBuf} or US-ASCII {@link String}
* into a {@link ByteBuf}. Please note that this decoder must be used
@ -59,7 +60,7 @@ public class Base64Decoder extends MessageToMessageDecoder<ByteBuf> {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(Base64.decode(msg, msg.readerIndex(), msg.readableBytes(), dialect));
}
}

View File

@ -19,11 +19,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* Encodes a {@link ByteBuf} into a Base64-encoded {@link ByteBuf}.
* A typical setup for TCP/IP would be:
@ -62,7 +63,7 @@ public class Base64Encoder extends MessageToMessageEncoder<ByteBuf> {
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect));
}
}

View File

@ -18,11 +18,12 @@ package io.netty.handler.codec.bytes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
/**
* Decodes a received {@link ByteBuf} into an array of bytes.
* A typical setup for TCP/IP would be:
@ -49,7 +50,7 @@ import io.netty.handler.codec.MessageToMessageDecoder;
*/
public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] array;
if (msg.hasArray()) {
if (msg.arrayOffset() == 0 && msg.readableBytes() == msg.capacity()) {

View File

@ -20,11 +20,12 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
/**
* Encodes the requested array of bytes into a {@link ByteBuf}.
* A typical setup for TCP/IP would be:
@ -52,7 +53,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
@Sharable
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, MessageList<Object> out) throws Exception {
protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
out.add(Unpooled.wrappedBuffer(msg));
}
}

View File

@ -15,11 +15,13 @@
*/
package io.netty.handler.codec.compression;
import com.jcraft.jzlib.Inflater;
import com.jcraft.jzlib.JZlib;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import java.util.List;
import com.jcraft.jzlib.Inflater;
import com.jcraft.jzlib.JZlib;
public class JZlibDecoder extends ZlibDecoder {
@ -82,7 +84,7 @@ public class JZlibDecoder extends ZlibDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (!in.isReadable()) {
return;

View File

@ -343,10 +343,10 @@ public class JZlibEncoder extends ZlibEncoder {
}
}
private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise future) {
private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
if (!finished.compareAndSet(false, true)) {
future.setSuccess();
return future;
promise.setSuccess();
return promise;
}
ByteBuf footer;
@ -366,8 +366,8 @@ public class JZlibEncoder extends ZlibEncoder {
// Write the ADLER32 checksum (stream footer).
int resultCode = z.deflate(JZlib.Z_FINISH);
if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
future.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
return future;
promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
return promise;
} else if (z.next_out_index != 0) {
footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
} else {
@ -385,8 +385,7 @@ public class JZlibEncoder extends ZlibEncoder {
}
}
ctx.write(footer, future);
return future;
return ctx.write(footer).flush(promise);
}
@Override

View File

@ -255,7 +255,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
deflater.end();
}
return ctx.write(footer, promise);
return ctx.write(footer).flush(promise);
}
@Override

View File

@ -15,15 +15,14 @@
*/
package io.netty.handler.codec.compression;
import static io.netty.handler.codec.compression.Snappy.validateChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.Arrays;
import static io.netty.handler.codec.compression.Snappy.*;
import java.util.List;
/**
* Uncompresses a {@link ByteBuf} encoded with the Snappy framing format.
@ -76,7 +75,7 @@ public class SnappyFramedDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (corrupted) {
in.skipBytes(in.readableBytes());
return;

View File

@ -18,13 +18,14 @@ package io.netty.handler.codec.marshalling;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;
import java.io.ObjectStreamConstants;
import java.util.List;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;
/**
* {@link ReplayingDecoder} which use an {@link Unmarshaller} to read the Object out of the {@link ByteBuf}.
@ -55,7 +56,7 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void> {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (discardingTooLongFrame) {
buffer.skipBytes(actualReadableBytes());
checkpoint();
@ -83,7 +84,7 @@ public class CompatibleMarshallingDecoder extends ReplayingDecoder<Void> {
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf buffer, MessageList<Object> out) throws Exception {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
switch (buffer.readableBytes()) {
case 0:
return;

View File

@ -15,19 +15,21 @@
*/
package io.netty.handler.codec.protobuf;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
/**
* Decodes a received {@link ByteBuf} into a
* <a href="http://code.google.com/p/protobuf/">Google Protocol Buffers</a>
@ -95,7 +97,7 @@ public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final byte[] array;
final int offset;
final int length = msg.readableBytes();

View File

@ -15,19 +15,20 @@
*/
package io.netty.handler.codec.protobuf;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageEncoder;
import static io.netty.buffer.Unpooled.*;
import java.util.List;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
/**
* Encodes the requested <a href="http://code.google.com/p/protobuf/">Google
@ -60,7 +61,7 @@ import static io.netty.buffer.Unpooled.*;
public class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {
@Override
protected void encode(
ChannelHandlerContext ctx, MessageLiteOrBuilder msg, MessageList<Object> out) throws Exception {
ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
if (msg instanceof MessageLite) {
out.add(wrappedBuffer(((MessageLite) msg).toByteArray()));
return;

View File

@ -15,13 +15,15 @@
*/
package io.netty.handler.codec.protobuf;
import com.google.protobuf.CodedInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import java.util.List;
import com.google.protobuf.CodedInputStream;
/**
* A decoder that splits the received {@link ByteBuf}s dynamically by the
* value of the Google Protocol Buffers
@ -43,7 +45,7 @@ public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
// (just like LengthFieldBasedFrameDecoder)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
final byte[] buf = new byte[5];
for (int i = 0; i < buf.length; i ++) {

View File

@ -19,13 +19,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.nio.charset.Charset;
import java.util.List;
/**
* Decodes a received {@link ByteBuf} into a {@link String}. Please
@ -75,7 +75,7 @@ public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
}

View File

@ -15,15 +15,17 @@
*/
package io.netty.handler.codec;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.List;
import org.junit.Test;
public class ReplayingDecoderTest {
@ -55,7 +57,7 @@ public class ReplayingDecoderTest {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
ByteBuf msg = in.readBytes(in.bytesBefore((byte) '\n'));
out.add(msg);
in.skipBytes(1);
@ -80,9 +82,9 @@ public class ReplayingDecoderTest {
private static final class BloatedLineDecoder extends ChannelInboundHandlerAdapter {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.pipeline().replace(this, "less-bloated", new LineDecoder());
ctx.pipeline().fireMessageReceived(msgs);
ctx.pipeline().fireMessageReceived(msg);
}
}

View File

@ -75,7 +75,7 @@ public class AppletDiscardServer extends JApplet {
private static final class DiscardServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Received: " + msg.toString(CharsetUtil.UTF_8));
}

View File

@ -19,8 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -28,7 +27,7 @@ import java.util.logging.Logger;
/**
* Handles a client-side channel.
*/
public class DiscardClientHandler extends ChannelInboundHandlerAdapter {
public class DiscardClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = Logger.getLogger(
DiscardClientHandler.class.getName());
@ -63,9 +62,8 @@ public class DiscardClientHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Server is supposed to send nothing, but if it sends something, discard it.
msgs.releaseAllAndRecycle();
}
@Override
@ -84,7 +82,7 @@ public class DiscardClientHandler extends ChannelInboundHandlerAdapter {
private void generateTraffic() {
// Flush the outbound buffer to the socket.
// Once flushed, generate the same amount of traffic again.
ctx.write(content.duplicate().retain()).addListener(trafficGenerator);
ctx.write(content.duplicate().retain()).flush().addListener(trafficGenerator);
}
private final ChannelFutureListener trafficGenerator = new ChannelFutureListener() {

View File

@ -16,8 +16,7 @@
package io.netty.example.discard;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -25,14 +24,14 @@ import java.util.logging.Logger;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
public class DiscardServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = Logger.getLogger(
DiscardServerHandler.class.getName());
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
msgs.releaseAllAndRecycle();
public void messageReceived0(ChannelHandlerContext ctx, Object msg) throws Exception {
// discard
}
@Override

View File

@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -51,12 +50,17 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.write(firstMessage);
ctx.write(firstMessage).flush();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
ctx.write(msgs);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override

View File

@ -18,7 +18,6 @@ package io.netty.example.echo;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -33,8 +32,13 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
EchoServerHandler.class.getName());
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
ctx.write(msgs);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override

View File

@ -17,11 +17,11 @@ package io.netty.example.factorial;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import java.math.BigInteger;
import java.util.List;
/**
* Decodes the binary representation of a {@link BigInteger} prepended
@ -32,7 +32,7 @@ import java.math.BigInteger;
public class BigIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// Wait until the length prefix is available.
if (in.readableBytes() < 5) {
return;

View File

@ -18,7 +18,6 @@ package io.netty.example.factorial;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import java.math.BigInteger;
@ -71,7 +70,7 @@ public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteg
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final BigInteger msg) {
public void messageReceived0(ChannelHandlerContext ctx, final BigInteger msg) {
receivedMessages ++;
if (receivedMessages == count) {
// Offer the answer after closing the connection.
@ -97,10 +96,9 @@ public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteg
private void sendNumbers() {
// Do not send more than 4096 numbers.
boolean finished = false;
MessageList<Object> out = MessageList.newInstance(4096);
while (out.size() < 4096) {
for (int i = 0; i < 4096; i++) {
if (i <= count) {
out.add(Integer.valueOf(i));
ctx.write(Integer.valueOf(i));
i ++;
} else {
finished = true;
@ -108,7 +106,7 @@ public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteg
}
}
ChannelFuture f = ctx.write(out);
ChannelFuture f = ctx.flush();
if (!finished) {
f.addListener(numberSender);
}

View File

@ -16,8 +16,7 @@
package io.netty.example.factorial;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import java.math.BigInteger;
import java.util.Formatter;
@ -31,7 +30,7 @@ import java.util.logging.Logger;
* to create a new handler instance whenever you create a new channel and insert
* this handler to avoid a race condition.
*/
public class FactorialServerHandler extends ChannelInboundHandlerAdapter {
public class FactorialServerHandler extends SimpleChannelInboundHandler<BigInteger> {
private static final Logger logger = Logger.getLogger(
FactorialServerHandler.class.getName());
@ -40,29 +39,21 @@ public class FactorialServerHandler extends ChannelInboundHandlerAdapter {
private BigInteger factorial = new BigInteger("1");
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
MessageList<BigInteger> ints = msgs.cast();
for (int i = 0; i < ints.size(); i++) {
BigInteger msg = ints.get(i);
public void messageReceived0(ChannelHandlerContext ctx, BigInteger msg) throws Exception {
// Calculate the cumulative factorial and send it to the client.
lastMultiplier = msg;
factorial = factorial.multiply(msg);
ctx.write(factorial);
}
msgs.recycle();
ctx.write(factorial).flush();
}
@Override
public void channelInactive(
ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info(new Formatter().format(
"Factorial of %,d is: %,d", lastMultiplier, factorial).toString());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.", cause);

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FileRegion;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
@ -94,7 +93,7 @@ public class FileServer {
private static final class FileHandler extends SimpleChannelInboundHandler<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, String msg) throws Exception {
File file = new File(msg);
if (file.exists()) {
if (!file.isFile()) {
@ -102,11 +101,10 @@ public class FileServer {
return;
}
ctx.write(file + " " + file.length() + '\n');
MessageList<Object> out = MessageList.newInstance();
FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length());
out.add(region);
out.add("\n");
ctx.write(out);
ctx.write(region);
ctx.write("\n");
ctx.flush();
} else {
ctx.write("File not found: " + file + '\n');
}

View File

@ -19,7 +19,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
@ -105,7 +104,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
public static final int HTTP_CACHE_SECONDS = 60;
@Override
public void messageReceived(
public void messageReceived0(
ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.getDecoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
@ -177,15 +176,14 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
MessageList<Object> out = MessageList.newInstance();
// Write the initial line and the header.
out.add(response);
ctx.write(response);
// Write the content.
out.add(new ChunkedFile(raf, 0, fileLength, 8192));
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192));
// Write the end marker
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
ctx.write(LastHttpContent.EMPTY_LAST_CONTENT);
ChannelFuture writeFuture = ctx.write(out);
ChannelFuture writeFuture = ctx.flush();
// Decide whether to close the connection or not.
if (!isKeepAlive(request)) {
// Close the connection when the whole content is written out.
@ -279,7 +277,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
response.content().writeBytes(Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
ctx.write(response).flush().addListener(ChannelFutureListener.CLOSE);
}
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
@ -287,7 +285,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
response.headers().set(LOCATION, newUri);
// Close the connection as soon as the error message is sent.
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
ctx.write(response).flush().addListener(ChannelFutureListener.CLOSE);
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
@ -296,7 +294,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
ctx.write(response).flush().addListener(ChannelFutureListener.CLOSE);
}
/**
@ -310,7 +308,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
setDateHeader(response);
// Close the connection as soon as the error message is sent.
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
ctx.write(response).flush().addListener(ChannelFutureListener.CLOSE);
}
/**

View File

@ -19,8 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
@ -31,20 +30,13 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<Object> {
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf CONTENT =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII));
private MessageList<Object> out;
@Override
protected void beginMessageReceived(ChannelHandlerContext ctx) {
out = MessageList.newInstance();
}
@Override
protected void endMessageReceived(ChannelHandlerContext ctx) {
ctx.write(out);
out = null;
public void messageReceivedLast(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
@ -53,7 +45,7 @@ public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<Obj
HttpRequest req = (HttpRequest) msg;
if (is100ContinueExpected(req)) {
out.add(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)).flush();
}
boolean keepAlive = isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, CONTENT.duplicate());
@ -61,11 +53,9 @@ public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<Obj
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
out.add(response);
ctx.write(out).addListener(ChannelFutureListener.CLOSE);
out = MessageList.newInstance();
ctx.write(response).flush().addListener(ChannelFutureListener.CLOSE);
} else {
out.add(response);
ctx.write(response);
response.headers().set(CONNECTION, Values.KEEP_ALIVE);
}
}

View File

@ -27,7 +27,7 @@ import io.netty.util.CharsetUtil;
public class HttpSnoopClientHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;

View File

@ -18,8 +18,7 @@ package io.netty.example.http.snoop;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.CookieDecoder;
@ -44,34 +43,24 @@ import static io.netty.handler.codec.http.HttpHeaders.*;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter {
public class HttpSnoopServerHandler extends SimpleChannelInboundHandler<Object> {
private HttpRequest request;
/** Buffer that stores the response content */
private final StringBuilder buf = new StringBuilder();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
MessageList<Object> out = MessageList.newInstance();
int size = msgs.size();
try {
for (int i = 0; i < size; i ++) {
if (!messageReceived(ctx, msgs.get(i), out)) {
break;
}
}
} finally {
msgs.releaseAllAndRecycle();
ctx.write(out);
}
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private boolean messageReceived(ChannelHandlerContext ctx, Object msg, MessageList<Object> out) {
@Override
protected void messageReceived0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg;
if (is100ContinueExpected(request)) {
send100Continue(out);
send100Continue(ctx);
}
buf.setLength(0);
@ -134,10 +123,9 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter {
buf.append("\r\n");
}
return writeResponse(trailer, out);
writeResponse(trailer, ctx);
}
}
return true;
}
private static void appendDecoderResult(StringBuilder buf, HttpObject o) {
@ -151,7 +139,7 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter {
buf.append("\r\n");
}
private boolean writeResponse(HttpObject currentObj, MessageList<Object> out) {
private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) {
// Decide whether to close the connection or not.
boolean keepAlive = isKeepAlive(request);
// Build the response object.
@ -186,14 +174,14 @@ public class HttpSnoopServerHandler extends ChannelInboundHandlerAdapter {
}
// Write the response.
out.add(response);
ctx.write(response);
return keepAlive;
}
private static void send100Continue(MessageList<Object> out) {
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE);
out.add(response);
ctx.write(response);
}
@Override

View File

@ -202,7 +202,7 @@ public class HttpUploadClient {
// send request
List<Entry<String, String>> entries = headers.entries();
channel.write(request).sync();
channel.write(request).flush().sync();
// Wait for the server to close the connection.
channel.closeFuture().sync();
@ -276,7 +276,7 @@ public class HttpUploadClient {
if (bodyRequestEncoder.isChunked()) {
// could do either request.isChunked()
// either do it through ChunkedWriteHandler
channel.write(bodyRequestEncoder).awaitUninterruptibly();
channel.write(bodyRequestEncoder).flush().awaitUninterruptibly();
}
// Do not clear here since we will reuse the InterfaceHttpData on the
@ -351,7 +351,7 @@ public class HttpUploadClient {
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) {
channel.write(bodyRequestEncoder).awaitUninterruptibly();
channel.write(bodyRequestEncoder).flush().awaitUninterruptibly();
}
// Now no more use of file representation (and list of HttpData)

View File

@ -36,7 +36,7 @@ public class HttpUploadClientHandler extends SimpleChannelInboundHandler<HttpObj
private boolean readingChunks;
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;

View File

@ -95,7 +95,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObj
}
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg;
URI uri = new URI(request.getUri());
@ -313,7 +313,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObj
}
}
// Write the response.
ChannelFuture future = channel.write(response);
ChannelFuture future = channel.write(response).flush();
// Close the connection after the write operation is done if necessary.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@ -53,18 +52,17 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
MessageList<Object> out = MessageList.newInstance(msgs.size());
for (int i = 0; i < msgs.size(); i++) {
Object msg = msgs.get(i);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg, out);
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
msgs.recycle();
ctx.write(out);
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
@ -92,7 +90,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame, MessageList<Object> out) {
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), frame.getClass().getSimpleName()));
@ -101,13 +99,13 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
out.add(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof TextWebSocketFrame) {
out.add(frame);
ctx.write(frame);
} else if (frame instanceof BinaryWebSocketFrame) {
out.add(frame);
ctx.write(frame);
} else if (frame instanceof ContinuationWebSocketFrame) {
out.add(frame);
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
@ -126,7 +124,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().write(res);
ChannelFuture f = ctx.channel().write(res).flush();
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}

View File

@ -79,7 +79,7 @@ public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object>
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);

View File

@ -21,7 +21,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
protected void messageReceived0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
String request = frame.text();
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}

View File

@ -53,7 +53,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
@ -135,7 +135,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().write(res);
ChannelFuture f = ctx.channel().write(res).flush();
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}

View File

@ -54,7 +54,7 @@ public class WebSocketSslServerHandler extends SimpleChannelInboundHandler<Objec
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
@ -137,7 +137,7 @@ public class WebSocketSslServerHandler extends SimpleChannelInboundHandler<Objec
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().write(res);
ChannelFuture f = ctx.channel().write(res).flush();
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}

View File

@ -97,7 +97,7 @@ public class LocalEcho {
}
// Sends the received line to the server.
lastWriteFuture = ch.write(line);
lastWriteFuture = ch.write(line).flush();
}
// Wait until all messages are flushed before closing the channel.

View File

@ -21,7 +21,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
public class LocalEchoClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Print as received
System.out.println(msg);
}

View File

@ -17,14 +17,19 @@ package io.netty.example.localecho;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
public class LocalEchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// Write back as received
ctx.write(msgs);
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.example.objectecho;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import java.util.ArrayList;
import java.util.List;
@ -57,9 +56,14 @@ public class ObjectEchoClientHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// Echo back the received object to the server.
ctx.write(msgs);
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.example.objectecho;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -33,9 +32,14 @@ public class ObjectEchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
ChannelHandlerContext ctx, Object msg) throws Exception {
// Echo back the received object to the client.
ctx.write(msgs);
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override

View File

@ -18,7 +18,6 @@ package io.netty.example.portunification;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
import io.netty.example.factorial.BigIntegerDecoder;
import io.netty.example.factorial.FactorialServerHandler;
import io.netty.example.factorial.NumberEncoder;
@ -33,6 +32,7 @@ import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
import java.util.List;
/**
* Manipulates the current pipeline dynamically to switch protocols or enable
@ -53,7 +53,7 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// Will use the first five bytes to detect a protocol.
if (in.readableBytes() < 5) {
return;

View File

@ -21,7 +21,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {
@ -38,8 +37,8 @@ public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(final ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
inboundChannel.write(msgs).addListener(new ChannelFutureListener() {
public void messageReceived(final ChannelHandlerContext ctx, Object msg) throws Exception {
inboundChannel.write(msg).flush().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageList;
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
@ -64,9 +63,9 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(final ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
public void messageReceived(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (outboundChannel.isActive()) {
outboundChannel.write(msgs).addListener(new ChannelFutureListener() {
outboundChannel.write(msg).flush().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
@ -98,7 +97,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
ch.write(Unpooled.EMPTY_BUFFER).flush().addListener(ChannelFutureListener.CLOSE);
}
}
}

View File

@ -55,7 +55,7 @@ public class QuoteOfTheMomentClient {
// Broadcast the QOTM request to port 8080.
ch.write(new DatagramPacket(
Unpooled.copiedBuffer("QOTM?", CharsetUtil.UTF_8),
new InetSocketAddress("255.255.255.255", port))).sync();
new InetSocketAddress("255.255.255.255", port))).flush().sync();
// QuoteOfTheMomentClientHandler will close the DatagramChannel when a
// response is received. If the channel is not closed within 5 seconds,

View File

@ -23,7 +23,7 @@ import io.netty.util.CharsetUtil;
public class QuoteOfTheMomentClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String response = msg.content().toString(CharsetUtil.UTF_8);
if (response.startsWith("QOTM: ")) {
System.out.println("Quote of the Moment: " + response.substring(6));

View File

@ -44,7 +44,7 @@ public class QuoteOfTheMomentServerHandler extends SimpleChannelInboundHandler<D
}
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
System.err.println(packet);
if ("QOTM?".equals(packet.content().toString(CharsetUtil.UTF_8))) {
ctx.write(new DatagramPacket(

View File

@ -26,7 +26,7 @@ public class RxtxClientHandler extends SimpleChannelInboundHandler<String> {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, String msg) throws Exception {
if ("OK".equals(msg)) {
System.out.println("Serial port responded to AT");
} else {

View File

@ -16,11 +16,9 @@
package io.netty.example.sctp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.channel.sctp.SctpMessage;
import java.util.logging.Level;
@ -57,8 +55,13 @@ public class SctpEchoClientHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> messages) throws Exception {
ctx.write(messages);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override

View File

@ -18,7 +18,6 @@ package io.netty.example.sctp;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -40,7 +39,12 @@ public class SctpEchoServerHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
ctx.write(msgs);
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

View File

@ -60,7 +60,7 @@ public class SecureChatClient {
}
// Sends the received line to the server.
lastWriteFuture = ch.write(line + "\r\n");
lastWriteFuture = ch.write(line + "\r\n").flush();
// If user typed the 'bye' command, wait until the server closes
// the connection.

View File

@ -30,7 +30,7 @@ public class SecureChatClientHandler extends SimpleChannelInboundHandler<String>
SecureChatClientHandler.class.getName());
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, String msg) throws Exception {
System.err.println(msg);
}

View File

@ -61,7 +61,7 @@ public class SecureChatServerHandler extends SimpleChannelInboundHandler<String>
}
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, String msg) throws Exception {
// Send the received message to all channels but the current one.
for (Channel c: channels) {
if (c != ctx.channel()) {

View File

@ -19,7 +19,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
public final class RelayHandler extends ChannelInboundHandlerAdapter {
@ -41,9 +41,11 @@ public final class RelayHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception {
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (relayChannel.isActive()) {
relayChannel.write(msgs);
relayChannel.write(msg).flush();
} else {
ReferenceCountUtil.release(msg);
}
}

View File

@ -39,11 +39,11 @@ public final class SocksServerConnectHandler extends SimpleChannelInboundHandler
private final Bootstrap b = new Bootstrap();
@Override
public void messageReceived(final ChannelHandlerContext ctx, final SocksCmdRequest request) throws Exception {
public void messageReceived0(final ChannelHandlerContext ctx, final SocksCmdRequest request) throws Exception {
CallbackNotifier cb = new CallbackNotifier() {
@Override
public void onSuccess(final ChannelHandlerContext outboundCtx) {
ctx.channel().write(new SocksCmdResponse(SocksCmdStatus.SUCCESS, request.addressType()))
ctx.channel().write(new SocksCmdResponse(SocksCmdStatus.SUCCESS, request.addressType())).flush()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {

View File

@ -37,7 +37,7 @@ public final class SocksServerHandler extends SimpleChannelInboundHandler<SocksR
}
@Override
public void messageReceived(ChannelHandlerContext ctx, SocksRequest socksRequest) throws Exception {
public void messageReceived0(ChannelHandlerContext ctx, SocksRequest socksRequest) throws Exception {
switch (socksRequest.requestType()) {
case INIT: {
// auth support example

Some files were not shown because too many files have changed in this diff Show More