diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 206c8278f7..b0796cd994 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -226,7 +226,7 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { } @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { handleEvent(e); } } diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java index a89a8aa837..e578a2dcfe 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelPipeline; public abstract class AbstractHttpChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { pipeline.sendUpstream(e); } diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java index a1692c2718..5009dc1427 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java @@ -334,7 +334,7 @@ public class RxtxChannelSink extends AbstractChannelSink { * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { pipeline.sendUpstream(event); } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java index 642cd891b1..5daba78839 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java @@ -24,13 +24,13 @@ import io.netty.channel.ChannelPipeline; public abstract class AbstractScptChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { Channel ch = e.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; // check if the current thread is a worker thread, and only fire the event later if thats not the case if (channel.worker.thread != Thread.currentThread()) { - channel.worker.fireEventLater(new Runnable() { + channel.worker.executeInIoThread(new Runnable() { @Override public void run() { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index e546019da3..1cc3a764ac 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -245,7 +245,8 @@ class SctpWorker implements Worker { } } - public void fireEventLater(Runnable eventRunnable) { + @Override + public void executeInIoThread(Runnable eventRunnable) { assert eventQueue.offer(eventRunnable); // wake up the selector to speed things diff --git a/transport/src/main/java/io/netty/channel/ChannelSink.java b/transport/src/main/java/io/netty/channel/ChannelSink.java index 16ebb8c52e..1a086ef982 100644 --- a/transport/src/main/java/io/netty/channel/ChannelSink.java +++ b/transport/src/main/java/io/netty/channel/ChannelSink.java @@ -38,5 +38,8 @@ public interface ChannelSink { */ void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; - void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; + /** + * Schedule the given {@link ChannelEvent} for later execution (in the io-thread). Some implementation may not support his and just fire it directly + */ + void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index b8379b1b98..4e4499e814 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -586,7 +586,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void sendUpstreamLater(ChannelEvent e) { try { - getSink().fireEventLater(this, e); + getSink().fireUpstreamEventLater(this, e); } catch (Throwable t) { notifyHandlerException(e, t); } @@ -843,7 +843,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (logger.isWarnEnabled()) { logger.warn("Not attached yet; discarding: " + e); } diff --git a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java index 5596540da2..51de5d3311 100755 --- a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java +++ b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java @@ -182,7 +182,7 @@ public class IoStreamChannelSink extends AbstractChannelSink { * This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { pipeline.sendUpstream(e); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java index 066bf27cc4..d882725de0 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java @@ -89,7 +89,7 @@ final class LocalClientChannelSink extends AbstractChannelSink { * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { pipeline.sendUpstream(event); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java index 29f5b9c2d0..6ead0ad3a2 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java @@ -46,7 +46,7 @@ final class LocalServerChannelSink extends AbstractChannelSink { * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { pipeline.sendUpstream(event); } diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index 8a2bf10424..eebfc74d40 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -16,7 +16,16 @@ package io.netty.channel.socket; +/** + * A {@link Worker} is responsible to dispatch IO operations + * + */ public interface Worker extends Runnable{ - void fireEventLater(Runnable eventRunnable); + /** + * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. + * + * @param task the {@link Runnable} to execute + */ + void executeInIoThread(Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 6812b59a58..85a8d03d3a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -24,13 +24,13 @@ import io.netty.channel.ChannelPipeline; public abstract class AbstractNioChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { Channel ch = e.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; // check if the current thread is a worker thread if so we can send the event now if (channel.worker.thread != Thread.currentThread()) { - channel.worker.fireEventLater(new Runnable() { + channel.worker.executeInIoThread(new Runnable() { @Override public void run() { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 708b0ee343..43ad21ad9c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -271,7 +271,8 @@ abstract class AbstractNioWorker implements Worker { } } - public void fireEventLater(Runnable eventRunnable) { + @Override + public void executeInIoThread(Runnable eventRunnable) { assert eventQueue.offer(eventRunnable); // wake up the selector to speed things diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 4a7a1a998c..889a08aa26 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -25,13 +25,13 @@ import io.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { Channel ch = e.getChannel(); if (ch instanceof AbstractOioChannel) { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; if (worker != null && channel.workerThread != Thread.currentThread()) { - channel.worker.fireEventLater(new Runnable() { + channel.worker.executeInIoThread(new Runnable() { @Override public void run() { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index b20b24a926..5dd3e8b14d 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -91,8 +91,10 @@ abstract class AbstractOioWorker implements Worker @Override - public void fireEventLater(Runnable eventRunnable) { + public void executeInIoThread(Runnable eventRunnable) { assert eventQueue.offer(eventRunnable); + + // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest } private void processEventQueue() throws IOException {