Allow to submit a Runnable that get execute in the io-thread. This is
also used to workout flaws in the thread-model. See #209 #140 #187
This commit is contained in:
parent
b6700fbe58
commit
4df3c61233
|
@ -22,9 +22,11 @@ import java.util.ConcurrentModificationException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
|
||||||
|
import io.netty.channel.Channels;
|
||||||
import io.netty.buffer.ChannelBufferFactory;
|
import io.netty.buffer.ChannelBufferFactory;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
@ -226,8 +228,13 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
|
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
|
||||||
handleEvent(e);
|
try {
|
||||||
|
task.run();
|
||||||
|
return Channels.succeededFuture(pipeline.getChannel());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return Channels.failedFuture(pipeline.getChannel(), t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,30 +18,20 @@ package io.netty.channel.sctp;
|
||||||
|
|
||||||
import io.netty.channel.AbstractChannelSink;
|
import io.netty.channel.AbstractChannelSink;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
|
||||||
public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
|
public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
|
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
|
||||||
Channel ch = e.getChannel();
|
Channel ch = pipeline.getChannel();
|
||||||
if (ch instanceof SctpChannelImpl) {
|
if (ch instanceof SctpChannelImpl) {
|
||||||
SctpChannelImpl channel = (SctpChannelImpl) ch;
|
SctpChannelImpl channel = (SctpChannelImpl) ch;
|
||||||
// check if the current thread is a worker thread, and only fire the event later if thats not the case
|
return channel.worker.executeInIoThread(channel, task);
|
||||||
if (channel.worker.thread != Thread.currentThread()) {
|
|
||||||
channel.worker.executeInIoThread(new Runnable() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
pipeline.sendUpstream(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
pipeline.sendUpstream(e);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
super.fireUpstreamEventLater(pipeline, e);
|
return super.execute(pipeline, task);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -45,6 +46,7 @@ import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.MessageEvent;
|
import io.netty.channel.MessageEvent;
|
||||||
import io.netty.channel.ReceiveBufferSizePredictor;
|
import io.netty.channel.ReceiveBufferSizePredictor;
|
||||||
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
|
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
|
||||||
|
import io.netty.channel.socket.ChannelRunnableWrapper;
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
import io.netty.logging.InternalLogger;
|
import io.netty.logging.InternalLogger;
|
||||||
import io.netty.logging.InternalLoggerFactory;
|
import io.netty.logging.InternalLoggerFactory;
|
||||||
|
@ -246,11 +248,31 @@ class SctpWorker implements Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void executeInIoThread(Runnable eventRunnable) {
|
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
|
||||||
assert eventQueue.offer(eventRunnable);
|
if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) {
|
||||||
|
try {
|
||||||
// wake up the selector to speed things
|
task.run();
|
||||||
selector.wakeup();
|
return succeededFuture(channel);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return failedFuture(channel, t);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
|
||||||
|
boolean added = eventQueue.offer(channelRunnable);
|
||||||
|
|
||||||
|
if (added) {
|
||||||
|
// wake up the selector to speed things
|
||||||
|
selector.wakeup();
|
||||||
|
} else {
|
||||||
|
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
|
||||||
|
}
|
||||||
|
return channelRunnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean isIoThread(SctpChannelImpl channel) {
|
||||||
|
return Thread.currentThread() == channel.worker.thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRegisterTaskQueue() throws IOException {
|
private void processRegisterTaskQueue() throws IOException {
|
||||||
|
|
|
@ -55,12 +55,17 @@ public abstract class AbstractChannelSink implements ChannelSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This implementation just send the event now via {@link ChannelPipeline#sendUpstream(ChannelEvent)}. Sub-classes should override this if they can handle it
|
* This implementation just directly call {@link Runnable#run()}. Sub-classes should override this if they can handle it
|
||||||
* in a better way
|
* in a better way
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
|
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
|
||||||
pipeline.sendUpstream(e);
|
try {
|
||||||
|
task.run();
|
||||||
|
return Channels.succeededFuture(pipeline.getChannel());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return Channels.failedFuture(pipeline.getChannel(), t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -443,14 +443,7 @@ public interface ChannelPipeline {
|
||||||
void sendUpstream(ChannelEvent e);
|
void sendUpstream(ChannelEvent e);
|
||||||
|
|
||||||
|
|
||||||
/**
|
ChannelFuture execute(Runnable task);
|
||||||
* Sends the specified {@link ChannelEvent} to the first
|
|
||||||
* {@link ChannelUpstreamHandler} in this pipeline when the next IO-Worker operation is performed.
|
|
||||||
*
|
|
||||||
* @throws NullPointerException
|
|
||||||
* if the specified event is {@code null}
|
|
||||||
*/
|
|
||||||
void sendUpstreamLater(ChannelEvent e);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the specified {@link ChannelEvent} to the last
|
* Sends the specified {@link ChannelEvent} to the last
|
||||||
|
|
|
@ -39,7 +39,7 @@ public interface ChannelSink {
|
||||||
void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;
|
void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule the given {@link ChannelEvent} for later execution (in the io-thread). Some implementation may not support his and just fire it directly
|
* Execute the given {@link Runnable} later in the io-thread. Some implementation may not support his and just execute it directly
|
||||||
*/
|
*/
|
||||||
void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception;
|
ChannelFuture execute(ChannelPipeline pipeline, Runnable task);
|
||||||
}
|
}
|
||||||
|
|
|
@ -303,13 +303,14 @@ public final class Channels {
|
||||||
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
||||||
* the specified {@link Channel} in the next io-thread.
|
* the specified {@link Channel} in the next io-thread.
|
||||||
*/
|
*/
|
||||||
public static void fireWriteCompleteLater(Channel channel, long amount) {
|
public static ChannelFuture fireWriteCompleteLater(final Channel channel, final long amount) {
|
||||||
if (amount == 0) {
|
return channel.getPipeline().execute(new Runnable() {
|
||||||
return;
|
@Override
|
||||||
}
|
public void run() {
|
||||||
|
fireWriteComplete(channel, amount);
|
||||||
channel.getPipeline().sendUpstreamLater(
|
}
|
||||||
new DefaultWriteCompletionEvent(channel, amount));
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -344,10 +345,15 @@ public final class Channels {
|
||||||
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
||||||
* the specified {@link Channel} once the io-thread runs again.
|
* the specified {@link Channel} once the io-thread runs again.
|
||||||
*/
|
*/
|
||||||
public static void fireChannelInterestChangedLater(Channel channel) {
|
public static ChannelFuture fireChannelInterestChangedLater(final Channel channel) {
|
||||||
channel.getPipeline().sendUpstreamLater(
|
return channel.getPipeline().execute(new Runnable() {
|
||||||
new UpstreamChannelStateEvent(
|
|
||||||
channel, ChannelState.INTEREST_OPS, Channel.OP_READ));
|
@Override
|
||||||
|
public void run() {
|
||||||
|
fireChannelInterestChanged(channel);
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -380,10 +386,14 @@ public final class Channels {
|
||||||
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
||||||
* the specified {@link Channel} once the io-thread runs again.
|
* the specified {@link Channel} once the io-thread runs again.
|
||||||
*/
|
*/
|
||||||
public static void fireChannelDisconnectedLater(Channel channel) {
|
public static ChannelFuture fireChannelDisconnectedLater(final Channel channel) {
|
||||||
channel.getPipeline().sendUpstreamLater(
|
return channel.getPipeline().execute(new Runnable() {
|
||||||
new UpstreamChannelStateEvent(
|
|
||||||
channel, ChannelState.CONNECTED, null));
|
@Override
|
||||||
|
public void run() {
|
||||||
|
fireChannelDisconnected(channel);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -415,9 +425,14 @@ public final class Channels {
|
||||||
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
||||||
* the specified {@link Channel} once the io-thread runs again.
|
* the specified {@link Channel} once the io-thread runs again.
|
||||||
*/
|
*/
|
||||||
public static void fireChannelUnboundLater(Channel channel) {
|
public static ChannelFuture fireChannelUnboundLater(final Channel channel) {
|
||||||
channel.getPipeline().sendUpstreamLater(new UpstreamChannelStateEvent(
|
return channel.getPipeline().execute(new Runnable() {
|
||||||
channel, ChannelState.BOUND, null));
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
fireChannelUnbound(channel);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -449,15 +464,15 @@ public final class Channels {
|
||||||
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
||||||
* the specified {@link Channel} once the io-thread runs again.
|
* the specified {@link Channel} once the io-thread runs again.
|
||||||
*/
|
*/
|
||||||
public static void fireChannelClosedLater(Channel channel) {
|
public static ChannelFuture fireChannelClosedLater(final Channel channel) {
|
||||||
channel.getPipeline().sendUpstream(
|
return channel.getPipeline().execute(new Runnable() {
|
||||||
new UpstreamChannelStateEvent(
|
|
||||||
channel, ChannelState.OPEN, Boolean.FALSE));
|
@Override
|
||||||
|
public void run() {
|
||||||
// Notify the parent handler.
|
fireChannelClosed(channel);
|
||||||
if (channel.getParent() != null) {
|
}
|
||||||
fireChildChannelStateChangedLater(channel.getParent(), channel);
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -495,9 +510,14 @@ public final class Channels {
|
||||||
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
|
||||||
* the specified {@link Channel} once the io-thread runs again.
|
* the specified {@link Channel} once the io-thread runs again.
|
||||||
*/
|
*/
|
||||||
public static void fireExceptionCaughtLater(Channel channel, Throwable cause) {
|
public static ChannelFuture fireExceptionCaughtLater(final Channel channel, final Throwable cause) {
|
||||||
channel.getPipeline().sendUpstreamLater(
|
return channel.getPipeline().execute(new Runnable() {
|
||||||
new DefaultExceptionEvent(channel, cause));
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
fireExceptionCaught(channel, cause);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -527,13 +547,7 @@ public final class Channels {
|
||||||
new DefaultChildChannelStateEvent(channel, childChannel));
|
new DefaultChildChannelStateEvent(channel, childChannel));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void fireChildChannelStateChangedLater(
|
|
||||||
Channel channel, Channel childChannel) {
|
|
||||||
channel.getPipeline().sendUpstreamLater(
|
|
||||||
new DefaultChildChannelStateEvent(channel, childChannel));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a {@code "bind"} request to the last
|
* Sends a {@code "bind"} request to the last
|
||||||
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
|
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
import io.netty.logging.InternalLogger;
|
import io.netty.logging.InternalLogger;
|
||||||
import io.netty.logging.InternalLoggerFactory;
|
import io.netty.logging.InternalLoggerFactory;
|
||||||
|
@ -584,12 +585,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendUpstreamLater(ChannelEvent e) {
|
public ChannelFuture execute(Runnable task) {
|
||||||
try {
|
return getSink().execute(this, task);
|
||||||
getSink().fireUpstreamEventLater(this, e);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
notifyHandlerException(e, t);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -843,10 +840,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
|
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
|
||||||
if (logger.isWarnEnabled()) {
|
if (logger.isWarnEnabled()) {
|
||||||
logger.warn("Not attached yet; discarding: " + e);
|
logger.warn("Not attached yet; rejecting: " + task);
|
||||||
}
|
}
|
||||||
|
return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.socket;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.DefaultChannelFuture;
|
||||||
|
|
||||||
|
public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable {
|
||||||
|
|
||||||
|
private Runnable task;
|
||||||
|
|
||||||
|
public ChannelRunnableWrapper(Channel channel, Runnable task) {
|
||||||
|
super(channel, true);
|
||||||
|
this.task = task;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
setSuccess();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
setFailure(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -16,6 +16,9 @@
|
||||||
|
|
||||||
package io.netty.channel.socket;
|
package io.netty.channel.socket;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link Worker} is responsible to dispatch IO operations
|
* A {@link Worker} is responsible to dispatch IO operations
|
||||||
*
|
*
|
||||||
|
@ -27,5 +30,5 @@ public interface Worker extends Runnable {
|
||||||
*
|
*
|
||||||
* @param task the {@link Runnable} to execute
|
* @param task the {@link Runnable} to execute
|
||||||
*/
|
*/
|
||||||
void executeInIoThread(Runnable task);
|
ChannelFuture executeInIoThread(Channel channel, Runnable task);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,30 +19,21 @@ package io.netty.channel.socket.nio;
|
||||||
import io.netty.channel.AbstractChannelSink;
|
import io.netty.channel.AbstractChannelSink;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
|
||||||
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
|
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
|
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
|
||||||
Channel ch = e.getChannel();
|
Channel ch = pipeline.getChannel();
|
||||||
if (ch instanceof AbstractNioChannel<?>) {
|
if (ch instanceof AbstractNioChannel<?>) {
|
||||||
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
|
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
|
||||||
// check if the current thread is a worker thread if so we can send the event now
|
|
||||||
if (!AbstractNioWorker.isIoThread(channel)) {
|
return channel.worker.executeInIoThread(ch, task);
|
||||||
channel.worker.executeInIoThread(new Runnable() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
pipeline.sendUpstream(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
pipeline.sendUpstream(e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
super.fireUpstreamEventLater(pipeline, e);
|
|
||||||
}
|
}
|
||||||
|
return super.execute(pipeline, task);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.MessageEvent;
|
import io.netty.channel.MessageEvent;
|
||||||
|
import io.netty.channel.socket.ChannelRunnableWrapper;
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
||||||
import io.netty.logging.InternalLogger;
|
import io.netty.logging.InternalLogger;
|
||||||
|
@ -41,6 +42,7 @@ import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -272,13 +274,28 @@ abstract class AbstractNioWorker implements Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void executeInIoThread(Runnable eventRunnable) {
|
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
|
||||||
boolean added = eventQueue.offer(eventRunnable);
|
if (channel instanceof AbstractNioChannel<?> && isIoThread((AbstractNioChannel<?>) channel)) {
|
||||||
|
try {
|
||||||
assert added;
|
task.run();
|
||||||
|
return succeededFuture(channel);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return failedFuture(channel, t);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
|
||||||
|
boolean added = eventQueue.offer(channelRunnable);
|
||||||
|
|
||||||
|
if (added) {
|
||||||
|
// wake up the selector to speed things
|
||||||
|
selector.wakeup();
|
||||||
|
} else {
|
||||||
|
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
|
||||||
|
}
|
||||||
|
return channelRunnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// wake up the selector to speed things
|
|
||||||
selector.wakeup();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRegisterTaskQueue() throws IOException {
|
private void processRegisterTaskQueue() throws IOException {
|
||||||
|
|
|
@ -19,33 +19,25 @@ package io.netty.channel.socket.oio;
|
||||||
import io.netty.channel.AbstractChannelSink;
|
import io.netty.channel.AbstractChannelSink;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
|
|
||||||
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
|
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
|
public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) {
|
||||||
Channel ch = e.getChannel();
|
Channel ch = pipeline.getChannel();
|
||||||
if (ch instanceof AbstractOioChannel) {
|
if (ch instanceof AbstractOioChannel) {
|
||||||
AbstractOioChannel channel = (AbstractOioChannel) ch;
|
AbstractOioChannel channel = (AbstractOioChannel) ch;
|
||||||
Worker worker = channel.worker;
|
Worker worker = channel.worker;
|
||||||
if (worker != null && !AbstractOioWorker.isIoThead(channel)) {
|
if (worker != null) {
|
||||||
channel.worker.executeInIoThread(new Runnable() {
|
return channel.worker.executeInIoThread(ch, task);
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
pipeline.sendUpstream(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// no worker thread yet or the current thread is a worker thread so just fire the event now
|
|
||||||
pipeline.sendUpstream(e);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
|
||||||
super.fireUpstreamEventLater(pipeline, e);
|
return super.execute(pipeline, task);
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +46,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink {
|
||||||
Channel channel = event.getChannel();
|
Channel channel = event.getChannel();
|
||||||
boolean fireLater = false;
|
boolean fireLater = false;
|
||||||
if (channel instanceof AbstractOioChannel) {
|
if (channel instanceof AbstractOioChannel) {
|
||||||
fireLater = !AbstractOioWorker.isIoThead((AbstractOioChannel) channel);
|
fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel);
|
||||||
}
|
}
|
||||||
return fireLater;
|
return fireLater;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,13 @@ import static io.netty.channel.Channels.*;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.Channels;
|
import io.netty.channel.Channels;
|
||||||
|
import io.netty.channel.socket.ChannelRunnableWrapper;
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
import io.netty.util.internal.QueueFactory;
|
import io.netty.util.internal.QueueFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract base class for Oio-Worker implementations
|
* Abstract base class for Oio-Worker implementations
|
||||||
|
@ -84,16 +86,31 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
|
||||||
close(channel, succeededFuture(channel), true);
|
close(channel, succeededFuture(channel), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isIoThead(AbstractOioChannel channel) {
|
static boolean isIoThread(AbstractOioChannel channel) {
|
||||||
return Thread.currentThread() == channel.workerThread;
|
return Thread.currentThread() == channel.workerThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void executeInIoThread(Runnable eventRunnable) {
|
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
|
||||||
boolean added = eventQueue.offer(eventRunnable);
|
if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) {
|
||||||
|
try {
|
||||||
assert added;
|
task.run();
|
||||||
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
|
return succeededFuture(channel);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return failedFuture(channel, t);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
|
||||||
|
boolean added = eventQueue.offer(channelRunnable);
|
||||||
|
|
||||||
|
if (added) {
|
||||||
|
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
|
||||||
|
|
||||||
|
} else {
|
||||||
|
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
|
||||||
|
}
|
||||||
|
return channelRunnable;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processEventQueue() throws IOException {
|
private void processEventQueue() throws IOException {
|
||||||
|
@ -119,7 +136,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
|
||||||
|
|
||||||
static void setInterestOps(
|
static void setInterestOps(
|
||||||
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
|
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
|
||||||
boolean iothread = isIoThead(channel);
|
boolean iothread = isIoThread(channel);
|
||||||
|
|
||||||
// Override OP_WRITE flag - a user cannot change this flag.
|
// Override OP_WRITE flag - a user cannot change this flag.
|
||||||
interestOps &= ~Channel.OP_WRITE;
|
interestOps &= ~Channel.OP_WRITE;
|
||||||
|
@ -165,7 +182,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
static void close(AbstractOioChannel channel, ChannelFuture future) {
|
static void close(AbstractOioChannel channel, ChannelFuture future) {
|
||||||
close(channel, future, isIoThead(channel));
|
close(channel, future, isIoThread(channel));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
|
private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) {
|
||||||
|
|
|
@ -63,7 +63,7 @@ class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
|
||||||
static void write(
|
static void write(
|
||||||
OioDatagramChannel channel, ChannelFuture future,
|
OioDatagramChannel channel, ChannelFuture future,
|
||||||
Object message, SocketAddress remoteAddress) {
|
Object message, SocketAddress remoteAddress) {
|
||||||
boolean iothread = isIoThead(channel);
|
boolean iothread = isIoThread(channel);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ChannelBuffer buf = (ChannelBuffer) message;
|
ChannelBuffer buf = (ChannelBuffer) message;
|
||||||
|
@ -105,7 +105,7 @@ class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
|
||||||
|
|
||||||
static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
|
static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
|
||||||
boolean connected = channel.isConnected();
|
boolean connected = channel.isConnected();
|
||||||
boolean iothread = isIoThead(channel);
|
boolean iothread = isIoThread(channel);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
channel.socket.disconnect();
|
channel.socket.disconnect();
|
||||||
|
|
|
@ -65,7 +65,7 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
|
||||||
OioSocketChannel channel, ChannelFuture future,
|
OioSocketChannel channel, ChannelFuture future,
|
||||||
Object message) {
|
Object message) {
|
||||||
|
|
||||||
boolean iothread = isIoThead(channel);
|
boolean iothread = isIoThread(channel);
|
||||||
OutputStream out = channel.getOutputStream();
|
OutputStream out = channel.getOutputStream();
|
||||||
if (out == null) {
|
if (out == null) {
|
||||||
Exception e = new ClosedChannelException();
|
Exception e = new ClosedChannelException();
|
||||||
|
|
Loading…
Reference in New Issue
Block a user