Change the thread model slightly for new/freeInbound/OutboundBuffer() for future improvement

- Related: #1283
- Make ReplayingDecoder work with the modified thread model
This commit is contained in:
Trustin Lee 2013-04-23 13:06:27 +09:00
parent c6f936f265
commit 8e2e22c270
7 changed files with 51 additions and 29 deletions

View File

@ -264,8 +264,8 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
static final Signal REPLAY = new Signal(ReplayingDecoder.class.getName() + ".REPLAY");
private ByteBuf cumulation;
private ReplayingDecoderBuffer replayable;
private ChannelHandlerContext ctx;
private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer();
private S state;
private int checkpoint = -1;
private boolean decodeWasNull;
@ -288,7 +288,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
* Stores the internal cumulative buffer's reader position.
*/
protected void checkpoint() {
checkpoint = cumulation.readerIndex();
checkpoint = internalBuffer().readerIndex();
}
/**
@ -334,18 +334,12 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
* Use it only when you must use it at your own risk.
*/
protected ByteBuf internalBuffer() {
return cumulation;
return ctx.inboundByteBuffer();
}
@Override
public final ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
cumulation = newInboundBuffer0(ctx);
replayable = new ReplayingDecoderBuffer(cumulation);
return cumulation;
}
protected ByteBuf newInboundBuffer0(ChannelHandlerContext ctx) throws Exception {
return super.newInboundBuffer(ctx);
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
@ -366,7 +360,8 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
OutputMessageBuf out = OutputMessageBuf.get();
try {
replayable.terminate();
ByteBuf in = cumulation;
ByteBuf in = internalBuffer();
replayable.setCumulation(in);
if (in.isReadable()) {
callDecode(ctx, in);
}
@ -391,7 +386,8 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) {
boolean wasNull = false;
ByteBuf in = cumulation;
ByteBuf in = internalBuffer();
replayable.setCumulation(in);
OutputMessageBuf out = OutputMessageBuf.get();
try {
while (in.isReadable()) {

View File

@ -38,9 +38,9 @@ final class ReplayingDecoderBuffer implements ByteBuf {
private static final Signal REPLAY = ReplayingDecoder.REPLAY;
private final ByteBuf buffer;
private final SwappedByteBuf swapped;
private ByteBuf buffer;
private boolean terminated;
private SwappedByteBuf swapped;
static final ReplayingDecoderBuffer EMPTY_BUFFER = new ReplayingDecoderBuffer(Unpooled.EMPTY_BUFFER);
@ -48,9 +48,14 @@ final class ReplayingDecoderBuffer implements ByteBuf {
EMPTY_BUFFER.terminate();
}
ReplayingDecoderBuffer() { }
ReplayingDecoderBuffer(ByteBuf buffer) {
setCumulation(buffer);
}
void setCumulation(ByteBuf buffer) {
this.buffer = buffer;
swapped = new SwappedByteBuf(this);
}
void terminate() {
@ -397,6 +402,11 @@ final class ReplayingDecoderBuffer implements ByteBuf {
if (endianness == order()) {
return this;
}
SwappedByteBuf swapped = this.swapped;
if (swapped == null) {
this.swapped = swapped = new SwappedByteBuf(this);
}
return swapped;
}

View File

@ -23,10 +23,11 @@ import io.netty.buffer.ByteBuf;
*/
public interface ChannelInboundByteHandler extends ChannelInboundHandler {
/**
* Return the {@link ByteBuf} which will be used for inbound data for the given {@link ChannelHandlerContext}.
* Implementations should take {@link ChannelConfig#getDefaultHandlerByteBufType()} into account.
* {@inheritDoc}
* <p>
* Use of {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)} is adviced.
* An implementation should respect the {@link ChannelConfig#getDefaultHandlerByteBufType()} setting unless
* there's a good reason to ignore it. If in doubt, use {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)}.
* </p>
*/
@Override
ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;

View File

@ -22,13 +22,22 @@ import io.netty.buffer.Buf;
*/
interface ChannelInboundHandler extends ChannelStateHandler {
/**
* Return the {@link Buf} which will be used for inbound data for the given {@link ChannelHandlerContext}.
* Returns a new buffer which will be used to consume inbound data for the given {@link ChannelHandlerContext}.
* <p>
* Please note that this method can be called from any thread repeatatively, and thus you should neither perform
* stateful operation nor keep the reference of the created buffer as a member variable. Get it always using
* {@link ChannelHandlerContext#inboundByteBuffer()} or {@link ChannelHandlerContext#inboundMessageBuffer()}.
* </p>
*/
Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when this handler is not going to receive any inbound message anymore and thus it's safe to
* deallocate its inbound buffer.
* <p>
* Please note that this method can be called from any thread repeatatively, and thus you should not perform
* stateful operation here.
* </p>
*/
void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -26,10 +26,6 @@ import io.netty.buffer.MessageBuf;
* {@link ChannelHandlerUtil#addToNextInboundBuffer(ChannelHandlerContext, Object)}.
*/
public interface ChannelInboundMessageHandler<I> extends ChannelInboundHandler {
/**
* Return the {@link MessageBuf} which will be used for inbound data to store.
*/
@Override
MessageBuf<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -22,10 +22,11 @@ import io.netty.buffer.ByteBuf;
*/
public interface ChannelOutboundByteHandler extends ChannelOutboundHandler {
/**
* Return the {@link ByteBuf} which will be used for outbound data for the given {@link ChannelHandlerContext}.
* Implementations should take {@link ChannelConfig#getDefaultHandlerByteBufType()} into account.
* {@inheritDoc}
* <p>
* Use of {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)} is adviced.
* An implementation should respect the {@link ChannelConfig#getDefaultHandlerByteBufType()} setting unless
* there's a good reason to ignore it. If in doubt, use {@link ChannelHandlerUtil#allocate(ChannelHandlerContext)}.
* </p>
*/
@Override
ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;

View File

@ -22,13 +22,22 @@ import io.netty.buffer.Buf;
*/
interface ChannelOutboundHandler extends ChannelOperationHandler {
/**
* Return the {@link Buf} which will be used for outbound data for the given {@link ChannelHandlerContext}.
* Returns a new buffer which will be used to transfer outbound data for the given {@link ChannelHandlerContext}.
* <p>
* Please note that this method can be called from any thread repeatatively, and thus you should neither perform
* stateful operation nor keep the reference of the created buffer as a member variable. Get it always using
* {@link ChannelHandlerContext#outboundByteBuffer()} or {@link ChannelHandlerContext#outboundMessageBuffer()}.
* </p>
*/
Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when this handler is not allowed to send any outbound message anymore and thus it's safe to
* deallocate its outbound buffer.
* <p>
* Please note that this method can be called from any thread repeatatively, and thus you should not perform
* stateful operation here.
* </p>
*/
void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
}