diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 43060cd01b..b6fc131d9c 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -18,11 +18,12 @@ package io.netty.handler.codec; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelStateHandlerAdapter; import java.util.Queue; -public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter { +public abstract class MessageToMessageDecoder extends ChannelStateHandlerAdapter implements ChannelInboundHandler { @Override public ChannelBufferHolder newInboundBuffer( diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 4790d9cedd..a9dda3f891 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -19,11 +19,13 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelOutboundHandler; import java.util.Queue; -public abstract class MessageToMessageEncoder extends ChannelOutboundHandlerAdapter { +public abstract class MessageToMessageEncoder + extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler { @Override public ChannelBufferHolder newOutboundBuffer( diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java index 2cd4f01e95..d18356a278 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToStreamEncoder.java @@ -20,11 +20,13 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelOutboundHandler; import java.util.Queue; -public abstract class MessageToStreamEncoder extends ChannelOutboundHandlerAdapter { +public abstract class MessageToStreamEncoder + extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler { @Override public ChannelBufferHolder newOutboundBuffer( diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index c878f0f425..c210aa6c00 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -20,10 +20,11 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelStateHandlerAdapter; -public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAdapter { +public abstract class StreamToMessageDecoder + extends ChannelStateHandlerAdapter implements ChannelInboundHandler { private ChannelHandlerContext ctx; diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java index 978d69fd21..837e3f62ca 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamDecoder.java @@ -19,9 +19,11 @@ import io.netty.buffer.ChannelBuffer; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelStateHandlerAdapter; -public abstract class StreamToStreamDecoder extends ChannelInboundHandlerAdapter { +public abstract class StreamToStreamDecoder + extends ChannelStateHandlerAdapter implements ChannelInboundHandler { @Override public ChannelBufferHolder newInboundBuffer( diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java index b711892bd1..95da778f14 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToStreamEncoder.java @@ -20,9 +20,11 @@ import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelOutboundHandler; -public abstract class StreamToStreamEncoder extends ChannelOutboundHandlerAdapter { +public abstract class StreamToStreamEncoder + extends ChannelOperationHandlerAdapter implements ChannelOutboundHandler { @Override public ChannelBufferHolder newOutboundBuffer( diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index 4d84250b81..93f6ca171e 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -232,4 +232,16 @@ public class LoggingHandler extends ChannelHandlerAdapter { } super.deregister(ctx, future); } + + @Override + public void flush(ChannelHandlerContext ctx, ChannelFuture future) + throws Exception { + ctx.flush(future); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) + throws Exception { + ctx.fireInboundBufferUpdated(); + } } diff --git a/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java b/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java index 1889028366..438e93938b 100644 --- a/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java +++ b/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java @@ -21,8 +21,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelStateHandlerAdapter; import io.netty.util.internal.QueueFactory; import java.io.IOException; @@ -69,7 +70,8 @@ import java.util.concurrent.TimeUnit; * * @param the type of the received messages */ -public class BlockingReadHandler extends ChannelInboundHandlerAdapter { +public class BlockingReadHandler + extends ChannelStateHandlerAdapter implements ChannelInboundHandler { private static final Object INACTIVE = new Object(); diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 0c7fc35603..a1a24ca550 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -23,8 +23,9 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -67,7 +68,8 @@ import java.util.concurrent.atomic.AtomicInteger; * @apiviz.landmark * @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from */ -public class ChunkedWriteHandler extends ChannelOutboundHandlerAdapter { +public class ChunkedWriteHandler + extends ChannelHandlerAdapter implements ChannelOutboundHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index 6f9cab7e95..36043b4e8a 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -23,10 +23,11 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelStateHandlerAdapter; import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; import io.netty.logging.InternalLogger; @@ -230,9 +231,11 @@ public class ServerBootstrap { validate(); } - private class Acceptor extends ChannelInboundHandlerAdapter { + private class Acceptor extends ChannelStateHandlerAdapter implements ChannelInboundHandler { + @Override - public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) { + public ChannelBufferHolder newInboundBuffer( + ChannelHandlerContext ctx) throws Exception { return ChannelBufferHolders.messageBuffer(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index df1ac23efc..cb720c9a93 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -15,7 +15,10 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; + import java.net.SocketAddress; +import java.util.Queue; public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler { @@ -46,10 +49,27 @@ public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - if (ctx.type().contains(ChannelHandlerType.OUTBOUND)) { - ChannelOutboundHandlerAdapter.flush0(ctx, future); - } else { - ctx.flush(future); + flush0(ctx, future); + } + + static void flush0(ChannelHandlerContext ctx, ChannelFuture future) { + if (ctx.hasOutboundMessageBuffer()) { + Queue out = ctx.outboundMessageBuffer(); + Queue nextOut = ctx.nextOutboundMessageBuffer(); + for (;;) { + O msg = out.poll(); + if (msg == null) { + break; + } + nextOut.add(msg); + } + } else if (ctx.hasOutboundByteBuffer()) { + ChannelBuffer out = ctx.outboundByteBuffer(); + ChannelBuffer nextOut = ctx.nextOutboundByteBuffer(); + nextOut.writeBytes(out); + out.discardReadBytes(); } + + ctx.flush(future); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java deleted file mode 100644 index efb54bf991..0000000000 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import io.netty.buffer.ChannelBuffer; - -import java.util.Queue; - -public abstract class ChannelInboundHandlerAdapter extends ChannelStateHandlerAdapter - implements ChannelInboundHandler { - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { - inboundBufferUpdated0(ctx); - } - - static void inboundBufferUpdated0(ChannelHandlerContext ctx) { - if (ctx.hasInboundMessageBuffer()) { - Queue in = ctx.inboundMessageBuffer(); - Queue nextIn = ctx.nextInboundMessageBuffer(); - for (;;) { - I msg = in.poll(); - if (msg == null) { - break; - } - nextIn.add(msg); - } - } else { - ChannelBuffer in = ctx.inboundByteBuffer(); - ChannelBuffer nextIn = ctx.nextInboundByteBuffer(); - nextIn.writeBytes(in); - in.discardReadBytes(); - } - ctx.fireInboundBufferUpdated(); - } -} diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index 10c41bfc67..ad5dbce1dd 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -17,8 +17,8 @@ package io.netty.channel; import java.util.Queue; -public class ChannelInboundMessageHandlerAdapter extends - ChannelInboundHandlerAdapter { +public class ChannelInboundMessageHandlerAdapter + extends ChannelStateHandlerAdapter implements ChannelInboundHandler { @Override public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java index 75f62e9b2a..9e515075f6 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundStreamHandlerAdapter.java @@ -18,7 +18,9 @@ package io.netty.channel; import io.netty.buffer.ChannelBuffer; -public class ChannelInboundStreamHandlerAdapter extends ChannelInboundHandlerAdapter { +public class ChannelInboundStreamHandlerAdapter + extends ChannelStateHandlerAdapter implements ChannelInboundHandler { + @Override public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception { return ChannelBufferHolders.byteBuffer(); diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java index 88d9ea47fd..e3dd918b7d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java @@ -69,6 +69,6 @@ public class ChannelOperationHandlerAdapter implements ChannelOperationHandler { @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - ctx.flush(future); + ChannelHandlerAdapter.flush0(ctx, future); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java deleted file mode 100644 index 4303dae0c6..0000000000 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import io.netty.buffer.ChannelBuffer; - -import java.util.Queue; - -public abstract class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter - implements ChannelOutboundHandler { - - @Override - public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - flush0(ctx, future); - } - - static void flush0(ChannelHandlerContext ctx, ChannelFuture future) { - if (ctx.hasOutboundMessageBuffer()) { - Queue out = ctx.outboundMessageBuffer(); - Queue nextOut = ctx.nextOutboundMessageBuffer(); - for (;;) { - O msg = out.poll(); - if (msg == null) { - break; - } - nextOut.add(msg); - } - } else { - ChannelBuffer out = ctx.outboundByteBuffer(); - ChannelBuffer nextOut = ctx.nextOutboundByteBuffer(); - nextOut.writeBytes(out); - out.discardReadBytes(); - } - ctx.flush(future); - } -} diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java index fd24571619..9646bcde7f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java @@ -15,6 +15,10 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; + +import java.util.Queue; + public class ChannelStateHandlerAdapter implements ChannelStateHandler { // Not using volatile because it's used only for a sanity check. @@ -78,10 +82,28 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler { @Override public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { - if (ctx.type().contains(ChannelHandlerType.INBOUND)) { - ChannelInboundHandlerAdapter.inboundBufferUpdated0(ctx); - } else { - ctx.fireInboundBufferUpdated(); - } + inboundBufferUpdated0(ctx); } + + static void inboundBufferUpdated0(ChannelHandlerContext ctx) { + if (ctx.hasInboundMessageBuffer()) { + Queue in = ctx.inboundMessageBuffer(); + Queue nextIn = ctx.nextInboundMessageBuffer(); + for (;;) { + I msg = in.poll(); + if (msg == null) { + break; + } + nextIn.add(msg); + } + } else if (ctx.hasInboundByteBuffer()){ + ChannelBuffer in = ctx.inboundByteBuffer(); + ChannelBuffer nextIn = ctx.nextInboundByteBuffer(); + nextIn.writeBytes(in); + in.discardReadBytes(); + } + + ctx.fireInboundBufferUpdated(); + } + }