Support of sharing a WorkerPool between Factories. See #225

This commit is contained in:
norman 2012-03-20 10:28:23 +01:00
parent d1f2226799
commit 16e373cabd
18 changed files with 421 additions and 164 deletions

View File

@ -16,9 +16,6 @@
package org.jboss.netty.channel.socket; package org.jboss.netty.channel.socket;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
/** /**
* A {@link Worker} is responsible to dispatch IO operations * A {@link Worker} is responsible to dispatch IO operations
* *
@ -32,5 +29,5 @@ public interface Worker extends Runnable {
* @param task * @param task
* the {@link Runnable} to execute * the {@link Runnable} to execute
*/ */
ChannelFuture executeInIoThread(Channel channel, Runnable task); void executeInIoThread(Runnable task);
} }

View File

@ -21,6 +21,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
public abstract class AbstractNioChannelSink extends AbstractChannelSink { public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@ -29,8 +30,9 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
Channel ch = pipeline.getChannel(); Channel ch = pipeline.getChannel();
if (ch instanceof AbstractNioChannel<?>) { if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch; AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
return channel.worker.executeInIoThread(ch, task); channel.worker.executeInIoThread(wrapper);
return wrapper;
} }
return super.execute(pipeline, task); return super.execute(pipeline, task);

View File

@ -21,7 +21,6 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
@ -42,7 +41,6 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -116,20 +114,42 @@ abstract class AbstractNioWorker implements Worker {
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
private final boolean allowShutdownOnIdle;
AbstractNioWorker(Executor executor) { AbstractNioWorker(Executor executor) {
this(executor, true);
}
public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) {
this.executor = executor; this.executor = executor;
this.allowShutdownOnIdle = allowShutdownOnIdle;
} }
void register(AbstractNioChannel<?> channel, ChannelFuture future) { void register(AbstractNioChannel<?> channel, ChannelFuture future) {
Runnable registerTask = createRegisterTask(channel, future); Runnable registerTask = createRegisterTask(channel, future);
Selector selector; Selector selector = start();
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
/**
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered
*
* @return selector
*/
private Selector start() {
synchronized (startStopLock) { synchronized (startStopLock) {
if (!started) { if (!started) {
// Open a selector if this worker didn't start yet. // Open a selector if this worker didn't start yet.
try { try {
this.selector = selector = Selector.open(); this.selector = Selector.open();
} catch (Throwable t) { } catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t); throw new ChannelException("Failed to create a selector.", t);
} }
@ -147,25 +167,17 @@ abstract class AbstractNioWorker implements Worker {
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Failed to close a selector.", t); logger.warn("Failed to close a selector.", t);
} }
this.selector = selector = null; this.selector = null;
// The method will return to the caller at this point. // The method will return to the caller at this point.
} }
} }
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
} }
assert selector != null && selector.isOpen(); assert selector != null && selector.isOpen();
started = true; started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
} }
return selector;
} }
@ -251,9 +263,11 @@ abstract class AbstractNioWorker implements Worker {
} }
} }
} else { } else {
if (allowShutdownOnIdle) {
// Give one more second. // Give one more second.
shutdown = true; shutdown = true;
} }
}
} else { } else {
shutdown = false; shutdown = false;
} }
@ -272,32 +286,39 @@ abstract class AbstractNioWorker implements Worker {
} }
} }
public ChannelFuture executeInIoThread(Channel channel, Runnable task) { public void executeInIoThread(Runnable task) {
if (channel instanceof AbstractNioChannel<?> && isIoThread((AbstractNioChannel<?>) channel)) { executeInIoThread(task, false);
try {
task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
} }
} else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
boolean added = eventQueue.offer(channelRunnable);
/**
* Execute the {@link Runnable} in a IO-Thread
*
* @param task
* the {@link Runnable} to execute
* @param alwaysAsync
* <code>true</code> if the {@link Runnable} should be executed
* in an async fashion even if the current Thread == IO Thread
*/
public void executeInIoThread(Runnable task, boolean alwaysAsync) {
if (!alwaysAsync && Thread.currentThread() == thread) {
task.run();
} else {
start();
boolean added = eventQueue.offer(task);
assert added;
if (added) { if (added) {
// wake up the selector to speed things
// wake up the selector to speed things // wake up the selector to speed things
Selector selector = this.selector; Selector selector = this.selector;
if (selector != null) { if (selector != null) {
selector.wakeup(); selector.wakeup();
} }
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
}
return channelRunnable;
} }
} }
}
private void processRegisterTaskQueue() throws IOException { private void processRegisterTaskQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = registerTaskQueue.poll(); final Runnable task = registerTaskQueue.poll();

View File

@ -0,0 +1,83 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling
* {@link #nextWorker()}
*
*/
public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker> implements WorkerPool<E> , ExternalResourceReleasable {
private final AbstractNioWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
private final Executor workerExecutor;
/**
* Create a new instance
*
* @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
* @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it
* @param workerCount the count of {@link Worker}'s to create
*/
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
workers = new AbstractNioWorker[workerCount];
for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor, allowShutDownOnIdle);
}
this.workerExecutor = workerExecutor;
}
/**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO
*
*
* @param executor the {@link Executor} to use
* @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it
* @return worker the new {@link Worker}
*/
protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle);
@SuppressWarnings("unchecked")
public E nextWorker() {
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
}
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -25,6 +25,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
/** /**
@ -83,7 +84,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private static final int DEFAULT_BOSS_COUNT = 1; private static final int DEFAULT_BOSS_COUNT = 1;
private final Executor bossExecutor; private final Executor bossExecutor;
private final Executor workerExecutor; private final WorkerPool<NioWorker> workerPool;
private final NioClientSocketPipelineSink sink; private final NioClientSocketPipelineSink sink;
/** /**
@ -135,27 +136,30 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
public NioClientSocketChannelFactory( public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor, Executor bossExecutor, Executor workerExecutor,
int bossCount, int workerCount) { int bossCount, int workerCount) {
this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true));
}
public NioClientSocketChannelFactory(
Executor bossExecutor, int bossCount,
WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) { if (bossExecutor == null) {
throw new NullPointerException("bossExecutor"); throw new NullPointerException("bossExecutor");
} }
if (workerExecutor == null) { if (workerPool == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerPool");
} }
if (bossCount <= 0) { if (bossCount <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"bossCount (" + bossCount + ") " + "bossCount (" + bossCount + ") " +
"must be a positive integer."); "must be a positive integer.");
} }
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor; this.workerPool = workerPool;
sink = new NioClientSocketPipelineSink( sink = new NioClientSocketPipelineSink(
bossExecutor, workerExecutor, bossCount, workerCount); bossExecutor, bossCount, workerPool);
} }
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
@ -163,6 +167,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
} }
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor, workerExecutor); ExecutorUtil.terminate(bossExecutor);
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
} }
} }

View File

@ -41,7 +41,6 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.QueueFactory;
@ -49,33 +48,27 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
private static final AtomicInteger nextId = new AtomicInteger();
final int id = nextId.incrementAndGet();
final Executor bossExecutor; final Executor bossExecutor;
private final Boss[] bosses; private final Boss[] bosses;
private final NioWorker[] workers;
private final AtomicInteger bossIndex = new AtomicInteger(); private final AtomicInteger bossIndex = new AtomicInteger();
private final AtomicInteger workerIndex = new AtomicInteger();
private final WorkerPool<NioWorker> workerPool;
NioClientSocketPipelineSink( NioClientSocketPipelineSink(
Executor bossExecutor, Executor workerExecutor, Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
int bossCount, int workerCount) {
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
bosses = new Boss[bossCount]; bosses = new Boss[bossCount];
for (int i = 0; i < bosses.length; i ++) { for (int i = 0; i < bosses.length; i ++) {
bosses[i] = new Boss(i + 1); bosses[i] = new Boss();
} }
workers = new NioWorker[workerCount]; this.workerPool = workerPool;
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(workerExecutor);
} }
}
public void eventSunk( public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception { ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
@ -167,21 +160,18 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
} }
NioWorker nextWorker() { NioWorker nextWorker() {
return workers[Math.abs( return workerPool.nextWorker();
workerIndex.getAndIncrement() % workers.length)];
} }
private final class Boss implements Runnable { private final class Boss implements Runnable {
volatile Selector selector; volatile Selector selector;
private boolean started; private boolean started;
private final int subId;
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);; private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);;
Boss(int subId) { Boss() {
this.subId = subId;
} }
void register(NioClientSocketChannel channel) { void register(NioClientSocketChannel channel) {
@ -202,9 +192,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
boolean success = false; boolean success = false;
try { try {
DeadLockProofWorker.start( DeadLockProofWorker.start(
bossExecutor, bossExecutor, this);
new ThreadRenamingRunnable(
this, "New I/O client boss #" + id + '-' + subId));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -25,7 +25,7 @@ import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.ExternalResourceReleasable;
/** /**
* A {@link DatagramChannelFactory} that creates a NIO-based connectionless * A {@link DatagramChannelFactory} that creates a NIO-based connectionless
@ -76,8 +76,8 @@ import org.jboss.netty.util.internal.ExecutorUtil;
*/ */
public class NioDatagramChannelFactory implements DatagramChannelFactory { public class NioDatagramChannelFactory implements DatagramChannelFactory {
private final Executor workerExecutor;
private final NioDatagramPipelineSink sink; private final NioDatagramPipelineSink sink;
private final WorkerPool<NioDatagramWorker> workerPool;
/** /**
* Creates a new instance. Calling this constructor is same with calling * Creates a new instance. Calling this constructor is same with calling
@ -100,21 +100,19 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
* @param workerCount * @param workerCount
* the maximum number of I/O worker threads * the maximum number of I/O worker threads
*/ */
public NioDatagramChannelFactory(final Executor workerExecutor, public NioDatagramChannelFactory(final Executor workerExecutor, final int workerCount) {
final int workerCount) { this(new NioDatagramWorkerPool(workerExecutor, workerCount, true));
if (workerCount <= 0) {
throw new IllegalArgumentException(String
.format("workerCount (%s) must be a positive integer.",
workerCount));
} }
if (workerExecutor == null) { /**
throw new NullPointerException( * Creates a new instance.
"workerExecutor argument must not be null"); *
} * @param workerPool
this.workerExecutor = workerExecutor; * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/
sink = new NioDatagramPipelineSink(workerExecutor, workerCount); public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool) {
this.workerPool = workerPool;
sink = new NioDatagramPipelineSink(workerPool);
} }
public DatagramChannel newChannel(final ChannelPipeline pipeline) { public DatagramChannel newChannel(final ChannelPipeline pipeline) {
@ -122,6 +120,8 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
} }
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor); if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
} }
} }

View File

@ -20,7 +20,6 @@ import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
@ -36,8 +35,7 @@ import org.jboss.netty.channel.MessageEvent;
*/ */
class NioDatagramPipelineSink extends AbstractNioChannelSink { class NioDatagramPipelineSink extends AbstractNioChannelSink {
private final NioDatagramWorker[] workers; private final WorkerPool<NioDatagramWorker> workerPool;
private final AtomicInteger workerIndex = new AtomicInteger();
/** /**
* Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount. * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount.
@ -49,11 +47,8 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
* @param workerCount * @param workerCount
* the number of {@link NioDatagramWorker}s for this sink * the number of {@link NioDatagramWorker}s for this sink
*/ */
NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) { NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
workers = new NioDatagramWorker[workerCount]; this.workerPool = workerPool;
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioDatagramWorker(workerExecutor);
}
} }
/** /**
@ -190,7 +185,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
} }
NioDatagramWorker nextWorker() { NioDatagramWorker nextWorker() {
return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; return workerPool.nextWorker();
} }
} }

View File

@ -38,7 +38,7 @@ import java.util.concurrent.Executor;
* A class responsible for registering channels with {@link Selector}. * A class responsible for registering channels with {@link Selector}.
* It also implements the {@link Selector} loop. * It also implements the {@link Selector} loop.
*/ */
class NioDatagramWorker extends AbstractNioWorker { public class NioDatagramWorker extends AbstractNioWorker {
/** /**
* Sole constructor. * Sole constructor.
@ -50,6 +50,10 @@ class NioDatagramWorker extends AbstractNioWorker {
super(executor); super(executor);
} }
NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) {
super(executor, allowShutdownOnIdle);
}
@Override @Override
protected boolean read(final SelectionKey key) { protected boolean read(final SelectionKey key) {
final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioDatagramWorker}'s
*
*
*/
public class NioDatagramWorkerPool extends AbstractNioWorkerPool<NioDatagramWorker> {
public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioDatagramWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -26,6 +26,7 @@ import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.ServerSocketChannel; import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory; import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
/** /**
@ -85,7 +86,7 @@ import org.jboss.netty.util.internal.ExecutorUtil;
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor; final Executor bossExecutor;
private final Executor workerExecutor; private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink; private final ChannelSink sink;
/** /**
@ -117,20 +118,29 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
public NioServerSocketChannelFactory( public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor, Executor bossExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true));
}
/**
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/
public NioServerSocketChannelFactory(
Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) { if (bossExecutor == null) {
throw new NullPointerException("bossExecutor"); throw new NullPointerException("bossExecutor");
} }
if (workerExecutor == null) { if (workerPool == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerPool");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
} }
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor; this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(workerExecutor, workerCount); sink = new NioServerSocketPipelineSink(workerPool);
} }
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
@ -138,6 +148,9 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
} }
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor, workerExecutor); ExecutorUtil.terminate(bossExecutor);
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}
} }
} }

View File

@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
@ -38,25 +37,18 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
class NioServerSocketPipelineSink extends AbstractNioChannelSink { class NioServerSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
private static final AtomicInteger nextId = new AtomicInteger(); private final WorkerPool<NioWorker> workerPool;
private final int id = nextId.incrementAndGet(); NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
private final NioWorker[] workers; this.workerPool = workerPool;
private final AtomicInteger workerIndex = new AtomicInteger(); }
NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
workers = new NioWorker[workerCount];
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(workerExecutor);
}
}
public void eventSunk( public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception { ChannelPipeline pipeline, ChannelEvent e) throws Exception {
@ -145,10 +137,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
Executor bossExecutor = Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start( DeadLockProofWorker.start(
bossExecutor, bossExecutor, new Boss(channel));
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -195,8 +184,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
} }
NioWorker nextWorker() { NioWorker nextWorker() {
return workers[Math.abs( return workerPool.nextWorker();
workerIndex.getAndIncrement() % workers.length)];
} }
private final class Boss implements Runnable { private final class Boss implements Runnable {

View File

@ -35,14 +35,19 @@ import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
class NioWorker extends AbstractNioWorker { public class NioWorker extends AbstractNioWorker {
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
NioWorker(Executor executor) { public NioWorker(Executor executor) {
super(executor); super(executor);
} }
public NioWorker(Executor executor, boolean allowShutdownOnIdle) {
super(executor, allowShutdownOnIdle);
}
@Override @Override
protected boolean read(SelectionKey k) { protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel(); final SocketChannel ch = (SocketChannel) k.channel();

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
/**
* Default implementation which hands of {@link NioWorker}'s
*
*
*/
public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> {
public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
super(executor, workerCount, allowShutdownOnIdle);
}
@Override
protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
return new NioWorker(executor, allowShutdownOnIdle);
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable;
/**
* This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once
* you want to release any resources of it.
*
*
*/
public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E> {
private final WorkerPool<E> wrapped;
public ShareableWorkerPool(WorkerPool<E> wrapped) {
this.wrapped = wrapped;
}
public E nextWorker() {
return wrapped.nextWorker();
}
/**
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
*/
public void destroy() {
if (wrapped instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) wrapped).releaseExternalResources();
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
/**
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
*
*/
public interface WorkerPool<E extends Worker> {
/**
* Return the next {@link Worker} to use
*
* @return worker
*/
E nextWorker();
}

View File

@ -21,6 +21,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink { public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@ -32,7 +33,9 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink {
AbstractOioChannel channel = (AbstractOioChannel) ch; AbstractOioChannel channel = (AbstractOioChannel) ch;
Worker worker = channel.worker; Worker worker = channel.worker;
if (worker != null) { if (worker != null) {
return channel.worker.executeInIoThread(ch, task); ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper);
return wrapper;
} }
} }

View File

@ -19,13 +19,11 @@ import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.ChannelRunnableWrapper;
import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.QueueFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
/** /**
* Abstract base class for Oio-Worker implementations * Abstract base class for Oio-Worker implementations
@ -34,10 +32,16 @@ import java.util.concurrent.RejectedExecutionException;
*/ */
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker { abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
protected final C channel; protected final C channel;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
public AbstractOioWorker(C channel) { public AbstractOioWorker(C channel) {
this.channel = channel; this.channel = channel;
channel.worker = this; channel.worker = this;
@ -45,7 +49,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
public void run() { public void run() {
channel.workerThread = Thread.currentThread(); thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) { while (channel.isOpen()) {
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
@ -91,31 +95,21 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
} }
public ChannelFuture executeInIoThread(Channel channel, Runnable task) { public void executeInIoThread(Runnable task) {
if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { if (Thread.currentThread() == thread) {
try {
task.run(); task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
}
} else { } else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); boolean added = eventQueue.offer(task);
boolean added = eventQueue.offer(channelRunnable);
if (added) { if (added) {
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
} }
return channelRunnable;
} }
} }
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final ChannelRunnableWrapper task = eventQueue.poll(); final Runnable task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }