diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index b0796cd994..2521682ca7 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -22,9 +22,11 @@ import java.util.ConcurrentModificationException; import java.util.LinkedList; import java.util.Queue; +import io.netty.channel.Channels; import io.netty.buffer.ChannelBufferFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -226,8 +228,13 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { } @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - handleEvent(e); + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + try { + task.run(); + return Channels.succeededFuture(pipeline.getChannel()); + } catch (Throwable t) { + return Channels.failedFuture(pipeline.getChannel(), t); + } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java index fcc0c9f1aa..f01321c2ce 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -18,30 +18,20 @@ package io.netty.channel.sctp; import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; public abstract class AbstractSctpChannelSink extends AbstractChannelSink { @Override - public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - Channel ch = e.getChannel(); + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; - // check if the current thread is a worker thread, and only fire the event later if thats not the case - if (channel.worker.thread != Thread.currentThread()) { - channel.worker.executeInIoThread(new Runnable() { - - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); - } else { - pipeline.sendUpstream(e); - } + return channel.worker.executeInIoThread(channel, task); + } else { - super.fireUpstreamEventLater(pipeline, e); + return super.execute(pipeline, task); } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 1cc3a764ac..1912f1ae46 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -31,6 +31,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -45,6 +46,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -246,11 +248,31 @@ class SctpWorker implements Worker { } @Override - public void executeInIoThread(Runnable eventRunnable) { - assert eventQueue.offer(eventRunnable); - - // wake up the selector to speed things - selector.wakeup(); + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // wake up the selector to speed things + selector.wakeup(); + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + + } + + static boolean isIoThread(SctpChannelImpl channel) { + return Thread.currentThread() == channel.worker.thread; } private void processRegisterTaskQueue() throws IOException { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java index 69b199aeb7..805995ac72 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java @@ -55,12 +55,17 @@ public abstract class AbstractChannelSink implements ChannelSink { } /** - * This implementation just send the event now via {@link ChannelPipeline#sendUpstream(ChannelEvent)}. Sub-classes should override this if they can handle it + * This implementation just directly call {@link Runnable#run()}. Sub-classes should override this if they can handle it * in a better way */ @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - pipeline.sendUpstream(e); + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + try { + task.run(); + return Channels.succeededFuture(pipeline.getChannel()); + } catch (Throwable t) { + return Channels.failedFuture(pipeline.getChannel(), t); + } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 6a74e5033c..4620e83fbd 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -443,14 +443,7 @@ public interface ChannelPipeline { void sendUpstream(ChannelEvent e); - /** - * Sends the specified {@link ChannelEvent} to the first - * {@link ChannelUpstreamHandler} in this pipeline when the next IO-Worker operation is performed. - * - * @throws NullPointerException - * if the specified event is {@code null} - */ - void sendUpstreamLater(ChannelEvent e); + ChannelFuture execute(Runnable task); /** * Sends the specified {@link ChannelEvent} to the last diff --git a/transport/src/main/java/io/netty/channel/ChannelSink.java b/transport/src/main/java/io/netty/channel/ChannelSink.java index 1a086ef982..e0e2650211 100644 --- a/transport/src/main/java/io/netty/channel/ChannelSink.java +++ b/transport/src/main/java/io/netty/channel/ChannelSink.java @@ -39,7 +39,7 @@ public interface ChannelSink { void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; /** - * Schedule the given {@link ChannelEvent} for later execution (in the io-thread). Some implementation may not support his and just fire it directly + * Execute the given {@link Runnable} later in the io-thread. Some implementation may not support his and just execute it directly */ - void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; + ChannelFuture execute(ChannelPipeline pipeline, Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index c24aa19399..c5aa9ce93e 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -303,13 +303,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} in the next io-thread. */ - public static void fireWriteCompleteLater(Channel channel, long amount) { - if (amount == 0) { - return; - } - - channel.getPipeline().sendUpstreamLater( - new DefaultWriteCompletionEvent(channel, amount)); + public static ChannelFuture fireWriteCompleteLater(final Channel channel, final long amount) { + return channel.getPipeline().execute(new Runnable() { + @Override + public void run() { + fireWriteComplete(channel, amount); + } + }); + } @@ -344,10 +345,15 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelInterestChangedLater(Channel channel) { - channel.getPipeline().sendUpstreamLater( - new UpstreamChannelStateEvent( - channel, ChannelState.INTEREST_OPS, Channel.OP_READ)); + public static ChannelFuture fireChannelInterestChangedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelInterestChanged(channel); + + } + }); } /** @@ -380,10 +386,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelDisconnectedLater(Channel channel) { - channel.getPipeline().sendUpstreamLater( - new UpstreamChannelStateEvent( - channel, ChannelState.CONNECTED, null)); + public static ChannelFuture fireChannelDisconnectedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelDisconnected(channel); + } + }); } /** @@ -415,9 +425,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelUnboundLater(Channel channel) { - channel.getPipeline().sendUpstreamLater(new UpstreamChannelStateEvent( - channel, ChannelState.BOUND, null)); + public static ChannelFuture fireChannelUnboundLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelUnbound(channel); + } + }); } /** @@ -449,15 +464,15 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelClosedLater(Channel channel) { - channel.getPipeline().sendUpstream( - new UpstreamChannelStateEvent( - channel, ChannelState.OPEN, Boolean.FALSE)); - - // Notify the parent handler. - if (channel.getParent() != null) { - fireChildChannelStateChangedLater(channel.getParent(), channel); - } + public static ChannelFuture fireChannelClosedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelClosed(channel); + } + }); + } /** @@ -495,9 +510,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireExceptionCaughtLater(Channel channel, Throwable cause) { - channel.getPipeline().sendUpstreamLater( - new DefaultExceptionEvent(channel, cause)); + public static ChannelFuture fireExceptionCaughtLater(final Channel channel, final Throwable cause) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireExceptionCaught(channel, cause); + } + }); } @@ -527,13 +547,7 @@ public final class Channels { new DefaultChildChannelStateEvent(channel, childChannel)); } - private static void fireChildChannelStateChangedLater( - Channel channel, Channel childChannel) { - channel.getPipeline().sendUpstreamLater( - new DefaultChildChannelStateEvent(channel, childChannel)); - } - /** * Sends a {@code "bind"} request to the last * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 4e4499e814..de75705212 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -21,6 +21,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.RejectedExecutionException; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -584,12 +585,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void sendUpstreamLater(ChannelEvent e) { - try { - getSink().fireUpstreamEventLater(this, e); - } catch (Throwable t) { - notifyHandlerException(e, t); - } + public ChannelFuture execute(Runnable task) { + return getSink().execute(this, task); } @Override @@ -843,10 +840,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { if (logger.isWarnEnabled()) { - logger.warn("Not attached yet; discarding: " + e); + logger.warn("Not attached yet; rejecting: " + task); } + return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet")); } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java new file mode 100644 index 0000000000..a8110e7db8 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket; + +import io.netty.channel.Channel; +import io.netty.channel.DefaultChannelFuture; + +public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { + + private Runnable task; + + public ChannelRunnableWrapper(Channel channel, Runnable task) { + super(channel, true); + this.task = task; + } + + @Override + public void run() { + try { + task.run(); + setSuccess(); + } catch (Throwable t) { + setFailure(t); + } + } + + + +} diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index 64dc433038..271897881a 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -16,6 +16,9 @@ package io.netty.channel.socket; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + /** * A {@link Worker} is responsible to dispatch IO operations * @@ -27,5 +30,5 @@ public interface Worker extends Runnable { * * @param task the {@link Runnable} to execute */ - void executeInIoThread(Runnable task); + ChannelFuture executeInIoThread(Channel channel, Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index bf034eca3a..1a29f5ef19 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -19,30 +19,21 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; public abstract class AbstractNioChannelSink extends AbstractChannelSink { @Override - public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - Channel ch = e.getChannel(); + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; - // check if the current thread is a worker thread if so we can send the event now - if (!AbstractNioWorker.isIoThread(channel)) { - channel.worker.executeInIoThread(new Runnable() { - - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); - } else { - pipeline.sendUpstream(e); - } - } else { - super.fireUpstreamEventLater(pipeline, e); + + return channel.worker.executeInIoThread(ch, task); } + return super.execute(pipeline, task); + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index a5a109d9c1..85bc12c61a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; @@ -41,6 +42,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -272,13 +274,28 @@ abstract class AbstractNioWorker implements Worker { } @Override - public void executeInIoThread(Runnable eventRunnable) { - boolean added = eventQueue.offer(eventRunnable); - - assert added; + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof AbstractNioChannel && isIoThread((AbstractNioChannel) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // wake up the selector to speed things + selector.wakeup(); + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + - // wake up the selector to speed things - selector.wakeup(); } private void processRegisterTaskQueue() throws IOException { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 485e5cb452..d57c198534 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -19,33 +19,25 @@ package io.netty.channel.socket.oio; import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink { @Override - public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - Channel ch = e.getChannel(); + public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); if (ch instanceof AbstractOioChannel) { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; - if (worker != null && !AbstractOioWorker.isIoThead(channel)) { - channel.worker.executeInIoThread(new Runnable() { - - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); - } else { - // no worker thread yet or the current thread is a worker thread so just fire the event now - pipeline.sendUpstream(e); + if (worker != null) { + return channel.worker.executeInIoThread(ch, task); } - - } else { - super.fireUpstreamEventLater(pipeline, e); - } + } + + return super.execute(pipeline, task); + } @@ -54,7 +46,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink { Channel channel = event.getChannel(); boolean fireLater = false; if (channel instanceof AbstractOioChannel) { - fireLater = !AbstractOioWorker.isIoThead((AbstractOioChannel) channel); + fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel); } return fireLater; } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index fb1b403894..a2176f32c6 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -19,11 +19,13 @@ import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.util.Queue; +import java.util.concurrent.RejectedExecutionException; /** * Abstract base class for Oio-Worker implementations @@ -84,16 +86,31 @@ abstract class AbstractOioWorker implements Worker close(channel, succeededFuture(channel), true); } - static boolean isIoThead(AbstractOioChannel channel) { + static boolean isIoThread(AbstractOioChannel channel) { return Thread.currentThread() == channel.workerThread; } @Override - public void executeInIoThread(Runnable eventRunnable) { - boolean added = eventQueue.offer(eventRunnable); - - assert added; - // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest + + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } } private void processEventQueue() throws IOException { @@ -119,7 +136,7 @@ abstract class AbstractOioWorker implements Worker static void setInterestOps( AbstractOioChannel channel, ChannelFuture future, int interestOps) { - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; @@ -165,7 +182,7 @@ abstract class AbstractOioWorker implements Worker } static void close(AbstractOioChannel channel, ChannelFuture future) { - close(channel, future, isIoThead(channel)); + close(channel, future, isIoThread(channel)); } private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java index 8ce169b90a..2581425616 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java @@ -63,7 +63,7 @@ class OioDatagramWorker extends AbstractOioWorker { static void write( OioDatagramChannel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); try { ChannelBuffer buf = (ChannelBuffer) message; @@ -105,7 +105,7 @@ class OioDatagramWorker extends AbstractOioWorker { static void disconnect(OioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); try { channel.socket.disconnect(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java index 180d756d36..5d4eb6aa3b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java @@ -65,7 +65,7 @@ class OioWorker extends AbstractOioWorker { OioSocketChannel channel, ChannelFuture future, Object message) { - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); OutputStream out = channel.getOutputStream(); if (out == null) { Exception e = new ClosedChannelException();