From 8e2e22c2707292913f93de4397ecef16044042fd Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 23 Apr 2013 13:06:27 +0900 Subject: [PATCH] Change the thread model slightly for new/freeInbound/OutboundBuffer() for future improvement - Related: #1283 - Make ReplayingDecoder work with the modified thread model --- .../netty/handler/codec/ReplayingDecoder.java | 24 ++++++++----------- .../handler/codec/ReplayingDecoderBuffer.java | 16 ++++++++++--- .../channel/ChannelInboundByteHandler.java | 7 +++--- .../netty/channel/ChannelInboundHandler.java | 11 ++++++++- .../channel/ChannelInboundMessageHandler.java | 4 ---- .../channel/ChannelOutboundByteHandler.java | 7 +++--- .../netty/channel/ChannelOutboundHandler.java | 11 ++++++++- 7 files changed, 51 insertions(+), 29 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index ec3429ee00..9e5a945dd1 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -264,8 +264,8 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { static final Signal REPLAY = new Signal(ReplayingDecoder.class.getName() + ".REPLAY"); - private ByteBuf cumulation; - private ReplayingDecoderBuffer replayable; + private ChannelHandlerContext ctx; + private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(); private S state; private int checkpoint = -1; private boolean decodeWasNull; @@ -288,7 +288,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { * Stores the internal cumulative buffer's reader position. */ protected void checkpoint() { - checkpoint = cumulation.readerIndex(); + checkpoint = internalBuffer().readerIndex(); } /** @@ -334,18 +334,12 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { * Use it only when you must use it at your own risk. */ protected ByteBuf internalBuffer() { - return cumulation; + return ctx.inboundByteBuffer(); } @Override - public final ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { - cumulation = newInboundBuffer0(ctx); - replayable = new ReplayingDecoderBuffer(cumulation); - return cumulation; - } - - protected ByteBuf newInboundBuffer0(ChannelHandlerContext ctx) throws Exception { - return super.newInboundBuffer(ctx); + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; } @Override @@ -366,7 +360,8 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { OutputMessageBuf out = OutputMessageBuf.get(); try { replayable.terminate(); - ByteBuf in = cumulation; + ByteBuf in = internalBuffer(); + replayable.setCumulation(in); if (in.isReadable()) { callDecode(ctx, in); } @@ -391,7 +386,8 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { @Override protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) { boolean wasNull = false; - ByteBuf in = cumulation; + ByteBuf in = internalBuffer(); + replayable.setCumulation(in); OutputMessageBuf out = OutputMessageBuf.get(); try { while (in.isReadable()) { diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index 281a737768..edfa277d8a 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -38,9 +38,9 @@ final class ReplayingDecoderBuffer implements ByteBuf { private static final Signal REPLAY = ReplayingDecoder.REPLAY; - private final ByteBuf buffer; - private final SwappedByteBuf swapped; + private ByteBuf buffer; private boolean terminated; + private SwappedByteBuf swapped; static final ReplayingDecoderBuffer EMPTY_BUFFER = new ReplayingDecoderBuffer(Unpooled.EMPTY_BUFFER); @@ -48,9 +48,14 @@ final class ReplayingDecoderBuffer implements ByteBuf { EMPTY_BUFFER.terminate(); } + ReplayingDecoderBuffer() { } + ReplayingDecoderBuffer(ByteBuf buffer) { + setCumulation(buffer); + } + + void setCumulation(ByteBuf buffer) { this.buffer = buffer; - swapped = new SwappedByteBuf(this); } void terminate() { @@ -397,6 +402,11 @@ final class ReplayingDecoderBuffer implements ByteBuf { if (endianness == order()) { return this; } + + SwappedByteBuf swapped = this.swapped; + if (swapped == null) { + this.swapped = swapped = new SwappedByteBuf(this); + } return swapped; } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundByteHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundByteHandler.java index 9539984c5e..f924cd2fda 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundByteHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundByteHandler.java @@ -23,10 +23,11 @@ import io.netty.buffer.ByteBuf; */ public interface ChannelInboundByteHandler extends ChannelInboundHandler { /** - * Return the {@link ByteBuf} which will be used for inbound data for the given {@link ChannelHandlerContext}. - * Implementations should take {@link ChannelConfig#getDefaultHandlerByteBufType()} into account. + * {@inheritDoc} *

- * Use of {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)} is adviced. + * An implementation should respect the {@link ChannelConfig#getDefaultHandlerByteBufType()} setting unless + * there's a good reason to ignore it. If in doubt, use {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)}. + *

*/ @Override ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception; diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java index cbc5bd8d15..ffc5eef239 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java @@ -22,13 +22,22 @@ import io.netty.buffer.Buf; */ interface ChannelInboundHandler extends ChannelStateHandler { /** - * Return the {@link Buf} which will be used for inbound data for the given {@link ChannelHandlerContext}. + * Returns a new buffer which will be used to consume inbound data for the given {@link ChannelHandlerContext}. + *

+ * Please note that this method can be called from any thread repeatatively, and thus you should neither perform + * stateful operation nor keep the reference of the created buffer as a member variable. Get it always using + * {@link ChannelHandlerContext#inboundByteBuffer()} or {@link ChannelHandlerContext#inboundMessageBuffer()}. + *

*/ Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception; /** * Invoked when this handler is not going to receive any inbound message anymore and thus it's safe to * deallocate its inbound buffer. + *

+ * Please note that this method can be called from any thread repeatatively, and thus you should not perform + * stateful operation here. + *

*/ void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java index 9b55ceb8b9..d3e72ca069 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandler.java @@ -26,10 +26,6 @@ import io.netty.buffer.MessageBuf; * {@link ChannelHandlerUtil#addToNextInboundBuffer(ChannelHandlerContext, Object)}. */ public interface ChannelInboundMessageHandler extends ChannelInboundHandler { - - /** - * Return the {@link MessageBuf} which will be used for inbound data to store. - */ @Override MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandler.java index df2466f1ec..82b042fe0d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundByteHandler.java @@ -22,10 +22,11 @@ import io.netty.buffer.ByteBuf; */ public interface ChannelOutboundByteHandler extends ChannelOutboundHandler { /** - * Return the {@link ByteBuf} which will be used for outbound data for the given {@link ChannelHandlerContext}. - * Implementations should take {@link ChannelConfig#getDefaultHandlerByteBufType()} into account. + * {@inheritDoc} *

- * Use of {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)} is adviced. + * An implementation should respect the {@link ChannelConfig#getDefaultHandlerByteBufType()} setting unless + * there's a good reason to ignore it. If in doubt, use {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)}. + *

*/ @Override ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception; diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java index c6207277e6..873b15c59a 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java @@ -22,13 +22,22 @@ import io.netty.buffer.Buf; */ interface ChannelOutboundHandler extends ChannelOperationHandler { /** - * Return the {@link Buf} which will be used for outbound data for the given {@link ChannelHandlerContext}. + * Returns a new buffer which will be used to transfer outbound data for the given {@link ChannelHandlerContext}. + *

+ * Please note that this method can be called from any thread repeatatively, and thus you should neither perform + * stateful operation nor keep the reference of the created buffer as a member variable. Get it always using + * {@link ChannelHandlerContext#outboundByteBuffer()} or {@link ChannelHandlerContext#outboundMessageBuffer()}. + *

*/ Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception; /** * Invoked when this handler is not allowed to send any outbound message anymore and thus it's safe to * deallocate its outbound buffer. + *

+ * Please note that this method can be called from any thread repeatatively, and thus you should not perform + * stateful operation here. + *

*/ void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception; }