diff --git a/codec/src/main/java/io/netty/handler/codec/marshalling/ChannelBufferByteOutput.java b/codec/src/main/java/io/netty/handler/codec/marshalling/ChannelBufferByteOutput.java index f1f448ef3c..e431a09c47 100644 --- a/codec/src/main/java/io/netty/handler/codec/marshalling/ChannelBufferByteOutput.java +++ b/codec/src/main/java/io/netty/handler/codec/marshalling/ChannelBufferByteOutput.java @@ -16,11 +16,10 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ByteBuf; +import org.jboss.marshalling.ByteOutput; import java.io.IOException; -import org.jboss.marshalling.ByteOutput; - /** * {@link ByteOutput} implementation which writes the data to a {@link ByteBuf} * @@ -39,7 +38,7 @@ class ChannelBufferByteOutput implements ByteOutput { @Override public void close() throws IOException { - // Nothing todo + // Nothing to do } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/marshalling/LimitingByteInput.java b/codec/src/main/java/io/netty/handler/codec/marshalling/LimitingByteInput.java index 68dc28d7df..47ae2bcf79 100644 --- a/codec/src/main/java/io/netty/handler/codec/marshalling/LimitingByteInput.java +++ b/codec/src/main/java/io/netty/handler/codec/marshalling/LimitingByteInput.java @@ -15,10 +15,10 @@ */ package io.netty.handler.codec.marshalling; -import java.io.IOException; - import org.jboss.marshalling.ByteInput; +import java.io.IOException; + /** * {@link ByteInput} implementation which wraps another {@link ByteInput} and throws a {@link TooBigObjectException} * if the read limit was reached. @@ -42,7 +42,7 @@ class LimitingByteInput implements ByteInput { @Override public void close() throws IOException { - // Nothing todo + // Nothing to do } @Override diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java index 5ba3073733..70485662ef 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServerHandler.java @@ -15,10 +15,6 @@ */ package io.netty.example.http.snoop; -import static io.netty.handler.codec.http.HttpHeaders.*; -import static io.netty.handler.codec.http.HttpHeaders.Names.*; -import static io.netty.handler.codec.http.HttpResponseStatus.*; -import static io.netty.handler.codec.http.HttpVersion.*; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; @@ -44,6 +40,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import static io.netty.handler.codec.http.HttpHeaders.Names.*; +import static io.netty.handler.codec.http.HttpHeaders.*; +import static io.netty.handler.codec.http.HttpResponseStatus.*; +import static io.netty.handler.codec.http.HttpVersion.*; + public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter { private HttpRequest request; diff --git a/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java b/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java index 526e4c88c1..f6129b4ee6 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java @@ -19,7 +19,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; public class LocalEchoClientHandler extends ChannelInboundMessageHandlerAdapter { - @Override public void messageReceived(ChannelHandlerContext ctx, String msg) { // Print as received diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index 0e9b7ade4c..57ef11a1a3 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -230,6 +230,11 @@ public class LoggingHandler extends ChannelHandlerAdapter { super.deregister(ctx, future); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index ee4848dfc3..f1d3fd7322 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -400,6 +400,11 @@ public class SslHandler closeOutboundAndChannel(ctx, future, false); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception { final ByteBuf in = ctx.outboundByteBuffer(); diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 50a4a43662..2aeea7d677 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -139,6 +139,11 @@ public class ChunkedWriteHandler } } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { queue.add(future); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 4319ee93e2..bf61593e65 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -261,6 +261,11 @@ public class IdleStateHandler extends ChannelStateHandlerAdapter implements Chan ctx.fireInboundBufferUpdated(); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception { future.addListener(new ChannelFutureListener() { diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index 13d479bcb9..2d63f12a8c 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -77,6 +77,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte */ protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s + private static final AttributeKey READ_SUSPENDED = new AttributeKey("readSuspended"); private static final AttributeKey REOPEN_TASK = new AttributeKey("reopenTask"); private static final AttributeKey BUFFER_UPDATE_TASK = new AttributeKey("bufferUpdateTask"); @@ -188,7 +189,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte /** * Class to implement setReadable at fix time */ - private static final class ReopenReadTimerTask implements Runnable { + private static final class ReopenReadTimerTask implements Runnable { final ChannelHandlerContext ctx; ReopenReadTimerTask(ChannelHandlerContext ctx) { this.ctx = ctx; @@ -196,9 +197,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte @Override public void run() { - if (ctx.channel().isActive()) { - ctx.readable(true); - } + ctx.attr(READ_SUSPENDED).set(false); + ctx.read(); } } @@ -259,8 +259,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal // time in order to // try to limit the traffic - if (ctx.isReadable()) { - ctx.readable(false); + if (!ctx.attr(READ_SUSPENDED).get()) { + ctx.attr(READ_SUSPENDED).set(true); // Create a Runnable to reactive the read if needed. If one was create before it will just be // reused to limit object creation @@ -294,6 +294,13 @@ public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapte ctx.fireInboundBufferUpdated(); } + @Override + public void read(ChannelHandlerContext ctx) { + if (!ctx.attr(READ_SUSPENDED).get()) { + ctx.read(); + } + } + @Override public void flush(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception { long curtime = System.currentTimeMillis(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java index ef3b59de00..614d99523c 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelOption; -import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -31,6 +30,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import static org.junit.Assert.*; + public class ServerSocketSuspendTest extends AbstractServerSocketTest { private static final int NUM_CHANNELS = 10; @@ -46,10 +47,10 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest { AcceptedChannelCounter counter = new AcceptedChannelCounter(NUM_CHANNELS); sb.option(ChannelOption.SO_BACKLOG, 1); + sb.option(ChannelOption.AUTO_READ, false); sb.childHandler(counter); Channel sc = sb.bind().sync().channel(); - sc.pipeline().firstContext().readable(false); List sockets = new ArrayList(); @@ -61,11 +62,13 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest { sockets.add(s); } - sc.pipeline().firstContext().readable(true); + sc.config().setAutoRead(true); + sc.read(); + counter.latch.await(); long endTime = System.nanoTime(); - Assert.assertTrue(endTime - startTime > TIMEOUT); + assertTrue(endTime - startTime > TIMEOUT); } finally { for (Socket s: sockets) { s.close(); @@ -83,7 +86,7 @@ public class ServerSocketSuspendTest extends AbstractServerSocketTest { } long endTime = System.nanoTime(); - Assert.assertTrue(endTime - startTime < TIMEOUT); + assertTrue(endTime - startTime < TIMEOUT); } finally { for (Socket s: sockets) { s.close(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 3c0eaf37da..35ed5da521 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.testsuite.transport.socket; -import static org.junit.Assert.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -23,12 +22,13 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; +import org.junit.Test; import java.io.IOException; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import static org.junit.Assert.*; public class SocketEchoTest extends AbstractSocketTest { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java index 704dc4e11f..d9b85f0810 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java @@ -32,8 +32,7 @@ import java.io.IOException; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class SocketFileRegionTest extends AbstractSocketTest { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java index edc5d746ec..de5cb0c7b1 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.testsuite.transport.socket; -import static org.junit.Assert.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -26,12 +25,13 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; +import org.junit.Test; import java.io.IOException; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import static org.junit.Assert.*; public class SocketFixedLengthEchoTest extends AbstractSocketTest { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java index 98ebc1b8f1..1356797077 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java @@ -15,14 +15,14 @@ */ package io.netty.testsuite.transport.socket; -import static org.junit.Assert.*; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.ChannelOption; +import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.SocketChannel; +import org.junit.Test; import java.net.Socket; import java.util.concurrent.BlockingQueue; @@ -30,7 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; +import static org.junit.Assert.*; public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java index e98951efb2..ec023ba21b 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.testsuite.transport.socket; -import static org.junit.Assert.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -30,13 +29,14 @@ import io.netty.handler.codec.spdy.SpdyConstants; import io.netty.handler.codec.spdy.SpdyFrameDecoder; import io.netty.handler.codec.spdy.SpdyFrameEncoder; import io.netty.util.NetUtil; +import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import static org.junit.Assert.*; public class SocketSpdyEchoTest extends AbstractSocketTest { diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index 6f93dd8c98..beb327673c 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -225,8 +225,8 @@ public class ServerBootstrap extends AbstractBootstrap { return Unpooled.messageBuffer(); } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public void inboundBufferUpdated(ChannelHandlerContext ctx) { MessageBuf in = ctx.inboundMessageBuffer(); for (;;) { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index bd891f2a22..3b8f6d76be 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -288,6 +288,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return (MessageBuf) pipeline.outboundMessageBuffer(); } + @Override + public void read() { + pipeline.read(); + } + @Override public ChannelFuture flush(ChannelFuture future) { return pipeline.flush(future); @@ -439,6 +444,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + private final Runnable beginReadTask = new Runnable() { + @Override + public void run() { + beginRead(); + } + }; + private final Runnable flushLaterTask = new Runnable() { @Override public void run() { @@ -725,6 +737,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + @Override + public void beginRead() { + if (eventLoop().inEventLoop()) { + try { + doBeginRead(); + } catch (Exception e) { + pipeline().fireExceptionCaught(e); + close(unsafe().voidFuture()); + } + } else { + eventLoop().execute(beginReadTask); + } + } + @Override public void flush(final ChannelFuture future) { if (eventLoop().inEventLoop()) { @@ -931,6 +957,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // NOOP } + /** + * Schedule a read operation. + */ + protected abstract void doBeginRead() throws Exception; + /** * Flush the content of the given {@link ByteBuf} to the remote peer. * diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 71d132b27c..3f857fda4c 100755 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -79,8 +79,12 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S return false; } - protected abstract class AbstractServerUnsafe extends AbstractUnsafe { + @Override + protected AbstractUnsafe newUnsafe() { + return new DefaultServerUnsafe(); + } + private final class DefaultServerUnsafe extends AbstractUnsafe { @Override public void flush(final ChannelFuture future) { if (eventLoop().inEventLoop()) { diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index a9e37ef7fc..ad713f9b32 100755 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.AttributeMap; @@ -155,7 +156,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr * {@code null} if this channel is not connected. * If this channel is not connected but it can receive messages * from arbitrary remote addresses (e.g. {@link DatagramChannel}, - * use {@link io.netty.channel.socket.DatagramPacket#remoteAddress()} to determine + * use {@link DatagramPacket#remoteAddress()} to determine * the origination of the received message as this method will * return {@code null}. */ @@ -245,6 +246,12 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr */ void deregister(ChannelFuture future); + /** + * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the + * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. + */ + void beginRead(); + /** * Flush out all data that was buffered in the buffer of the {@link #directOutboundContext()} and was not * flushed out yet. After that is done the {@link ChannelFuture} will get notified @@ -256,18 +263,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr */ void flushNow(); - /** - * Suspend reads from the underlying transport, which basicly has the effect of no new data that will - * get dispatched. - */ - void suspendRead(); - - /** - * Resume reads from the underlying transport. If {@link #suspendRead()} was not called before, this - * has no effect. - */ - void resumeRead(); - /** * Send a {@link FileRegion} to the remote peer and notify the {@link ChannelFuture} once it completes * or an error was detected. Once the {@link FileRegion} was transfered or an error was thrown it will diff --git a/transport/src/main/java/io/netty/channel/ChannelConfig.java b/transport/src/main/java/io/netty/channel/ChannelConfig.java index 560c6af7b9..a52c8de73e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/ChannelConfig.java @@ -149,4 +149,16 @@ public interface ChannelConfig { * to allocate buffers. */ ChannelConfig setAllocator(ByteBufAllocator allocator); + + /** + * Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that + * a user application doesn't need to call it at all. The default value is {@code true}. + */ + boolean isAutoRead(); + + /** + * Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't + * need to call it at all. The default value is {@code true}. + */ + ChannelConfig setAutoRead(boolean autoRead); } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index b2315daed7..c2649f184d 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -86,6 +86,11 @@ public abstract class ChannelHandlerAdapter extends ChannelStateHandlerAdapter i ctx.deregister(future); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + /** * Calls {@link ChannelHandlerContext#flush(ChannelFuture)} to forward * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 255a837076..0caa563a42 100755 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -329,19 +329,4 @@ public interface ChannelHandlerContext * {@link UnsupportedOperationException} is thrown. */ MessageBuf nextOutboundMessageBuffer(); - - /** - * Return {@code true} if the {@link ChannelHandlerContext} was marked as readable. This basically means - * that once its not readable anymore no new data will be read from the transport and passed down the - * {@link ChannelPipeline}. - * - * Only if all {@link ChannelHandlerContext}'s {@link #isReadable()} return {@code true}, the data is - * passed again down the {@link ChannelPipeline}. - */ - boolean isReadable(); - - /** - * Mark the {@link ChannelHandlerContext} as readable or suspend it. See {@link #isReadable()} - */ - void readable(boolean readable); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java index 47c1cd69dd..cd802cfc1f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java @@ -83,4 +83,10 @@ public interface ChannelInboundInvoker { * {@link Channel}. */ void fireInboundBufferUpdated(); + + /** + * Triggers an {@link ChannelStateHandler#inboundBufferSuspended(ChannelHandlerContext) inboundBufferSuspended} + * event to the next {@link ChannelStateHandler} in the {@link ChannelPipeline}. + */ + void fireInboundBufferSuspended(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java index ebde7e8ae7..f81a13e7f9 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOperationHandler.java @@ -71,6 +71,11 @@ public interface ChannelOperationHandler extends ChannelHandler { */ void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception; + /** + * Intercepts {@link ChannelHandlerContext#read()}. + */ + void read(ChannelHandlerContext ctx); + /** * Called once a flush operation is made and so the outbound data should be written. * diff --git a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java index f7260db0d3..c6d1af15e5 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOperationHandlerAdapter.java @@ -135,6 +135,11 @@ public abstract class ChannelOperationHandlerAdapter implements ChannelOperation ctx.deregister(future); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + /** * Calls {@link ChannelHandlerContext#flush(ChannelFuture)} to forward * to the next {@link ChannelOperationHandler} in the {@link ChannelPipeline}. diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 6eb0dc2876..8ac4adb79f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -44,6 +44,8 @@ public class ChannelOption extends UniqueName { new ChannelOption("WRITE_SPIN_COUNT"); public static final ChannelOption ALLOW_HALF_CLOSURE = new ChannelOption("ALLOW_HALF_CLOSURE"); + public static final ChannelOption AUTO_READ = + new ChannelOption("AUTO_READ"); public static final ChannelOption SO_BROADCAST = new ChannelOption("SO_BROADCAST"); diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index 304dc0acd9..2ee4e81eb7 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -140,6 +140,15 @@ public interface ChannelOutboundInvoker { */ ChannelFuture deregister(ChannelFuture future); + /** + * Reads data from the {@link Channel} into the first inbound buffer, triggers an + * {@link ChannelStateHandler#inboundBufferUpdated(ChannelHandlerContext) inboundBufferUpdated} event if data was + * read, and triggers an + * {@link ChannelStateHandler#inboundBufferSuspended(ChannelHandlerContext) inboundBufferSuspended} event so the + * handler can decide to continue reading. If there's a pending read operation already, this method does nothing. + */ + void read(); + /** * Flush all pending data which belongs to this ChannelOutboundInvoker and notify the {@link ChannelFuture} * once the operation completes, either because the operation was successful or because of an error. diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java index 0356173eaa..f0166ed749 100755 --- a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java @@ -49,4 +49,10 @@ public interface ChannelStateHandler extends ChannelHandler { * to wait for more data and consume it later. */ void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception; + + /** + * Invoked when a {@link ChannelHandlerContext#read()} is finished and the inbound buffer of this handler will not + * be updated until another {@link ChannelHandlerContext#read()} request is issued. + */ + void inboundBufferSuspended(ChannelHandlerContext ctx) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java index 5dc6ec2ccd..a9e62187fb 100644 --- a/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelStateHandlerAdapter.java @@ -153,4 +153,9 @@ public class ChannelStateHandlerAdapter implements ChannelStateHandler { } ctx.fireInboundBufferUpdated(); } + + @Override + public void inboundBufferSuspended(ChannelHandlerContext ctx) throws Exception { + ctx.fireInboundBufferSuspended(); + } } diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java index 0a1d2f336d..4ff912f970 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java @@ -204,6 +204,11 @@ public class CombinedChannelHandler extends ChannelStateHandlerAdapter implement out.deregister(ctx, future); } + @Override + public void read(ChannelHandlerContext ctx) { + out.read(ctx); + } + @Override public void flush( ChannelHandlerContext ctx, ChannelFuture future) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 8367e31c79..89782182ce 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -36,10 +36,11 @@ public class DefaultChannelConfig implements ChannelConfig { private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR; private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; private volatile int writeSpinCount = 16; + private volatile boolean autoRead = true; @Override public Map, Object> getOptions() { - return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT, ALLOCATOR); + return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ); } protected Map, Object> getOptions( @@ -70,8 +71,8 @@ public class DefaultChannelConfig implements ChannelConfig { return setAllOptions; } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public T getOption(ChannelOption option) { if (option == null) { throw new NullPointerException("option"); @@ -86,6 +87,9 @@ public class DefaultChannelConfig implements ChannelConfig { if (option == ALLOCATOR) { return (T) getAllocator(); } + if (option == AUTO_READ) { + return (T) Boolean.valueOf(isAutoRead()); + } return null; } @@ -100,6 +104,8 @@ public class DefaultChannelConfig implements ChannelConfig { setWriteSpinCount((Integer) value); } else if (option == ALLOCATOR) { setAllocator((ByteBufAllocator) value); + } else if (option == AUTO_READ) { + setAutoRead((Boolean) value); } else { return false; } @@ -157,4 +163,15 @@ public class DefaultChannelConfig implements ChannelConfig { this.allocator = allocator; return this; } + + @Override + public boolean isAutoRead() { + return autoRead; + } + + @Override + public ChannelConfig setAutoRead(boolean autoRead) { + this.autoRead = autoRead; + return this; + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 76e8c52078..e59dffa39d 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -31,7 +31,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static io.netty.channel.DefaultChannelPipeline.*; @@ -40,10 +39,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private static final EnumSet EMPTY_TYPE = EnumSet.noneOf(ChannelHandlerType.class); - static final int DIR_INBOUND = 1; - static final int DIR_OUTBOUND = 2; - - private static final int FLAG_NEEDS_LAZY_INIT = 4; + private static final int FLAG_STATE_HANDLER = 1; + static final int FLAG_OPERATION_HANDLER = 2; + static final int FLAG_INBOUND_HANDLER = 4; + private static final int FLAG_OUTBOUND_HANDLER = 8; + private static final int FLAG_NEEDS_LAZY_INIT = 16; volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext prev; @@ -54,7 +54,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private final Set type; private final ChannelHandler handler; final int flags; - final AtomicBoolean readable = new AtomicBoolean(true); // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. @@ -143,7 +142,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void run() { DefaultChannelHandlerContext next = nextContext( - DefaultChannelHandlerContext.this.next, DIR_INBOUND); + DefaultChannelHandlerContext.this.next, FLAG_STATE_HANDLER); if (next != null) { next.fillBridge(); EventExecutor executor = next.executor(); @@ -155,6 +154,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } }; + private final Runnable fireInboundBufferSuspendedTask = new Runnable() { + @Override + public void run() { + DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + try { + ((ChannelStateHandler) ctx.handler).inboundBufferSuspended(ctx); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } + }; private final Runnable freeInboundBufferTask = new Runnable() { @Override public void run() { @@ -176,12 +186,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } - DefaultChannelHandlerContext nextCtx = nextContext(ctx.next, DIR_INBOUND); + DefaultChannelHandlerContext nextCtx = nextContext(ctx.next, FLAG_STATE_HANDLER); if (nextCtx != null) { nextCtx.callFreeInboundBuffer(); } else { // Freed all inbound buffers. Free all outbound buffers in a reverse order. - pipeline.firstContext(DIR_OUTBOUND).callFreeOutboundBuffer(); + pipeline.lastContext(FLAG_OPERATION_HANDLER).callFreeOutboundBuffer(); } } }; @@ -206,13 +216,20 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } - DefaultChannelHandlerContext nextCtx = nextContext(ctx.prev, DIR_OUTBOUND); + DefaultChannelHandlerContext nextCtx = prevContext(ctx.prev, FLAG_OPERATION_HANDLER); if (nextCtx != null) { nextCtx.callFreeOutboundBuffer(); } } }; + final Runnable read0Task = new Runnable() { + @Override + public void run() { + pipeline.read0(DefaultChannelHandlerContext.this); + } + }; + @SuppressWarnings("unchecked") DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutorGroup group, @@ -232,16 +249,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements EnumSet type = EMPTY_TYPE.clone(); if (handler instanceof ChannelStateHandler) { type.add(ChannelHandlerType.STATE); - flags |= DIR_INBOUND; + flags |= FLAG_STATE_HANDLER; if (handler instanceof ChannelInboundHandler) { type.add(ChannelHandlerType.INBOUND); + flags |= FLAG_INBOUND_HANDLER; } } if (handler instanceof ChannelOperationHandler) { type.add(ChannelHandlerType.OPERATION); - flags |= DIR_OUTBOUND; + flags |= FLAG_OPERATION_HANDLER; if (handler instanceof ChannelOutboundHandler) { type.add(ChannelHandlerType.OUTBOUND); + flags |= FLAG_OUTBOUND_HANDLER; } } this.type = Collections.unmodifiableSet(type); @@ -267,7 +286,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements executor = null; } - if (type.contains(ChannelHandlerType.INBOUND)) { + if ((flags & FLAG_INBOUND_HANDLER) != 0) { Buf buf; try { buf = ((ChannelInboundHandler) handler).newInboundBuffer(this); @@ -299,7 +318,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements inMsgBridge = null; } - if (type.contains(ChannelHandlerType.OUTBOUND)) { + if ((flags & FLAG_OUTBOUND_HANDLER) != 0) { if (prev == null) { // Special case: if pref == null, it means this context for HeadHandler. // HeadHandler is an outbound handler instantiated by the constructor of DefaultChannelPipeline. @@ -930,7 +949,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void fireChannelRegistered() { - DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND); + DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER); if (next != null) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -943,7 +962,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void fireChannelUnregistered() { - DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND); + DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER); if (next != null) { EventExecutor executor = next.executor(); if (executor.inEventLoop() && prev != null) { @@ -956,7 +975,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void fireChannelActive() { - DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND); + DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER); if (next != null) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -969,7 +988,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void fireChannelInactive() { - DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND); + DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER); if (next != null) { EventExecutor executor = next.executor(); if (executor.inEventLoop() && prev != null) { @@ -1062,6 +1081,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } + @Override + public void fireInboundBufferSuspended() { + DefaultChannelHandlerContext next = nextContext(this.next, FLAG_STATE_HANDLER); + if (next != null) { + EventExecutor executor = next.executor(); + if (executor.inEventLoop() && prev != null) { + next.fireInboundBufferSuspendedTask.run(); + } else { + executor.execute(next.fireInboundBufferSuspendedTask); + } + } + } + @Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newFuture()); @@ -1104,7 +1136,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { - return pipeline.bind(nextContext(prev, DIR_OUTBOUND), localAddress, future); + return pipeline.bind(prevContext(prev, FLAG_OPERATION_HANDLER), localAddress, future); } @Override @@ -1114,29 +1146,34 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - return pipeline.connect(nextContext(prev, DIR_OUTBOUND), remoteAddress, localAddress, future); + return pipeline.connect(prevContext(prev, FLAG_OPERATION_HANDLER), remoteAddress, localAddress, future); } @Override public ChannelFuture disconnect(ChannelFuture future) { - return pipeline.disconnect(nextContext(prev, DIR_OUTBOUND), future); + return pipeline.disconnect(prevContext(prev, FLAG_OPERATION_HANDLER), future); } @Override public ChannelFuture close(ChannelFuture future) { - return pipeline.close(nextContext(prev, DIR_OUTBOUND), future); + return pipeline.close(prevContext(prev, FLAG_OPERATION_HANDLER), future); } @Override public ChannelFuture deregister(ChannelFuture future) { - return pipeline.deregister(nextContext(prev, DIR_OUTBOUND), future); + return pipeline.deregister(prevContext(prev, FLAG_OPERATION_HANDLER), future); + } + + @Override + public void read() { + pipeline.read(prevContext(prev, FLAG_OPERATION_HANDLER)); } @Override public ChannelFuture flush(final ChannelFuture future) { EventExecutor executor = executor(); if (executor.inEventLoop()) { - DefaultChannelHandlerContext prev = nextContext(this.prev, DIR_OUTBOUND); + DefaultChannelHandlerContext prev = prevContext(this.prev, FLAG_OPERATION_HANDLER); prev.fillBridge(); pipeline.flush(prev, future); } else { @@ -1269,23 +1306,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } - @Override - public boolean isReadable() { - return readable.get(); - } - - @Override - public void readable(boolean readable) { - pipeline.readable(this, readable); - } - @Override public ChannelFuture sendFile(FileRegion region) { - return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, newFuture()); + return pipeline.sendFile(prevContext(prev, FLAG_OPERATION_HANDLER), region, newFuture()); } @Override public ChannelFuture sendFile(FileRegion region, ChannelFuture future) { - return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, future); + return pipeline.sendFile(prevContext(prev, FLAG_OPERATION_HANDLER), region, future); } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 61fcea6f9a..dea990330a 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import static io.netty.channel.DefaultChannelHandlerContext.*; @@ -56,7 +55,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final Map childExecutors = new IdentityHashMap(); - private final AtomicInteger suspendRead = new AtomicInteger(); public DefaultChannelPipeline(Channel channel) { if (channel == null) { @@ -102,14 +100,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { // in order to avoid deadlock newCtx.executeOnEventLoop(new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - checkDuplicateName(name); - addFirst0(name, nextCtx, newCtx); - } + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + checkDuplicateName(name); + addFirst0(name, nextCtx, newCtx); } - }); + } + }); return this; } @@ -257,14 +255,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { // in order to avoid deadlock newCtx.executeOnEventLoop(new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - checkDuplicateName(name); - addAfter0(name, ctx, newCtx); - } + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + checkDuplicateName(name); + addAfter0(name, ctx, newCtx); } - }); + } + }); return this; } @@ -423,9 +421,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { name2ctx.remove(ctx.name()); callAfterRemove(ctx); - - // make sure the it's set back to readable - ctx.readable(true); } @Override @@ -455,13 +450,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { // in order to avoid deadlock oldTail.executeOnEventLoop(new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - removeLast0(oldTail); - } + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + removeLast0(oldTail); } - }); + } + }); return oldTail.handler(); } @@ -474,9 +469,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { name2ctx.remove(oldTail.name()); callBeforeRemove(oldTail); - - // make sure the it's set back to readable - oldTail.readable(true); } @Override @@ -586,10 +578,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { boolean removed = false; try { callAfterRemove(ctx); - - // clear readable suspend if necessary - ctx.readable(true); - removed = true; } catch (ChannelPipelineException e) { removeException = e; @@ -955,6 +943,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { public void fireChannelActive() { firedChannelActive = true; head.fireChannelActive(); + + if (channel.config().isAutoRead()) { + channel.read(); + } + if (fireInboundBufferUpdatedOnActivation) { fireInboundBufferUpdatedOnActivation = false; head.fireInboundBufferUpdated(); @@ -989,6 +982,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { head.fireInboundBufferUpdated(); } + @Override + public void fireInboundBufferSuspended() { + head.fireInboundBufferSuspended(); + if (channel.config().isAutoRead()) { + channel.read(); + } + } + @Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, channel.newFuture()); @@ -1031,7 +1032,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { - return bind(firstContext(DIR_OUTBOUND), localAddress, future); + return bind(lastContext(FLAG_OPERATION_HANDLER), localAddress, future); } ChannelFuture bind( @@ -1066,7 +1067,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - return connect(firstContext(DIR_OUTBOUND), remoteAddress, localAddress, future); + return connect(lastContext(FLAG_OPERATION_HANDLER), remoteAddress, localAddress, future); } ChannelFuture connect( @@ -1098,7 +1099,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture disconnect(ChannelFuture future) { - return disconnect(firstContext(DIR_OUTBOUND), future); + return disconnect(lastContext(FLAG_OPERATION_HANDLER), future); } ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { @@ -1130,7 +1131,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture close(ChannelFuture future) { - return close(firstContext(DIR_OUTBOUND), future); + return close(lastContext(FLAG_OPERATION_HANDLER), future); } ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { @@ -1156,7 +1157,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture deregister(final ChannelFuture future) { - return deregister(firstContext(DIR_OUTBOUND), future); + return deregister(lastContext(FLAG_OPERATION_HANDLER), future); } ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { @@ -1187,7 +1188,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelFuture sendFile(FileRegion region, ChannelFuture future) { - return sendFile(firstContext(DIR_OUTBOUND), region, future); + return sendFile(lastContext(FLAG_OPERATION_HANDLER), region, future); } ChannelFuture sendFile(final DefaultChannelHandlerContext ctx, final FileRegion region, @@ -1216,9 +1217,32 @@ final class DefaultChannelPipeline implements ChannelPipeline { return future; } + + @Override + public void read() { + read(lastContext(FLAG_OPERATION_HANDLER)); + } + + void read(final DefaultChannelHandlerContext ctx) { + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + read0(ctx); + } else { + executor.execute(ctx.read0Task); + } + } + + void read0(DefaultChannelHandlerContext ctx) { + try { + ((ChannelOperationHandler) ctx.handler()).read(ctx); + } catch (Throwable t) { + notifyHandlerException(t); + } + } + @Override public ChannelFuture flush(ChannelFuture future) { - return flush(firstContext(DIR_OUTBOUND), future); + return flush(lastContext(FLAG_OPERATION_HANDLER), future); } ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { @@ -1230,7 +1254,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { executor.execute(new Runnable() { @Override public void run() { - flush(ctx, future); + flush0(ctx, future); } }); } @@ -1356,36 +1380,35 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - DefaultChannelHandlerContext firstContext(int direction) { - assert direction == DIR_INBOUND || direction == DIR_OUTBOUND; - if (direction == DIR_INBOUND) { - return nextContext(head.next, DIR_INBOUND); - } else { // DIR_OUTBOUND - return nextContext(tail, DIR_OUTBOUND); - } + DefaultChannelHandlerContext lastContext(int flag) { + return prevContext(tail, flag); } - static DefaultChannelHandlerContext nextContext( - DefaultChannelHandlerContext ctx, int direction) { - assert direction == DIR_INBOUND || direction == DIR_OUTBOUND; + static DefaultChannelHandlerContext nextContext(DefaultChannelHandlerContext ctx, int flag) { if (ctx == null) { return null; } DefaultChannelHandlerContext realCtx = ctx; - if (direction == DIR_INBOUND) { - while ((realCtx.flags & DIR_INBOUND) == 0) { - realCtx = realCtx.next; - if (realCtx == null) { - return null; - } + while ((realCtx.flags & flag) == 0) { + realCtx = realCtx.next; + if (realCtx == null) { + return null; } - } else { // DIR_OUTBOUND - while ((realCtx.flags & DIR_OUTBOUND) == 0) { - realCtx = realCtx.prev; - if (realCtx == null) { - return null; - } + } + return realCtx; + } + + static DefaultChannelHandlerContext prevContext(DefaultChannelHandlerContext ctx, int flag) { + if (ctx == null) { + return null; + } + + DefaultChannelHandlerContext realCtx = ctx; + while ((realCtx.flags & flag) == 0) { + realCtx = realCtx.prev; + if (realCtx == null) { + return null; } } return realCtx; @@ -1460,20 +1483,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - void readable(DefaultChannelHandlerContext ctx, boolean readable) { - if (ctx.readable.compareAndSet(!readable, readable)) { - if (!readable) { - if (suspendRead.incrementAndGet() == 1) { - unsafe.suspendRead(); - } - } else { - if (suspendRead.decrementAndGet() == 0) { - unsafe.resumeRead(); - } - } - } - } - private final class HeadHandler implements ChannelOutboundHandler { @Override public Buf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { @@ -1542,6 +1551,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { unsafe.deregister(future); } + @Override + public void read(ChannelHandlerContext ctx) { + unsafe.beginRead(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { unsafe.flush(future); diff --git a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java index 55a6198bb7..c8338e122c 100755 --- a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java @@ -233,6 +233,11 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel { // NOOP } + @Override + protected void doBeginRead() throws Exception { + // NOOP + } + @Override protected AbstractUnsafe newUnsafe() { return new DefaultUnsafe(); @@ -317,16 +322,6 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel { SocketAddress localAddress, ChannelFuture future) { future.setSuccess(); } - - @Override - public void suspendRead() { - // NOOP - } - - @Override - public void resumeRead() { - // NOOP - } } private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java index 53e2e04e75..ecb9b447c4 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java @@ -24,7 +24,7 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelPipeline; /** - * Embedded {@@link Channel} which operates on messages which can be of any time. + * Embedded {@link Channel} which operates on messages which can be of any time. */ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { @@ -47,6 +47,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { return pipeline().inboundMessageBuffer(); } + @Override @SuppressWarnings("unchecked") public MessageBuf lastOutboundBuffer() { return (MessageBuf) lastOutboundBuffer; diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 22f836f3f8..bf5c249440 100755 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -56,6 +56,7 @@ public class LocalChannel extends AbstractChannel { private volatile LocalAddress localAddress; private volatile LocalAddress remoteAddress; private volatile ChannelFuture connectFuture; + private volatile boolean readInProgress; public LocalChannel() { this(null); @@ -207,6 +208,23 @@ public class LocalChannel extends AbstractChannel { ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); } + @Override + protected void doBeginRead() throws Exception { + if (readInProgress) { + return; + } + + ChannelPipeline pipeline = pipeline(); + MessageBuf buf = pipeline.inboundMessageBuffer(); + if (buf.isEmpty()) { + readInProgress = true; + return; + } + + pipeline.fireInboundBufferUpdated(); + pipeline.fireInboundBufferSuspended(); + } + @Override protected void doFlushMessageBuffer(MessageBuf buf) throws Exception { if (state < 2) { @@ -222,7 +240,7 @@ public class LocalChannel extends AbstractChannel { if (peerLoop == eventLoop()) { buf.drainTo(peerPipeline.inboundMessageBuffer()); - peerPipeline.fireInboundBufferUpdated(); + finishPeerRead(peer, peerPipeline); } else { final Object[] msgs = buf.toArray(); buf.clear(); @@ -231,12 +249,20 @@ public class LocalChannel extends AbstractChannel { public void run() { MessageBuf buf = peerPipeline.inboundMessageBuffer(); Collections.addAll(buf, msgs); - peerPipeline.fireInboundBufferUpdated(); + finishPeerRead(peer, peerPipeline); } }); } } + private static void finishPeerRead(LocalChannel peer, ChannelPipeline peerPipeline) { + if (peer.readInProgress) { + peer.readInProgress = false; + peerPipeline.fireInboundBufferUpdated(); + peerPipeline.fireInboundBufferSuspended(); + } + } + @Override protected boolean isFlushPending() { return false; @@ -305,15 +331,5 @@ public class LocalChannel extends AbstractChannel { }); } } - - @Override - public void suspendRead() { - // TODO: Implement me - } - - @Override - public void resumeRead() { - // TODO: Implement me - } } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index e5c7672da4..266c2422bc 100755 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -15,8 +15,10 @@ */ package io.netty.channel.local; +import io.netty.buffer.MessageBuf; import io.netty.channel.AbstractServerChannel; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; @@ -40,6 +42,7 @@ public class LocalServerChannel extends AbstractServerChannel { private volatile int state; // 0 - open, 1 - active, 2 - closed private volatile LocalAddress localAddress; + private volatile boolean acceptInProgress; /** * Creates a new instance @@ -127,6 +130,23 @@ public class LocalServerChannel extends AbstractServerChannel { ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); } + @Override + protected void doBeginRead() throws Exception { + if (acceptInProgress) { + return; + } + + ChannelPipeline pipeline = pipeline(); + MessageBuf buf = pipeline.inboundMessageBuffer(); + if (buf.isEmpty()) { + acceptInProgress = true; + return; + } + + pipeline.fireInboundBufferUpdated(); + pipeline.fireInboundBufferSuspended(); + } + LocalChannel serve(final LocalChannel peer) { LocalChannel child = new LocalChannel(this, peer); serve0(child); @@ -135,8 +155,13 @@ public class LocalServerChannel extends AbstractServerChannel { private void serve0(final LocalChannel child) { if (eventLoop().inEventLoop()) { - pipeline().inboundMessageBuffer().add(child); - pipeline().fireInboundBufferUpdated(); + final ChannelPipeline pipeline = pipeline(); + pipeline.inboundMessageBuffer().add(child); + if (acceptInProgress) { + acceptInProgress = false; + pipeline.fireInboundBufferUpdated(); + pipeline.fireInboundBufferSuspended(); + } } else { eventLoop().execute(new Runnable() { @Override @@ -146,22 +171,4 @@ public class LocalServerChannel extends AbstractServerChannel { }); } } - - @Override - protected AbstractUnsafe newUnsafe() { - return new LocalServerUnsafe(); - } - - private final class LocalServerUnsafe extends AbstractServerUnsafe { - - @Override - public void suspendRead() { - // TODO: Implement me - } - - @Override - public void resumeRead() { - // TODO: Implement me - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java index b1bd8fcc6f..48dee5b94f 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramChannelConfig.java @@ -163,4 +163,7 @@ public interface DatagramChannelConfig extends ChannelConfig { @Override DatagramChannelConfig setAllocator(ByteBufAllocator allocator); + + @Override + DatagramChannelConfig setAutoRead(boolean autoRead); } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java index 34fbe7905f..730941123e 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java @@ -376,4 +376,9 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement public DatagramChannelConfig setAllocator(ByteBufAllocator allocator) { return (DatagramChannelConfig) super.setAllocator(allocator); } + + @Override + public DatagramChannelConfig setAutoRead(boolean autoRead) { + return (DatagramChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java index 3cf47ad81f..af5842a19a 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSctpChannelConfig.java @@ -172,4 +172,9 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc public SctpChannelConfig setAllocator(ByteBufAllocator allocator) { return (SctpChannelConfig) super.setAllocator(allocator); } + + @Override + public SctpChannelConfig setAutoRead(boolean autoRead) { + return (SctpChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java index 1ab3bbe7cf..c9365e05db 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSctpServerChannelConfig.java @@ -26,9 +26,7 @@ import io.netty.util.NetUtil; import java.io.IOException; import java.util.Map; -import static com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS; -import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF; -import static com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; /** * The default {@link SctpServerChannelConfig} implementation for SCTP. @@ -169,4 +167,9 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme public SctpServerChannelConfig setAllocator(ByteBufAllocator allocator) { return (SctpServerChannelConfig) super.setAllocator(allocator); } + + @Override + public SctpServerChannelConfig setAutoRead(boolean autoRead) { + return (SctpServerChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java index b754e0b87c..49d73d91d5 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java @@ -15,8 +15,6 @@ */ package io.netty.channel.socket; -import static io.netty.channel.ChannelOption.*; - import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; @@ -27,6 +25,8 @@ import java.net.ServerSocket; import java.net.SocketException; import java.util.Map; +import static io.netty.channel.ChannelOption.*; + /** * The default {@link ServerSocketChannelConfig} implementation. */ @@ -156,4 +156,9 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { return (ServerSocketChannelConfig) super.setAllocator(allocator); } + + @Override + public ServerSocketChannelConfig setAutoRead(boolean autoRead) { + return (ServerSocketChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java index 31663215aa..b51270c871 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java @@ -15,8 +15,6 @@ */ package io.netty.channel.socket; -import static io.netty.channel.ChannelOption.*; - import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; @@ -26,6 +24,8 @@ import java.net.Socket; import java.net.SocketException; import java.util.Map; +import static io.netty.channel.ChannelOption.*; + /** * The default {@link SocketChannelConfig} implementation. */ @@ -280,4 +280,9 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig public SocketChannelConfig setAllocator(ByteBufAllocator allocator) { return (SocketChannelConfig) super.setAllocator(allocator); } + + @Override + public SocketChannelConfig setAutoRead(boolean autoRead) { + return (SocketChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/SctpChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SctpChannelConfig.java index a2bc80fa2c..da8f2359ff 100644 --- a/transport/src/main/java/io/netty/channel/socket/SctpChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/SctpChannelConfig.java @@ -18,7 +18,7 @@ package io.netty.channel.socket; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelConfig; -import static com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; /** * A {@link ChannelConfig} for a {@link SctpChannel}. @@ -101,4 +101,7 @@ public interface SctpChannelConfig extends ChannelConfig { @Override SctpChannelConfig setAllocator(ByteBufAllocator allocator); + + @Override + SctpChannelConfig setAutoRead(boolean autoRead); } diff --git a/transport/src/main/java/io/netty/channel/socket/SctpServerChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SctpServerChannelConfig.java index ddddb5e620..926d631908 100644 --- a/transport/src/main/java/io/netty/channel/socket/SctpServerChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/SctpServerChannelConfig.java @@ -18,7 +18,7 @@ package io.netty.channel.socket; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelConfig; -import static com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams; +import static com.sun.nio.sctp.SctpStandardSocketOptions.*; /** * A {@link ChannelConfig} for a {@link SctpServerChannelConfig}. @@ -100,4 +100,7 @@ public interface SctpServerChannelConfig extends ChannelConfig { @Override SctpServerChannelConfig setAllocator(ByteBufAllocator allocator); + + @Override + SctpServerChannelConfig setAutoRead(boolean autoRead); } diff --git a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java index a299f1c015..f341e12c47 100644 --- a/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/ServerSocketChannelConfig.java @@ -90,4 +90,7 @@ public interface ServerSocketChannelConfig extends ChannelConfig { @Override ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator); + + @Override + ServerSocketChannelConfig setAutoRead(boolean autoRead); } diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java index cec4db31c7..5a00d3c11d 100644 --- a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java @@ -153,4 +153,7 @@ public interface SocketChannelConfig extends ChannelConfig { @Override SocketChannelConfig setAllocator(ByteBufAllocator allocator); + + @Override + SocketChannelConfig setAutoRead(boolean autoRead); } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index 198e0abfc7..7d84799c4b 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -94,7 +94,12 @@ abstract class AbstractAioChannel extends AbstractChannel { return loop instanceof AioEventLoop; } - protected abstract class AbstractAioUnsafe extends AbstractUnsafe { + @Override + protected AbstractUnsafe newUnsafe() { + return new DefaultAioUnsafe(); + } + + protected final class DefaultAioUnsafe extends AbstractUnsafe { @Override public void connect(final SocketAddress remoteAddress, @@ -144,13 +149,13 @@ abstract class AbstractAioChannel extends AbstractChannel { } } - protected final void connectFailed(Throwable t) { + protected void connectFailed(Throwable t) { connectFuture.setFailure(t); pipeline().fireExceptionCaught(t); closeIfClosed(); } - protected final void connectSuccess() { + protected void connectSuccess() { assert eventLoop().inEventLoop(); assert connectFuture != null; try { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 2818880719..e97792b803 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -31,7 +31,6 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; -import java.util.concurrent.atomic.AtomicBoolean; /** * {@link ServerSocketChannel} implementation which uses NIO2. @@ -47,16 +46,8 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server InternalLoggerFactory.getInstance(AioServerSocketChannel.class); private final AioServerSocketChannelConfig config; + private boolean acceptInProgress; private boolean closed; - private final AtomicBoolean readSuspended = new AtomicBoolean(); - - private final Runnable acceptTask = new Runnable() { - - @Override - public void run() { - doAccept(); - } - }; private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) { try { @@ -122,13 +113,15 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server protected void doBind(SocketAddress localAddress) throws Exception { AsynchronousServerSocketChannel ch = javaChannel(); ch.bind(localAddress, config.getBacklog()); - doAccept(); } - private void doAccept() { - if (readSuspended.get()) { + @Override + protected void doBeginRead() { + if (acceptInProgress) { return; } + + acceptInProgress = true; javaChannel().accept(this, ACCEPT_HANDLER); } @@ -172,17 +165,17 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) { - // register again this handler to accept new connections - channel.doAccept(); - + channel.acceptInProgress = false; // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add( new AioSocketChannel(channel, null, ch)); channel.pipeline().fireInboundBufferUpdated(); + channel.pipeline().fireInboundBufferSuspended(); } @Override protected void failed0(Throwable t, AioServerSocketChannel channel) { + channel.acceptInProgress = false; boolean asyncClosed = false; if (t instanceof AsynchronousCloseException) { asyncClosed = true; @@ -200,28 +193,4 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server public ServerSocketChannelConfig config() { return config; } - - @Override - protected AbstractUnsafe newUnsafe() { - return new AioServerSocketUnsafe(); - } - - private final class AioServerSocketUnsafe extends AbstractAioUnsafe { - - @Override - public void suspendRead() { - readSuspended.set(true); - } - - @Override - public void resumeRead() { - if (readSuspended.compareAndSet(true, false)) { - if (eventLoop().inEventLoop()) { - doAccept(); - } else { - eventLoop().execute(acceptTask); - } - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java index b72250098e..a77ff2579e 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java @@ -15,8 +15,6 @@ */ package io.netty.channel.socket.aio; -import static io.netty.channel.ChannelOption.*; - import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; @@ -32,6 +30,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import static io.netty.channel.ChannelOption.*; + /** * The Async {@link ServerSocketChannelConfig} implementation. */ @@ -213,4 +213,9 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { return (ServerSocketChannelConfig) super.setAllocator(allocator); } + + @Override + public ServerSocketChannelConfig setAutoRead(boolean autoRead) { + return (ServerSocketChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 67dc5c4480..72eeda7ec3 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -20,11 +20,11 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFlushFutureNotifier; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; +import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.SocketChannel; import java.io.IOException; @@ -38,7 +38,6 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.WritableByteChannel; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** @@ -68,19 +67,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private volatile boolean inputShutdown; private volatile boolean outputShutdown; - private boolean asyncWriteInProgress; + private boolean readInProgress; + private boolean writeInProgress; private boolean inDoFlushByteBuffer; - private boolean asyncReadInProgress; - private boolean inBeginRead; - - private final AtomicBoolean readSuspended = new AtomicBoolean(); - - private final Runnable readTask = new Runnable() { - @Override - public void run() { - beginRead(); - } - }; /** * Create a new instance which has not yet attached an {@link AsynchronousSocketChannel}. The @@ -207,16 +196,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne config.assign(javaChannel()); } - if (remoteAddress() == null) { - return null; - } - - return new Runnable() { - @Override - public void run() { - beginRead(); - } - }; + return null; } private static void expandReadBuffer(ByteBuf byteBuf) { @@ -267,7 +247,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void doFlushByteBuffer(ByteBuf buf) throws Exception { - if (inDoFlushByteBuffer || asyncWriteInProgress) { + if (inDoFlushByteBuffer || writeInProgress) { return; } @@ -284,7 +264,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne // discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly. buf.discardReadBytes(); - asyncWriteInProgress = true; + writeInProgress = true; if (buf.nioBufferCount() == 1) { javaChannel().write( buf.nioBuffer(), config.getWriteTimeout(), TimeUnit.MILLISECONDS, this, WRITE_HANDLER); @@ -300,7 +280,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - if (asyncWriteInProgress) { + if (writeInProgress) { // JDK decided to write data (or notify handler) later. buf.suspendIntermediaryDeallocations(); break; @@ -328,53 +308,35 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne region.transferTo(new WritableByteChannelAdapter(region, future), 0); } - private void beginRead() { - if (inBeginRead || asyncReadInProgress || readSuspended.get()) { + @Override + protected void doBeginRead() { + if (readInProgress || inputShutdown) { return; } - inBeginRead = true; + ByteBuf byteBuf = pipeline().inboundByteBuffer(); + if (!byteBuf.readable()) { + byteBuf.discardReadBytes(); + } - try { - for (;;) { - ByteBuf byteBuf = pipeline().inboundByteBuffer(); - if (inputShutdown) { - break; - } + expandReadBuffer(byteBuf); - if (!byteBuf.readable()) { - byteBuf.discardReadBytes(); - } - - expandReadBuffer(byteBuf); - - asyncReadInProgress = true; - if (byteBuf.nioBufferCount() == 1) { - // Get a ByteBuffer view on the ByteBuf - ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); - javaChannel().read( - buffer, config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER); - } else { - ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); - if (buffers.length == 1) { - javaChannel().read( - buffers[0], config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER); - } else { - javaChannel().read( - buffers, 0, buffers.length, config.getReadTimeout(), TimeUnit.MILLISECONDS, - this, SCATTERING_READ_HANDLER); - } - } - - if (asyncReadInProgress) { - // JDK decided to read data (or notify handler) later. - break; - } - - // The read operation has been finished immediately - schedule another read operation. + readInProgress = true; + if (byteBuf.nioBufferCount() == 1) { + // Get a ByteBuffer view on the ByteBuf + ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); + javaChannel().read( + buffer, config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER); + } else { + ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); + if (buffers.length == 1) { + javaChannel().read( + buffers[0], config.getReadTimeout(), TimeUnit.MILLISECONDS, this, READ_HANDLER); + } else { + javaChannel().read( + buffers, 0, buffers.length, config.getReadTimeout(), TimeUnit.MILLISECONDS, + this, SCATTERING_READ_HANDLER); } - } finally { - inBeginRead = false; } } @@ -382,7 +344,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void completed0(T result, AioSocketChannel channel) { - channel.asyncWriteInProgress = false; + channel.writeInProgress = false; ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); buf.resumeIntermediaryDeallocations(); @@ -419,7 +381,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void failed0(Throwable cause, AioSocketChannel channel) { - channel.asyncWriteInProgress = false; + channel.writeInProgress = false; channel.flushFutureNotifier.notifyFlushFutures(cause); // Check if the exception was raised because of an InterruptedByTimeoutException which means that the @@ -444,7 +406,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void completed0(T result, AioSocketChannel channel) { - channel.asyncReadInProgress = false; + channel.readInProgress = false; if (channel.inputShutdown) { // Channel has been closed during read. Because the inbound buffer has been deallocated already, @@ -457,6 +419,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne boolean closed = false; boolean read = false; + boolean firedInboundBufferSuspended = false; try { int localReadAmount = result.intValue(); if (localReadAmount > 0) { @@ -477,11 +440,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne pipeline.fireInboundBufferUpdated(); } + firedInboundBufferSuspended = true; + pipeline.fireInboundBufferSuspended(); pipeline.fireExceptionCaught(t); } finally { if (read) { pipeline.fireInboundBufferUpdated(); } + if (!firedInboundBufferSuspended) { + pipeline.fireInboundBufferSuspended(); + } // Double check because fireInboundBufferUpdated() might have triggered the closure by a user handler. if (closed || !channel.isOpen()) { @@ -493,16 +461,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne channel.unsafe().close(channel.unsafe().voidFuture()); } } - } else { - // Schedule another read operation. - channel.beginRead(); } } } @Override protected void failed0(Throwable t, AioSocketChannel channel) { - channel.asyncReadInProgress = false; + channel.readInProgress = false; if (t instanceof ClosedChannelException) { channel.inputShutdown = true; return; @@ -516,9 +481,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne // See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html if (t instanceof IOException || t instanceof InterruptedByTimeoutException) { channel.unsafe().close(channel.unsafe().voidFuture()); - } else { - // Schedule another read operation. - channel.beginRead(); } } } @@ -527,14 +489,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void completed0(Void result, AioSocketChannel channel) { - ((AbstractAioUnsafe) channel.unsafe()).connectSuccess(); + ((DefaultAioUnsafe) channel.unsafe()).connectSuccess(); channel.pipeline().fireChannelActive(); - channel.beginRead(); } @Override protected void failed0(Throwable exc, AioSocketChannel channel) { - ((AbstractAioUnsafe) channel.unsafe()).connectFailed(exc); + ((DefaultAioUnsafe) channel.unsafe()).connectFailed(exc); } } @@ -543,34 +504,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne return config; } - @Override - protected AbstractUnsafe newUnsafe() { - return new AioSocketChannelAsyncUnsafe(); - } - - private final class AioSocketChannelAsyncUnsafe extends AbstractAioUnsafe { - - @Override - public void suspendRead() { - readSuspended.set(true); - } - - @Override - public void resumeRead() { - if (readSuspended.compareAndSet(true, false)) { - if (inputShutdown) { - return; - } - - if (eventLoop().inEventLoop()) { - beginRead(); - } else { - eventLoop().execute(readTask); - } - } - } - } - private final class WritableByteChannelAdapter implements WritableByteChannel { private final FileRegion region; private final ChannelFuture future; diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index 93f38dc7c9..3e871faa5b 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.aio; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.socket.SocketChannelConfig; + import java.nio.channels.InterruptedByTimeoutException; @@ -61,6 +62,9 @@ public interface AioSocketChannelConfig extends SocketChannelConfig { @Override AioSocketChannelConfig setAllocator(ByteBufAllocator allocator); + @Override + AioSocketChannelConfig setAutoRead(boolean autoRead); + /** * Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown. * Once such an exception was detected it will get propagated to the handlers first. After that the channel diff --git a/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioSocketChannelConfig.java index 0ef9bd4c82..7a4e4c0829 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioSocketChannelConfig.java @@ -336,4 +336,9 @@ final class DefaultAioSocketChannelConfig extends DefaultChannelConfig public AioSocketChannelConfig setAllocator(ByteBufAllocator allocator) { return (AioSocketChannelConfig) super.setAllocator(allocator); } + + @Override + public AioSocketChannelConfig setAutoRead(boolean autoRead) { + return (AioSocketChannelConfig) super.setAutoRead(autoRead); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index cd280d595e..f933c5d2c8 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -18,10 +18,10 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.FileRegion; +import io.netty.channel.socket.ChannelInputShutdownEvent; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -55,11 +55,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override public void read() { assert eventLoop().inEventLoop(); + final SelectionKey key = selectionKey(); + key.interestOps(key.interestOps() & ~readInterestOp); final ChannelPipeline pipeline = pipeline(); final ByteBuf byteBuf = pipeline.inboundByteBuffer(); boolean closed = false; boolean read = false; + boolean firedInboundBufferSuspended = false; try { expandReadBuffer(byteBuf); loop: for (;;) { @@ -96,6 +99,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { read = false; pipeline.fireInboundBufferUpdated(); } + + firedInboundBufferSuspended = true; + pipeline.fireInboundBufferSuspended(); + pipeline().fireExceptionCaught(t); if (t instanceof IOException) { close(voidFuture()); @@ -104,11 +111,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (read) { pipeline.fireInboundBufferUpdated(); } + if (!firedInboundBufferSuspended) { + pipeline.fireInboundBufferSuspended(); + } + if (closed) { setInputShutdown(); if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - suspendReadTask.run(); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidFuture()); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index a0a813bd15..39b6f43cae 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -43,25 +43,11 @@ public abstract class AbstractNioChannel extends AbstractChannel { InternalLoggerFactory.getInstance(AbstractNioChannel.class); private final SelectableChannel ch; - private final int readInterestOp; + protected final int readInterestOp; private volatile SelectionKey selectionKey; private volatile boolean inputShutdown; final Queue> writableTasks = new ConcurrentLinkedQueue>(); - final Runnable suspendReadTask = new Runnable() { - @Override - public void run() { - selectionKey().interestOps(selectionKey().interestOps() & ~readInterestOp); - } - }; - - final Runnable resumeReadTask = new Runnable() { - @Override - public void run() { - selectionKey().interestOps(selectionKey().interestOps() | readInterestOp); - } - }; - /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. @@ -249,30 +235,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { connectFuture = null; } } - - @Override - public void suspendRead() { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - suspendReadTask.run(); - } else { - loop.execute(suspendReadTask); - } - } - - @Override - public void resumeRead() { - if (inputShutdown) { - return; - } - - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - resumeReadTask.run(); - } else { - loop.execute(resumeReadTask); - } - } } @Override @@ -288,9 +250,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - NioEventLoop loop = eventLoop(); - selectionKey = javaChannel().register( - loop.selector, isActive() && !inputShutdown ? readInterestOp : 0, this); + selectionKey = javaChannel().register(eventLoop().selector, 0, this); return null; } @@ -299,6 +259,25 @@ public abstract class AbstractNioChannel extends AbstractChannel { eventLoop().cancel(selectionKey()); } + @Override + protected void doBeginRead() throws Exception { + if (inputShutdown) { + return; + } + + final SelectionKey selectionKey = this.selectionKey; + if (!selectionKey.isValid()) { + return; + } + + final int interestOps = selectionKey.interestOps(); + if ((interestOps & readInterestOp) != 0) { + return; + } + + this.selectionKey.interestOps(interestOps | readInterestOp); + } + /** * Conect to the remote peer */ diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 7d25a0c868..c9165131a0 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelPipeline; import java.io.IOException; import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; /** * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages. @@ -44,11 +45,14 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { @Override public void read() { assert eventLoop().inEventLoop(); + final SelectionKey key = selectionKey(); + key.interestOps(key.interestOps() & ~readInterestOp); final ChannelPipeline pipeline = pipeline(); final MessageBuf msgBuf = pipeline.inboundMessageBuffer(); boolean closed = false; boolean read = false; + boolean firedInboundBufferSuspended = false; try { for (;;) { int localReadAmount = doReadMessages(msgBuf); @@ -66,6 +70,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { read = false; pipeline.fireInboundBufferUpdated(); } + + firedInboundBufferSuspended = true; + pipeline.fireInboundBufferSuspended(); + pipeline().fireExceptionCaught(t); if (t instanceof IOException) { close(voidFuture()); @@ -74,6 +82,9 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { if (read) { pipeline.fireInboundBufferUpdated(); } + if (!firedInboundBufferSuspended) { + pipeline.fireInboundBufferSuspended(); + } if (closed && isOpen()) { close(voidFuture()); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java index 54eda716b7..ad65b72710 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSctpServerChannel.java @@ -118,8 +118,6 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().bind(localAddress, config.getBacklog()); - SelectionKey selectionKey = selectionKey(); - selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index e475170b17..18f09ea6be 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -90,8 +90,6 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); - SelectionKey selectionKey = selectionKey(); - selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index f937cb85bf..0fd6ffaa05 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -213,10 +213,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); - - // FIXME: This is not as efficient as Netty 3's SendBufferPool if heap buffer is used - // because of potentially unwanted repetitive memory copy in case of - // a slow connection or a large output buffer that triggers OP_WRITE. final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); final SelectionKey key = selectionKey(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index 91ab8c01d8..44f297d497 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -17,9 +17,9 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelInputShutdownEvent; import java.io.IOException; @@ -42,88 +42,85 @@ abstract class AbstractOioByteChannel extends AbstractOioChannel { } @Override - protected AbstractOioUnsafe newUnsafe() { - return new OioByteUnsafe(); - } - - private final class OioByteUnsafe extends AbstractOioUnsafe { - @Override - public void read() { - assert eventLoop().inEventLoop(); - - if (inputShutdown) { - try { - Thread.sleep(SO_TIMEOUT); - } catch (InterruptedException e) { - // ignore - } - return; - } - - final ChannelPipeline pipeline = pipeline(); - final ByteBuf byteBuf = pipeline.inboundByteBuffer(); - boolean closed = false; - boolean read = false; + protected void doRead() { + if (inputShutdown) { try { - for (;;) { - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - } + Thread.sleep(SO_TIMEOUT); + } catch (InterruptedException e) { + // ignore + } + return; + } - final int available = available(); - if (available <= 0) { - break; - } + final ChannelPipeline pipeline = pipeline(); + final ByteBuf byteBuf = pipeline.inboundByteBuffer(); + boolean closed = false; + boolean read = false; + boolean firedInboundBufferSuspeneded = false; + try { + for (;;) { + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } - if (byteBuf.writable()) { - continue; - } + final int available = available(); + if (available <= 0) { + break; + } - final int capacity = byteBuf.capacity(); - final int maxCapacity = byteBuf.maxCapacity(); - if (capacity == maxCapacity) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - if (!byteBuf.writable()) { - throw new IllegalStateException( - "an inbound handler whose buffer is full must consume at " + - "least one byte."); - } + if (byteBuf.writable()) { + continue; + } + + final int capacity = byteBuf.capacity(); + final int maxCapacity = byteBuf.maxCapacity(); + if (capacity == maxCapacity) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + if (!byteBuf.writable()) { + throw new IllegalStateException( + "an inbound handler whose buffer is full must consume at " + + "least one byte."); } + } + } else { + final int writerIndex = byteBuf.writerIndex(); + if (writerIndex + available > maxCapacity) { + byteBuf.capacity(maxCapacity); } else { - final int writerIndex = byteBuf.writerIndex(); - if (writerIndex + available > maxCapacity) { - byteBuf.capacity(maxCapacity); - } else { - byteBuf.ensureWritableBytes(available); - } + byteBuf.ensureWritableBytes(available); } } - } catch (Throwable t) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - } - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); - } - } finally { - if (read) { - pipeline.fireInboundBufferUpdated(); - } - if (closed) { - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidFuture()); - } + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + firedInboundBufferSuspeneded = true; + pipeline.fireInboundBufferSuspended(); + pipeline.fireExceptionCaught(t); + if (t instanceof IOException) { + unsafe().close(unsafe().voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (!firedInboundBufferSuspeneded) { + pipeline.fireInboundBufferSuspended(); + } + if (closed) { + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + unsafe().close(unsafe().voidFuture()); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index fc10a27a89..60c44045cd 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -30,7 +30,15 @@ abstract class AbstractOioChannel extends AbstractChannel { static final int SO_TIMEOUT = 1000; - protected volatile boolean readSuspended; + private boolean readInProgress; + + private final Runnable readTask = new Runnable() { + @Override + public void run() { + readInProgress = false; + doRead(); + } + }; /** * @see AbstractChannel#AbstractChannel(Channel, Integer) @@ -50,15 +58,11 @@ abstract class AbstractOioChannel extends AbstractChannel { } @Override - public OioUnsafe unsafe() { - return (OioUnsafe) super.unsafe(); + protected AbstractUnsafe newUnsafe() { + return new DefaultOioUnsafe(); } - public interface OioUnsafe extends Unsafe { - void read(); - } - - abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe { + private final class DefaultOioUnsafe extends AbstractUnsafe { @Override public void connect( final SocketAddress remoteAddress, @@ -88,16 +92,6 @@ abstract class AbstractOioChannel extends AbstractChannel { }); } } - - @Override - public void suspendRead() { - readSuspended = true; - } - - @Override - public void resumeRead() { - readSuspended = false; - } } @Override @@ -115,4 +109,16 @@ abstract class AbstractOioChannel extends AbstractChannel { */ protected abstract void doConnect( SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; + + @Override + protected void doBeginRead() throws Exception { + if (readInProgress) { + return; + } + + readInProgress = true; + eventLoop().execute(readTask); + } + + protected abstract void doRead(); } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index 61f2b3343b..1357c29c67 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -34,42 +34,39 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel { } @Override - protected AbstractOioUnsafe newUnsafe() { - return new OioMessageUnsafe(); - } - - private final class OioMessageUnsafe extends AbstractOioUnsafe { - @Override - public void read() { - assert eventLoop().inEventLoop(); - - final ChannelPipeline pipeline = pipeline(); - final MessageBuf msgBuf = pipeline.inboundMessageBuffer(); - boolean closed = false; - boolean read = false; - try { - int localReadAmount = doReadMessages(msgBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - } - } catch (Throwable t) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - } - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); - } - } finally { - if (read) { - pipeline.fireInboundBufferUpdated(); - } - if (closed && isOpen()) { - close(voidFuture()); - } + protected void doRead() { + final ChannelPipeline pipeline = pipeline(); + final MessageBuf msgBuf = pipeline.inboundMessageBuffer(); + boolean closed = false; + boolean read = false; + boolean firedInboundBufferSuspended = false; + try { + int localReadAmount = doReadMessages(msgBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + firedInboundBufferSuspended = true; + pipeline.fireInboundBufferSuspended(); + pipeline.fireExceptionCaught(t); + if (t instanceof IOException) { + unsafe().close(unsafe().voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (!firedInboundBufferSuspended) { + pipeline.fireInboundBufferSuspended(); + } + if (closed && isOpen()) { + unsafe().close(unsafe().voidFuture()); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 0bfb3c47bb..f2adcd7f3a 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -181,15 +181,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel @Override protected int doReadMessages(MessageBuf buf) throws Exception { - if (readSuspended) { - try { - Thread.sleep(SO_TIMEOUT); - } catch (InterruptedException e) { - // ignore; - } - return 0; - } - int packetSize = config().getReceivePacketSize(); byte[] data = new byte[packetSize]; tmpPacket.setData(data); @@ -202,11 +193,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel buf.add(new DatagramPacket(Unpooled.wrappedBuffer( data, tmpPacket.getOffset(), tmpPacket.getLength()), remoteAddr)); - if (readSuspended) { - return 0; - } else { - return 1; - } + return 1; } catch (SocketTimeoutException e) { // Expected return 0; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 9eb5eebf8b..2c92447b12 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -78,8 +78,6 @@ class OioEventLoop extends SingleThreadEventLoop { } } - ch.unsafe().read(); - // Handle deregistration if (!ch.isRegistered()) { runAllTasks(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java index ca8da2a4b4..45598472d8 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpChannel.java @@ -148,7 +148,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel @Override protected int doReadMessages(MessageBuf buf) throws Exception { - if (readSuspended || !readSelector.isOpen()) { + if (!readSelector.isOpen()) { return 0; } @@ -174,10 +174,6 @@ public class OioSctpChannel extends AbstractOioMessageChannel buf.add(new SctpMessage(messageInfo, Unpooled.wrappedBuffer(data))); readMessages ++; - - if (readSuspended) { - return readMessages; - } } } finally { reableKeys.clear(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java index adbea540ed..13952ac78d 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSctpServerChannel.java @@ -185,10 +185,6 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel return -1; } - if (readSuspended) { - return 0; - } - SctpChannel s = null; try { final int selectedKeys = selector.select(SO_TIMEOUT); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index b8760027a3..07c142da8a 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -157,15 +157,6 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel return -1; } - if (readSuspended) { - try { - Thread.sleep(SO_TIMEOUT); - } catch (InterruptedException e) { - // ignore - } - return 0; - } - Socket s = null; try { s = socket.accept(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index a9e0f501f3..afac55de15 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -222,15 +222,6 @@ public class OioSocketChannel extends AbstractOioByteChannel return -1; } - if (readSuspended) { - try { - Thread.sleep(SO_TIMEOUT); - } catch (InterruptedException e) { - // ignore - } - return 0; - } - try { return buf.writeBytes(is, buf.writableBytes()); } catch (SocketTimeoutException e) { diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 1b58f654aa..c95a65e1c8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -366,6 +366,11 @@ public class LocalTransportThreadModelTest { ctx.fireInboundBufferUpdated(); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { @@ -446,6 +451,11 @@ public class LocalTransportThreadModelTest { ctx.fireInboundBufferUpdated(); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { @@ -537,6 +547,11 @@ public class LocalTransportThreadModelTest { ctx.fireInboundBufferUpdated(); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception { @@ -630,6 +645,11 @@ public class LocalTransportThreadModelTest { ctx.fireInboundBufferUpdated(); } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { @@ -711,6 +731,11 @@ public class LocalTransportThreadModelTest { } } + @Override + public void read(ChannelHandlerContext ctx) { + ctx.read(); + } + @Override public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {