From e48281471bcefc9fda036bf98cfdb5b3b8c223c7 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 28 May 2012 05:05:49 -0700 Subject: [PATCH] Limit future notification stack depth / Robost writeCounter management - Also ported the discard example while testing this commit --- .../netty/example/discard/DiscardClient.java | 51 ++++---- .../example/discard/DiscardClientHandler.java | 98 +++++++--------- .../netty/example/discard/DiscardServer.java | 48 ++++---- .../example/discard/DiscardServerHandler.java | 41 ++----- .../io/netty/channel/AbstractChannel.java | 109 +++++++++++++----- .../netty/channel/CompleteChannelFuture.java | 30 +---- .../netty/channel/DefaultChannelFuture.java | 56 ++++++--- .../netty/channel/SingleThreadEventLoop.java | 49 +++++--- .../socket/nio/AbstractNioMessageChannel.java | 2 - .../socket/nio/AbstractNioStreamChannel.java | 2 - .../socket/oio/AbstractOioMessageChannel.java | 6 +- .../socket/oio/AbstractOioStreamChannel.java | 6 +- 12 files changed, 258 insertions(+), 240 deletions(-) diff --git a/example/src/main/java/io/netty/example/discard/DiscardClient.java b/example/src/main/java/io/netty/example/discard/DiscardClient.java index 99527397b5..3dac1a6bb8 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClient.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClient.java @@ -15,15 +15,12 @@ */ package io.netty.example.discard; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - -import io.netty.bootstrap.ClientBootstrap; +import io.netty.channel.ChannelBootstrap; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.socket.nio.NioClientSocketChannelFactory; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioSocketChannel; /** * Keeps sending random data to the specified address. @@ -40,28 +37,28 @@ public class DiscardClient { this.firstMessageSize = firstMessageSize; } - public void run() { - // Configure the client. - ClientBootstrap bootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool())); + public void run() throws Exception { + ChannelBootstrap b = new ChannelBootstrap(); + try { + b.eventLoop(new NioEventLoop()) + .channel(new NioSocketChannel()) + .remoteAddress(host, port) + .initializer(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new DiscardClientHandler(firstMessageSize)); + } + }); - // Set up the pipeline factory. - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new DiscardClientHandler(firstMessageSize)); - } - }); + // Make the connection attempt. + ChannelFuture f = b.connect().sync(); - // Start the connection attempt. - ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); + // Wait until the connection is closed. + f.channel().closeFuture().sync(); - // Wait until the connection is closed or the connection attempt fails. - future.channel().getCloseFuture().awaitUninterruptibly(); - - // Shut down thread pools to exit. - bootstrap.releaseExternalResources(); + } finally { + b.shutdown(); + } } public static void main(String[] args) throws Exception { diff --git a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java index 7ea7e4a319..bdcfe1ccd6 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java @@ -15,31 +15,26 @@ */ package io.netty.example.discard; +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundStreamHandlerAdapter; + import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelState; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.channel.WriteCompletionEvent; - /** * Handles a client-side channel. */ -public class DiscardClientHandler extends SimpleChannelUpstreamHandler { +public class DiscardClientHandler extends ChannelInboundStreamHandlerAdapter { private static final Logger logger = Logger.getLogger( DiscardClientHandler.class.getName()); - private long transferredBytes; private final byte[] content; + private ChannelInboundHandlerContext ctx; + private ChannelBuffer out; public DiscardClientHandler(int messageSize) { if (messageSize <= 0) { @@ -49,70 +44,55 @@ public class DiscardClientHandler extends SimpleChannelUpstreamHandler { content = new byte[messageSize]; } - public long getTransferredBytes() { - return transferredBytes; - } @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { - if (((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) { - logger.info(e.toString()); - } - } - - // Let SimpleChannelHandler call actual event handler methods below. - super.handleUpstream(ctx, e); - } - - @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { + public void channelActive(ChannelInboundHandlerContext ctx) + throws Exception { + this.ctx = ctx; + out = ctx.out().byteBuffer(); // Send the initial messages. - generateTraffic(e); + generateTraffic(); } - @Override - public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) { - // Keep sending messages whenever the current socket buffer has room. - generateTraffic(e); - } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) + throws Exception { // Server is supposed to send nothing. Therefore, do nothing. } - @Override - public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) { - transferredBytes += e.getWrittenAmount(); - } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + public void exceptionCaught(ChannelInboundHandlerContext ctx, + Throwable cause) throws Exception { // Close the connection when an exception is raised. logger.log( Level.WARNING, "Unexpected exception from downstream.", - e.cause()); - e.channel().close(); + cause); + ctx.close(); } - private void generateTraffic(ChannelStateEvent e) { - // Keep generating traffic until the channel is unwritable. - // A channel becomes unwritable when its internal buffer is full. - // If you keep writing messages ignoring this property, - // you will end up with an OutOfMemoryError. - Channel channel = e.channel(); - while (channel.isWritable()) { - ChannelBuffer m = nextMessage(); - if (m == null) { - break; - } - channel.write(m); + long counter; + + private void generateTraffic() { + // Fill the outbound buffer up to 64KiB + while (out.readableBytes() < 65536) { + out.writeBytes(content); } + + // Flush the outbound buffer to the socket. + // Once flushed, generate the same amount of traffic again. + ctx.flush().addListener(GENERATE_TRAFFIC); } - private ChannelBuffer nextMessage() { - return ChannelBuffers.wrappedBuffer(content); - } + private final ChannelFutureListener GENERATE_TRAFFIC = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + out.clear(); + generateTraffic(); + } + } + }; } diff --git a/example/src/main/java/io/netty/example/discard/DiscardServer.java b/example/src/main/java/io/netty/example/discard/DiscardServer.java index c32f6feb0f..4b5db1133f 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServer.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServer.java @@ -15,14 +15,12 @@ */ package io.netty.example.discard; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.socket.nio.NioServerSocketChannelFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ServerChannelBootstrap; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. @@ -35,21 +33,29 @@ public class DiscardServer { this.port = port; } - public void run() { - // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool())); + public void run() throws Exception { + ServerChannelBootstrap b = new ServerChannelBootstrap(); + try { + b.eventLoop(new NioEventLoop(), new NioEventLoop()) + .channel(new NioServerSocketChannel()) + .localAddress(port) + .childInitializer(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new DiscardServerHandler()); + } + }); - // Set up the pipeline factory. - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new DiscardServerHandler()); - } - }); + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind().sync(); - // Bind and start to accept incoming connections. - bootstrap.bind(new InetSocketAddress(port)); + // Wait until the server socket is closed. + // In this example, this does not happen, but you can do that to gracefully + // shut down your server. + f.channel().closeFuture().sync(); + } finally { + b.shutdown(); + } } public static void main(String[] args) throws Exception { diff --git a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java index 3b184cf1bc..8c3167013f 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServerHandler.java @@ -15,54 +15,37 @@ */ package io.netty.example.discard; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundStreamHandlerAdapter; + import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; - /** * Handles a server-side channel. */ -public class DiscardServerHandler extends SimpleChannelUpstreamHandler { +public class DiscardServerHandler extends ChannelInboundStreamHandlerAdapter { private static final Logger logger = Logger.getLogger( DiscardServerHandler.class.getName()); - private long transferredBytes; - - public long getTransferredBytes() { - return transferredBytes; - } @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { - logger.info(e.toString()); - } - - // Let SimpleChannelHandler call actual event handler methods below. - super.handleUpstream(ctx, e); + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) + throws Exception { + // Discard the received data silently. + ctx.in().byteBuffer().clear(); } - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - // Discard received data silently by doing nothing. - transferredBytes += ((ChannelBuffer) e.getMessage()).readableBytes(); - } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + public void exceptionCaught(ChannelInboundHandlerContext ctx, + Throwable cause) throws Exception { // Close the connection when an exception is raised. logger.log( Level.WARNING, "Unexpected exception from downstream.", - e.cause()); - e.channel().close(); + cause); + ctx.close(); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 4efc23a39a..13d6ec0f4a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -82,7 +82,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ClosedChannelException closedChannelException; private final Deque flushCheckpoints = new ArrayDeque(); - protected long writeCounter; + private long writeCounter; + private boolean inFlushNow; + private boolean flushNowPending; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -362,6 +364,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract class AbstractUnsafe implements Unsafe { + private final Runnable flushLaterTask = new FlushLater(); + @Override public ChannelBufferHolder out() { return firstOut(); @@ -554,19 +558,26 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // Attempt/perform outbound I/O if: // - the channel is inactive - flush0() will fail the futures. // - the event loop has no plan to call flushForcibly(). - try { - if (!isActive() || !isFlushPending()) { - doFlush(out()); + if (!inFlushNow) { + try { + if (!isActive() || !isFlushPending()) { + flushNow(); + } + } catch (Throwable t) { + notifyFlushFutures(t); + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + if (!isActive()) { + close(unsafe().voidFuture()); + } } - } catch (Throwable t) { - notifyFlushFutures(t); - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); - } - } finally { - if (!isActive()) { - close(unsafe().voidFuture()); + } else { + if (!flushNowPending) { + flushNowPending = true; + eventLoop().execute(flushLaterTask); } } } else { @@ -581,18 +592,38 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void flushNow() { + if (inFlushNow) { + return; + } + + inFlushNow = true; try { - doFlush(out()); - } catch (Throwable t) { - notifyFlushFutures(t); - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); + Throwable cause = null; + ChannelBufferHolder out = out(); + int oldSize = out.size(); + try { + doFlush(out); + } catch (Throwable t) { + cause = t; + } finally { + writeCounter += oldSize - out.size(); } - } finally { + + if (cause == null) { + notifyFlushFutures(); + } else { + notifyFlushFutures(cause); + pipeline().fireExceptionCaught(cause); + if (cause instanceof IOException) { + close(voidFuture()); + } + } + if (!isActive()) { close(unsafe().voidFuture()); } + } finally { + inFlushNow = false; } } @@ -615,6 +646,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + private class FlushLater implements Runnable { + @Override + public void run() { + flushNowPending = false; + unsafe().flush(voidFuture); + } + } + protected abstract boolean isCompatible(EventLoop loop); protected abstract ChannelBufferHolder firstOut(); @@ -631,38 +670,46 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract boolean isFlushPending(); - protected void notifyFlushFutures() { + private void notifyFlushFutures() { if (flushCheckpoints.isEmpty()) { return; } - final long flushedAmount = AbstractChannel.this.writeCounter; + final long writeCounter = AbstractChannel.this.writeCounter; for (;;) { - FlushCheckpoint cp = flushCheckpoints.poll(); + FlushCheckpoint cp = flushCheckpoints.peek(); if (cp == null) { + // Reset the counter if there's nothing in the notification list. + AbstractChannel.this.writeCounter = 0; break; } - if (cp.flushCheckpoint() > flushedAmount) { + + if (cp.flushCheckpoint() > writeCounter) { + if (writeCounter > 0 && flushCheckpoints.size() == 1) { + AbstractChannel.this.writeCounter = 0; + cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter); + } break; } + + flushCheckpoints.remove(); cp.future().setSuccess(); } // Avoid overflow - if (flushCheckpoints.isEmpty()) { - // Reset the counter if there's nothing in the notification list. - AbstractChannel.this.writeCounter = 0; - } else if (flushedAmount >= 0x1000000000000000L) { - // Otherwise, reset the counter only when the counter grew pretty large + final long newWriteCounter = AbstractChannel.this.writeCounter; + if (newWriteCounter >= 0x1000000000000000L) { + // Reset the counter only when the counter grew pretty large // so that we can reduce the cost of updating all entries in the notification list. AbstractChannel.this.writeCounter = 0; for (FlushCheckpoint cp: flushCheckpoints) { - cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount); + cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter); } } } - protected void notifyFlushFutures(Throwable cause) { + private void notifyFlushFutures(Throwable cause) { + notifyFlushFutures(); for (;;) { FlushCheckpoint cp = flushCheckpoints.poll(); if (cp == null) { diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java index 62f3a3226f..8b5cef2b91 100644 --- a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java @@ -15,9 +15,6 @@ */ package io.netty.channel; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - import java.util.concurrent.TimeUnit; /** @@ -26,9 +23,6 @@ import java.util.concurrent.TimeUnit; */ public abstract class CompleteChannelFuture implements ChannelFuture { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(CompleteChannelFuture.class); - private final Channel channel; /** @@ -45,31 +39,13 @@ public abstract class CompleteChannelFuture implements ChannelFuture { @Override public ChannelFuture addListener(final ChannelFutureListener listener) { - if (channel().eventLoop().inEventLoop()) { - notifyListener(listener); - } else { - channel().eventLoop().execute(new Runnable() { - @Override - public void run() { - notifyListener(listener); - } - }); + if (listener == null) { + throw new NullPointerException("listener"); } + DefaultChannelFuture.notifyListener(this, listener); return this; } - private void notifyListener(ChannelFutureListener listener) { - try { - listener.operationComplete(this); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by " + - ChannelFutureListener.class.getSimpleName() + ".", t); - } - } - } - @Override public ChannelFuture removeListener(ChannelFutureListener listener) { // NOOP diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java b/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java index 9df1d2a780..fd7ef91b4a 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java @@ -38,6 +38,14 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelFuture.class); + private static final int MAX_LISTENER_STACK_DEPTH = 8; + private static final ThreadLocal LISTENER_STACK_DEPTH = new ThreadLocal() { + @Override + protected Integer initialValue() { + return 0; + } + }; + private static final Throwable CANCELLED = new Throwable(); private static volatile boolean useDeadLockChecker = true; @@ -154,17 +162,7 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu } if (notifyNow) { - if (channel().eventLoop().inEventLoop()) { - notifyListener(listener); - } else { - channel().eventLoop().execute(new Runnable() { - @Override - public void run() { - notifyListener(listener); - - } - }); - } + notifyListener(this, listener); } return this; @@ -433,12 +431,12 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu } if (channel().eventLoop().inEventLoop()) { - notifyListener(firstListener); + notifyListener0(this, firstListener); firstListener = null; if (otherListeners != null) { for (ChannelFutureListener l: otherListeners) { - notifyListener(l); + notifyListener0(this, l); } otherListeners = null; } @@ -450,10 +448,10 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu channel().eventLoop().execute(new Runnable() { @Override public void run() { - notifyListener(firstListener); + notifyListener0(DefaultChannelFuture.this, firstListener); if (otherListeners != null) { for (ChannelFutureListener l: otherListeners) { - notifyListener(l); + notifyListener0(DefaultChannelFuture.this, l); } } } @@ -461,9 +459,33 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu } } - private void notifyListener(ChannelFutureListener l) { + static void notifyListener(final ChannelFuture f, final ChannelFutureListener l) { + EventLoop loop = f.channel().eventLoop(); + if (loop.inEventLoop()) { + final Integer stackDepth = LISTENER_STACK_DEPTH.get(); + if (stackDepth < MAX_LISTENER_STACK_DEPTH) { + LISTENER_STACK_DEPTH.set(stackDepth + 1); + try { + notifyListener0(f, l); + } finally { + LISTENER_STACK_DEPTH.set(stackDepth); + } + return; + } + } + + loop.execute(new Runnable() { + @Override + public void run() { + notifyListener(f, l); + } + + }); + } + + private static void notifyListener0(ChannelFuture f, ChannelFutureListener l) { try { - l.operationComplete(this); + l.operationComplete(f); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn( diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index ecf2062958..c90cbb5547 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -109,46 +109,65 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl protected Runnable pollTask() { assert inEventLoop(); + Runnable task = taskQueue.poll(); - if (task == null) { - if (fetchScheduledTasks()) { - task = taskQueue.poll(); - } + if (task != null) { + return task; } - return task; + + if (fetchScheduledTasks()) { + task = taskQueue.poll(); + return task; + } + + return null; } protected Runnable takeTask() throws InterruptedException { assert inEventLoop(); + for (;;) { Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); if (task != null) { return task; } fetchScheduledTasks(); + task = taskQueue.poll(); + if (task != null) { + return task; + } } } protected Runnable peekTask() { assert inEventLoop(); + Runnable task = taskQueue.peek(); - if (task == null) { - if (fetchScheduledTasks()) { - task = taskQueue.peek(); - } + if (task != null) { + return task; } - return task; + + if (fetchScheduledTasks()) { + task = taskQueue.peek(); + return task; + } + + return null; } protected boolean hasTasks() { assert inEventLoop(); + boolean empty = taskQueue.isEmpty(); - if (empty) { - if (fetchScheduledTasks()) { - empty = taskQueue.isEmpty(); - } + if (!empty) { + return true; } - return !empty; + + if (fetchScheduledTasks()) { + return !taskQueue.isEmpty(); + } + + return false; } protected void addTask(Runnable task) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index ce0b70bd46..59c6f70482 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -82,9 +82,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel { for (int i = writeSpinCount; i >= 0; i --) { int localFlushedAmount = doWriteMessages(buf, i == 0); if (localFlushedAmount > 0) { - writeCounter += localFlushedAmount; wrote = true; - notifyFlushFutures(); break; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java index ec822d196c..e30fc59b77 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java @@ -89,8 +89,6 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel { for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf, i == 0); if (localFlushedAmount > 0) { - writeCounter += localFlushedAmount; - notifyFlushFutures(); break; } if (!buf.readable()) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index 0b08b55011..af8acb606d 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -70,11 +70,7 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel { private void flushMessageBuf(Queue buf) throws Exception { while (!buf.isEmpty()) { - int localFlushedAmount = doWriteMessages(buf); - if (localFlushedAmount > 0) { - writeCounter += localFlushedAmount; - notifyFlushFutures(); - } + doWriteMessages(buf); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java index 88fda6393e..f64b99334c 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -72,11 +72,7 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel { private void flushByteBuf(ChannelBuffer buf) throws Exception { while (buf.readable()) { - int localFlushedAmount = doWriteBytes(buf); - if (localFlushedAmount > 0) { - writeCounter += localFlushedAmount; - notifyFlushFutures(); - } + doWriteBytes(buf); } buf.clear(); }