diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index c70978987d..c24aa19399 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -298,6 +298,21 @@ public final class Channels { ctx.getChannel(), message, remoteAddress)); } + /** + * Sends a {@code "writeComplete"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} in the next io-thread. + */ + public static void fireWriteCompleteLater(Channel channel, long amount) { + if (amount == 0) { + return; + } + + channel.getPipeline().sendUpstreamLater( + new DefaultWriteCompletionEvent(channel, amount)); + } + + /** * Sends a {@code "writeComplete"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 43ad21ad9c..14ffdc6d99 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -398,6 +398,7 @@ abstract class AbstractNioWorker implements Worker { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; + boolean iothread = isIoThread(channel); long writtenBytes = 0; @@ -468,7 +469,11 @@ abstract class AbstractNioWorker implements Worker { buf = null; evt = null; future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } if (t instanceof IOException) { open = false; close(channel, succeededFuture(channel)); @@ -491,10 +496,17 @@ abstract class AbstractNioWorker implements Worker { } } } - - fireWriteComplete(channel, writtenBytes); + if (iothread) { + fireWriteComplete(channel, writtenBytes); + } else { + fireWriteCompleteLater(channel, writtenBytes); + } } + static boolean isIoThread(AbstractNioChannel channel) { + return Thread.currentThread() == channel.worker.thread; + } + private void setOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; SelectionKey key = channel.channel.keyFor(selector); @@ -545,6 +557,8 @@ abstract class AbstractNioWorker implements Worker { void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); + boolean iothread = isIoThread(channel); + try { channel.channel.close(); cancelledKeys ++; @@ -552,20 +566,36 @@ abstract class AbstractNioWorker implements Worker { if (channel.setClosed()) { future.setSuccess(); if (connected) { - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } if (bound) { - fireChannelUnbound(channel); + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } } cleanUpWriteBuffer(channel); - fireChannelClosed(channel); + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); + } } else { future.setSuccess(); } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } @@ -618,12 +648,17 @@ abstract class AbstractNioWorker implements Worker { } if (fireExceptionCaught) { - fireExceptionCaught(channel, cause); + if (isIoThread(channel)) { + fireExceptionCaught(channel, cause); + } else { + fireExceptionCaughtLater(channel, cause); + } } } void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; + boolean iothread = isIoThread(channel); try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. @@ -684,16 +719,28 @@ abstract class AbstractNioWorker implements Worker { future.setSuccess(); if (changed) { - fireChannelInterestChanged(channel); + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } } } catch (CancelledKeyException e) { // setInterestOps() was called on a closed channel. ClosedChannelException cce = new ClosedChannelException(); future.setFailure(cce); - fireExceptionCaught(channel, cce); + if (iothread) { + fireExceptionCaught(channel, cce); + } else { + fireExceptionCaughtLater(channel, cce); + } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index fb7ddde380..6191df2e08 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -16,7 +16,9 @@ package io.netty.channel.socket.nio; import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireChannelDisconnectedLater; import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireExceptionCaughtLater; import static io.netty.channel.Channels.fireMessageReceived; import static io.netty.channel.Channels.succeededFuture; import io.netty.buffer.ChannelBufferFactory; @@ -126,15 +128,24 @@ class NioDatagramWorker extends AbstractNioWorker { static void disconnect(NioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); + boolean iothread = isIoThread(channel); try { channel.getDatagramChannel().disconnect(); future.setSuccess(); if (connected) { - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 5dd3e8b14d..97b20149a9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -15,12 +15,7 @@ */ package io.netty.channel.socket.oio; -import static io.netty.channel.Channels.fireChannelClosed; -import static io.netty.channel.Channels.fireChannelDisconnected; -import static io.netty.channel.Channels.fireChannelInterestChanged; -import static io.netty.channel.Channels.fireChannelUnbound; -import static io.netty.channel.Channels.fireExceptionCaught; -import static io.netty.channel.Channels.succeededFuture; +import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; @@ -89,6 +84,9 @@ abstract class AbstractOioWorker implements Worker close(channel, succeededFuture(channel)); } + static boolean isIoThead(AbstractOioChannel channel) { + return Thread.currentThread() == channel.workerThread; + } @Override public void executeInIoThread(Runnable eventRunnable) { @@ -120,7 +118,8 @@ abstract class AbstractOioWorker implements Worker static void setInterestOps( AbstractOioChannel channel, ChannelFuture future, int interestOps) { - + boolean iothread = isIoThead(channel); + // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; interestOps |= channel.getInterestOps() & Channel.OP_WRITE; @@ -148,18 +147,27 @@ abstract class AbstractOioWorker implements Worker workerThread.interrupt(); } } - - fireChannelInterestChanged(channel); + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } static void close(AbstractOioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); + boolean iothread = isIoThead(channel); + try { channel.closeSocket(); if (channel.setClosed()) { @@ -171,18 +179,34 @@ abstract class AbstractOioWorker implements Worker if (workerThread != null && currentThread != workerThread) { workerThread.interrupt(); } - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } if (bound) { - fireChannelUnbound(channel); + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } + } + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); } - fireChannelClosed(channel); } else { future.setSuccess(); } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java index f1b42b42f9..8ce169b90a 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java @@ -63,6 +63,8 @@ class OioDatagramWorker extends AbstractOioWorker { static void write( OioDatagramChannel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { + boolean iothread = isIoThead(channel); + try { ChannelBuffer buf = (ChannelBuffer) message; int offset = buf.readerIndex(); @@ -84,27 +86,45 @@ class OioDatagramWorker extends AbstractOioWorker { packet.setSocketAddress(remoteAddress); } channel.socket.send(packet); - fireWriteComplete(channel, length); + if (iothread) { + fireWriteComplete(channel, length); + } else { + fireWriteCompleteLater(channel, length); + } future.setSuccess(); } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } static void disconnect(OioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); + boolean iothread = isIoThead(channel); + try { channel.socket.disconnect(); future.setSuccess(); if (connected) { // Notify. - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java index bb0f7148d9..180d756d36 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java @@ -65,11 +65,16 @@ class OioWorker extends AbstractOioWorker { OioSocketChannel channel, ChannelFuture future, Object message) { + boolean iothread = isIoThead(channel); OutputStream out = channel.getOutputStream(); if (out == null) { Exception e = new ClosedChannelException(); future.setFailure(e); - fireExceptionCaught(channel, e); + if (iothread) { + fireExceptionCaught(channel, e); + } else { + fireExceptionCaughtLater(channel, e); + } return; } @@ -106,7 +111,11 @@ class OioWorker extends AbstractOioWorker { } } - fireWriteComplete(channel, length); + if (iothread) { + fireWriteComplete(channel, length); + } else { + fireWriteCompleteLater(channel, length); + } future.setSuccess(); } catch (Throwable t) { @@ -118,7 +127,11 @@ class OioWorker extends AbstractOioWorker { t = new ClosedChannelException(); } future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } }