Give a handler more control over how its buffers' read bytes are discarded.

This pull request adds two new handler methods: discardInboundReadBytes(ctx) and discardOutboundReadBytes(ctx) to ChannelInboundByteHandler and ChannelOutboundByteHandler respectively. They are called between every inboundBufferUpdated() and flush() respectively. Their default implementation is to call discardSomeReadBytes() on their buffers and a user can override this behavior easily. For example, ReplayingDecoder.discardInboundReadBytes() looks like the following:

    @Override
    public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
        ByteBuf in = ctx.inboundByteBuffer();
        final int oldReaderIndex = in.readerIndex();
        super.discardInboundReadBytes(ctx);
        final int newReaderIndex = in.readerIndex();
        checkpoint -= oldReaderIndex - newReaderIndex;
    }

If a handler, which has its own buffer index variable, extends ReplayingDecoder or ByteToMessageDecoder, the handler can also override discardInboundReadBytes() and adjust its index variable accordingly.
This commit is contained in:
Trustin Lee 2013-01-05 15:04:25 +09:00
parent 7277536ca6
commit dd6b7969b7
45 changed files with 321 additions and 233 deletions

View File

@ -66,6 +66,16 @@ public abstract class SpdyOrHttpChooser extends ChannelHandlerAdapter implements
return ctx.alloc().buffer();
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
// No need to discard anything because this handler will be replaced with something else very quickly.
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
if (initPipeline(ctx)) {

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.buffer.Buf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
@ -91,19 +90,19 @@ public class SpdySessionHandler
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
}
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
}
@Override

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ -97,13 +96,23 @@ public abstract class ByteToByteCodec
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
decoder.freeInboundBuffer(ctx, buf);
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
decoder.discardInboundReadBytes(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
encoder.freeOutboundBuffer(ctx, buf);
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
encoder.discardOutboundReadBytes(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoder.freeInboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
encoder.freeOutboundBuffer(ctx);
}
/**

View File

@ -96,7 +96,6 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
}
}
in.discardSomeReadBytes();
if (out.readableBytes() > oldOutSize) {
ctx.fireInboundBufferUpdated();
}

View File

@ -75,7 +75,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
break;
}
}
in.discardSomeReadBytes();
ctx.flush(promise);
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerAdapter;
@ -71,13 +70,13 @@ public abstract class ByteToMessageCodec<INBOUND_OUT, OUTBOUND_IN>
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
decoder.freeInboundBuffer(ctx, buf);
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
decoder.freeInboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
encoder.freeOutboundBuffer(ctx, buf);
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
encoder.freeOutboundBuffer(ctx);
}
protected abstract void encode(

View File

@ -55,6 +55,11 @@ public abstract class ByteToMessageDecoder<O>
return ctx.alloc().buffer();
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().discardSomeReadBytes();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
callDecode(ctx);
@ -108,8 +113,6 @@ public abstract class ByteToMessageDecoder<O>
break;
}
} catch (Throwable t) {
in.discardSomeReadBytes();
if (decoded) {
decoded = false;
ctx.fireInboundBufferUpdated();
@ -123,8 +126,6 @@ public abstract class ByteToMessageDecoder<O>
}
}
in.discardSomeReadBytes();
if (decoded) {
ctx.fireInboundBufferUpdated();
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.Buf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ -99,8 +98,8 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundMessageBuffer().free();
}
@Override
@ -115,8 +114,8 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
}
@Override

View File

@ -348,6 +348,15 @@ public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
return cumulation;
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ByteBuf in = ctx.inboundByteBuffer();
final int oldReaderIndex = in.readerIndex();
super.discardInboundReadBytes(ctx);
final int newReaderIndex = in.readerIndex();
checkpoint -= oldReaderIndex - newReaderIndex;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
replayable.terminate();
@ -358,7 +367,7 @@ public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
try {
if (ChannelHandlerUtil.unfoldAndAdd(ctx, decodeLast(ctx, replayable), true)) {
fireInboundBufferUpdated(ctx, in);
ctx.fireInboundBufferUpdated();
}
} catch (Signal replay) {
// Ignore
@ -427,7 +436,7 @@ public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
} catch (Throwable t) {
if (decoded) {
decoded = false;
fireInboundBufferUpdated(ctx, in);
ctx.fireInboundBufferUpdated();
}
if (t instanceof CodecException) {
@ -439,15 +448,7 @@ public abstract class ReplayingDecoder<O, S> extends ByteToMessageDecoder<O> {
}
if (decoded) {
fireInboundBufferUpdated(ctx, in);
ctx.fireInboundBufferUpdated();
}
}
private void fireInboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
final int oldReaderIndex = in.readerIndex();
in.discardSomeReadBytes();
final int newReaderIndex = in.readerIndex();
checkpoint -= oldReaderIndex - newReaderIndex;
ctx.fireInboundBufferUpdated();
}
}

View File

@ -181,7 +181,6 @@ public class JdkZlibEncoder extends ZlibEncoder {
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished.get()) {
out.writeBytes(in);
in.discardReadBytes();
return;
}

View File

@ -95,9 +95,6 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
writtenObjects ++;
if (writtenObjects % resetInterval == 0) {
oos.reset();
// Also discard the byproduct to avoid OOM on the sending side.
out.discardSomeReadBytes();
}
}

View File

@ -87,7 +87,6 @@ public class DiscardClientHandler extends ChannelInboundByteHandlerAdapter {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.nextOutboundByteBuffer().discardReadBytes();
generateTraffic();
}
}

View File

@ -56,7 +56,6 @@ public class EchoClientHandler extends ChannelInboundByteHandlerAdapter {
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
ctx.flush();
}

View File

@ -35,7 +35,6 @@ public class EchoServerHandler extends ChannelInboundByteHandlerAdapter {
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
ctx.flush();
}

View File

@ -36,9 +36,7 @@ public class HexDumpProxyBackendHandler extends ChannelInboundByteHandlerAdapter
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf out = inboundChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();
inboundChannel.flush();
}

View File

@ -68,9 +68,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundByteHandlerAdapte
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf out = outboundChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();
if (outboundChannel.isActive()) {
outboundChannel.flush();
}

View File

@ -42,9 +42,7 @@ public final class RelayHandler extends ChannelInboundByteHandlerAdapter {
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf out = relayChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
in.clear();
if (relayChannel.isActive()) {
relayChannel.flush();
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.logging;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler;
@ -106,10 +105,6 @@ public class ByteLoggingHandler
public ByteLoggingHandler(String name) {
super(name);
}
@Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
@ -117,13 +112,28 @@ public class ByteLoggingHandler
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
}
@Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free();
}
@Override

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.logging;
import io.netty.buffer.Buf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -48,10 +47,6 @@ public class MessageLoggingHandler
public MessageLoggingHandler(String name) {
super(name);
}
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
@ -59,12 +54,17 @@ public class MessageLoggingHandler
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Nothing to free
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Nothing to free
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.ssl;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
@ -369,24 +368,34 @@ public class SslHandler
return sslCloseFuture;
}
@Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
}
@Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free();
}
@Override
@ -415,8 +424,6 @@ public class SslHandler
final ByteBuf in = ctx.outboundByteBuffer();
final ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardSomeReadBytes();
// Do not encrypt the first write request if this handler is
// created with startTLS flag turned on.
if (!internal && startTls && !sentFirstMessage) {
@ -487,7 +494,6 @@ public class SslHandler
setHandshakeFailure(e);
throw e;
} finally {
in.discardSomeReadBytes();
flush0(ctx, bytesConsumed);
}
}
@ -783,7 +789,6 @@ public class SslHandler
assert packetLength > 0;
final ByteBuf out = ctx.nextInboundByteBuffer();
out.discardReadBytes();
boolean wrapLater = false;
int bytesProduced = 0;
@ -835,7 +840,6 @@ public class SslHandler
throw e;
} finally {
if (bytesProduced > 0) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.stream;
import io.netty.buffer.Buf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@ -98,8 +97,8 @@ public class ChunkedWriteHandler
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Nothing to free
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
queue.free();
}
private boolean isWritable() {

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.traffic;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ -223,7 +222,12 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// do nothing
}
@ -233,7 +237,12 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
// do nothing
}

View File

@ -51,7 +51,6 @@ public class SctpOutboundByteStreamHandler extends ChannelOutboundByteHandlerAda
ByteBuf payload = Unpooled.buffer(in.readableBytes());
payload.writeBytes(in);
out.add(new SctpMessage(streamIdentifier, protocolIdentifier, payload));
in.discardReadBytes();
} catch (Throwable t) {
ctx.fireExceptionCaught(new EncoderException(t));
}

View File

@ -350,10 +350,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
if (byteBuf.readerIndex() != 0) {
byteBuf.discardReadBytes();
return 0;
}
return 2;
}
@ -877,14 +873,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) {
cause = t;
} finally {
final int newSize = out.readableBytes();
final int writtenBytes = oldSize - newSize;
if (writtenBytes > 0) {
flushFutureNotifier.increaseWriteCounter(writtenBytes);
if (newSize == 0) {
out.discardReadBytes();
}
}
flushFutureNotifier.increaseWriteCounter(oldSize - out.readableBytes());
}
} else {
MessageBuf<Object> out = ctx.outboundMessageBuffer();

View File

@ -24,4 +24,13 @@ import io.netty.buffer.ByteBuf;
public interface ChannelInboundByteHandler extends ChannelInboundHandler {
@Override
ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
/**
* Discards the read bytes of the inbound buffer and optionally trims its unused portion to reduce memory
* consumption. The most common implementation of this method will look like the following:
* <pre>
* ctx.inboundByteBuffer().discardSomeReadBytes();
* </pre>
*/
void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -36,16 +36,19 @@ public abstract class ChannelInboundByteHandlerAdapter
return ctx.alloc().buffer();
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
}
@Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ByteBuf in = ctx.inboundByteBuffer();
try {
inboundBufferUpdated(ctx, in);
} finally {
if (!in.readable()) {
in.discardReadBytes();
}
}
inboundBufferUpdated(ctx, ctx.inboundByteBuffer());
}
/**

View File

@ -30,5 +30,5 @@ public interface ChannelInboundHandler extends ChannelStateHandler {
* Invoked when this handler is not going to receive any inbound message anymore and thus it's safe to
* deallocate its inbound buffer.
*/
void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception;
void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -33,7 +33,11 @@ public abstract class ChannelInboundHandlerAdapter
* When doing so be aware that you will need to handle all the resource management by your own.
*/
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (ctx.hasInboundByteBuffer()) {
ctx.inboundByteBuffer().free();
} else {
ctx.inboundMessageBuffer().free();
}
}
}

View File

@ -85,7 +85,7 @@ public interface ChannelInboundInvoker {
void fireInboundBufferUpdated();
/**
* Triggers an {@link ChannelStateHandler#inboundBufferSuspended(ChannelHandlerContext) inboundBufferSuspended}
* Triggers an {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) inboundBufferSuspended}
* event to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}.
*/
void fireInboundBufferSuspended();

View File

@ -58,6 +58,11 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundMessageBuffer().free();
}
@Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
if (!beginMessageReceived(ctx)) {

View File

@ -23,4 +23,13 @@ import io.netty.buffer.ByteBuf;
public interface ChannelOutboundByteHandler extends ChannelOutboundHandler {
@Override
ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
/**
* Discards the read bytes of the outbound buffer and optionally trims its unused portion to reduce memory
* consumption. The most common implementation of this method will look like the following:
* <pre>
* ctx.outboundByteBuffer().discardSomeReadBytes();
* </pre>
*/
void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -26,4 +26,14 @@ public abstract class ChannelOutboundByteHandlerAdapter
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free();
}
}

View File

@ -30,5 +30,5 @@ public interface ChannelOutboundHandler extends ChannelOperationHandler {
* Invoked when this handler is not allowed to send any outbound message anymore and thus it's safe to
* deallocate its outbound buffer.
*/
void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception;
void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -33,7 +33,11 @@ public abstract class ChannelOutboundHandlerAdapter
* When doing so be aware that you will need to handle all the resource management by your own.
*/
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (ctx.hasOutboundByteBuffer()) {
ctx.outboundByteBuffer().free();
} else {
ctx.outboundMessageBuffer().free();
}
}
}

View File

@ -157,7 +157,7 @@ public interface ChannelOutboundInvoker {
* Reads data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelStateHandler#inboundBufferUpdated(ChannelHandlerContext) inboundBufferUpdated} event if data was
* read, and triggers an
* {@link ChannelStateHandler#inboundBufferSuspended(ChannelHandlerContext) inboundBufferSuspended} event so the
* {@link ChannelStateHandler#channelReadSuspended(ChannelHandlerContext) inboundBufferSuspended} event so the
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
*/
void read();

View File

@ -30,4 +30,9 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
}
}

View File

@ -42,6 +42,12 @@ public interface ChannelStateHandler extends ChannelHandler {
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when a {@link ChannelHandlerContext#read()} is finished and the inbound buffer of this handler will not
* be updated until another {@link ChannelHandlerContext#read()} request is issued.
*/
void channelReadSuspended(ChannelHandlerContext ctx) throws Exception;
/**
* The inbound buffer of the {@link ChannelHandlerContext} was updated with new data.
* This means something may be ready to get processed by the actual {@link ChannelStateHandler}
@ -49,10 +55,4 @@ public interface ChannelStateHandler extends ChannelHandler {
* to wait for more data and consume it later.
*/
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when a {@link ChannelHandlerContext#read()} is finished and the inbound buffer of this handler will not
* be updated until another {@link ChannelHandlerContext#read()} request is issued.
*/
void inboundBufferSuspended(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -155,7 +155,7 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler {
}
@Override
public void inboundBufferSuspended(ChannelHandlerContext ctx) throws Exception {
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
ctx.fireInboundBufferSuspended();
}
}

View File

@ -46,8 +46,7 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement
* Otherwise it will trigger a {@link IllegalStateException} later.
*
*/
protected void init(
ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
protected void init(ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
if (inboundHandler == null) {
throw new NullPointerException("inboundHandler");
}
@ -74,25 +73,23 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement
}
@Override
public Buf newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
public Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return in.newInboundBuffer(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
in.freeInboundBuffer(ctx, buf);
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
in.freeInboundBuffer(ctx);
}
@Override
public Buf newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
public Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return out.newOutboundBuffer(ctx);
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
out.freeOutboundBuffer(ctx, buf);
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
out.freeOutboundBuffer(ctx);
}
@Override
@ -169,6 +166,9 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
in.inboundBufferUpdated(ctx);
if (in instanceof ChannelInboundByteHandler) {
((ChannelInboundByteHandler) in).discardInboundReadBytes(ctx);
}
}
@Override
@ -213,6 +213,9 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement
public void flush(
ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
out.flush(ctx, promise);
if (out instanceof ChannelOutboundByteHandler) {
((ChannelOutboundByteHandler) out).discardOutboundReadBytes(ctx);
}
}
@Override

View File

@ -123,16 +123,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
ChannelStateHandler handler = (ChannelStateHandler) ctx.handler;
flushBridge();
try {
((ChannelStateHandler) ctx.handler).inboundBufferUpdated(ctx);
handler.inboundBufferUpdated(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
} finally {
ByteBuf buf = inByteBuf;
if (buf != null) {
if (!buf.readable()) {
buf.discardReadBytes();
if (handler instanceof ChannelInboundByteHandler) {
try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
}
}
@ -159,7 +161,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
try {
((ChannelStateHandler) ctx.handler).inboundBufferSuspended(ctx);
((ChannelStateHandler) ctx.handler).channelReadSuspended(ctx);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
@ -174,11 +176,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
if (ctx.hasInboundByteBuffer()) {
if (ctx.inByteBuf != null) {
h.freeInboundBuffer(ctx, ctx.inByteBuf);
h.freeInboundBuffer(ctx);
}
} else {
if (ctx.inMsgBuf != null) {
h.freeInboundBuffer(ctx, ctx.inMsgBuf);
h.freeInboundBuffer(ctx);
}
}
} catch (Throwable t) {
@ -204,11 +206,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
if (ctx.hasOutboundByteBuffer()) {
if (ctx.outByteBuf != null) {
h.freeOutboundBuffer(ctx, ctx.outByteBuf);
h.freeOutboundBuffer(ctx);
}
} else {
if (ctx.outMsgBuf != null) {
h.freeOutboundBuffer(ctx, ctx.outMsgBuf);
h.freeOutboundBuffer(ctx);
}
}
} catch (Throwable t) {

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Freeable;
import io.netty.buffer.MessageBuf;
@ -67,9 +66,21 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
this.channel = channel;
HeadHandler headHandler = new HeadHandler();
tailCtx = new DefaultChannelHandlerContext(
this, null, null, null, generateName(TAIL_HANDLER), TAIL_HANDLER);
HeadHandler headHandler;
switch (channel.metadata().bufferType()) {
case BYTE:
headHandler = new ByteHeadHandler();
break;
case MESSAGE:
headHandler = new MessageHeadHandler();
break;
default:
throw new Error("unknown buffer type: " + channel.metadata().bufferType());
}
head = new DefaultChannelHandlerContext(
this, null, null, tailCtx, generateName(headHandler), headHandler);
tailCtx.prev = head;
@ -1300,16 +1311,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return;
}
ChannelOperationHandler handler = (ChannelOperationHandler) ctx.handler();
try {
ctx.flushBridge();
((ChannelOperationHandler) ctx.handler()).flush(ctx, promise);
handler.flush(ctx, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
if (ctx.hasOutboundByteBuffer()) {
ByteBuf buf = ctx.outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
if (handler instanceof ChannelOutboundByteHandler) {
try {
((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(ctx);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
}
@ -1529,53 +1542,36 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
private final class HeadHandler implements ChannelOutboundHandler {
private abstract class HeadHandler implements ChannelOutboundHandler {
@Override
public Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
switch (channel.metadata().bufferType()) {
case BYTE:
return ctx.alloc().ioBuffer();
case MESSAGE:
return Unpooled.messageBuffer();
default:
throw new Error();
}
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) {
buf.free();
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
public final void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
public final void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
public final void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
public final void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void bind(
public final void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
public final void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
@ -1583,43 +1579,75 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
public final void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
public final void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
public final void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
public final void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.flush(promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
public final void sendFile(
ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
unsafe.sendFile(region, promise);
}
}
private final class ByteHeadHandler extends HeadHandler implements ChannelOutboundByteHandler {
@Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().ioBuffer();
}
@Override
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
if (ctx.hasOutboundByteBuffer()) {
ctx.outboundByteBuffer().discardSomeReadBytes();
}
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
ctx.outboundByteBuffer().free();
}
}
private final class MessageHeadHandler extends HeadHandler implements ChannelOutboundMessageHandler<Object> {
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
ctx.outboundMessageBuffer().free();
}
}
}

View File

@ -331,7 +331,7 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Do NOT free the buffer.
}
@ -354,7 +354,7 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Do NOT free the buffer.
}

View File

@ -77,9 +77,6 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel<ByteBuf> {
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
if (!lastOutboundBuffer().readable()) {
lastOutboundBuffer().discardReadBytes();
}
lastOutboundBuffer().writeBytes(buf);
}
}

View File

@ -306,10 +306,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
ByteBuf byteBuf = pipeline().inboundByteBuffer();
if (!byteBuf.readable()) {
byteBuf.discardReadBytes();
}
expandReadBuffer(byteBuf);
readInProgress = true;
@ -382,8 +378,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (buf.readable()) {
channel.unsafe().flushNow();
} else {
buf.discardReadBytes();
}
}
@ -398,14 +392,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture());
return;
}
if (!channel.inDoFlushByteBuffer) {
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
}
}
}
}

View File

@ -16,7 +16,6 @@
package io.netty.channel.local;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
@ -343,18 +342,18 @@ public class LocalTransportThreadModelTest {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Nothing to free
}
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Nothing to free
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) {
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
// Nothing to free
}
@ -405,19 +404,24 @@ public class LocalTransportThreadModelTest {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Nothing to free
}
@Override
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Nothing to free
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) {
buf.free();
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
ctx.outboundByteBuffer().free();
}
@Override
@ -509,6 +513,16 @@ public class LocalTransportThreadModelTest {
return ctx.alloc().buffer();
}
@Override
public void discardInboundReadBytes(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().discardSomeReadBytes();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
}
@Override
public MessageBuf<Integer> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
@ -516,12 +530,7 @@ public class LocalTransportThreadModelTest {
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
buf.free();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) {
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
// Nothing to free
}
@ -606,18 +615,18 @@ public class LocalTransportThreadModelTest {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Nothing to free
}
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Nothing to free
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) {
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
// Nothing to free
}
@ -696,18 +705,18 @@ public class LocalTransportThreadModelTest {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Nothing to free
}
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception {
// Nothing to free
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) {
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
// Nothing to free
}