Remove ChannelBufferHolder / Add more handler interfaces for type safety

- ChannelInboundHandler and ChannelOutboundHandler does not have a type
  parameter anymore.  
- User should implement ChannelInboundMessageHandler or
  ChannelOutboundMessageHandler.
This commit is contained in:
Trustin Lee 2012-06-10 12:22:32 +09:00
parent a849d11877
commit 574d84e98e
39 changed files with 265 additions and 624 deletions

View File

@ -19,18 +19,17 @@ import java.util.Queue;
public final class MessageBufs {
public static <T> MessageBuf<T> messageBuffer() {
public static <T> MessageBuf<T> buffer() {
return new DefaultMessageBuf<T>();
}
public static <T> MessageBuf<T> messageBuffer(int initialCapacity) {
public static <T> MessageBuf<T> buffer(int initialCapacity) {
return new DefaultMessageBuf<T>(initialCapacity);
}
public static <T> MessageBuf<T> messageBuffer(Queue<T> queue) {
public static <T> MessageBuf<T> wrappedBuffer(Queue<T> queue) {
return new QueueBackedMessageBuf<T>(queue);
}
private MessageBufs() { }
}

View File

@ -15,14 +15,14 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class SpdySessionHandler
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
@ -87,15 +87,13 @@ public class SpdySessionHandler
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override

View File

@ -16,16 +16,15 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundByteHandler;
public abstract class ByteToByteCodec
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
implements ChannelInboundByteHandler, ChannelOutboundByteHandler {
private final ByteToByteEncoder encoder = new ByteToByteEncoder() {
@Override
@ -46,8 +45,7 @@ public abstract class ByteToByteCodec
};
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder.newInboundBuffer(ctx);
}
@ -57,8 +55,7 @@ public abstract class ByteToByteCodec
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return encoder.newOutboundBuffer(ctx);
}

View File

@ -16,16 +16,16 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelBufferHolder;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
public abstract class ByteToMessageCodec<INBOUND_OUT, OUTBOUND_IN>
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<OUTBOUND_IN> {
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<OUTBOUND_IN> {
private final MessageToByteEncoder<OUTBOUND_IN> encoder =
new MessageToByteEncoder<OUTBOUND_IN>() {
@ -47,7 +47,7 @@ public abstract class ByteToMessageCodec<INBOUND_OUT, OUTBOUND_IN>
};
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
public ByteBuf newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return decoder.newInboundBuffer(ctx);
}
@ -58,7 +58,7 @@ public abstract class ByteToMessageCodec<INBOUND_OUT, OUTBOUND_IN>
}
@Override
public ChannelBufferHolder<OUTBOUND_IN> newOutboundBuffer(
public MessageBuf<OUTBOUND_IN> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return encoder.newOutboundBuffer(ctx);
}

View File

@ -16,14 +16,14 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.ByteBufs;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
public abstract class ByteToMessageDecoder<O> extends ChannelInboundHandlerAdapter<Byte> {
public abstract class ByteToMessageDecoder<O>
extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler {
private ChannelHandlerContext ctx;
@ -34,8 +34,8 @@ public abstract class ByteToMessageDecoder<O> extends ChannelInboundHandlerAdapt
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override
@ -118,7 +118,7 @@ public abstract class ByteToMessageDecoder<O> extends ChannelInboundHandlerAdapt
* All remaining bytes in the inbound buffer will be forwarded to the new handler's
* inbound buffer.
*/
public void replace(String newHandlerName, ChannelInboundHandler<Byte> newHandler) {
public void replace(String newHandlerName, ChannelInboundByteHandler newHandler) {
if (!ctx.executor().inEventLoop()) {
throw new IllegalStateException("not in event loop");
}

View File

@ -17,8 +17,6 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
/**
@ -68,10 +66,9 @@ public class FixedLengthFrameDecoder extends ByteToMessageDecoder<Object> {
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
if (allocateFullBuffer) {
return ChannelBufferHolders.byteBuffer(ByteBufs.dynamicBuffer(frameLength));
return ByteBufs.dynamicBuffer(frameLength);
} else {
return super.newInboundBuffer(ctx);
}

View File

@ -15,16 +15,17 @@
*/
package io.netty.handler.codec;
import io.netty.channel.ChannelBufferHolder;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN, OUTBOUND_OUT>
extends ChannelHandlerAdapter
implements ChannelInboundHandler<INBOUND_IN>, ChannelOutboundHandler<OUTBOUND_IN> {
implements ChannelInboundMessageHandler<INBOUND_IN>,
ChannelOutboundMessageHandler<OUTBOUND_IN> {
private final MessageToMessageEncoder<OUTBOUND_IN, OUTBOUND_OUT> encoder =
new MessageToMessageEncoder<OUTBOUND_IN, OUTBOUND_OUT>() {
@ -53,7 +54,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
};
@Override
public ChannelBufferHolder<INBOUND_IN> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
public MessageBuf<INBOUND_IN> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder.newInboundBuffer(ctx);
}
@ -64,7 +65,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, INBOUND_OUT, OUTBOUND_IN
}
@Override
public ChannelBufferHolder<OUTBOUND_IN> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
public MessageBuf<OUTBOUND_IN> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return encoder.newOutboundBuffer(ctx);
}

View File

@ -15,18 +15,20 @@
*/
package io.netty.handler.codec;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandler;
import java.util.Queue;
public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandlerAdapter<I> {
public abstract class MessageToMessageDecoder<I, O>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {
@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override

View File

@ -16,9 +16,8 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -282,12 +281,10 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends ByteToMessa
static final Signal REPLAY = new Signal(ReplayingDecoder.class.getName() + ".REPLAY");
private final ChannelBufferHolder<Byte> in = ChannelBufferHolders.byteBuffer();
private final ByteBuf cumulation = in.byteBuffer();
private final ByteBuf cumulation = ByteBufs.dynamicBuffer();
private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(cumulation);
private S state;
private int checkpoint = -1;
private volatile boolean inUse;
/**
* Creates a new instance with no initial state (i.e: {@code null}).
@ -357,14 +354,9 @@ public abstract class ReplayingDecoder<O, S extends Enum<S>> extends ByteToMessa
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
public ByteBuf newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
if (inUse) {
throw new IllegalStateException(
ReplayingDecoder.class.getSimpleName() + " cannot be shared.");
}
inUse = true;
return in;
return cumulation;
}
@Override

View File

@ -60,6 +60,11 @@ class ReplayingDecoderBuffer implements ByteBuf {
}
}
@Override
public boolean isPooled() {
return false;
}
@Override
public boolean isDirect() {
return buffer.isDirect();

View File

@ -17,8 +17,8 @@ package io.netty.handler.codec.bytes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@ -53,8 +53,8 @@ import io.netty.handler.codec.MessageToMessageEncoder;
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[], ByteBuf> {
@Override
public ChannelBufferHolder<byte[]> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<byte[]> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override

View File

@ -16,8 +16,6 @@
package io.netty.example.http.snoop;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.http.HttpChunk;
@ -28,14 +26,6 @@ public class HttpSnoopClientHandler extends ChannelInboundMessageHandlerAdapter<
private boolean readingChunks;
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!readingChunks) {

View File

@ -21,8 +21,6 @@ import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@ -51,12 +49,6 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
/** Buffer that stores the response content */
private final StringBuilder buf = new StringBuilder();
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!readingChunks) {

View File

@ -16,16 +16,14 @@
package io.netty.handler.logging;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.ByteBufs;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundByteHandler;
public class ByteLoggingHandler
extends LoggingHandler
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
extends LoggingHandler implements ChannelInboundByteHandler, ChannelOutboundByteHandler {
private static final String NEWLINE = String.format("%n");
@ -111,18 +109,15 @@ public class ByteLoggingHandler
super(name);
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {

View File

@ -15,18 +15,18 @@
*/
package io.netty.handler.logging;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import java.util.Queue;
public class MessageLoggingHandler
extends LoggingHandler
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
public MessageLoggingHandler() {
super();
@ -52,18 +52,15 @@ public class MessageLoggingHandler
super(name);
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx)
throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {

View File

@ -16,10 +16,10 @@
package io.netty.handler.queue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.BlockingOperationException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline;
@ -96,10 +96,10 @@ public class BlockingReadHandler<E> extends ChannelInboundMessageHandlerAdapter<
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
public MessageBuf<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
return ChannelBufferHolders.messageBuffer(queue);
return MessageBufs.wrappedBuffer(queue);
}
/**

View File

@ -16,15 +16,14 @@
package io.netty.handler.ssl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelFuture;
import io.netty.logging.InternalLogger;
@ -144,7 +143,7 @@ import javax.net.ssl.SSLException;
*/
public class SslHandler
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Byte> {
implements ChannelInboundByteHandler, ChannelOutboundByteHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SslHandler.class);
@ -304,13 +303,13 @@ public class SslHandler
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override

View File

@ -16,23 +16,21 @@
package io.netty.handler.stream;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -69,23 +67,22 @@ import java.util.concurrent.atomic.AtomicInteger;
* @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from
*/
public class ChunkedWriteHandler
extends ChannelHandlerAdapter implements ChannelOutboundHandler<Object> {
extends ChannelHandlerAdapter implements ChannelOutboundMessageHandler<Object> {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
private static final int MAX_PENDING_WRITES = 4;
private final Queue<Object> queue = new LinkedList<Object>();
private final MessageBuf<Object> queue = MessageBufs.buffer();
private volatile ChannelHandlerContext ctx;
private final AtomicInteger pendingWrites = new AtomicInteger();
private Object currentEvent;
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(queue);
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return queue;
}
private boolean isWritable() {

View File

@ -18,8 +18,8 @@ package io.netty.handler.stream;
import static org.junit.Assert.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@ -135,11 +135,11 @@ public class ChunkedWriteHandlerTest {
}
};
ChannelOutboundHandlerAdapter<ByteBuf> testHandler = new ChannelOutboundHandlerAdapter<ByteBuf>() {
ChannelOutboundHandlerAdapter testHandler = new ChannelOutboundHandlerAdapter() {
@Override
public ChannelBufferHolder<ByteBuf> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<ByteBuf> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override

View File

@ -15,15 +15,16 @@
*/
package io.netty.bootstrap;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@ -230,10 +231,12 @@ public class ServerBootstrap {
validate();
}
private class Acceptor extends ChannelInboundHandlerAdapter<Channel> {
private class Acceptor
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<Channel> {
@Override
public ChannelBufferHolder<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override

View File

@ -1,98 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import java.util.Queue;
public final class ChannelBufferHolder<E> {
private final ChannelBufferType type;
private final Queue<E> msgBuf;
private final ByteBuf byteBuf;
ChannelBufferHolder(Queue<E> msgBuf) {
if (msgBuf == null) {
throw new NullPointerException("msgBuf");
}
this.msgBuf = msgBuf;
byteBuf = null;
type = ChannelBufferType.MESSAGE;
}
ChannelBufferHolder(ByteBuf byteBuf) {
if (byteBuf == null) {
throw new NullPointerException("byteBuf");
}
msgBuf = null;
this.byteBuf = byteBuf;
type = ChannelBufferType.BYTE;
}
public ChannelBufferType type() {
return type;
}
public Queue<E> messageBuffer() {
if (msgBuf == null) {
throw new NoSuchBufferException();
}
return msgBuf;
}
public ByteBuf byteBuffer() {
if (byteBuf == null) {
throw new NoSuchBufferException();
}
return byteBuf;
}
@Override
public String toString() {
switch (type) {
case MESSAGE:
return "MessageBuffer(" + msgBuf.size() + ')';
case BYTE:
return byteBuf.toString();
default:
throw new Error();
}
}
public int size() {
switch (type) {
case MESSAGE:
return msgBuf.size();
case BYTE:
return byteBuf.readableBytes();
default:
throw new Error();
}
}
public boolean isEmpty() {
switch (type) {
case MESSAGE:
return msgBuf.isEmpty();
case BYTE:
return !byteBuf.readable();
default:
throw new Error();
}
}
}

View File

@ -1,269 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.AbstractByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufFactory;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.HeapByteBufFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.AbstractQueue;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
public final class ChannelBufferHolders {
private static final ChannelBufferHolder<Object> DISCARD_MESSAGE_BUFFER =
new ChannelBufferHolder<Object>(new NoopQueue<Object>());
private static final ChannelBufferHolder<Byte> DISCARD_BYTE_BUFFER =
new ChannelBufferHolder<Byte>(new NoopByteBuf());
public static <E> ChannelBufferHolder<E> messageBuffer() {
return messageBuffer(new ArrayDeque<E>());
}
public static <E> ChannelBufferHolder<E> messageBuffer(Queue<E> buffer) {
return new ChannelBufferHolder<E>(buffer);
}
public static ChannelBufferHolder<Byte> byteBuffer() {
// TODO: Use more efficient implementation.
return byteBuffer(ByteBufs.dynamicBuffer());
}
public static ChannelBufferHolder<Byte> byteBuffer(ByteBuf buffer) {
return new ChannelBufferHolder<Byte>(buffer);
}
@SuppressWarnings("unchecked")
public static <E> ChannelBufferHolder<E> discardMessageBuffer() {
return (ChannelBufferHolder<E>) DISCARD_MESSAGE_BUFFER;
}
@SuppressWarnings("unchecked")
public static <E> ChannelBufferHolder<E> discardByteBuffer() {
return (ChannelBufferHolder<E>) DISCARD_BYTE_BUFFER;
}
private ChannelBufferHolders() {
// Utility class
}
private static class NoopQueue<E> extends AbstractQueue<E> {
@Override
public boolean offer(Object e) {
return false;
}
@Override
public E poll() {
return null;
}
@Override
public E peek() {
return null;
}
@Override
public Iterator<E> iterator() {
return (Iterator<E>) Collections.emptyList().iterator();
}
@Override
public int size() {
return 0;
}
}
private static class NoopByteBuf extends AbstractByteBuf {
@Override
public ByteBufFactory factory() {
return HeapByteBufFactory.getInstance();
}
@Override
public int capacity() {
return 0;
}
@Override
public ByteOrder order() {
return ByteOrder.BIG_ENDIAN;
}
@Override
public boolean isDirect() {
return false;
}
@Override
public byte getByte(int index) {
return 0;
}
@Override
public short getShort(int index) {
return 0;
}
@Override
public int getUnsignedMedium(int index) {
return 0;
}
@Override
public int getInt(int index) {
return 0;
}
@Override
public long getLong(int index) {
return 0;
}
@Override
public void getBytes(int index, ByteBuf dst, int dstIndex, int length) {
// NOOP
}
@Override
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
// NOOP
}
@Override
public void getBytes(int index, ByteBuffer dst) {
// NOOP
}
@Override
public void getBytes(int index, OutputStream out, int length)
throws IOException {
// NOOP
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length)
throws IOException {
return 0;
}
@Override
public void setByte(int index, int value) {
// NOOP
}
@Override
public void setShort(int index, int value) {
// NOOP
}
@Override
public void setMedium(int index, int value) {
// NOOP
}
@Override
public void setInt(int index, int value) {
// NOOP
}
@Override
public void setLong(int index, long value) {
// NOOP
}
@Override
public void setBytes(int index, ByteBuf src, int srcIndex,
int length) {
// NOOP
}
@Override
public void setBytes(int index, byte[] src, int srcIndex, int length) {
// NOOP
}
@Override
public void setBytes(int index, ByteBuffer src) {
// NOOP
}
@Override
public int setBytes(int index, InputStream in, int length)
throws IOException {
return 0;
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length)
throws IOException {
return 0;
}
@Override
public ByteBuf copy(int index, int length) {
return this;
}
@Override
public ByteBuf slice(int index, int length) {
return this;
}
@Override
public ByteBuf duplicate() {
return this;
}
@Override
public boolean hasNioBuffer() {
return true;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return ByteBuffer.allocate(0);
}
@Override
public boolean hasArray() {
return true;
}
@Override
public byte[] array() {
return new byte[0];
}
@Override
public int arrayOffset() {
return 0;
}
}
}

View File

@ -0,0 +1,8 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
public interface ChannelInboundByteHandler extends ChannelInboundHandler {
@Override
ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -16,13 +16,15 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
public abstract class ChannelInboundByteHandlerAdapter extends ChannelInboundHandlerAdapter<Byte> {
public abstract class ChannelInboundByteHandlerAdapter
extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override

View File

@ -15,7 +15,8 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuf;
public interface ChannelInboundHandler<T> extends ChannelStateHandler {
ChannelBufferHolder<T> newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
public interface ChannelInboundHandler extends ChannelStateHandler {
ChannelBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -16,8 +16,8 @@
package io.netty.channel;
public abstract class ChannelInboundHandlerAdapter<I> extends ChannelStateHandlerAdapter
implements ChannelInboundHandler<I> {
public abstract class ChannelInboundHandlerAdapter
extends ChannelStateHandlerAdapter implements ChannelInboundHandler {
@Override
public abstract void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -0,0 +1,8 @@
package io.netty.channel;
import io.netty.buffer.MessageBuf;
public interface ChannelInboundMessageHandler<I> extends ChannelInboundHandler {
@Override
MessageBuf<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -15,13 +15,17 @@
*/
package io.netty.channel;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import java.util.Queue;
public abstract class ChannelInboundMessageHandlerAdapter<I> extends ChannelInboundHandlerAdapter<I> {
public abstract class ChannelInboundMessageHandlerAdapter<I>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {
@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override

View File

@ -0,0 +1,8 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
public interface ChannelOutboundByteHandler extends ChannelOutboundHandler {
@Override
ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -15,9 +15,13 @@
*/
package io.netty.channel;
public abstract class ChannelOutboundByteHandlerAdapter extends ChannelOutboundHandlerAdapter<Byte> {
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
public abstract class ChannelOutboundByteHandlerAdapter
extends ChannelOutboundHandlerAdapter implements ChannelOutboundByteHandler {
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
public interface ChannelOutboundHandler<T> extends ChannelOperationHandler {
ChannelBufferHolder<T> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
import io.netty.buffer.ChannelBuf;
public interface ChannelOutboundHandler extends ChannelOperationHandler {
ChannelBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -15,8 +15,8 @@
*/
package io.netty.channel;
public abstract class ChannelOutboundHandlerAdapter<O>
extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler<O> {
public abstract class ChannelOutboundHandlerAdapter
extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler {
@Override
public abstract void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
}

View File

@ -0,0 +1,8 @@
package io.netty.channel;
import io.netty.buffer.MessageBuf;
public interface ChannelOutboundMessageHandler<I> extends ChannelOutboundHandler {
@Override
MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -15,10 +15,13 @@
*/
package io.netty.channel;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
public abstract class ChannelOutboundMessageHandlerAdapter<I> extends ChannelOutboundHandlerAdapter<I> {
public abstract class ChannelOutboundMessageHandlerAdapter<I>
extends ChannelOutboundHandlerAdapter implements ChannelOutboundMessageHandler<I> {
@Override
public ChannelBufferHolder<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
}

View File

@ -15,27 +15,27 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuf;
import java.net.SocketAddress;
public class CombinedChannelHandler
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
extends ChannelHandlerAdapter implements ChannelInboundHandler, ChannelOutboundHandler {
private ChannelOutboundHandler<Object> out;
private ChannelInboundHandler<Object> in;
private ChannelOutboundHandler out;
private ChannelInboundHandler in;
protected CombinedChannelHandler() {
// User will call init in the subclass constructor.
}
public CombinedChannelHandler(
ChannelInboundHandler<?> inboundHandler, ChannelOutboundHandler<?> outboundHandler) {
ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
init(inboundHandler, outboundHandler);
}
@SuppressWarnings("unchecked")
protected void init(ChannelInboundHandler<?> inboundHandler,
ChannelOutboundHandler<?> outboundHandler) {
protected void init(
ChannelInboundHandler inboundHandler, ChannelOutboundHandler outboundHandler) {
if (inboundHandler == null) {
throw new NullPointerException("inboundHandler");
}
@ -58,18 +58,18 @@ public class CombinedChannelHandler
throw new IllegalStateException("init() cannot be called more than once.");
}
in = (ChannelInboundHandler<Object>) inboundHandler;
out = (ChannelOutboundHandler<Object>) outboundHandler;
in = inboundHandler;
out = outboundHandler;
}
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
public ChannelBuf newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return in.newInboundBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
public ChannelBuf newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return out.newOutboundBuffer(ctx);
}

View File

@ -18,6 +18,8 @@ package io.netty.channel;
import static io.netty.channel.DefaultChannelPipeline.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.QueueFactory;
@ -174,7 +176,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
this.type = Collections.unmodifiableSet(type);
this.directions = typeValue;
directions = typeValue;
this.prev = prev;
this.next = next;
@ -200,27 +202,28 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
if (type.contains(ChannelHandlerType.INBOUND)) {
ChannelBufferHolder<Object> holder;
ChannelBuf buf;
try {
holder = ((ChannelInboundHandler<Object>) handler).newInboundBuffer(this);
buf = ((ChannelInboundHandler) handler).newInboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
}
switch (holder.type()) {
case BYTE:
inByteBuf = holder.byteBuffer();
if (buf == null) {
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
}
if (buf instanceof ByteBuf) {
inByteBuf = (ByteBuf) buf;
inByteBridge = new AtomicReference<ByteBridge>();
inMsgBuf = null;
inMsgBridge = null;
break;
case MESSAGE:
} else if (buf instanceof MessageBuf) {
inByteBuf = null;
inByteBridge = null;
inMsgBuf = holder.messageBuffer();
inMsgBuf = (Queue<Object>) buf;
inMsgBridge = new AtomicReference<MessageBridge>();
break;
default:
} else {
throw new Error();
}
} else {
@ -231,27 +234,28 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
if (type.contains(ChannelHandlerType.OUTBOUND)) {
ChannelBufferHolder<Object> holder;
ChannelBuf buf;
try {
holder = ((ChannelOutboundHandler<Object>) handler).newOutboundBuffer(this);
buf = ((ChannelOutboundHandler) handler).newOutboundBuffer(this);
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
}
switch (holder.type()) {
case BYTE:
outByteBuf = holder.byteBuffer();
if (buf == null) {
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
}
if (buf instanceof ByteBuf) {
outByteBuf = (ByteBuf) buf;
outByteBridge = new AtomicReference<ByteBridge>();
outMsgBuf = null;
outMsgBridge = null;
break;
case MESSAGE:
} else if (buf instanceof MessageBuf) {
outByteBuf = null;
outByteBridge = null;
outMsgBuf = holder.messageBuffer();
outMsgBuf = (Queue<Object>) buf;
outMsgBridge = new AtomicReference<MessageBridge>();
break;
default:
} else {
throw new Error();
}
} else {

View File

@ -17,8 +17,11 @@ package io.netty.channel;
import static io.netty.channel.DefaultChannelHandlerContext.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.DefaultChannelHandlerContext.MessageBridge;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.DefaultChannelHandlerContext.ByteBridge;
import io.netty.channel.DefaultChannelHandlerContext.MessageBridge;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -1429,19 +1432,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
@SuppressWarnings("rawtypes")
private final class HeadHandler implements ChannelOutboundHandler {
@Override
public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
public ChannelBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
switch (channel.bufferType()) {
case BYTE:
return ChannelBufferHolders.byteBuffer();
return ByteBufs.dynamicBuffer();
case MESSAGE:
if (channel instanceof ServerChannel) {
return ChannelBufferHolders.discardMessageBuffer();
} else {
return ChannelBufferHolders.messageBuffer();
}
return MessageBufs.buffer();
default:
throw new Error();
}

View File

@ -17,9 +17,10 @@ package io.netty.channel.embedded;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
@ -35,7 +36,6 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
public abstract class AbstractEmbeddedChannel extends AbstractChannel {
@ -46,7 +46,7 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig();
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> lastInboundMessageBuffer = new ArrayDeque<Object>();
private final MessageBuf<Object> lastInboundMessageBuffer = MessageBufs.buffer();
private final ByteBuf lastInboundByteBuffer = ByteBufs.dynamicBuffer();
protected final Object lastOutboundBuffer;
private Throwable lastException;
@ -198,10 +198,10 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
}
}
private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter<Object> {
private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public ChannelBufferHolder<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(lastInboundMessageBuffer);
public ChannelBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return lastInboundMessageBuffer;
}
@Override
@ -222,10 +222,10 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
}
}
private final class LastInboundByteHandler extends ChannelInboundHandlerAdapter<Byte> {
private final class LastInboundByteHandler extends ChannelInboundHandlerAdapter {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer(lastInboundByteBuffer);
public ChannelBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return lastInboundByteBuffer;
}
@Override

View File

@ -17,16 +17,19 @@ package io.netty.channel.local;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.DefaultEventExecutor;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
@ -330,7 +333,8 @@ public class LocalTransportThreadModelTest {
private static class ThreadNameAuditor
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
implements ChannelInboundMessageHandler<Object>,
ChannelOutboundMessageHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -338,15 +342,13 @@ public class LocalTransportThreadModelTest {
private final Queue<String> outboundThreadNames = QueueFactory.createQueue();
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
@ -379,7 +381,7 @@ public class LocalTransportThreadModelTest {
*/
private static class MessageForwarder1
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Integer>, ChannelOutboundHandler<Byte> {
implements ChannelInboundMessageHandler<Integer>, ChannelOutboundByteHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -387,15 +389,13 @@ public class LocalTransportThreadModelTest {
private volatile Thread t;
@Override
public ChannelBufferHolder<Integer> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Integer> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ByteBufs.dynamicBuffer();
}
@Override
@ -464,7 +464,7 @@ public class LocalTransportThreadModelTest {
*/
private static class MessageForwarder2
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Integer> {
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<Integer> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -472,15 +472,15 @@ public class LocalTransportThreadModelTest {
private volatile Thread t;
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
public ByteBuf newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
return ByteBufs.dynamicBuffer();
}
@Override
public ChannelBufferHolder<Integer> newOutboundBuffer(
public MessageBuf<Integer> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
return MessageBufs.buffer();
}
@Override
@ -541,7 +541,7 @@ public class LocalTransportThreadModelTest {
*/
private static class MessageForwarder3
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -549,20 +549,17 @@ public class LocalTransportThreadModelTest {
private volatile Thread t;
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public void inboundBufferUpdated(
ChannelHandlerContext ctx) throws Exception {
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
Thread t = this.t;
if (t == null) {
this.t = Thread.currentThread();
@ -619,7 +616,7 @@ public class LocalTransportThreadModelTest {
*/
private static class MessageDiscarder
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -628,20 +625,17 @@ public class LocalTransportThreadModelTest {
final CountDownLatch latch = new CountDownLatch(1);
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public void inboundBufferUpdated(
ChannelHandlerContext ctx) throws Exception {
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
Thread t = this.t;
if (t == null) {
this.t = Thread.currentThread();