Remove freeInboundBuffer() and freeOutboundBuffer() which has no value
- Fixes #1308 freeInboundBuffer() and freeOutboundBuffer() were introduced in the early days of the new API when we did not have reference counting mechanism in the buffer. A user did not want Netty to free the handler buffers had to override these methods. However, now that we have reference counting mechanism built into the buffer, a user who wants to retain the buffers beyond handler's life cycle can simply return the buffer whose reference count is greater than 1 in newInbound/OutboundBuffer().
This commit is contained in:
parent
1b3d7f5325
commit
7884574c7b
@ -104,21 +104,11 @@ public final class HttpClientCodec
|
||||
decoder().discardInboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder().freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<HttpObject> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return encoder().newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder().freeOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
private final class Encoder extends HttpRequestEncoder {
|
||||
|
||||
@Override
|
||||
|
@ -67,18 +67,8 @@ public final class HttpServerCodec
|
||||
decoder().discardInboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder().freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<HttpObject> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return encoder().newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder().freeOutboundBuffer(ctx);
|
||||
}
|
||||
}
|
||||
|
@ -70,18 +70,8 @@ public final class SpdyFrameCodec
|
||||
decoder().discardInboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder().freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<SpdyDataOrControlFrame> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return encoder().newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder().freeOutboundBuffer(ctx);
|
||||
}
|
||||
}
|
||||
|
@ -50,18 +50,8 @@ public final class SpdyHttpCodec
|
||||
return decoder().newInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder().freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<HttpObject> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return encoder().newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder().freeOutboundBuffer(ctx);
|
||||
}
|
||||
}
|
||||
|
@ -72,11 +72,6 @@ public abstract class SpdyOrHttpChooser extends ChannelInboundByteHandlerAdapter
|
||||
// No need to discard anything because this handler will be replaced with something else very quickly.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
if (initPipeline(ctx)) {
|
||||
|
@ -89,21 +89,11 @@ public class SpdySessionHandler
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
MessageBuf<Object> in = ctx.inboundMessageBuffer();
|
||||
@ -459,7 +449,7 @@ public class SpdySessionHandler
|
||||
msg instanceof SpdyHeadersFrame ||
|
||||
msg instanceof SpdyWindowUpdateFrame) {
|
||||
try {
|
||||
handleOutboundMessage(ctx, msg, promise);
|
||||
handleOutboundMessage(ctx, msg);
|
||||
} catch (SpdyProtocolException e) {
|
||||
if (e == PROTOCOL_EXCEPTION) {
|
||||
// on the case of PROTOCOL_EXCEPTION faile the promise directly
|
||||
@ -475,8 +465,7 @@ public class SpdySessionHandler
|
||||
ctx.flush(promise);
|
||||
}
|
||||
|
||||
private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||||
throws Exception {
|
||||
private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
|
||||
if (msg instanceof SpdyDataFrame) {
|
||||
|
||||
|
@ -145,11 +145,6 @@ public class WebSocketServerProtocolHandlerTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
|
||||
//NoOp
|
||||
|
@ -108,16 +108,6 @@ public abstract class ByteToByteCodec
|
||||
encoder.discardOutboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder.freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder.freeOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see {@link ByteToByteEncoder#encode(ChannelHandlerContext, ByteBuf, ByteBuf)}
|
||||
*/
|
||||
|
@ -70,21 +70,11 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
|
||||
decoder.discardInboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder.freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return encoder.newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder.freeOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder.inboundBufferUpdated(ctx);
|
||||
|
@ -102,22 +102,12 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
|
||||
return (MessageBuf<INBOUND_IN>) decoder.newInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
decoder.freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public MessageBuf<OUTBOUND_IN> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return (MessageBuf<OUTBOUND_IN>) encoder.newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
encoder.freeOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(
|
||||
ChannelHandlerContext ctx) throws Exception {
|
||||
|
@ -117,11 +117,6 @@ public class ByteLoggingHandler
|
||||
ctx.inboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return ChannelHandlerUtil.allocate(ctx);
|
||||
@ -132,11 +127,6 @@ public class ByteLoggingHandler
|
||||
ctx.outboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx)
|
||||
throws Exception {
|
||||
|
@ -54,21 +54,11 @@ public class MessageLoggingHandler
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx)
|
||||
throws Exception {
|
||||
|
@ -356,11 +356,6 @@ public class SslHandler
|
||||
ctx.inboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return ChannelHandlerUtil.allocate(ctx);
|
||||
@ -371,11 +366,6 @@ public class SslHandler
|
||||
ctx.outboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(final ChannelHandlerContext ctx,
|
||||
final ChannelPromise promise) throws Exception {
|
||||
|
@ -91,13 +91,20 @@ public class ChunkedWriteHandler
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
return queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
queue.release();
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
// Fail all promised that are queued. This is needed because otherwise we would never notify the
|
||||
// ChannelFuture and the registered FutureListener. See #304
|
||||
discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline."));
|
||||
}
|
||||
|
||||
private boolean isWritable() {
|
||||
@ -347,12 +354,4 @@ public class ChunkedWriteHandler
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
// Fail all promised that are queued. This is needed because otherwise we would never notify the
|
||||
// ChannelFuture and the registered FutureListener. See #304
|
||||
discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline."));
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package io.netty.testsuite.transport.socket;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.MessageBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
@ -79,16 +78,6 @@ public class SocketBufReleaseTest extends AbstractSocketTest {
|
||||
channelFuture.setSuccess(ctx.channel());
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return super.newInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
super.freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
||||
byte[] data = new byte[1024];
|
||||
|
@ -267,11 +267,6 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundMessageBuffer().release();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -41,11 +41,6 @@ public abstract class ChannelInboundByteHandlerAdapter
|
||||
ctx.inboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
inboundBufferUpdated(ctx, ctx.inboundByteBuffer());
|
||||
|
@ -30,14 +30,4 @@ interface ChannelInboundHandler extends ChannelStateHandler {
|
||||
* </p>
|
||||
*/
|
||||
Buf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
|
||||
|
||||
/**
|
||||
* Invoked when this handler is not going to receive any inbound message anymore and thus it's safe to
|
||||
* deallocate its inbound buffer.
|
||||
* <p>
|
||||
* Please note that this method can be called from any thread repeatatively, and thus you should not perform
|
||||
* stateful operation here.
|
||||
* </p>
|
||||
*/
|
||||
void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception;
|
||||
}
|
||||
|
@ -95,11 +95,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
ChannelHandlerUtil.handleInboundBufferUpdated(ctx, this);
|
||||
|
@ -32,11 +32,6 @@ public abstract class ChannelOutboundByteHandlerAdapter
|
||||
ctx.outboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundByteBuffer().release();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method merely delegates the flush request to {@link #flush(ChannelHandlerContext, ByteBuf, ChannelPromise)}.
|
||||
*/
|
||||
|
@ -30,14 +30,4 @@ interface ChannelOutboundHandler extends ChannelOperationHandler {
|
||||
* </p>
|
||||
*/
|
||||
Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
|
||||
|
||||
/**
|
||||
* Invoked when this handler is not allowed to send any outbound message anymore and thus it's safe to
|
||||
* deallocate its outbound buffer.
|
||||
* <p>
|
||||
* Please note that this method can be called from any thread repeatatively, and thus you should not perform
|
||||
* stateful operation here.
|
||||
* </p>
|
||||
*/
|
||||
void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
|
||||
}
|
||||
|
@ -89,11 +89,6 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
ChannelHandlerUtil.handleFlush(ctx, promise, isCloseOnFailedFlush(), this);
|
||||
|
@ -433,28 +433,27 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
private void freeHandlerBuffersAfterRemoval() {
|
||||
int flags = this.flags;
|
||||
if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet
|
||||
final ChannelHandler handler = handler();
|
||||
try {
|
||||
if (handler instanceof ChannelInboundHandler) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler).freeInboundBuffer(this);
|
||||
} catch (Exception e) {
|
||||
notifyHandlerException(e);
|
||||
}
|
||||
}
|
||||
if (handler instanceof ChannelOutboundHandler) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler).freeOutboundBuffer(this);
|
||||
} catch (Exception e) {
|
||||
notifyHandlerException(e);
|
||||
}
|
||||
}
|
||||
freeBuffer(inByteBuf);
|
||||
freeBuffer(inMsgBuf);
|
||||
freeBuffer(outByteBuf);
|
||||
freeBuffer(outMsgBuf);
|
||||
} finally {
|
||||
free();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void freeBuffer(Buf buf) {
|
||||
if (buf != null) {
|
||||
try {
|
||||
buf.release();
|
||||
} catch (Exception e) {
|
||||
notifyHandlerException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void free() {
|
||||
flags |= FLAG_FREED;
|
||||
freeInbound();
|
||||
@ -1477,13 +1476,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
private void invokeFreeInboundBuffer0() {
|
||||
ChannelHandler handler = handler();
|
||||
try {
|
||||
if (handler instanceof ChannelInboundHandler) {
|
||||
((ChannelInboundHandler) handler).freeInboundBuffer(this);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
freeBuffer(inByteBuf);
|
||||
freeBuffer(inMsgBuf);
|
||||
} finally {
|
||||
freeInbound();
|
||||
}
|
||||
@ -1526,13 +1521,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
private void invokeFreeOutboundBuffer0() {
|
||||
ChannelHandler handler = handler();
|
||||
try {
|
||||
if (handler instanceof ChannelOutboundHandler) {
|
||||
((ChannelOutboundHandler) handler).freeOutboundBuffer(this);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
freeBuffer(outByteBuf);
|
||||
freeBuffer(outMsgBuf);
|
||||
} finally {
|
||||
freeOutbound();
|
||||
}
|
||||
|
@ -1011,12 +1011,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
byteSink.release();
|
||||
msgSink.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
int byteSinkSize = byteSink.readableBytes();
|
||||
@ -1133,12 +1127,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
msgSink.release();
|
||||
byteSink.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
ctx.fireExceptionCaught(cause);
|
||||
|
@ -52,8 +52,8 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
|
||||
private final ChannelConfig config = new DefaultChannelConfig(this);
|
||||
private final SocketAddress localAddress = new EmbeddedSocketAddress();
|
||||
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
|
||||
private final MessageBuf<Object> lastInboundMessageBuffer = Unpooled.messageBuffer();
|
||||
private final ByteBuf lastInboundByteBuffer = Unpooled.buffer();
|
||||
private final MessageBuf<Object> lastInboundMessageBuffer = Unpooled.messageBuffer().retain(2);
|
||||
private final ByteBuf lastInboundByteBuffer = Unpooled.buffer().retain(2);
|
||||
protected final Object lastOutboundBuffer;
|
||||
private Throwable lastException;
|
||||
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
|
||||
@ -331,11 +331,6 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
|
||||
return lastInboundMessageBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Do NOT free the buffer.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
// Do nothing.
|
||||
@ -360,11 +355,6 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
|
||||
// nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Do NOT free the buffer.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
// No nothing
|
||||
|
@ -801,11 +801,6 @@ public class DefaultChannelPipelineTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
updated = true;
|
||||
@ -820,11 +815,6 @@ public class DefaultChannelPipelineTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.outboundMessageBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
promise.setSuccess();
|
||||
@ -848,11 +838,6 @@ public class DefaultChannelPipelineTest {
|
||||
((ChannelInboundByteHandler) stateHandler()).discardInboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
((ChannelInboundHandler) stateHandler()).freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return ((ChannelOutboundByteHandler) operationHandler()).newOutboundBuffer(ctx);
|
||||
@ -862,11 +847,6 @@ public class DefaultChannelPipelineTest {
|
||||
public void discardOutboundReadBytes(ChannelHandlerContext ctx) throws Exception {
|
||||
((ChannelOutboundByteHandler) operationHandler()).discardOutboundReadBytes(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
((ChannelOutboundHandler) operationHandler()).freeOutboundBuffer(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class MessageHandlerImpl extends CombinedChannelDuplexHandler
|
||||
@ -881,23 +861,11 @@ public class DefaultChannelPipelineTest {
|
||||
return ((ChannelInboundMessageHandler<Object>) stateHandler()).newInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
((ChannelInboundHandler) stateHandler()).freeInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return ((ChannelOutboundMessageHandler<Object>) operationHandler()).newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
((ChannelOutboundHandler) operationHandler()).freeOutboundBuffer(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/** Test handler to validate life-cycle aware behavior. */
|
||||
|
@ -344,21 +344,11 @@ public class LocalTransportThreadModelTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(
|
||||
ChannelHandlerContext ctx) throws Exception {
|
||||
@ -406,11 +396,6 @@ public class LocalTransportThreadModelTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return ChannelHandlerUtil.allocate(ctx);
|
||||
@ -421,11 +406,6 @@ public class LocalTransportThreadModelTest {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
|
||||
ctx.outboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(
|
||||
ChannelHandlerContext ctx) throws Exception {
|
||||
@ -520,22 +500,12 @@ public class LocalTransportThreadModelTest {
|
||||
ctx.inboundByteBuffer().discardSomeReadBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ctx.inboundByteBuffer().release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Integer> newOutboundBuffer(
|
||||
ChannelHandlerContext ctx) throws Exception {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(
|
||||
ChannelHandlerContext ctx) throws Exception {
|
||||
@ -617,21 +587,11 @@ public class LocalTransportThreadModelTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
Thread t = this.t;
|
||||
@ -707,21 +667,11 @@ public class LocalTransportThreadModelTest {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return Unpooled.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
|
||||
// Nothing to free
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
|
||||
Thread t = this.t;
|
||||
|
Loading…
Reference in New Issue
Block a user