Change Worker.executeInIoThread() to not need a Channel as paramater

This commit is contained in:
norman 2012-03-07 14:13:48 +01:00
parent e8c64ea593
commit 6375b84c9d
7 changed files with 61 additions and 74 deletions

View File

@ -20,6 +20,7 @@ import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
public abstract class AbstractSctpChannelSink extends AbstractChannelSink { public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
@ -28,7 +29,9 @@ public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
Channel ch = pipeline.getChannel(); Channel ch = pipeline.getChannel();
if (ch instanceof SctpChannelImpl) { if (ch instanceof SctpChannelImpl) {
SctpChannelImpl channel = (SctpChannelImpl) ch; SctpChannelImpl channel = (SctpChannelImpl) ch;
return channel.worker.executeInIoThread(channel, task); ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task);
channel.worker.executeInIoThread(task);
return wrapper;
} else { } else {
return super.execute(pipeline, task); return super.execute(pipeline, task);

View File

@ -15,7 +15,29 @@
*/ */
package io.netty.channel.sctp; package io.netty.channel.sctp;
import static io.netty.channel.Channels.*; import static io.netty.channel.Channels.fireChannelBound;
import static io.netty.channel.Channels.fireChannelClosed;
import static io.netty.channel.Channels.fireChannelConnected;
import static io.netty.channel.Channels.fireChannelDisconnected;
import static io.netty.channel.Channels.fireChannelInterestChanged;
import static io.netty.channel.Channels.fireChannelUnbound;
import static io.netty.channel.Channels.fireExceptionCaught;
import static io.netty.channel.Channels.fireMessageReceived;
import static io.netty.channel.Channels.fireWriteComplete;
import static io.netty.channel.Channels.succeededFuture;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.MessageEvent;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
import io.netty.channel.socket.Worker;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -31,28 +53,12 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.MessageInfo;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.MessageEvent;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory;
/** /**
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -248,25 +254,17 @@ class SctpWorker implements Worker {
} }
@Override @Override
public ChannelFuture executeInIoThread(Channel channel, Runnable task) { public void executeInIoThread(Runnable task) {
if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) { if (Thread.currentThread() == thread) {
try { task.run();
task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
}
} else { } else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); boolean added = eventQueue.offer(task);
boolean added = eventQueue.offer(channelRunnable);
if (added) { if (added) {
// wake up the selector to speed things // wake up the selector to speed things
selector.wakeup(); selector.wakeup();
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
} }
return channelRunnable;
} }
} }

View File

@ -16,9 +16,6 @@
package io.netty.channel.socket; package io.netty.channel.socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
/** /**
* A {@link Worker} is responsible to dispatch IO operations * A {@link Worker} is responsible to dispatch IO operations
* *
@ -30,5 +27,5 @@ public interface Worker extends Runnable {
* *
* @param task the {@link Runnable} to execute * @param task the {@link Runnable} to execute
*/ */
ChannelFuture executeInIoThread(Channel channel, Runnable task); void executeInIoThread(Runnable task);
} }

View File

@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
public abstract class AbstractNioChannelSink extends AbstractChannelSink { public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@ -29,8 +30,9 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
Channel ch = pipeline.getChannel(); Channel ch = pipeline.getChannel();
if (ch instanceof AbstractNioChannel<?>) { if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch; AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
return channel.worker.executeInIoThread(ch, task); channel.worker.executeInIoThread(wrapper);
return wrapper;
} }
return super.execute(pipeline, task); return super.execute(pipeline, task);

View File

@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker; import io.netty.channel.socket.Worker;
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
@ -42,7 +41,6 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -282,28 +280,20 @@ abstract class AbstractNioWorker implements Worker {
} }
@Override @Override
public ChannelFuture executeInIoThread(Channel channel, Runnable task) { public void executeInIoThread(Runnable task) {
if (channel instanceof AbstractNioChannel<?> && isIoThread((AbstractNioChannel<?>) channel)) { if (Thread.currentThread() == thread) {
try { task.run();
task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
}
} else { } else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); boolean added = eventQueue.offer(task);
boolean added = eventQueue.offer(channelRunnable);
assert added;
if (added) { if (added) {
// wake up the selector to speed things // wake up the selector to speed things
Selector selector = this.selector; Selector selector = this.selector;
if (selector != null) { if (selector != null) {
selector.wakeup(); selector.wakeup();
} }
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
} }
return channelRunnable;
} }

View File

@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker; import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink { public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@ -32,7 +33,9 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink {
AbstractOioChannel channel = (AbstractOioChannel) ch; AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker; Worker worker = channel.worker;
if (worker != null) { if (worker != null) {
return channel.worker.executeInIoThread(ch, task); ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
} }
} }

View File

@ -19,13 +19,11 @@ import static io.netty.channel.Channels.*;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels; import io.netty.channel.Channels;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker; import io.netty.channel.socket.Worker;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
/** /**
* Abstract base class for Oio-Worker implementations * Abstract base class for Oio-Worker implementations
@ -34,10 +32,16 @@ import java.util.concurrent.RejectedExecutionException;
*/ */
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker { abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel; protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
public AbstractOioWorker(C channel) { public AbstractOioWorker(C channel) {
this.channel = channel; this.channel = channel;
channel.worker = this; channel.worker = this;
@ -45,7 +49,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
@Override @Override
public void run() { public void run() {
channel.workerThread = Thread.currentThread(); thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) { while (channel.isOpen()) {
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
@ -91,31 +95,21 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
} }
@Override @Override
public ChannelFuture executeInIoThread(Channel channel, Runnable task) { public void executeInIoThread(Runnable task) {
if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { if (Thread.currentThread() == thread) {
try { task.run();
task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
}
} else { } else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); boolean added = eventQueue.offer(task);
boolean added = eventQueue.offer(channelRunnable);
if (added) { if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
}
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
}
return channelRunnable;
} }
} }
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final ChannelRunnableWrapper task = eventQueue.poll(); final Runnable task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }