Clean up abstract ChannelHandler impls / Remove ChannelHandlerContext.hasNext*()

- Rename ChannelHandlerAdapter to ChannelDuplexHandler
- Add ChannelHandlerAdapter that implements only ChannelHandler
- Rename CombinedChannelHandler to CombinedChannelDuplexHandler and
  improve runtime validation
- Remove ChannelInbound/OutboundHandlerAdapter which are not useful
- Make ChannelOutboundByteHandlerAdapter similar to
  ChannelInboundByteHandlerAdapter
- Make the tail and head handler of DefaultChannelPipeline accept both
  bytes and messages.  ChannelHandlerContext.hasNext*() were removed
  because they always return true now.
- Removed various unnecessary null checks.
- Correct method/field names:
  inboundBufferSuspended -> channelReadSuspended
This commit is contained in:
Trustin Lee 2013-02-06 12:55:42 +09:00
parent 7eed272e57
commit d4742bbe16
53 changed files with 1121 additions and 1061 deletions

View File

@ -33,6 +33,9 @@ final class QueueBackedMessageBuf<T> extends AbstractMessageBuf<T> {
@Override @Override
public boolean offer(T e) { public boolean offer(T e) {
if (e == null) {
throw new NullPointerException("e");
}
checkUnfreed(); checkUnfreed();
return isWritable() && queue.offer(e); return isWritable() && queue.offer(e);
} }

View File

@ -16,9 +16,12 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.PrematureChannelClosureException; import io.netty.handler.codec.PrematureChannelClosureException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -42,13 +45,15 @@ import java.util.concurrent.atomic.AtomicLong;
* @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder
* @apiviz.has io.netty.handler.codec.http.HttpRequestEncoder * @apiviz.has io.netty.handler.codec.http.HttpRequestEncoder
*/ */
public class HttpClientCodec extends CombinedChannelHandler { public final class HttpClientCodec
extends CombinedChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<HttpObject> {
/** A queue that is used for correlating a request and a response. */ /** A queue that is used for correlating a request and a response. */
final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>(); private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
/** If true, decoding stops (i.e. pass-through) */ /** If true, decoding stops (i.e. pass-through) */
volatile boolean done; private volatile boolean done;
private final AtomicLong requestResponseCounter = new AtomicLong(); private final AtomicLong requestResponseCounter = new AtomicLong();
private final boolean failOnMissingResponse; private final boolean failOnMissingResponse;
@ -65,25 +70,53 @@ public class HttpClientCodec extends CombinedChannelHandler {
/** /**
* Creates a new instance with the specified decoder options. * Creates a new instance with the specified decoder options.
*/ */
public HttpClientCodec( public HttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false); this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
} }
public HttpClientCodec( public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
boolean failOnMissingResponse) { init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new Encoder());
init(
new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize),
new Encoder());
this.failOnMissingResponse = failOnMissingResponse; this.failOnMissingResponse = failOnMissingResponse;
} }
private Decoder decoder() {
return (Decoder) stateHandler();
}
private Encoder encoder() {
return (Encoder) operationHandler();
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder().newInboundBuffer(ctx);
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
decoder().discardInboundReadBytes(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoder().freeInboundBuffer(ctx);
}
@Override
public MessageBuf<HttpObject> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return encoder().newOutboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
encoder().freeOutboundBuffer(ctx);
}
private final class Encoder extends HttpRequestEncoder { private final class Encoder extends HttpRequestEncoder {
@Override @Override
protected void encode( protected void encode(
ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception {
if (msg instanceof HttpRequest && !done) { if (msg instanceof HttpRequest && !done) {
queue.offer(((HttpRequest) msg).getMethod()); queue.offer(((HttpRequest) msg).getMethod());
} }

View File

@ -39,7 +39,7 @@ import static io.netty.handler.codec.http.HttpConstants.*;
* implement all abstract methods properly. * implement all abstract methods properly.
* @apiviz.landmark * @apiviz.landmark
*/ */
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToByteEncoder<Object> { public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToByteEncoder<HttpObject> {
private static final int ST_INIT = 0; private static final int ST_INIT = 0;
private static final int ST_CONTENT_NON_CHUNK = 1; private static final int ST_CONTENT_NON_CHUNK = 1;
@ -56,17 +56,17 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
} }
@Override @Override
@SuppressWarnings("unchecked") protected void encode(ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (msg instanceof HttpMessage) { if (msg instanceof HttpMessage) {
if (state != ST_INIT) { if (state != ST_INIT) {
throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName()); throw new IllegalStateException("unexpected message type: " + msg.getClass().getSimpleName());
} }
HttpMessage m = (HttpMessage) msg; @SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
H m = (H) msg;
// Encode the message. // Encode the message.
encodeInitialLine(out, (H) m); encodeInitialLine(out, m);
encodeHeaders(out, m); encodeHeaders(out, m);
out.writeByte(CR); out.writeByte(CR);
out.writeByte(LF); out.writeByte(LF);

View File

@ -15,7 +15,12 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.channel.CombinedChannelHandler; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.CombinedChannelDuplexHandler;
/** /**
@ -26,7 +31,9 @@ import io.netty.channel.CombinedChannelHandler;
* @apiviz.has io.netty.handler.codec.http.HttpRequestDecoder * @apiviz.has io.netty.handler.codec.http.HttpRequestDecoder
* @apiviz.has io.netty.handler.codec.http.HttpResponseEncoder * @apiviz.has io.netty.handler.codec.http.HttpResponseEncoder
*/ */
public class HttpServerCodec extends CombinedChannelHandler { public final class HttpServerCodec
extends CombinedChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<HttpObject> {
/** /**
* Creates a new instance with the default decoder options * Creates a new instance with the default decoder options
@ -40,10 +47,40 @@ public class HttpServerCodec extends CombinedChannelHandler {
/** /**
* Creates a new instance with the specified decoder options. * Creates a new instance with the specified decoder options.
*/ */
public HttpServerCodec( public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { super(new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new HttpResponseEncoder());
super( }
new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize),
new HttpResponseEncoder()); private HttpRequestDecoder decoder() {
return (HttpRequestDecoder) stateHandler();
}
private HttpResponseEncoder encoder() {
return (HttpResponseEncoder) operationHandler();
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder().newInboundBuffer(ctx);
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
decoder().discardInboundReadBytes(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoder().freeInboundBuffer(ctx);
}
@Override
public MessageBuf<HttpObject> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return encoder().newOutboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
encoder().freeOutboundBuffer(ctx);
} }
} }

View File

@ -18,7 +18,6 @@
* Encoder, decoder and their related message types for HTTP. * Encoder, decoder and their related message types for HTTP.
* *
* @apiviz.exclude ^java\.lang\. * @apiviz.exclude ^java\.lang\.
* @apiviz.exclude OneToOne(Encoder|Decoder)$
* @apiviz.exclude \.HttpHeaders\. * @apiviz.exclude \.HttpHeaders\.
* @apiviz.exclude \.codec\.replay\. * @apiviz.exclude \.codec\.replay\.
* @apiviz.exclude \.(Simple)?Channel[A-Za-z]*Handler$ * @apiviz.exclude \.(Simple)?Channel[A-Za-z]*Handler$

View File

@ -15,7 +15,12 @@
*/ */
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.channel.CombinedChannelHandler; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.CombinedChannelDuplexHandler;
/** /**
@ -24,7 +29,9 @@ import io.netty.channel.CombinedChannelHandler;
* @apiviz.has io.netty.handler.codec.spdy.SpdyFrameDecoder * @apiviz.has io.netty.handler.codec.spdy.SpdyFrameDecoder
* @apiviz.has io.netty.handler.codec.spdy.SpdyFrameEncoder * @apiviz.has io.netty.handler.codec.spdy.SpdyFrameEncoder
*/ */
public class SpdyFrameCodec extends CombinedChannelHandler { public final class SpdyFrameCodec
extends CombinedChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<Object> {
/** /**
* Creates a new instance with the specified {@code version} and * Creates a new instance with the specified {@code version} and
@ -47,4 +54,37 @@ public class SpdyFrameCodec extends CombinedChannelHandler {
new SpdyFrameDecoder(version, maxChunkSize, maxHeaderSize), new SpdyFrameDecoder(version, maxChunkSize, maxHeaderSize),
new SpdyFrameEncoder(version, compressionLevel, windowBits, memLevel)); new SpdyFrameEncoder(version, compressionLevel, windowBits, memLevel));
} }
private SpdyFrameDecoder decoder() {
return (SpdyFrameDecoder) stateHandler();
}
private SpdyFrameEncoder encoder() {
return (SpdyFrameEncoder) operationHandler();
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder().newInboundBuffer(ctx);
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
decoder().discardInboundReadBytes(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoder().freeInboundBuffer(ctx);
}
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return encoder().newOutboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
encoder().freeOutboundBuffer(ctx);
}
} }

View File

@ -15,7 +15,12 @@
*/ */
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.channel.CombinedChannelHandler; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.http.HttpObject;
/** /**
@ -23,14 +28,42 @@ import io.netty.channel.CombinedChannelHandler;
* @apiviz.has io.netty.handler.codec.sdpy.SpdyHttpDecoder * @apiviz.has io.netty.handler.codec.sdpy.SpdyHttpDecoder
* @apiviz.has io.netty.handler.codec.spdy.SpdyHttpEncoder * @apiviz.has io.netty.handler.codec.spdy.SpdyHttpEncoder
*/ */
public class SpdyHttpCodec extends CombinedChannelHandler { public final class SpdyHttpCodec
extends CombinedChannelDuplexHandler
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<HttpObject> {
/** /**
* Creates a new instance with the specified decoder options. * Creates a new instance with the specified decoder options.
*/ */
public SpdyHttpCodec(int version, int maxContentLength) { public SpdyHttpCodec(int version, int maxContentLength) {
super( super(new SpdyHttpDecoder(version, maxContentLength), new SpdyHttpEncoder(version));
new SpdyHttpDecoder(version, maxContentLength), }
new SpdyHttpEncoder(version));
private SpdyHttpDecoder decoder() {
return (SpdyHttpDecoder) stateHandler();
}
private SpdyHttpEncoder encoder() {
return (SpdyHttpEncoder) operationHandler();
}
@Override
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder().newInboundBuffer(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoder().freeInboundBuffer(ctx);
}
@Override
public MessageBuf<HttpObject> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return encoder().newOutboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
encoder().freeOutboundBuffer(ctx);
} }
} }

View File

@ -120,7 +120,7 @@ import java.util.Map;
* All pushed resources should be sent before sending the response * All pushed resources should be sent before sending the response
* that corresponds to the initial request. * that corresponds to the initial request.
*/ */
public class SpdyHttpEncoder extends MessageToMessageEncoder<Object> { public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
private final int spdyVersion; private final int spdyVersion;
private volatile int currentStreamId; private volatile int currentStreamId;
@ -141,7 +141,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object> {
} }
@Override @Override
public Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { public Object encode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
List<Object> out = new ArrayList<Object>(); List<Object> out = new ArrayList<Object>();
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
@ -288,7 +288,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object> {
} }
@Override @Override
protected void freeOutboundMessage(Object msg) throws Exception { protected void freeOutboundMessage(HttpObject msg) throws Exception {
if (msg instanceof HttpContent) { if (msg instanceof HttpContent) {
// Will be freed later as the content of them is just reused here // Will be freed later as the content of them is just reused here
return; return;

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
@ -35,7 +35,7 @@ import javax.net.ssl.SSLEngine;
* much about the low-level details. * much about the low-level details.
* *
*/ */
public abstract class SpdyOrHttpChooser extends ChannelHandlerAdapter implements ChannelInboundByteHandler { public abstract class SpdyOrHttpChooser extends ChannelDuplexHandler implements ChannelInboundByteHandler {
public enum SelectedProtocol { public enum SelectedProtocol {
SpdyVersion2, SpdyVersion2,

View File

@ -17,9 +17,9 @@ package io.netty.handler.codec.spdy;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* Manages streams within a SPDY session. * Manages streams within a SPDY session.
*/ */
public class SpdySessionHandler public class SpdySessionHandler
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> { implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException(); private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();

View File

@ -16,7 +16,7 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler;
@ -53,7 +53,7 @@ import io.netty.channel.ChannelPromise;
* </pre> * </pre>
*/ */
public abstract class ByteToByteCodec public abstract class ByteToByteCodec
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundByteHandler { implements ChannelInboundByteHandler, ChannelOutboundByteHandler {
private final ByteToByteEncoder encoder = new ByteToByteEncoder() { private final ByteToByteEncoder encoder = new ByteToByteEncoder() {

View File

@ -47,8 +47,7 @@ import io.netty.channel.PartialFlushException;
public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter { public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter {
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception {
ByteBuf in = ctx.outboundByteBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
boolean encoded = false; boolean encoded = false;

View File

@ -17,14 +17,14 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<I> { implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<I> {
private final Class<?>[] encodableMessageTypes; private final Class<?>[] encodableMessageTypes;

View File

@ -16,7 +16,7 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
@ -49,7 +49,7 @@ import io.netty.channel.ChannelPromise;
* </pre> * </pre>
*/ */
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<INBOUND_IN>, implements ChannelInboundMessageHandler<INBOUND_IN>,
ChannelOutboundMessageHandler<OUTBOUND_IN> { ChannelOutboundMessageHandler<OUTBOUND_IN> {

View File

@ -67,6 +67,7 @@ public abstract class MessageToMessageDecoder<I>
public void inboundBufferUpdated(ChannelHandlerContext ctx) public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception { throws Exception {
MessageBuf<I> in = ctx.inboundMessageBuffer(); MessageBuf<I> in = ctx.inboundMessageBuffer();
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
boolean notify = false; boolean notify = false;
for (;;) { for (;;) {
try { try {
@ -75,7 +76,7 @@ public abstract class MessageToMessageDecoder<I>
break; break;
} }
if (!isDecodable(msg)) { if (!isDecodable(msg)) {
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); out.add(msg);
notify = true; notify = true;
continue; continue;
} }

View File

@ -15,9 +15,9 @@
*/ */
package io.netty.handler.logging; package io.netty.handler.logging;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.logging.InternalLogLevel; import io.netty.logging.InternalLogLevel;
@ -32,7 +32,7 @@ import java.net.SocketAddress;
* @apiviz.landmark * @apiviz.landmark
*/ */
@Sharable @Sharable
public class LoggingHandler extends ChannelHandlerAdapter { public class LoggingHandler extends ChannelDuplexHandler {
private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG; private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG;

View File

@ -18,10 +18,10 @@ package io.netty.handler.ssl;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFlushPromiseNotifier; import io.netty.channel.ChannelFlushPromiseNotifier;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler;
@ -142,7 +142,7 @@ import java.util.regex.Pattern;
* @apiviz.uses io.netty.handler.ssl.SslBufferPool * @apiviz.uses io.netty.handler.ssl.SslBufferPool
*/ */
public class SslHandler public class SslHandler
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundByteHandler { implements ChannelInboundByteHandler, ChannelOutboundByteHandler {
private static final InternalLogger logger = private static final InternalLogger logger =

View File

@ -18,11 +18,11 @@ package io.netty.handler.stream;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from * @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from
*/ */
public class ChunkedWriteHandler public class ChunkedWriteHandler
extends ChannelHandlerAdapter implements ChannelOutboundMessageHandler<Object> { extends ChannelDuplexHandler implements ChannelOutboundMessageHandler<Object> {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);

View File

@ -17,9 +17,9 @@ package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOperationHandler; import io.netty.channel.ChannelOperationHandler;
@ -75,7 +75,7 @@ import java.util.concurrent.TimeUnit;
* } * }
* *
* // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}. * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
* public class MyHandler extends {@link ChannelHandlerAdapter} { * public class MyHandler extends {@link ChannelDuplexHandler} {
* {@code @Override} * {@code @Override}
* public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} { * public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
* if (evt instanceof {@link IdleState}} { * if (evt instanceof {@link IdleState}} {

View File

@ -17,7 +17,7 @@ package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelStateHandlerAdapter; import io.netty.channel.ChannelStateHandlerAdapter;
@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit;
* } * }
* *
* // Handler should handle the {@link ReadTimeoutException}. * // Handler should handle the {@link ReadTimeoutException}.
* public class MyHandler extends {@link ChannelHandlerAdapter} { * public class MyHandler extends {@link ChannelDuplexHandler} {
* {@code @Override} * {@code @Override}
* public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause) * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
* throws {@link Exception} { * throws {@link Exception} {

View File

@ -17,9 +17,9 @@ package io.netty.handler.timeout;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOperationHandlerAdapter; import io.netty.channel.ChannelOperationHandlerAdapter;
@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
* } * }
* *
* // Handler should handle the {@link WriteTimeoutException}. * // Handler should handle the {@link WriteTimeoutException}.
* public class MyHandler extends {@link ChannelHandlerAdapter} { * public class MyHandler extends {@link ChannelDuplexHandler} {
* {@code @Override} * {@code @Override}
* public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause) * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
* throws {@link Exception} { * throws {@link Exception} {

View File

@ -16,7 +16,7 @@
package io.netty.handler.traffic; package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler;
@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit;
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li> * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
* </ul> * </ul>
*/ */
public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapter public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundByteHandler { implements ChannelInboundByteHandler, ChannelOutboundByteHandler {
/** /**

View File

@ -43,9 +43,7 @@ public class SctpOutboundByteStreamHandler extends ChannelOutboundByteHandlerAda
} }
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception {
ByteBuf in = ctx.outboundByteBuffer();
try { try {
MessageBuf<Object> out = ctx.nextOutboundMessageBuffer(); MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
ByteBuf payload = Unpooled.buffer(in.readableBytes()); ByteBuf payload = Unpooled.buffer(in.readableBytes());

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
@ -822,7 +823,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private int outboundBufSize() { private int outboundBufSize() {
final int bufSize; final int bufSize;
final ChannelHandlerContext ctx = directOutboundContext(); final ChannelHandlerContext ctx = directOutboundContext();
if (ctx.hasOutboundByteBuffer()) { if (metadata().bufferType() == BufType.BYTE) {
bufSize = ctx.outboundByteBuffer().readableBytes(); bufSize = ctx.outboundByteBuffer().readableBytes();
} else { } else {
bufSize = ctx.outboundMessageBuffer().size(); bufSize = ctx.outboundMessageBuffer().size();
@ -869,7 +870,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
ChannelHandlerContext ctx = directOutboundContext(); ChannelHandlerContext ctx = directOutboundContext();
Throwable cause = null; Throwable cause = null;
try { try {
if (ctx.hasOutboundByteBuffer()) { if (metadata().bufferType() == BufType.BYTE) {
ByteBuf out = ctx.outboundByteBuffer(); ByteBuf out = ctx.outboundByteBuffer();
int oldSize = out.readableBytes(); int oldSize = out.readableBytes();
try { try {
@ -877,7 +878,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) { } catch (Throwable t) {
cause = t; cause = t;
} finally { } finally {
flushFutureNotifier.increaseWriteCounter(oldSize - out.readableBytes()); int delta = oldSize - out.readableBytes();
out.discardSomeReadBytes();
flushFutureNotifier.increaseWriteCounter(delta);
} }
} else { } else {
MessageBuf<Object> out = ctx.outboundMessageBuffer(); MessageBuf<Object> out = ctx.outboundMessageBuffer();

View File

@ -0,0 +1,124 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.net.SocketAddress;
/**
* {@link ChannelHandler} implementation which represents a combination out of a {@link ChannelStateHandler} and
* the {@link ChannelOperationHandler}.
*
* It is a good starting point if your {@link ChannelHandler} implementation needs to intercept operations and also
* state updates.
*/
public abstract class ChannelDuplexHandler extends ChannelStateHandlerAdapter implements ChannelOperationHandler {
/**
* Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise future) throws Exception {
ctx.bind(localAddress, future);
}
/**
* Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise future) throws Exception {
ctx.connect(remoteAddress, localAddress, future);
}
/**
* Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
ctx.disconnect(future);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
ctx.close(future);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
ctx.deregister(future);
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
/**
* Calls {@link ChannelHandlerContext#flush(ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*
* Be aware that if your class also implement {@link ChannelOutboundHandler} it need to {@code @Override} this
* method and provide some proper implementation. Fail to do so, will result in an {@link IllegalStateException}!
*/
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
if (this instanceof ChannelOutboundHandler) {
throw new IllegalStateException(
"flush(...) must be overridden by " + getClass().getName() +
", which implements " + ChannelOutboundHandler.class.getSimpleName());
}
ctx.flush(future);
}
/**
* Calls {@link ChannelHandlerContext#sendFile(FileRegion, ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise future) throws Exception {
ctx.sendFile(region, future);
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012 The Netty Project * Copyright 2013 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, * The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance * version 2.0 (the "License"); you may not use this file except in compliance
@ -13,112 +13,75 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.channel; package io.netty.channel;
import java.net.SocketAddress; public abstract class ChannelHandlerAdapter implements ChannelHandler {
// Not using volatile because it's used only for a sanity check.
boolean added;
/** /**
* {@link ChannelHandler} implementation which represents a combination out of a {@link ChannelStateHandler} and * Return {@code true} if the implementation is {@link Sharable} and so can be added
* the {@link ChannelOperationHandler}. * to different {@link ChannelPipeline}s.
*
* It is a good starting point if your {@link ChannelHandler} implementation needs to intercept operations and also
* state updates.
*/ */
public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler { final boolean isSharable() {
return getClass().isAnnotationPresent(Sharable.class);
/**
* Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise future) throws Exception {
ctx.bind(localAddress, future);
} }
/** /**
* Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward * Do nothing by default, sub-classes may override this method.
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/ */
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
SocketAddress localAddress, ChannelPromise future) throws Exception { // NOOP
ctx.connect(remoteAddress, localAddress, future);
} }
/** /**
* Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward * Do nothing by default, sub-classes may override this method.
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. */
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
* *
* Sub-classes may override this method to change behavior. * Sub-classes may override this method to change behavior.
*/ */
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception { throws Exception {
ctx.disconnect(future); ctx.fireExceptionCaught(cause);
} }
/** /**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
* *
* Sub-classes may override this method to change behavior. * Sub-classes may override this method to change behavior.
*/ */
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception { throws Exception {
ctx.close(future); ctx.fireUserEventTriggered(evt);
}
/**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
ctx.deregister(future);
}
@Override
public void read(ChannelHandlerContext ctx) {
ctx.read();
}
/**
* Calls {@link ChannelHandlerContext#flush(ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*
* Be aware that if your class also implement {@link ChannelOutboundHandler} it need to {@code @Override} this
* method and provide some proper implementation. Fail to do so, will result in an {@link IllegalStateException}!
*/
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
if (this instanceof ChannelOutboundHandler) {
throw new IllegalStateException(
"flush(...) must be overridden by " + getClass().getName() +
", which implements " + ChannelOutboundHandler.class.getSimpleName());
}
ctx.flush(future);
}
/**
* Calls {@link ChannelHandlerContext#sendFile(FileRegion, ChannelPromise)} to forward
* to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise future) throws Exception {
ctx.sendFile(region, future);
} }
} }

View File

@ -47,7 +47,7 @@ import java.util.Set;
* You can keep the {@link ChannelHandlerContext} for later use, such as * You can keep the {@link ChannelHandlerContext} for later use, such as
* triggering an event outside the handler methods, even from a different thread. * triggering an event outside the handler methods, even from a different thread.
* <pre> * <pre>
* public class MyHandler extends {@link ChannelHandlerAdapter} { * public class MyHandler extends {@link ChannelDuplexHandler} {
* *
* <b>private {@link ChannelHandlerContext} ctx;</b> * <b>private {@link ChannelHandlerContext} ctx;</b>
* *
@ -293,52 +293,22 @@ public interface ChannelHandlerContext
<T> MessageBuf<T> replaceOutboundMessageBuffer(MessageBuf<T> newOutboundMsgBuf); <T> MessageBuf<T> replaceOutboundMessageBuffer(MessageBuf<T> newOutboundMsgBuf);
/** /**
* Return {@code true} if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext}.
* inbound data.
*/
boolean hasNextInboundByteBuffer();
/**
* Return {@code true} if the next {@link ChannelHandlerContext} has a {@link MessageBuf} for handling
* inbound data.
*/
boolean hasNextInboundMessageBuffer();
/**
* Return the {@link ByteBuf} of the next {@link ChannelHandlerContext} if {@link #hasNextInboundByteBuffer()}
* returned {@code true}, otherwise a {@link UnsupportedOperationException} is thrown.
*/ */
ByteBuf nextInboundByteBuffer(); ByteBuf nextInboundByteBuffer();
/** /**
* Return the {@link MessageBuf} of the next {@link ChannelHandlerContext} if * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext}.
* {@link #hasNextInboundMessageBuffer()} returned {@code true}, otherwise a
* {@link UnsupportedOperationException} is thrown.
*/ */
MessageBuf<Object> nextInboundMessageBuffer(); MessageBuf<Object> nextInboundMessageBuffer();
/** /**
* Return {@code true} if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling outbound * Return the {@link ByteBuf} of the next {@link ChannelHandlerContext}.
* data.
*/
boolean hasNextOutboundByteBuffer();
/**
* Return {@code true} if the next {@link ChannelHandlerContext} has a {@link MessageBuf} for handling
* outbound data.
*/
boolean hasNextOutboundMessageBuffer();
/**
* Return the {@link ByteBuf} of the next {@link ChannelHandlerContext} if {@link #hasNextOutboundByteBuffer()}
* returned {@code true}, otherwise a {@link UnsupportedOperationException} is thrown.
*/ */
ByteBuf nextOutboundByteBuffer(); ByteBuf nextOutboundByteBuffer();
/** /**
* Return the {@link MessageBuf} of the next {@link ChannelHandlerContext} if * Return the {@link MessageBuf} of the next {@link ChannelHandlerContext}.
* {@link #hasNextOutboundMessageBuffer()} returned {@code true}, otherwise a
* {@link UnsupportedOperationException} is thrown.
*/ */
MessageBuf<Object> nextOutboundMessageBuffer(); MessageBuf<Object> nextOutboundMessageBuffer();
} }

View File

@ -64,39 +64,14 @@ public final class ChannelHandlerUtil {
} }
if (inbound) { if (inbound) {
if (ctx.hasNextInboundMessageBuffer()) {
ctx.nextInboundMessageBuffer().add(msg); ctx.nextInboundMessageBuffer().add(msg);
return true; return true;
} }
if (msg instanceof ByteBuf && ctx.hasNextInboundByteBuffer()) {
ByteBuf altDst = ctx.nextInboundByteBuffer();
ByteBuf src = (ByteBuf) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
} else {
if (ctx.hasNextOutboundMessageBuffer()) {
ctx.nextOutboundMessageBuffer().add(msg); ctx.nextOutboundMessageBuffer().add(msg);
return true; return true;
} }
if (msg instanceof ByteBuf && ctx.hasNextOutboundByteBuffer()) {
ByteBuf altDst = ctx.nextOutboundByteBuffer();
ByteBuf src = (ByteBuf) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
}
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s which accepts a %s.",
ctx.name(),
inbound? ChannelInboundHandler.class.getSimpleName()
: ChannelOutboundHandler.class.getSimpleName(),
msg.getClass().getSimpleName()));
}
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0]; private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
/** /**

View File

@ -19,13 +19,13 @@ import io.netty.buffer.ByteBuf;
/** /**
* Abstract base class for {@link ChannelInboundHandlerAdapter} which should be extended by the user to * Abstract base class for {@link ChannelInboundByteHandler} which should be extended by the user to
* get notified once more data is ready to get consumed from the inbound {@link ByteBuf}. * get notified once more data is ready to get consumed from the inbound {@link ByteBuf}.
* *
* This implementation is a good starting point for most users. * This implementation is a good starting point for most users.
*/ */
public abstract class ChannelInboundByteHandlerAdapter public abstract class ChannelInboundByteHandlerAdapter
extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler { extends ChannelStateHandlerAdapter implements ChannelInboundByteHandler {
/** /**
* Create a new unpooled {@link ByteBuf} by default. Sub-classes may override this to offer a more * Create a new unpooled {@link ByteBuf} by default. Sub-classes may override this to offer a more

View File

@ -1,43 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.Buf;
/**
* Abstract base class for a {@link ChannelHandler} that handles inbound data.
*
* Please either extend {@link ChannelInboundByteHandlerAdapter} or
* {@link ChannelInboundMessageHandlerAdapter}.
*/
abstract class ChannelInboundHandlerAdapter
extends ChannelStateHandlerAdapter implements ChannelInboundHandler {
/**
* Calls {@link Buf#free()} to free the buffer, sub-classes may override this.
*
* When doing so be aware that you will need to handle all the resource management by your own.
*/
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (ctx.hasInboundByteBuffer()) {
ctx.inboundByteBuffer().free();
} else {
ctx.inboundMessageBuffer().free();
}
}
}

View File

@ -85,8 +85,8 @@ public interface ChannelInboundInvoker {
void fireInboundBufferUpdated(); void fireInboundBufferUpdated();
/** /**
* Triggers an {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) inboundBufferSuspended} * Triggers an {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) channelReadSuspended}
* event to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}. * event to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*/ */
void fireInboundBufferSuspended(); void fireChannelReadSuspended();
} }

View File

@ -41,7 +41,7 @@ import io.netty.buffer.Unpooled;
* @param <I> The type of the messages to handle * @param <I> The type of the messages to handle
*/ */
public abstract class ChannelInboundMessageHandlerAdapter<I> public abstract class ChannelInboundMessageHandlerAdapter<I>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> { extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<I> {
private final Class<?>[] acceptedMsgTypes; private final Class<?>[] acceptedMsgTypes;
@ -73,6 +73,7 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
try { try {
MessageBuf<I> in = ctx.inboundMessageBuffer(); MessageBuf<I> in = ctx.inboundMessageBuffer();
MessageBuf<Object> out = null;
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -80,7 +81,10 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
} }
try { try {
if (!isSupported(msg)) { if (!isSupported(msg)) {
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); if (out == null) {
out = ctx.nextOutboundMessageBuffer();
}
out.add(msg);
unsupportedFound = true; unsupportedFound = true;
continue; continue;
} }

View File

@ -17,63 +17,7 @@ package io.netty.channel;
import java.net.SocketAddress; import java.net.SocketAddress;
public abstract class ChannelOperationHandlerAdapter implements ChannelOperationHandler { public abstract class ChannelOperationHandlerAdapter extends ChannelHandlerAdapter implements ChannelOperationHandler {
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
/**
* Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
ctx.fireUserEventTriggered(evt);
}
/** /**
* Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward

View File

@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf;
* Abstract base class which handles outgoing bytes. * Abstract base class which handles outgoing bytes.
*/ */
public abstract class ChannelOutboundByteHandlerAdapter public abstract class ChannelOutboundByteHandlerAdapter
extends ChannelOutboundHandlerAdapter implements ChannelOutboundByteHandler { extends ChannelOperationHandlerAdapter implements ChannelOutboundByteHandler {
@Override @Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer(); return ctx.alloc().buffer();
@ -36,4 +36,21 @@ public abstract class ChannelOutboundByteHandlerAdapter
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free(); ctx.outboundByteBuffer().free();
} }
/**
* This method merely delegates the flush request to {@link #flush(ChannelHandlerContext, ByteBuf, ChannelPromise)}.
*/
@Override
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
flush(ctx, ctx.outboundByteBuffer(), promise);
}
/**
* Invoked when a flush request has been issued.
*
* @param ctx the current context
* @param in this handler's outbound buffer
* @param promise the promise associate with the current flush request
*/
protected abstract void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception;
} }

View File

@ -1,43 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.Buf;
/**
* Abstract base class for a {@link ChannelHandler} that handles outbound data.
*
* Please extend {@link ChannelOutboundByteHandlerAdapter} or
* {@link ChannelOutboundMessageHandlerAdapter}.
*/
abstract class ChannelOutboundHandlerAdapter
extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler {
/**
* Calls {@link Buf#free()} to free the buffer, sub-classes may override this.
*
* When doing so be aware that you will need to handle all the resource management by your own.
*/
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (ctx.hasOutboundByteBuffer()) {
ctx.outboundByteBuffer().free();
} else {
ctx.outboundMessageBuffer().free();
}
}
}

View File

@ -157,7 +157,7 @@ public interface ChannelOutboundInvoker {
* Reads data from the {@link Channel} into the first inbound buffer, triggers an * Reads data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelStateHandler#inboundBufferUpdated(ChannelHandlerContext) inboundBufferUpdated} event if data was * {@link ChannelStateHandler#inboundBufferUpdated(ChannelHandlerContext) inboundBufferUpdated} event if data was
* read, and triggers an * read, and triggers an
* {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) inboundBufferSuspended} event so the * {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) channelReadSuspended} event so the
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing. * handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
*/ */
void read(); void read();

View File

@ -24,7 +24,7 @@ import io.netty.buffer.Unpooled;
* @param <I> The type of the messages to handle * @param <I> The type of the messages to handle
*/ */
public abstract class ChannelOutboundMessageHandlerAdapter<I> public abstract class ChannelOutboundMessageHandlerAdapter<I>
extends ChannelOutboundHandlerAdapter implements ChannelOutboundMessageHandler<I> { extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> {
@Override @Override
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {

View File

@ -23,73 +23,7 @@ package io.netty.channel;
* This implementation just forward the operation to the next {@link ChannelHandler} in the * This implementation just forward the operation to the next {@link ChannelHandler} in the
* {@link ChannelPipeline}. Sub-classes may override a method implementation to change this. * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this.
*/ */
public class ChannelStateHandlerAdapter implements ChannelStateHandler { public abstract class ChannelStateHandlerAdapter extends ChannelHandlerAdapter implements ChannelStateHandler {
// Not using volatile because it's used only for a sanity check.
boolean added;
/**
* Return {@code true} if the implementation is {@link Sharable} and so can be added
* to different {@link ChannelPipeline}s.
*/
final boolean isSharable() {
return getClass().isAnnotationPresent(Sharable.class);
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
/**
* Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
ctx.fireUserEventTriggered(evt);
}
/** /**
* Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
@ -156,6 +90,6 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler {
@Override @Override
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
ctx.fireInboundBufferSuspended(); ctx.fireChannelReadSuspended();
} }
} }

View File

@ -0,0 +1,244 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.net.SocketAddress;
/**
* Combines a {@link ChannelStateHandler} and a {@link ChannelOperationHandler} into one {@link ChannelHandler}.
*
*/
public class CombinedChannelDuplexHandler extends ChannelDuplexHandler {
private ChannelStateHandler stateHandler;
private ChannelOperationHandler operationHandler;
/**
* Creates a new uninitialized instance. A class that extends this handler must invoke
* {@link #init(ChannelStateHandler, ChannelOperationHandler)} before adding this handler into a
* {@link ChannelPipeline}.
*/
protected CombinedChannelDuplexHandler() { }
/**
* Creates a new instance that combines the specified two handlers into one.
*/
public CombinedChannelDuplexHandler(ChannelStateHandler stateHandler, ChannelOperationHandler operationHandler) {
init(stateHandler, operationHandler);
}
/**
* Initialized this handler with the specified handlers.
*
* @throws IllegalStateException if this handler was not constructed via the default constructor or
* if this handler does not implement all required handler interfaces
* @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
* in the type hierarchy
*/
protected final void init(ChannelStateHandler stateHandler, ChannelOperationHandler operationHandler) {
validate(stateHandler, operationHandler);
this.stateHandler = stateHandler;
this.operationHandler = operationHandler;
}
@SuppressWarnings("InstanceofIncompatibleInterface")
private void validate(ChannelStateHandler stateHandler, ChannelOperationHandler operationHandler) {
if (this.stateHandler != null) {
throw new IllegalStateException(
"init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
" was constructed with non-default constructor.");
}
if (stateHandler == null) {
throw new NullPointerException("stateHandler");
}
if (operationHandler == null) {
throw new NullPointerException("operationHandler");
}
if (stateHandler instanceof ChannelOperationHandler) {
throw new IllegalArgumentException(
"stateHandler must not implement " +
ChannelOperationHandler.class.getSimpleName() + " to get combined.");
}
if (operationHandler instanceof ChannelStateHandler) {
throw new IllegalArgumentException(
"operationHandler must not implement " +
ChannelStateHandler.class.getSimpleName() + " to get combined.");
}
if (stateHandler instanceof ChannelInboundByteHandler && !(this instanceof ChannelInboundByteHandler)) {
throw new IllegalStateException(
getClass().getSimpleName() + " must implement " + ChannelInboundByteHandler.class.getSimpleName() +
" if stateHandler implements " + ChannelInboundByteHandler.class.getSimpleName());
}
if (stateHandler instanceof ChannelInboundMessageHandler && !(this instanceof ChannelInboundMessageHandler)) {
throw new IllegalStateException(
getClass().getSimpleName() + " must implement " +
ChannelInboundMessageHandler.class.getSimpleName() + " if stateHandler implements " +
ChannelInboundMessageHandler.class.getSimpleName());
}
if (operationHandler instanceof ChannelOutboundByteHandler && !(this instanceof ChannelOutboundByteHandler)) {
throw new IllegalStateException(
getClass().getSimpleName() + " must implement " +
ChannelOutboundByteHandler.class.getSimpleName() + " if operationHandler implements " +
ChannelOutboundByteHandler.class.getSimpleName());
}
if (operationHandler instanceof ChannelOutboundMessageHandler &&
!(this instanceof ChannelOutboundMessageHandler)) {
throw new IllegalStateException(
getClass().getSimpleName() + " must implement " +
ChannelOutboundMessageHandler.class.getSimpleName() + " if operationHandler implements " +
ChannelOutboundMessageHandler.class.getSimpleName());
}
}
protected final ChannelStateHandler stateHandler() {
return stateHandler;
}
protected final ChannelOperationHandler operationHandler() {
return operationHandler;
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (stateHandler == null) {
throw new IllegalStateException(
"init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
" if " + CombinedChannelDuplexHandler.class.getSimpleName() +
" was constructed with the default constructor.");
}
try {
stateHandler.beforeAdd(ctx);
} finally {
operationHandler.beforeAdd(ctx);
}
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
try {
stateHandler.afterAdd(ctx);
} finally {
operationHandler.afterAdd(ctx);
}
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
try {
stateHandler.beforeRemove(ctx);
} finally {
operationHandler.beforeRemove(ctx);
}
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
try {
stateHandler.afterRemove(ctx);
} finally {
operationHandler.afterRemove(ctx);
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
stateHandler.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
stateHandler.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
stateHandler.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
stateHandler.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
stateHandler.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
stateHandler.userEventTriggered(ctx, evt);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
stateHandler.inboundBufferUpdated(ctx);
if (stateHandler instanceof ChannelInboundByteHandler) {
((ChannelInboundByteHandler) stateHandler).discardInboundReadBytes(ctx);
}
}
@Override
public void bind(
ChannelHandlerContext ctx,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
operationHandler.bind(ctx, localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
operationHandler.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
operationHandler.disconnect(ctx, promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
operationHandler.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
operationHandler.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
operationHandler.read(ctx);
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
operationHandler.flush(ctx, promise);
}
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
operationHandler.sendFile(ctx, region, promise);
}
}

View File

@ -1,225 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.Buf;
import java.net.SocketAddress;
/**
* Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
*
*/
public class CombinedChannelHandler extends ChannelStateHandlerAdapter implements ChannelInboundHandler,
ChannelOutboundHandler {
private ChannelOutboundHandler out;
private ChannelInboundHandler in;
protected CombinedChannelHandler() {
// User will call init in the subclass constructor.
}
/**
* Combine the given {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
*/
public CombinedChannelHandler(
ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
init(inboundHandler, outboundHandler);
}
/**
* Needs to get called before the handler can be added to the {@link ChannelPipeline}.
* Otherwise it will trigger a {@link IllegalStateException} later.
*
*/
protected void init(ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
if (inboundHandler == null) {
throw new NullPointerException("inboundHandler");
}
if (outboundHandler == null) {
throw new NullPointerException("outboundHandler");
}
if (inboundHandler instanceof ChannelOperationHandler) {
throw new IllegalArgumentException(
"inboundHandler must not implement " +
ChannelOperationHandler.class.getSimpleName() + " to get combined.");
}
if (outboundHandler instanceof ChannelStateHandler) {
throw new IllegalArgumentException(
"outboundHandler must not implement " +
ChannelStateHandler.class.getSimpleName() + " to get combined.");
}
if (in != null) {
throw new IllegalStateException("init() cannot be called more than once.");
}
in = inboundHandler;
out = outboundHandler;
}
@Override
public Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return in.newInboundBuffer(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
in.freeInboundBuffer(ctx);
}
@Override
public Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return out.newOutboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
out.freeOutboundBuffer(ctx);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (in == null) {
throw new IllegalStateException(
"not initialized yet - call init() in the constructor of the subclass");
}
try {
in.beforeAdd(ctx);
} finally {
out.beforeAdd(ctx);
}
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
try {
in.afterAdd(ctx);
} finally {
out.afterAdd(ctx);
}
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
try {
in.beforeRemove(ctx);
} finally {
out.beforeRemove(ctx);
}
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
try {
in.afterRemove(ctx);
} finally {
out.afterRemove(ctx);
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
in.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
in.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
in.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
in.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
in.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
in.userEventTriggered(ctx, evt);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
in.inboundBufferUpdated(ctx);
if (in instanceof ChannelInboundByteHandler) {
((ChannelInboundByteHandler) in).discardInboundReadBytes(ctx);
}
}
@Override
public void bind(
ChannelHandlerContext ctx,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
out.bind(ctx, localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
out.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void disconnect(
ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
out.disconnect(ctx, promise);
}
@Override
public void close(
ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
out.close(ctx, promise);
}
@Override
public void deregister(
ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
out.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
out.read(ctx);
}
@Override
public void flush(
ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
out.flush(ctx, promise);
if (out instanceof ChannelOutboundByteHandler) {
((ChannelOutboundByteHandler) out).discardOutboundReadBytes(ctx);
}
}
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
out.sendFile(ctx, region, promise);
}
}

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultChannelPipeline.*;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -67,9 +68,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
// //
// Note we use an AtomicReferenceFieldUpdater for atomic operations on these to safe memory. This will safe us // Note we use an AtomicReferenceFieldUpdater for atomic operations on these to safe memory. This will safe us
// 64 bytes per Bridge. // 64 bytes per Bridge.
@SuppressWarnings("UnusedDeclaration")
private volatile MessageBridge inMsgBridge; private volatile MessageBridge inMsgBridge;
@SuppressWarnings("UnusedDeclaration")
private volatile MessageBridge outMsgBridge; private volatile MessageBridge outMsgBridge;
@SuppressWarnings("UnusedDeclaration")
private volatile ByteBridge inByteBridge; private volatile ByteBridge inByteBridge;
@SuppressWarnings("UnusedDeclaration")
private volatile ByteBridge outByteBridge; private volatile ByteBridge outByteBridge;
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, MessageBridge> IN_MSG_BRIDGE_UPDATER private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, MessageBridge> IN_MSG_BRIDGE_UPDATER
@ -94,22 +99,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private Runnable invokeChannelInactiveTask; private Runnable invokeChannelInactiveTask;
private Runnable invokeInboundBufferUpdatedTask; private Runnable invokeInboundBufferUpdatedTask;
private Runnable fireInboundBufferUpdated0Task; private Runnable fireInboundBufferUpdated0Task;
private Runnable invokeInboundBufferSuspendedTask; private Runnable invokeChannelReadSuspendedTask;
private Runnable invokeFreeInboundBuffer0Task; private Runnable invokeFreeInboundBuffer0Task;
private Runnable invokeFreeOutboundBuffer0Task; private Runnable invokeFreeOutboundBuffer0Task;
private Runnable invokeRead0Task; private Runnable invokeRead0Task;
volatile boolean removed; volatile boolean removed;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutorGroup group,
String name, ChannelHandler handler) {
this(pipeline, group, name, handler, false);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DefaultChannelHandlerContext( DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutorGroup group, DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
String name, ChannelHandler handler, boolean needsLazyBufInit) {
if (name == null) { if (name == null) {
throw new NullPointerException("name"); throw new NullPointerException("name");
@ -157,44 +155,63 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try { try {
buf = ((ChannelInboundHandler) handler).newInboundBuffer(this); buf = ((ChannelInboundHandler) handler).newInboundBuffer(this);
} catch (Exception e) { } catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e); throw new ChannelPipelineException(
} handler.getClass().getSimpleName() + ".newInboundBuffer() raised an exception.", e);
if (buf == null) {
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
} }
if (buf instanceof ByteBuf) { if (buf instanceof ByteBuf) {
inByteBuf = (ByteBuf) buf; inByteBuf = (ByteBuf) buf;
inByteBridge = null;
inMsgBuf = null;
inMsgBridge = null;
} else if (buf instanceof MessageBuf) { } else if (buf instanceof MessageBuf) {
inByteBuf = null;
inByteBridge = null;
inMsgBuf = (MessageBuf<Object>) buf; inMsgBuf = (MessageBuf<Object>) buf;
inMsgBridge = null;
} else { } else {
throw new Error(); throw new ChannelPipelineException(
handler.getClass().getSimpleName() + ".newInboundBuffer() returned neither " +
ByteBuf.class.getSimpleName() + " nor " + MessageBuf.class.getSimpleName() + ": " + buf);
} }
} else {
inByteBridge = null;
inMsgBridge = null;
} }
if (handler instanceof ChannelOutboundHandler) { if (handler instanceof ChannelOutboundHandler) {
if (needsLazyBufInit) { Buf buf;
// Special case: it means this context is for HeadHandler. try {
// HeadHandler is an outbound handler instantiated by the constructor of DefaultChannelPipeline. buf = ((ChannelOutboundHandler) handler).newOutboundBuffer(this);
// Because Channel is not really fully initialized at this point, we should not call } catch (Exception e) {
// newOutboundBuffer() yet because it will usually lead to NPE. throw new ChannelPipelineException(
// To work around this problem, we lazily initialize the outbound buffer for this special case. handler.getClass().getSimpleName() + ".newOutboundBuffer() raised an exception.", e);
}
if (buf instanceof ByteBuf) {
outByteBuf = (ByteBuf) buf;
} else if (buf instanceof MessageBuf) {
@SuppressWarnings("unchecked")
MessageBuf<Object> msgBuf = (MessageBuf<Object>) buf;
outMsgBuf = msgBuf;
} else { } else {
initOutboundBuffer(); throw new ChannelPipelineException(
handler.getClass().getSimpleName() + ".newOutboundBuffer() returned neither " +
ByteBuf.class.getSimpleName() + " nor " + MessageBuf.class.getSimpleName() + ": " + buf);
}
} }
} }
this.needsLazyBufInit = needsLazyBufInit; DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, HeadHandler handler) {
type = null;
channel = pipeline.channel;
this.pipeline = pipeline;
this.name = name;
this.handler = handler;
executor = null;
needsLazyBufInit = true;
}
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, TailHandler handler) {
type = null;
channel = pipeline.channel;
this.pipeline = pipeline;
this.name = name;
this.handler = handler;
executor = null;
inByteBuf = handler.byteSink;
inMsgBuf = handler.msgSink;
} }
void forwardBufferContent() { void forwardBufferContent() {
@ -233,19 +250,23 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
private void lazyInitOutboundBuffer() { private void lazyInitHeadHandler() {
if (needsLazyBufInit) { if (needsLazyBufInit) {
if (outByteBuf == null && outMsgBuf == null) {
needsLazyBufInit = false;
EventExecutor exec = executor(); EventExecutor exec = executor();
if (exec.inEventLoop()) { if (exec.inEventLoop()) {
initOutboundBuffer(); if (needsLazyBufInit) {
needsLazyBufInit = false;
HeadHandler headHandler = (HeadHandler) handler;
headHandler.init(this);
outByteBuf = headHandler.byteSink;
outMsgBuf = headHandler.msgSink;
}
} else { } else {
try { try {
getFromFuture(exec.submit(new Runnable() { getFromFuture(exec.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
lazyInitOutboundBuffer(); lazyInitHeadHandler();
} }
})); }));
} catch (Exception e) { } catch (Exception e) {
@ -254,36 +275,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
} }
}
private void initOutboundBuffer() {
Buf buf;
try {
buf = ((ChannelOutboundHandler) handler()).newOutboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
}
if (buf == null) {
throw new ChannelPipelineException("A user handler's newOutboundBuffer() returned null");
}
if (buf instanceof ByteBuf) {
outByteBuf = (ByteBuf) buf;
outByteBridge = null;
outMsgBuf = null;
outMsgBridge = null;
} else if (buf instanceof MessageBuf) {
outByteBuf = null;
outByteBridge = null;
@SuppressWarnings("unchecked")
MessageBuf<Object> msgBuf = (MessageBuf<Object>) buf;
outMsgBuf = msgBuf;
outMsgBridge = null;
} else {
throw new Error();
}
}
private void fillBridge() { private void fillBridge() {
if (inMsgBridge != null) { if (inMsgBridge != null) {
@ -708,88 +699,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return currentOutboundMsgBuf; return currentOutboundMsgBuf;
} }
@Override
public boolean hasNextInboundByteBuffer() {
DefaultChannelHandlerContext ctx = next;
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.hasInboundByteBuffer()) {
return true;
}
ctx = ctx.next;
}
}
@Override
public boolean hasNextInboundMessageBuffer() {
DefaultChannelHandlerContext ctx = next;
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.hasInboundMessageBuffer()) {
return true;
}
ctx = ctx.next;
}
}
@Override
public boolean hasNextOutboundByteBuffer() {
DefaultChannelHandlerContext ctx = prev;
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.hasOutboundByteBuffer()) {
return true;
}
ctx = ctx.prev;
}
}
@Override
public boolean hasNextOutboundMessageBuffer() {
DefaultChannelHandlerContext ctx = prev;
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.hasOutboundMessageBuffer()) {
return true;
}
ctx = ctx.prev;
}
}
@Override @Override
public ByteBuf nextInboundByteBuffer() { public ByteBuf nextInboundByteBuffer() {
DefaultChannelHandlerContext ctx = next; DefaultChannelHandlerContext ctx = next;
for (;;) { for (;;) {
if (ctx == null) {
if (prev != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose inbound buffer is %s.",
name, ChannelInboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose inbound buffer is %s.",
ChannelInboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
}
}
if (ctx.hasInboundByteBuffer()) { if (ctx.hasInboundByteBuffer()) {
if (ctx.executor().inEventLoop()) { Thread currentThread = Thread.currentThread();
return ctx.inboundByteBuffer(); if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inByteBuf;
} }
if (executor().inEventLoop()) { if (executor().inEventLoop(currentThread)) {
ByteBridge bridge = ctx.inByteBridge; ByteBridge bridge = ctx.inByteBridge;
if (bridge == null) { if (bridge == null) {
bridge = new ByteBridge(ctx); bridge = new ByteBridge(ctx);
@ -809,25 +728,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public MessageBuf<Object> nextInboundMessageBuffer() { public MessageBuf<Object> nextInboundMessageBuffer() {
DefaultChannelHandlerContext ctx = next; DefaultChannelHandlerContext ctx = next;
for (;;) { for (;;) {
if (ctx == null) {
if (prev != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose inbound buffer is %s.",
name, ChannelInboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose inbound buffer is %s.",
ChannelInboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
}
}
if (ctx.hasInboundMessageBuffer()) { if (ctx.hasInboundMessageBuffer()) {
if (ctx.executor().inEventLoop()) { Thread currentThread = Thread.currentThread();
return ctx.inboundMessageBuffer(); if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inMsgBuf;
} }
if (executor().inEventLoop()) { if (executor().inEventLoop(currentThread)) {
MessageBridge bridge = ctx.inMsgBridge; MessageBridge bridge = ctx.inMsgBridge;
if (bridge == null) { if (bridge == null) {
bridge = new MessageBridge(); bridge = new MessageBridge();
@ -846,13 +752,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override @Override
public ByteBuf nextOutboundByteBuffer() { public ByteBuf nextOutboundByteBuffer() {
DefaultChannelHandlerContext ctx = prev; DefaultChannelHandlerContext ctx = prev;
final DefaultChannelHandlerContext initialCtx = ctx;
for (;;) { for (;;) {
if (ctx.hasOutboundByteBuffer()) { if (ctx.hasOutboundByteBuffer()) {
if (ctx.executor().inEventLoop()) { Thread currentThread = Thread.currentThread();
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.outboundByteBuffer(); return ctx.outboundByteBuffer();
} }
if (executor().inEventLoop()) { if (executor().inEventLoop(currentThread)) {
ByteBridge bridge = ctx.outByteBridge; ByteBridge bridge = ctx.outByteBridge;
if (bridge == null) { if (bridge == null) {
bridge = new ByteBridge(ctx); bridge = new ByteBridge(ctx);
@ -865,33 +771,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new IllegalStateException("nextOutboundByteBuffer() called from outside the eventLoop"); throw new IllegalStateException("nextOutboundByteBuffer() called from outside the eventLoop");
} }
ctx = ctx.prev; ctx = ctx.prev;
if (ctx == null) {
if (initialCtx != null && initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose outbound buffer is %s.",
initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose outbound buffer is %s.",
ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
}
}
} }
} }
@Override @Override
public MessageBuf<Object> nextOutboundMessageBuffer() { public MessageBuf<Object> nextOutboundMessageBuffer() {
DefaultChannelHandlerContext ctx = prev; DefaultChannelHandlerContext ctx = prev;
final DefaultChannelHandlerContext initialCtx = ctx;
for (;;) { for (;;) {
if (ctx.hasOutboundMessageBuffer()) { if (ctx.hasOutboundMessageBuffer()) {
if (ctx.executor().inEventLoop()) { Thread currentThread = Thread.currentThread();
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.outboundMessageBuffer(); return ctx.outboundMessageBuffer();
} }
if (executor().inEventLoop()) { if (executor().inEventLoop(currentThread)) {
MessageBridge bridge = ctx.outMsgBridge; MessageBridge bridge = ctx.outMsgBridge;
if (bridge == null) { if (bridge == null) {
bridge = new MessageBridge(); bridge = new MessageBridge();
@ -904,28 +796,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
throw new IllegalStateException("nextOutboundMessageBuffer() called from outside the eventLoop"); throw new IllegalStateException("nextOutboundMessageBuffer() called from outside the eventLoop");
} }
ctx = ctx.prev; ctx = ctx.prev;
if (ctx == null) {
if (initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose outbound buffer is %s.",
initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose outbound buffer is %s.",
ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
}
}
} }
} }
@Override @Override
public void fireChannelRegistered() { public void fireChannelRegistered() {
lazyInitOutboundBuffer(); lazyInitHeadHandler();
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelRegistered(); next.invokeChannelRegistered();
@ -942,7 +819,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
executor.execute(task); executor.execute(task);
} }
} }
}
private void invokeChannelRegistered() { private void invokeChannelRegistered() {
try { try {
@ -957,7 +833,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override @Override
public void fireChannelUnregistered() { public void fireChannelUnregistered() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (prev != null && executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeChannelUnregistered(); next.invokeChannelUnregistered();
@ -974,7 +849,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
executor.execute(task); executor.execute(task);
} }
} }
}
private void invokeChannelUnregistered() { private void invokeChannelUnregistered() {
try { try {
@ -986,8 +860,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override @Override
public void fireChannelActive() { public void fireChannelActive() {
lazyInitHeadHandler();
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelActive(); next.invokeChannelActive();
@ -1004,7 +878,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
executor.execute(task); executor.execute(task);
} }
} }
}
private void invokeChannelActive() { private void invokeChannelActive() {
try { try {
@ -1019,7 +892,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override @Override
public void fireChannelInactive() { public void fireChannelInactive() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (prev != null && executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeChannelInactive(); next.invokeChannelInactive();
@ -1036,7 +908,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
executor.execute(task); executor.execute(task);
} }
} }
}
private void invokeChannelInactive() { private void invokeChannelInactive() {
try { try {
@ -1055,7 +926,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
final DefaultChannelHandlerContext next = this.next; final DefaultChannelHandlerContext next = this.next;
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (prev != null && executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeExceptionCaught(cause); next.invokeExceptionCaught(cause);
@ -1074,12 +944,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
} }
} else {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the end of the " +
"pipeline. It usually means the last inbound handler in the pipeline did not " +
"handle the exception.", cause);
}
} }
private void invokeExceptionCaught(Throwable cause) { private void invokeExceptionCaught(Throwable cause) {
@ -1103,7 +967,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
final DefaultChannelHandlerContext next = this.next; final DefaultChannelHandlerContext next = this.next;
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event); next.invokeUserEventTriggered(event);
@ -1116,7 +979,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}); });
} }
} }
}
private void invokeUserEventTriggered(Object event) { private void invokeUserEventTriggered(Object event) {
try { try {
@ -1149,7 +1011,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void fireInboundBufferUpdated0() { private void fireInboundBufferUpdated0() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (next != null && !next.isInboundBufferFreed()) { if (!next.isInboundBufferFreed()) {
next.fillBridge(); next.fillBridge();
// This comparison is safe because this method is always executed from the executor. // This comparison is safe because this method is always executed from the executor.
if (next.executor == executor) { if (next.executor == executor) {
@ -1191,28 +1053,26 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
@Override @Override
public void fireInboundBufferSuspended() { public void fireChannelReadSuspended() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (next != null) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (prev != null && executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeInboundBufferSuspended(); next.invokeChannelReadSuspended();
} else { } else {
Runnable task = next.invokeInboundBufferSuspendedTask; Runnable task = next.invokeChannelReadSuspendedTask;
if (task == null) { if (task == null) {
next.invokeInboundBufferSuspendedTask = task = new Runnable() { next.invokeChannelReadSuspendedTask = task = new Runnable() {
@Override @Override
public void run() { public void run() {
next.invokeInboundBufferSuspended(); next.invokeChannelReadSuspended();
} }
}; };
} }
executor.execute(task); executor.execute(task);
} }
} }
}
private void invokeInboundBufferSuspended() { private void invokeChannelReadSuspended() {
try { try {
((ChannelStateHandler) handler()).channelReadSuspended(this); ((ChannelStateHandler) handler()).channelReadSuspended(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -1701,12 +1561,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
if (next != null) {
DefaultChannelHandlerContext nextCtx = findContextInbound(); DefaultChannelHandlerContext nextCtx = findContextInbound();
if (nextCtx != null) {
nextCtx.invokeFreeInboundBuffer(); nextCtx.invokeFreeInboundBuffer();
} else { } else {
// Freed all inbound buffers. Free all outbound buffers in a reverse order. // Freed all inbound buffers. Free all outbound buffers in a reverse order.
pipeline.tail.findContextOutbound().invokeFreeOutboundBuffer(); findContextOutbound().invokeFreeOutboundBuffer();
} }
} }
@ -1741,9 +1601,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
DefaultChannelHandlerContext nextCtx = findContextOutbound(); if (prev != null) {
if (nextCtx != null) { findContextOutbound().invokeFreeOutboundBuffer();
nextCtx.invokeFreeOutboundBuffer();
} }
} }
@ -1790,7 +1649,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
DefaultChannelHandlerContext ctx = this; DefaultChannelHandlerContext ctx = this;
do { do {
ctx = ctx.next; ctx = ctx.next;
} while (ctx != null && !(ctx.handler() instanceof ChannelStateHandler)); } while (!(ctx.handler() instanceof ChannelStateHandler));
return ctx; return ctx;
} }
@ -1798,7 +1657,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
DefaultChannelHandlerContext ctx = this; DefaultChannelHandlerContext ctx = this;
do { do {
ctx = ctx.prev; ctx = ctx.prev;
} while (ctx != null && !(ctx.handler() instanceof ChannelOperationHandler)); } while (!(ctx.handler() instanceof ChannelOperationHandler));
return ctx; return ctx;
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Freeable; import io.netty.buffer.Freeable;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
@ -56,7 +57,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutorGroup, EventExecutor> childExecutors = final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>(); new IdentityHashMap<EventExecutorGroup, EventExecutor>();
private static final TailHandler TAIL_HANDLER = new TailHandler();
volatile boolean inboundBufferFreed; volatile boolean inboundBufferFreed;
volatile boolean outboundBufferFreed; volatile boolean outboundBufferFreed;
@ -66,7 +66,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
this.channel = channel; this.channel = channel;
tail = new DefaultChannelHandlerContext(this, null, generateName(TAIL_HANDLER), TAIL_HANDLER); TailHandler tailHandler = new TailHandler();
tail = new DefaultChannelHandlerContext(this, generateName(tailHandler), tailHandler);
HeadHandler headHandler; HeadHandler headHandler;
switch (channel.metadata().bufferType()) { switch (channel.metadata().bufferType()) {
@ -80,7 +81,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
throw new Error("unknown buffer type: " + channel.metadata().bufferType()); throw new Error("unknown buffer type: " + channel.metadata().bufferType());
} }
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler, true); head = new DefaultChannelHandlerContext(this, generateName(headHandler), headHandler);
head.next = tail; head.next = tail;
tail.prev = head; tail.prev = head;
@ -583,8 +584,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static void callBeforeAdd(ChannelHandlerContext ctx) { private static void callBeforeAdd(ChannelHandlerContext ctx) {
ChannelHandler handler = ctx.handler(); ChannelHandler handler = ctx.handler();
if (handler instanceof ChannelStateHandlerAdapter) { if (handler instanceof ChannelHandlerAdapter) {
ChannelStateHandlerAdapter h = (ChannelStateHandlerAdapter) handler; ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) { if (!h.isSharable() && h.added) {
throw new ChannelPipelineException( throw new ChannelPipelineException(
h.getClass().getName() + h.getClass().getName() +
@ -904,8 +905,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public void fireInboundBufferSuspended() { public void fireChannelReadSuspended() {
head.fireInboundBufferSuspended(); head.fireChannelReadSuspended();
if (channel.config().isAutoRead()) { if (channel.config().isAutoRead()) {
read(); read();
} }
@ -1075,22 +1076,110 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private static final class TailHandler extends ChannelInboundMessageHandlerAdapter<Freeable> { // A special catch-all handler that handles both bytes and messages.
public TailHandler() { static final class TailHandler implements ChannelInboundHandler {
super(Freeable.class);
final ByteBuf byteSink = Unpooled.buffer(0);
final MessageBuf<Object> msgSink = Unpooled.messageBuffer(0);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { }
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { }
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception { }
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the end of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.", cause);
} }
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, Freeable msg) throws Exception { public Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (logger.isWarnEnabled()) { throw new Error();
logger.warn("Freeable reached end-of-pipeline, call " + msg + ".free() to" + }
" guard against resource leakage!");
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
byteSink.free();
msgSink.free();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
int byteSinkSize = byteSink.readableBytes();
if (byteSinkSize != 0) {
byteSink.clear();
logger.warn(
"Discarded {} inbound byte(s) that reached at the end of the pipeline. " +
"Please check your pipeline configuration.", byteSinkSize);
}
int msgSinkSize = msgSink.size();
if (msgSinkSize != 0) {
MessageBuf<Object> in = msgSink;
for (;;) {
Object m = in.poll();
if (m == null) {
break;
}
if (m instanceof Freeable) {
((Freeable) m).free();
}
}
logger.warn(
"Discarded {} inbound message(s) that reached at the end of the pipeline. " +
"Please check your pipeline configuration.", msgSinkSize);
} }
msg.free();
} }
} }
private abstract class HeadHandler implements ChannelOutboundHandler { abstract class HeadHandler implements ChannelOutboundHandler {
ByteBuf byteSink;
MessageBuf<Object> msgSink;
void init(ChannelHandlerContext ctx) {
switch (ctx.channel().metadata().bufferType()) {
case BYTE:
byteSink = ctx.alloc().ioBuffer();
msgSink = Unpooled.messageBuffer(0);
break;
case MESSAGE:
byteSink = Unpooled.buffer(0);
msgSink = Unpooled.messageBuffer();
break;
default:
throw new Error();
}
}
@Override @Override
public final void beforeAdd(ChannelHandlerContext ctx) throws Exception { public final void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
@ -1146,11 +1235,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
unsafe.beginRead(); unsafe.beginRead();
} }
@Override
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.flush(promise);
}
@Override @Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause); ctx.fireExceptionCaught(cause);
@ -1166,36 +1250,54 @@ final class DefaultChannelPipeline implements ChannelPipeline {
ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception { ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
unsafe.sendFile(region, promise); unsafe.sendFile(region, promise);
} }
}
private final class ByteHeadHandler extends HeadHandler implements ChannelOutboundByteHandler {
@Override @Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public final Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().ioBuffer(); throw new Error();
} }
@Override @Override
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception { public final void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (ctx.hasOutboundByteBuffer()) { msgSink.free();
ctx.outboundByteBuffer().discardSomeReadBytes(); byteSink.free();
} }
} }
private final class ByteHeadHandler extends HeadHandler {
@Override @Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) { public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.outboundByteBuffer().free(); int msgSinkSize = msgSink.size();
if (msgSinkSize != 0) {
MessageBuf<Object> in = msgSink;
for (;;) {
Object m = in.poll();
if (m == null) {
break;
}
if (m instanceof Freeable) {
((Freeable) m).free();
}
}
logger.warn(
"Discarded {} outbound message(s) that reached at the end of the pipeline. " +
"Please check your pipeline configuration.", msgSinkSize);
}
unsafe.flush(promise);
} }
} }
private final class MessageHeadHandler extends HeadHandler implements ChannelOutboundMessageHandler<Object> { private final class MessageHeadHandler extends HeadHandler {
@Override @Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
return Unpooled.messageBuffer(); int byteSinkSize = byteSink.readableBytes();
if (byteSinkSize != 0) {
byteSink.clear();
logger.warn(
"Discarded {} outbound byte(s) that reached at the end of the pipeline. " +
"Please check your pipeline configuration.", byteSinkSize);
} }
unsafe.flush(promise);
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
ctx.outboundMessageBuffer().free();
} }
} }
} }

View File

@ -222,7 +222,7 @@ public class LocalChannel extends AbstractChannel {
} }
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
@Override @Override
@ -259,7 +259,7 @@ public class LocalChannel extends AbstractChannel {
if (peer.readInProgress) { if (peer.readInProgress) {
peer.readInProgress = false; peer.readInProgress = false;
peerPipeline.fireInboundBufferUpdated(); peerPipeline.fireInboundBufferUpdated();
peerPipeline.fireInboundBufferSuspended(); peerPipeline.fireChannelReadSuspended();
} }
} }

View File

@ -144,7 +144,7 @@ public class LocalServerChannel extends AbstractServerChannel {
} }
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
LocalChannel serve(final LocalChannel peer) { LocalChannel serve(final LocalChannel peer) {
@ -160,7 +160,7 @@ public class LocalServerChannel extends AbstractServerChannel {
if (acceptInProgress) { if (acceptInProgress) {
acceptInProgress = false; acceptInProgress = false;
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
} else { } else {
eventLoop().execute(new Runnable() { eventLoop().execute(new Runnable() {

View File

@ -65,7 +65,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
final ByteBuf byteBuf = pipeline.inboundByteBuffer(); final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
boolean firedInboundBufferSuspended = false; boolean firedChannelReadSuspended = false;
try { try {
expandReadBuffer(byteBuf); expandReadBuffer(byteBuf);
loop: for (;;) { loop: for (;;) {
@ -106,8 +106,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (t instanceof IOException) { if (t instanceof IOException) {
closed = true; closed = true;
} else if (!closed) { } else if (!closed) {
firedInboundBufferSuspended = true; firedChannelReadSuspended = true;
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
pipeline().fireExceptionCaught(t); pipeline().fireExceptionCaught(t);
} finally { } finally {
@ -124,8 +124,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
close(voidFuture()); close(voidFuture());
} }
} }
} else if (!firedInboundBufferSuspended) { } else if (!firedChannelReadSuspended) {
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
} }
} }

View File

@ -55,7 +55,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer(); final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
boolean firedInboundBufferSuspended = false; boolean firedChannelReadSuspended = false;
try { try {
for (;;) { for (;;) {
int localReadAmount = doReadMessages(msgBuf); int localReadAmount = doReadMessages(msgBuf);
@ -77,8 +77,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
if (t instanceof IOException) { if (t instanceof IOException) {
closed = true; closed = true;
} else if (!closed) { } else if (!closed) {
firedInboundBufferSuspended = true; firedChannelReadSuspended = true;
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
pipeline().fireExceptionCaught(t); pipeline().fireExceptionCaught(t);
@ -88,8 +88,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
} }
if (closed && isOpen()) { if (closed && isOpen()) {
close(voidFuture()); close(voidFuture());
} else if (!firedInboundBufferSuspended) { } else if (!firedChannelReadSuspended) {
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
} }
} }

View File

@ -126,7 +126,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} else { } else {
firedInboundBufferSuspeneded = true; firedInboundBufferSuspeneded = true;
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
unsafe().close(unsafe().voidFuture()); unsafe().close(unsafe().voidFuture());
} }
@ -144,7 +144,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
} }
} }
} else if (!firedInboundBufferSuspeneded) { } else if (!firedInboundBufferSuspeneded) {
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
} }
} }

View File

@ -39,7 +39,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer(); final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
boolean firedInboundBufferSuspended = false; boolean firedChannelReadSuspended = false;
try { try {
int localReadAmount = doReadMessages(msgBuf); int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) { if (localReadAmount > 0) {
@ -52,8 +52,8 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
read = false; read = false;
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
} }
firedInboundBufferSuspended = true; firedChannelReadSuspended = true;
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
if (t instanceof IOException) { if (t instanceof IOException) {
unsafe().close(unsafe().voidFuture()); unsafe().close(unsafe().voidFuture());
@ -62,8 +62,8 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
if (read) { if (read) {
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
} }
if (!firedInboundBufferSuspended) { if (!firedChannelReadSuspended) {
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
if (closed && isOpen()) { if (closed && isOpen()) {
unsafe().close(unsafe().voidFuture()); unsafe().close(unsafe().voidFuture());

View File

@ -185,7 +185,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
channel.pipeline().inboundMessageBuffer().add( channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, ch)); new AioSocketChannel(channel, null, ch));
channel.pipeline().fireInboundBufferUpdated(); channel.pipeline().fireInboundBufferUpdated();
channel.pipeline().fireInboundBufferSuspended(); channel.pipeline().fireChannelReadSuspended();
} }
@Override @Override

View File

@ -436,7 +436,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
boolean firedInboundBufferSuspended = false; boolean firedChannelReadSuspended = false;
try { try {
int localReadAmount = result.intValue(); int localReadAmount = result.intValue();
if (localReadAmount > 0) { if (localReadAmount > 0) {
@ -458,8 +458,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
if (!closed && channel.isOpen()) { if (!closed && channel.isOpen()) {
firedInboundBufferSuspended = true; firedChannelReadSuspended = true;
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
@ -478,8 +478,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} }
} }
} else if (!firedInboundBufferSuspended) { } else if (!firedChannelReadSuspended) {
pipeline.fireInboundBufferSuspended(); pipeline.fireChannelReadSuspended();
} }
} }
} }

View File

@ -59,11 +59,11 @@ public abstract class AbstractEventLoopTest {
assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor()); assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor());
} }
private static final class TestChannelHandler extends ChannelHandlerAdapter { private static final class TestChannelHandler extends ChannelDuplexHandler {
} }
private static final class TestChannelHandler2 extends ChannelHandlerAdapter { private static final class TestChannelHandler2 extends ChannelDuplexHandler {
} }

View File

@ -16,6 +16,7 @@
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Freeable; import io.netty.buffer.Freeable;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalChannel;
@ -24,10 +25,98 @@ import org.junit.Test;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class DefaultChannelPipelineTest { public class DefaultChannelPipelineTest {
@Test
public void testMessageCatchAllInboundSink() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final AtomicBoolean forwarded = new AtomicBoolean();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
pipeline.addLast(new ChannelInboundMessageHandlerAdapter<Object>() {
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
forwarded.set(ctx.nextInboundMessageBuffer().add(msg));
}
@Override
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
ctx.fireInboundBufferUpdated();
}
});
channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
pipeline.inboundMessageBuffer().add(new Object());
pipeline.fireInboundBufferUpdated();
}
}).get();
assertTrue(forwarded.get());
}
@Test
public void testByteCatchAllInboundSink() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final AtomicBoolean forwarded = new AtomicBoolean();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
pipeline.addLast(new ChannelInboundByteHandlerAdapter() {
@Override
protected void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf out = ctx.nextInboundByteBuffer();
out.writeBytes(in);
forwarded.set(true);
ctx.fireInboundBufferUpdated();
}
});
channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
pipeline.inboundByteBuffer().writeByte(0);
pipeline.fireInboundBufferUpdated();
}
}).get();
assertTrue(forwarded.get());
}
@Test
public void testByteCatchAllOutboundSink() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final AtomicBoolean forwarded = new AtomicBoolean();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
pipeline.addLast(new ChannelOutboundByteHandlerAdapter() {
@Override
protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception {
ByteBuf out = ctx.nextOutboundByteBuffer();
out.writeBytes(in);
forwarded.set(true);
ctx.flush(promise);
}
});
channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
pipeline.outboundByteBuffer().writeByte(0);
pipeline.flush();
}
}).get();
Thread.sleep(1000);
assertTrue(forwarded.get());
}
@Test @Test
public void testFreeCalled() throws InterruptedException{ public void testFreeCalled() throws InterruptedException{
final CountDownLatch free = new CountDownLatch(1); final CountDownLatch free = new CountDownLatch(1);
@ -242,7 +331,7 @@ public class DefaultChannelPipelineTest {
} }
@Sharable @Sharable
private static class TestHandler extends ChannelHandlerAdapter { private static class TestHandler extends ChannelDuplexHandler {
// Dummy // Dummy
} }
} }

View File

@ -20,7 +20,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
@ -331,7 +331,7 @@ public class LocalTransportThreadModelTest {
} }
private static class ThreadNameAuditor private static class ThreadNameAuditor
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<Object>, implements ChannelInboundMessageHandler<Object>,
ChannelOutboundMessageHandler<Object> { ChannelOutboundMessageHandler<Object> {
@ -394,7 +394,7 @@ public class LocalTransportThreadModelTest {
* Converts integers into a binary stream. * Converts integers into a binary stream.
*/ */
private static class MessageForwarder1 private static class MessageForwarder1
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<Integer>, ChannelOutboundByteHandler { implements ChannelInboundMessageHandler<Integer>, ChannelOutboundByteHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -502,7 +502,7 @@ public class LocalTransportThreadModelTest {
* Converts a binary stream into integers. * Converts a binary stream into integers.
*/ */
private static class MessageForwarder2 private static class MessageForwarder2
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<Integer> { implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<Integer> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -605,7 +605,7 @@ public class LocalTransportThreadModelTest {
* Simply forwards the received object to the next handler. * Simply forwards the received object to the next handler.
*/ */
private static class MessageForwarder3 private static class MessageForwarder3
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> { implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -695,7 +695,7 @@ public class LocalTransportThreadModelTest {
* Discards all received messages. * Discards all received messages.
*/ */
private static class MessageDiscarder private static class MessageDiscarder
extends ChannelHandlerAdapter extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> { implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();