diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 4e85a1d01c..2521682ca7 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -22,9 +22,11 @@ import java.util.ConcurrentModificationException; import java.util.LinkedList; import java.util.Queue; +import io.netty.channel.Channels; import io.netty.buffer.ChannelBufferFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -224,6 +226,16 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { throw new CodecEmbedderException(actualCause); } + + @Override + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + try { + task.run(); + return Channels.succeededFuture(pipeline.getChannel()); + } catch (Throwable t) { + return Channels.failedFuture(pipeline.getChannel(), t); + } + } } private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline { diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index c5387a33e9..6fc29eaebd 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -90,7 +90,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } try { - flush(ctx); + flush(ctx, false); } catch (Exception e) { if (logger.isWarnEnabled()) { logger.warn("Unexpected exception while sending chunks.", e); @@ -112,10 +112,10 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns final Channel channel = ctx.getChannel(); if (channel.isWritable()) { this.ctx = ctx; - flush(ctx); + flush(ctx, false); } else if (!channel.isConnected()) { this.ctx = ctx; - discard(ctx); + discard(ctx, false); } } @@ -127,12 +127,12 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns switch (cse.getState()) { case INTEREST_OPS: // Continue writing when the channel becomes writable. - flush(ctx); + flush(ctx, true); break; case OPEN: if (!Boolean.TRUE.equals(cse.getValue())) { // Fail all pending writes - discard(ctx); + discard(ctx, true); } break; } @@ -140,7 +140,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns ctx.sendUpstream(e); } - private void discard(ChannelHandlerContext ctx) { + private void discard(ChannelHandlerContext ctx, boolean fireNow) { ClosedChannelException cause = null; boolean fireExceptionCaught = false; @@ -175,14 +175,18 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns if (fireExceptionCaught) { - Channels.fireExceptionCaught(ctx.getChannel(), cause); + if (fireNow) { + fireExceptionCaught(ctx.getChannel(), cause); + } else { + fireExceptionCaughtLater(ctx.getChannel(), cause); + } } } - private synchronized void flush(ChannelHandlerContext ctx) throws Exception { + private synchronized void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception { final Channel channel = ctx.getChannel(); if (!channel.isConnected()) { - discard(ctx); + discard(ctx, fireNow); } while (channel.isWritable()) { @@ -220,7 +224,11 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns this.currentEvent = null; currentEvent.getFuture().setFailure(t); - fireExceptionCaught(ctx, t); + if (fireNow) { + fireExceptionCaught(ctx, t); + } else { + fireExceptionCaughtLater(ctx.getChannel(), t); + } closeInput(chunks); break; @@ -262,7 +270,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } if (!channel.isConnected()) { - discard(ctx); + discard(ctx, fireNow); break; } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index b5af55d670..5dd2b4e19c 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -154,7 +154,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler } protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { - Channels.fireExceptionCaught(ctx, EXCEPTION); + Channels.fireExceptionCaughtLater(ctx.getChannel(), EXCEPTION); } private final class WriteTimeoutTask implements TimerTask { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java new file mode 100644 index 0000000000..f01321c2ce --- /dev/null +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -0,0 +1,38 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.sctp; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; + +public abstract class AbstractSctpChannelSink extends AbstractChannelSink { + + @Override + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); + if (ch instanceof SctpChannelImpl) { + SctpChannelImpl channel = (SctpChannelImpl) ch; + return channel.worker.executeInIoThread(channel, task); + + } else { + return super.execute(pipeline, task); + } + + } +} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java index b72c4f5cb6..1e664a0120 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java @@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -48,7 +47,7 @@ import io.netty.util.internal.QueueFactory; /** */ -class SctpClientPipelineSink extends AbstractChannelSink { +class SctpClientPipelineSink extends AbstractSctpChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java index 3a0f86bb16..c4001af990 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import com.sun.nio.sctp.SctpChannel; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -45,7 +44,7 @@ import io.netty.util.internal.DeadLockProofWorker; /** */ -class SctpServerPipelineSink extends AbstractChannelSink { +class SctpServerPipelineSink extends AbstractSctpChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index a7878d77ba..1912f1ae46 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -31,6 +31,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -45,6 +46,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; +import io.netty.channel.socket.ChannelRunnableWrapper; +import io.netty.channel.socket.Worker; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; @@ -53,7 +56,7 @@ import io.netty.util.internal.QueueFactory; /** */ @SuppressWarnings("unchecked") -class SctpWorker implements Runnable { +class SctpWorker implements Worker { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpWorker.class); @@ -64,13 +67,15 @@ class SctpWorker implements Runnable { private final Executor executor; private boolean started; - private volatile Thread thread; + volatile Thread thread; volatile Selector selector; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); private final Object startStopLock = new Object(); private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); @@ -188,6 +193,7 @@ class SctpWorker implements Runnable { cancelledKeys = 0; processRegisterTaskQueue(); + processEventQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); @@ -240,7 +246,35 @@ class SctpWorker implements Runnable { } } } + + @Override + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // wake up the selector to speed things + selector.wakeup(); + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + } + + static boolean isIoThread(SctpChannelImpl channel) { + return Thread.currentThread() == channel.worker.thread; + } + private void processRegisterTaskQueue() throws IOException { for (; ;) { final Runnable task = registerTaskQueue.poll(); @@ -264,7 +298,19 @@ class SctpWorker implements Runnable { cleanUpCancelledKeys(); } } + + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + task.run(); + cleanUpCancelledKeys(); + } + } + private void processSelectedKeys(final Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java index a1c27839ff..805995ac72 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java @@ -43,7 +43,29 @@ public abstract class AbstractChannelSink implements ChannelSink { if (actualCause == null) { actualCause = cause; } - - fireExceptionCaught(event.getChannel(), actualCause); + if (isFireExceptionCaughtLater(event, actualCause)) { + fireExceptionCaughtLater(event.getChannel(), actualCause); + } else { + fireExceptionCaught(event.getChannel(), actualCause); + } } + + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + return false; + } + + /** + * This implementation just directly call {@link Runnable#run()}. Sub-classes should override this if they can handle it + * in a better way + */ + @Override + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + try { + task.run(); + return Channels.succeededFuture(pipeline.getChannel()); + } catch (Throwable t) { + return Channels.failedFuture(pipeline.getChannel(), t); + } + } + } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index cbdeeab097..4620e83fbd 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -442,6 +442,9 @@ public interface ChannelPipeline { */ void sendUpstream(ChannelEvent e); + + ChannelFuture execute(Runnable task); + /** * Sends the specified {@link ChannelEvent} to the last * {@link ChannelDownstreamHandler} in this pipeline. diff --git a/transport/src/main/java/io/netty/channel/ChannelSink.java b/transport/src/main/java/io/netty/channel/ChannelSink.java index 86d042e56f..e0e2650211 100644 --- a/transport/src/main/java/io/netty/channel/ChannelSink.java +++ b/transport/src/main/java/io/netty/channel/ChannelSink.java @@ -37,4 +37,9 @@ public interface ChannelSink { * one of its {@link ChannelHandler}s process a {@link ChannelEvent}. */ void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; + + /** + * Execute the given {@link Runnable} later in the io-thread. Some implementation may not support his and just execute it directly + */ + ChannelFuture execute(ChannelPipeline pipeline, Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index e5852d2ced..c5aa9ce93e 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -298,6 +298,22 @@ public final class Channels { ctx.getChannel(), message, remoteAddress)); } + /** + * Sends a {@code "writeComplete"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} in the next io-thread. + */ + public static ChannelFuture fireWriteCompleteLater(final Channel channel, final long amount) { + return channel.getPipeline().execute(new Runnable() { + @Override + public void run() { + fireWriteComplete(channel, amount); + } + }); + + } + + /** * Sends a {@code "writeComplete"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -321,6 +337,25 @@ public final class Channels { public static void fireWriteComplete(ChannelHandlerContext ctx, long amount) { ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), amount)); } + + + + /** + * Sends a {@code "channelInterestChanged"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static ChannelFuture fireChannelInterestChangedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelInterestChanged(channel); + + } + }); + } + /** * Sends a {@code "channelInterestChanged"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -346,6 +381,21 @@ public final class Channels { ctx.getChannel(), ChannelState.INTEREST_OPS, Channel.OP_READ)); } + /** + * Sends a {@code "channelDisconnected"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static ChannelFuture fireChannelDisconnectedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelDisconnected(channel); + } + }); + } + /** * Sends a {@code "channelDisconnected"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -368,6 +418,23 @@ public final class Channels { ctx.getChannel(), ChannelState.CONNECTED, null)); } + + + /** + * Sends a {@code "channelUnbound"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static ChannelFuture fireChannelUnboundLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelUnbound(channel); + } + }); + } + /** * Sends a {@code "channelUnbound"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -390,6 +457,24 @@ public final class Channels { ctx.getChannel(), ChannelState.BOUND, null)); } + + + /** + * Sends a {@code "channelClosed"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static ChannelFuture fireChannelClosedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelClosed(channel); + } + }); + + } + /** * Sends a {@code "channelClosed"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -418,6 +503,24 @@ public final class Channels { ctx.getChannel(), ChannelState.OPEN, Boolean.FALSE)); } + + + /** + * Sends a {@code "exceptionCaught"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static ChannelFuture fireExceptionCaughtLater(final Channel channel, final Throwable cause) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireExceptionCaught(channel, cause); + } + }); + } + + /** * Sends a {@code "exceptionCaught"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -444,6 +547,7 @@ public final class Channels { new DefaultChildChannelStateEvent(channel, childChannel)); } + /** * Sends a {@code "bind"} request to the last * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 50487ae36e..de75705212 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -21,6 +21,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.RejectedExecutionException; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -583,6 +584,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } + @Override + public ChannelFuture execute(Runnable task) { + return getSink().execute(this, task); + } + @Override public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); @@ -832,5 +838,14 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelEvent e, ChannelPipelineException cause) throws Exception { throw cause; } + + @Override + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + if (logger.isWarnEnabled()) { + logger.warn("Not attached yet; rejecting: " + task); + } + return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet")); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java new file mode 100644 index 0000000000..aece47526a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket; + +import io.netty.channel.Channel; +import io.netty.channel.DefaultChannelFuture; + +public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { + + private final Runnable task; + + public ChannelRunnableWrapper(Channel channel, Runnable task) { + super(channel, true); + this.task = task; + } + + @Override + public void run() { + try { + task.run(); + setSuccess(); + } catch (Throwable t) { + setFailure(t); + } + } + + + +} diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java new file mode 100644 index 0000000000..271897881a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -0,0 +1,34 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + +/** + * A {@link Worker} is responsible to dispatch IO operations + * + */ +public interface Worker extends Runnable { + + /** + * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. + * + * @param task the {@link Runnable} to execute + */ + ChannelFuture executeInIoThread(Channel channel, Runnable task); +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java new file mode 100644 index 0000000000..1a29f5ef19 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -0,0 +1,50 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.Channel; +import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; + +public abstract class AbstractNioChannelSink extends AbstractChannelSink { + + @Override + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); + if (ch instanceof AbstractNioChannel) { + AbstractNioChannel channel = (AbstractNioChannel) ch; + + return channel.worker.executeInIoThread(ch, task); + } + return super.execute(pipeline, task); + + + } + + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractNioChannel) { + fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel) channel); + } + return fireLater; + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 2b96e04f40..85bc12c61a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,6 +21,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; +import io.netty.channel.socket.ChannelRunnableWrapper; +import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -40,11 +42,12 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -abstract class AbstractNioWorker implements Runnable { +abstract class AbstractNioWorker implements Worker { /** * Internal Netty logger. */ @@ -106,6 +109,9 @@ abstract class AbstractNioWorker implements Runnable { */ protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); @@ -216,6 +222,7 @@ abstract class AbstractNioWorker implements Runnable { cancelledKeys = 0; processRegisterTaskQueue(); + processEventQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); @@ -266,7 +273,31 @@ abstract class AbstractNioWorker implements Runnable { } } - + @Override + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof AbstractNioChannel && isIoThread((AbstractNioChannel) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // wake up the selector to speed things + selector.wakeup(); + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + + + } + private void processRegisterTaskQueue() throws IOException { for (;;) { final Runnable task = registerTaskQueue.poll(); @@ -291,6 +322,18 @@ abstract class AbstractNioWorker implements Runnable { } } + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + private void processSelectedKeys(Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); @@ -374,6 +417,7 @@ abstract class AbstractNioWorker implements Runnable { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; + boolean iothread = isIoThread(channel); long writtenBytes = 0; @@ -444,7 +488,11 @@ abstract class AbstractNioWorker implements Runnable { buf = null; evt = null; future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } if (t instanceof IOException) { open = false; close(channel, succeededFuture(channel)); @@ -467,10 +515,17 @@ abstract class AbstractNioWorker implements Runnable { } } } - - fireWriteComplete(channel, writtenBytes); + if (iothread) { + fireWriteComplete(channel, writtenBytes); + } else { + fireWriteCompleteLater(channel, writtenBytes); + } } + static boolean isIoThread(AbstractNioChannel channel) { + return Thread.currentThread() == channel.worker.thread; + } + private void setOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; SelectionKey key = channel.channel.keyFor(selector); @@ -521,6 +576,8 @@ abstract class AbstractNioWorker implements Runnable { void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); + boolean iothread = isIoThread(channel); + try { channel.channel.close(); cancelledKeys ++; @@ -528,20 +585,36 @@ abstract class AbstractNioWorker implements Runnable { if (channel.setClosed()) { future.setSuccess(); if (connected) { - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } if (bound) { - fireChannelUnbound(channel); + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } } cleanUpWriteBuffer(channel); - fireChannelClosed(channel); + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); + } } else { future.setSuccess(); } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } @@ -594,12 +667,17 @@ abstract class AbstractNioWorker implements Runnable { } if (fireExceptionCaught) { - fireExceptionCaught(channel, cause); + if (isIoThread(channel)) { + fireExceptionCaught(channel, cause); + } else { + fireExceptionCaughtLater(channel, cause); + } } } void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; + boolean iothread = isIoThread(channel); try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. @@ -660,16 +738,28 @@ abstract class AbstractNioWorker implements Runnable { future.setSuccess(); if (changed) { - fireChannelInterestChanged(channel); + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } } } catch (CancelledKeyException e) { // setInterestOps() was called on a closed channel. ClosedChannelException cce = new ClosedChannelException(); future.setFailure(cce); - fireExceptionCaught(channel, cce); + if (iothread) { + fireExceptionCaught(channel, cce); + } else { + fireExceptionCaughtLater(channel, cce); + } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 39b2a73ed4..512d17da00 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -45,7 +44,7 @@ import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.QueueFactory; -class NioClientSocketPipelineSink extends AbstractChannelSink { +class NioClientSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java index 60b6943725..401d6dbf8b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -22,7 +22,6 @@ import java.net.SocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -35,7 +34,7 @@ import io.netty.channel.MessageEvent; * Receives downstream events from a {@link ChannelPipeline}. It contains * an array of I/O workers. */ -class NioDatagramPipelineSink extends AbstractChannelSink { +class NioDatagramPipelineSink extends AbstractNioChannelSink { private final NioDatagramWorker[] workers; private final AtomicInteger workerIndex = new AtomicInteger(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index fb7ddde380..6191df2e08 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -16,7 +16,9 @@ package io.netty.channel.socket.nio; import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireChannelDisconnectedLater; import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireExceptionCaughtLater; import static io.netty.channel.Channels.fireMessageReceived; import static io.netty.channel.Channels.succeededFuture; import io.netty.buffer.ChannelBufferFactory; @@ -126,15 +128,24 @@ class NioDatagramWorker extends AbstractNioWorker { static void disconnect(NioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); + boolean iothread = isIoThread(channel); try { channel.getDatagramChannel().disconnect(); future.setSuccess(); if (connected) { - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 5de57a6f01..965c585827 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -29,7 +29,6 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -41,7 +40,7 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; -class NioServerSocketPipelineSink extends AbstractChannelSink { +class NioServerSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index 7447254cde..2c7009050a 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -25,11 +25,14 @@ import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; +import io.netty.channel.socket.Worker; abstract class AbstractOioChannel extends AbstractChannel { private volatile InetSocketAddress localAddress; volatile InetSocketAddress remoteAddress; volatile Thread workerThread; + volatile Worker worker; + final Object interestOpsLock = new Object(); AbstractOioChannel( diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java new file mode 100644 index 0000000000..d57c198534 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -0,0 +1,54 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.oio; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.Channel; +import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.Worker; + +public abstract class AbstractOioChannelSink extends AbstractChannelSink { + + @Override + public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); + if (ch instanceof AbstractOioChannel) { + AbstractOioChannel channel = (AbstractOioChannel) ch; + Worker worker = channel.worker; + if (worker != null) { + return channel.worker.executeInIoThread(ch, task); + } + } + + return super.execute(pipeline, task); + + + } + + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractOioChannel) { + fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel); + } + return fireLater; + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 898ef994c3..a2176f32c6 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -15,29 +15,32 @@ */ package io.netty.channel.socket.oio; -import static io.netty.channel.Channels.fireChannelClosed; -import static io.netty.channel.Channels.fireChannelDisconnected; -import static io.netty.channel.Channels.fireChannelInterestChanged; -import static io.netty.channel.Channels.fireChannelUnbound; -import static io.netty.channel.Channels.fireExceptionCaught; -import static io.netty.channel.Channels.succeededFuture; +import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; +import io.netty.channel.socket.ChannelRunnableWrapper; +import io.netty.channel.socket.Worker; +import io.netty.util.internal.QueueFactory; import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.RejectedExecutionException; /** * Abstract base class for Oio-Worker implementations * * @param {@link AbstractOioChannel} */ -abstract class AbstractOioWorker implements Runnable { +abstract class AbstractOioWorker implements Worker { + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + protected final C channel; public AbstractOioWorker(C channel) { this.channel = channel; + channel.worker = this; } @Override @@ -60,9 +63,13 @@ abstract class AbstractOioWorker implements Runnab } try { - if (!process()) { - break; - } + boolean cont = process(); + + processEventQueue(); + + if (!cont) { + break; + } } catch (Throwable t) { if (!channel.isSocketClosed()) { fireExceptionCaught(channel, t); @@ -76,9 +83,48 @@ abstract class AbstractOioWorker implements Runnab channel.workerThread = null; // Clean up. - close(channel, succeededFuture(channel)); + close(channel, succeededFuture(channel), true); } + static boolean isIoThread(AbstractOioChannel channel) { + return Thread.currentThread() == channel.workerThread; + } + + @Override + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest + + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + } + + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + + /** * Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message * was processed without errors. @@ -90,7 +136,8 @@ abstract class AbstractOioWorker implements Runnab static void setInterestOps( AbstractOioChannel channel, ChannelFuture future, int interestOps) { - + boolean iothread = isIoThread(channel); + // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; interestOps |= channel.getInterestOps() & Channel.OP_WRITE; @@ -118,18 +165,30 @@ abstract class AbstractOioWorker implements Runnab workerThread.interrupt(); } } - - fireChannelInterestChanged(channel); + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } static void close(AbstractOioChannel channel, ChannelFuture future) { + close(channel, future, isIoThread(channel)); + } + + private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); + try { channel.closeSocket(); if (channel.setClosed()) { @@ -141,18 +200,34 @@ abstract class AbstractOioWorker implements Runnab if (workerThread != null && currentThread != workerThread) { workerThread.interrupt(); } - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } if (bound) { - fireChannelUnbound(channel); + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } + } + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); } - fireChannelClosed(channel); } else { future.setSuccess(); } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java index e5cf415a02..e607d3282e 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java @@ -21,7 +21,6 @@ import java.io.PushbackInputStream; import java.net.SocketAddress; import java.util.concurrent.Executor; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -31,7 +30,7 @@ import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; import io.netty.util.internal.DeadLockProofWorker; -class OioClientSocketPipelineSink extends AbstractChannelSink { +class OioClientSocketPipelineSink extends AbstractOioChannelSink { private final Executor workerExecutor; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java index 3cf3e6baf6..2b198080b5 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java @@ -20,7 +20,6 @@ import static io.netty.channel.Channels.*; import java.net.SocketAddress; import java.util.concurrent.Executor; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -30,7 +29,7 @@ import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; import io.netty.util.internal.DeadLockProofWorker; -class OioDatagramPipelineSink extends AbstractChannelSink { +class OioDatagramPipelineSink extends AbstractOioChannelSink { private final Executor workerExecutor; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java index f1b42b42f9..2581425616 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java @@ -63,6 +63,8 @@ class OioDatagramWorker extends AbstractOioWorker { static void write( OioDatagramChannel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { + boolean iothread = isIoThread(channel); + try { ChannelBuffer buf = (ChannelBuffer) message; int offset = buf.readerIndex(); @@ -84,27 +86,45 @@ class OioDatagramWorker extends AbstractOioWorker { packet.setSocketAddress(remoteAddress); } channel.socket.send(packet); - fireWriteComplete(channel, length); + if (iothread) { + fireWriteComplete(channel, length); + } else { + fireWriteCompleteLater(channel, length); + } future.setSuccess(); } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } static void disconnect(OioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); + boolean iothread = isIoThread(channel); + try { channel.socket.disconnect(); future.setSuccess(); if (connected) { // Notify. - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java index e8c4c3278f..5daad24afc 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java @@ -24,7 +24,6 @@ import java.net.SocketTimeoutException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -36,7 +35,7 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; -class OioServerSocketPipelineSink extends AbstractChannelSink { +class OioServerSocketPipelineSink extends AbstractOioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java index bb0f7148d9..5d4eb6aa3b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java @@ -65,11 +65,16 @@ class OioWorker extends AbstractOioWorker { OioSocketChannel channel, ChannelFuture future, Object message) { + boolean iothread = isIoThread(channel); OutputStream out = channel.getOutputStream(); if (out == null) { Exception e = new ClosedChannelException(); future.setFailure(e); - fireExceptionCaught(channel, e); + if (iothread) { + fireExceptionCaught(channel, e); + } else { + fireExceptionCaughtLater(channel, e); + } return; } @@ -106,7 +111,11 @@ class OioWorker extends AbstractOioWorker { } } - fireWriteComplete(channel, length); + if (iothread) { + fireWriteComplete(channel, length); + } else { + fireWriteCompleteLater(channel, length); + } future.setSuccess(); } catch (Throwable t) { @@ -118,7 +127,11 @@ class OioWorker extends AbstractOioWorker { t = new ClosedChannelException(); } future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } }