Merge pull request #225 from netty/workerpool
Support of sharing a WorkerPool between Factories
This commit is contained in:
commit
6e68577d54
|
@ -20,6 +20,7 @@ import io.netty.channel.AbstractChannelSink;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.ChannelRunnableWrapper;
|
||||||
|
|
||||||
public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
|
public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
|
||||||
|
|
||||||
|
@ -28,7 +29,9 @@ public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
|
||||||
Channel ch = pipeline.getChannel();
|
Channel ch = pipeline.getChannel();
|
||||||
if (ch instanceof SctpChannelImpl) {
|
if (ch instanceof SctpChannelImpl) {
|
||||||
SctpChannelImpl channel = (SctpChannelImpl) ch;
|
SctpChannelImpl channel = (SctpChannelImpl) ch;
|
||||||
return channel.worker.executeInIoThread(channel, task);
|
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task);
|
||||||
|
channel.worker.executeInIoThread(wrapper);
|
||||||
|
return wrapper;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return super.execute(pipeline, task);
|
return super.execute(pipeline, task);
|
||||||
|
|
|
@ -15,7 +15,29 @@
|
||||||
*/
|
*/
|
||||||
package io.netty.channel.sctp;
|
package io.netty.channel.sctp;
|
||||||
|
|
||||||
import static io.netty.channel.Channels.*;
|
import static io.netty.channel.Channels.fireChannelBound;
|
||||||
|
import static io.netty.channel.Channels.fireChannelClosed;
|
||||||
|
import static io.netty.channel.Channels.fireChannelConnected;
|
||||||
|
import static io.netty.channel.Channels.fireChannelDisconnected;
|
||||||
|
import static io.netty.channel.Channels.fireChannelInterestChanged;
|
||||||
|
import static io.netty.channel.Channels.fireChannelUnbound;
|
||||||
|
import static io.netty.channel.Channels.fireExceptionCaught;
|
||||||
|
import static io.netty.channel.Channels.fireMessageReceived;
|
||||||
|
import static io.netty.channel.Channels.fireWriteComplete;
|
||||||
|
import static io.netty.channel.Channels.succeededFuture;
|
||||||
|
import io.netty.buffer.ChannelBuffer;
|
||||||
|
import io.netty.buffer.ChannelBufferFactory;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelException;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.MessageEvent;
|
||||||
|
import io.netty.channel.ReceiveBufferSizePredictor;
|
||||||
|
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
|
||||||
|
import io.netty.channel.socket.Worker;
|
||||||
|
import io.netty.logging.InternalLogger;
|
||||||
|
import io.netty.logging.InternalLoggerFactory;
|
||||||
|
import io.netty.util.internal.DeadLockProofWorker;
|
||||||
|
import io.netty.util.internal.QueueFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
@ -31,28 +53,12 @@ import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import com.sun.nio.sctp.MessageInfo;
|
import com.sun.nio.sctp.MessageInfo;
|
||||||
|
|
||||||
import io.netty.buffer.ChannelBuffer;
|
|
||||||
import io.netty.buffer.ChannelBufferFactory;
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelException;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
|
||||||
import io.netty.channel.MessageEvent;
|
|
||||||
import io.netty.channel.ReceiveBufferSizePredictor;
|
|
||||||
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
|
|
||||||
import io.netty.channel.socket.ChannelRunnableWrapper;
|
|
||||||
import io.netty.channel.socket.Worker;
|
|
||||||
import io.netty.logging.InternalLogger;
|
|
||||||
import io.netty.logging.InternalLoggerFactory;
|
|
||||||
import io.netty.util.internal.DeadLockProofWorker;
|
|
||||||
import io.netty.util.internal.QueueFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -248,25 +254,17 @@ class SctpWorker implements Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
|
public void executeInIoThread(Runnable task) {
|
||||||
if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) {
|
if (Thread.currentThread() == thread) {
|
||||||
try {
|
task.run();
|
||||||
task.run();
|
|
||||||
return succeededFuture(channel);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
return failedFuture(channel, t);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
|
boolean added = eventQueue.offer(task);
|
||||||
boolean added = eventQueue.offer(channelRunnable);
|
|
||||||
|
|
||||||
if (added) {
|
if (added) {
|
||||||
// wake up the selector to speed things
|
// wake up the selector to speed things
|
||||||
selector.wakeup();
|
selector.wakeup();
|
||||||
} else {
|
|
||||||
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
|
|
||||||
}
|
}
|
||||||
return channelRunnable;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,9 +16,6 @@
|
||||||
|
|
||||||
package io.netty.channel.socket;
|
package io.netty.channel.socket;
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link Worker} is responsible to dispatch IO operations
|
* A {@link Worker} is responsible to dispatch IO operations
|
||||||
*
|
*
|
||||||
|
@ -30,5 +27,5 @@ public interface Worker extends Runnable {
|
||||||
*
|
*
|
||||||
* @param task the {@link Runnable} to execute
|
* @param task the {@link Runnable} to execute
|
||||||
*/
|
*/
|
||||||
ChannelFuture executeInIoThread(Channel channel, Runnable task);
|
void executeInIoThread(Runnable task);
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,6 +115,16 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
|
||||||
this.channel = ch;
|
this.channel = ch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@link AbstractNioWorker} that handle the IO of the {@link AbstractNioChannel}
|
||||||
|
*
|
||||||
|
* @return worker
|
||||||
|
*/
|
||||||
|
public AbstractNioWorker getWorker() {
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InetSocketAddress getLocalAddress() {
|
public InetSocketAddress getLocalAddress() {
|
||||||
InetSocketAddress localAddress = this.localAddress;
|
InetSocketAddress localAddress = this.localAddress;
|
||||||
|
|
|
@ -21,6 +21,7 @@ import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.ChannelRunnableWrapper;
|
||||||
|
|
||||||
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
|
public abstract class AbstractNioChannelSink extends AbstractChannelSink {
|
||||||
|
|
||||||
|
@ -29,8 +30,9 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
|
||||||
Channel ch = pipeline.getChannel();
|
Channel ch = pipeline.getChannel();
|
||||||
if (ch instanceof AbstractNioChannel<?>) {
|
if (ch instanceof AbstractNioChannel<?>) {
|
||||||
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
|
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
|
||||||
|
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
|
||||||
return channel.worker.executeInIoThread(ch, task);
|
channel.worker.executeInIoThread(wrapper);
|
||||||
|
return wrapper;
|
||||||
}
|
}
|
||||||
return super.execute(pipeline, task);
|
return super.execute(pipeline, task);
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.MessageEvent;
|
import io.netty.channel.MessageEvent;
|
||||||
import io.netty.channel.socket.ChannelRunnableWrapper;
|
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
|
||||||
import io.netty.logging.InternalLogger;
|
import io.netty.logging.InternalLogger;
|
||||||
|
@ -42,7 +41,6 @@ import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -115,20 +113,42 @@ abstract class AbstractNioWorker implements Worker {
|
||||||
|
|
||||||
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
|
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
|
||||||
|
|
||||||
|
private final boolean allowShutdownOnIdle;
|
||||||
|
|
||||||
AbstractNioWorker(Executor executor) {
|
AbstractNioWorker(Executor executor) {
|
||||||
|
this(executor, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
|
this.allowShutdownOnIdle = allowShutdownOnIdle;
|
||||||
}
|
}
|
||||||
|
|
||||||
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
|
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
|
||||||
|
|
||||||
Runnable registerTask = createRegisterTask(channel, future);
|
Runnable registerTask = createRegisterTask(channel, future);
|
||||||
Selector selector;
|
Selector selector = start();
|
||||||
|
|
||||||
|
|
||||||
|
boolean offered = registerTaskQueue.offer(registerTask);
|
||||||
|
assert offered;
|
||||||
|
|
||||||
|
if (wakenUp.compareAndSet(false, true)) {
|
||||||
|
selector.wakeup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered
|
||||||
|
*
|
||||||
|
* @return selector
|
||||||
|
*/
|
||||||
|
private Selector start() {
|
||||||
synchronized (startStopLock) {
|
synchronized (startStopLock) {
|
||||||
if (!started) {
|
if (!started) {
|
||||||
// Open a selector if this worker didn't start yet.
|
// Open a selector if this worker didn't start yet.
|
||||||
try {
|
try {
|
||||||
this.selector = selector = Selector.open();
|
this.selector = Selector.open();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw new ChannelException("Failed to create a selector.", t);
|
throw new ChannelException("Failed to create a selector.", t);
|
||||||
}
|
}
|
||||||
|
@ -146,28 +166,19 @@ abstract class AbstractNioWorker implements Worker {
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
logger.warn("Failed to close a selector.", t);
|
logger.warn("Failed to close a selector.", t);
|
||||||
}
|
}
|
||||||
this.selector = selector = null;
|
this.selector = null;
|
||||||
// The method will return to the caller at this point.
|
// The method will return to the caller at this point.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// Use the existing selector if this worker has been started.
|
|
||||||
selector = this.selector;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert selector != null && selector.isOpen();
|
assert selector != null && selector.isOpen();
|
||||||
|
|
||||||
started = true;
|
started = true;
|
||||||
boolean offered = registerTaskQueue.offer(registerTask);
|
|
||||||
assert offered;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (wakenUp.compareAndSet(false, true)) {
|
|
||||||
selector.wakeup();
|
|
||||||
}
|
}
|
||||||
|
return selector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
thread = Thread.currentThread();
|
thread = Thread.currentThread();
|
||||||
|
@ -251,8 +262,10 @@ abstract class AbstractNioWorker implements Worker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Give one more second.
|
if (allowShutdownOnIdle) {
|
||||||
shutdown = true;
|
// Give one more second.
|
||||||
|
shutdown = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
shutdown = false;
|
shutdown = false;
|
||||||
|
@ -273,31 +286,34 @@ abstract class AbstractNioWorker implements Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
|
public void executeInIoThread(Runnable task) {
|
||||||
if (channel instanceof AbstractNioChannel<?> && isIoThread((AbstractNioChannel<?>) channel)) {
|
executeInIoThread(task, false);
|
||||||
try {
|
}
|
||||||
task.run();
|
|
||||||
return succeededFuture(channel);
|
/**
|
||||||
} catch (Throwable t) {
|
* Execute the {@link Runnable} in a IO-Thread
|
||||||
return failedFuture(channel, t);
|
*
|
||||||
}
|
* @param task the {@link Runnable} to execute
|
||||||
} else {
|
* @param alwaysAsync <code>true</code> if the {@link Runnable} should be executed in an async
|
||||||
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
|
* fashion even if the current Thread == IO Thread
|
||||||
boolean added = eventQueue.offer(channelRunnable);
|
*/
|
||||||
|
public void executeInIoThread(Runnable task, boolean alwaysAsync) {
|
||||||
if (added) {
|
if (!alwaysAsync && Thread.currentThread() == thread) {
|
||||||
// wake up the selector to speed things
|
task.run();
|
||||||
Selector selector = this.selector;
|
} else {
|
||||||
if (selector != null) {
|
start();
|
||||||
selector.wakeup();
|
boolean added = eventQueue.offer(task);
|
||||||
}
|
|
||||||
} else {
|
assert added;
|
||||||
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
|
if (added) {
|
||||||
}
|
// wake up the selector to speed things
|
||||||
return channelRunnable;
|
Selector selector = this.selector;
|
||||||
}
|
if (selector != null) {
|
||||||
|
selector.wakeup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRegisterTaskQueue() throws IOException {
|
private void processRegisterTaskQueue() throws IOException {
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.channel.socket.nio;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.socket.Worker;
|
||||||
|
import io.netty.util.ExternalResourceReleasable;
|
||||||
|
import io.netty.util.internal.ExecutorUtil;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling
|
||||||
|
* {@link #nextWorker()}
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker> implements WorkerPool<E> , ExternalResourceReleasable {
|
||||||
|
|
||||||
|
private final AbstractNioWorker[] workers;
|
||||||
|
private final AtomicInteger workerIndex = new AtomicInteger();
|
||||||
|
private final Executor workerExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance
|
||||||
|
*
|
||||||
|
* @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
|
||||||
|
* @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it
|
||||||
|
* @param workerCount the count of {@link Worker}'s to create
|
||||||
|
*/
|
||||||
|
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) {
|
||||||
|
if (workerExecutor == null) {
|
||||||
|
throw new NullPointerException("workerExecutor");
|
||||||
|
}
|
||||||
|
if (workerCount <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"workerCount (" + workerCount + ") " +
|
||||||
|
"must be a positive integer.");
|
||||||
|
}
|
||||||
|
workers = new AbstractNioWorker[workerCount];
|
||||||
|
|
||||||
|
for (int i = 0; i < workers.length; i++) {
|
||||||
|
workers[i] = createWorker(workerExecutor, allowShutDownOnIdle);
|
||||||
|
}
|
||||||
|
this.workerExecutor = workerExecutor;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link Worker} which uses the given {@link Executor} to service IO
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @param executor the {@link Executor} to use
|
||||||
|
* @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it
|
||||||
|
* @return worker the new {@link Worker}
|
||||||
|
*/
|
||||||
|
protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle);
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public E nextWorker() {
|
||||||
|
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void releaseExternalResources() {
|
||||||
|
ExecutorUtil.terminate(workerExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.group.ChannelGroup;
|
import io.netty.channel.group.ChannelGroup;
|
||||||
import io.netty.channel.socket.ClientSocketChannelFactory;
|
import io.netty.channel.socket.ClientSocketChannelFactory;
|
||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.util.ExternalResourceReleasable;
|
||||||
import io.netty.util.internal.ExecutorUtil;
|
import io.netty.util.internal.ExecutorUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -82,7 +83,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
|
||||||
private static final int DEFAULT_BOSS_COUNT = 1;
|
private static final int DEFAULT_BOSS_COUNT = 1;
|
||||||
|
|
||||||
private final Executor bossExecutor;
|
private final Executor bossExecutor;
|
||||||
private final Executor workerExecutor;
|
private final WorkerPool<NioWorker> workerPool;
|
||||||
private final NioClientSocketPipelineSink sink;
|
private final NioClientSocketPipelineSink sink;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -136,29 +137,32 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
|
||||||
Executor bossExecutor, Executor workerExecutor,
|
Executor bossExecutor, Executor workerExecutor,
|
||||||
int bossCount, int workerCount) {
|
int bossCount, int workerCount) {
|
||||||
|
|
||||||
|
this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
public NioClientSocketChannelFactory(
|
||||||
|
Executor bossExecutor, int bossCount,
|
||||||
|
WorkerPool<NioWorker> workerPool) {
|
||||||
|
|
||||||
if (bossExecutor == null) {
|
if (bossExecutor == null) {
|
||||||
throw new NullPointerException("bossExecutor");
|
throw new NullPointerException("bossExecutor");
|
||||||
}
|
}
|
||||||
if (workerExecutor == null) {
|
if (workerPool == null) {
|
||||||
throw new NullPointerException("workerExecutor");
|
throw new NullPointerException("workerPool");
|
||||||
}
|
}
|
||||||
if (bossCount <= 0) {
|
if (bossCount <= 0) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"bossCount (" + bossCount + ") " +
|
"bossCount (" + bossCount + ") " +
|
||||||
"must be a positive integer.");
|
"must be a positive integer.");
|
||||||
}
|
}
|
||||||
if (workerCount <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"workerCount (" + workerCount + ") " +
|
|
||||||
"must be a positive integer.");
|
|
||||||
}
|
|
||||||
|
|
||||||
this.bossExecutor = bossExecutor;
|
this.bossExecutor = bossExecutor;
|
||||||
this.workerExecutor = workerExecutor;
|
this.workerPool = workerPool;
|
||||||
sink = new NioClientSocketPipelineSink(
|
sink = new NioClientSocketPipelineSink(
|
||||||
bossExecutor, workerExecutor, bossCount, workerCount);
|
bossExecutor, bossCount, workerPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SocketChannel newChannel(ChannelPipeline pipeline) {
|
public SocketChannel newChannel(ChannelPipeline pipeline) {
|
||||||
|
@ -167,6 +171,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void releaseExternalResources() {
|
public void releaseExternalResources() {
|
||||||
ExecutorUtil.terminate(bossExecutor, workerExecutor);
|
ExecutorUtil.terminate(bossExecutor);
|
||||||
|
if (workerPool instanceof ExternalResourceReleasable) {
|
||||||
|
((ExternalResourceReleasable) workerPool).releaseExternalResources();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,15 +51,13 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||||
|
|
||||||
final Executor bossExecutor;
|
final Executor bossExecutor;
|
||||||
|
|
||||||
private final Boss[] bosses;
|
private final Boss[] bosses;
|
||||||
private final NioWorker[] workers;
|
|
||||||
|
|
||||||
private final AtomicInteger bossIndex = new AtomicInteger();
|
private final AtomicInteger bossIndex = new AtomicInteger();
|
||||||
private final AtomicInteger workerIndex = new AtomicInteger();
|
|
||||||
|
private final WorkerPool<NioWorker> workerPool;
|
||||||
|
|
||||||
NioClientSocketPipelineSink(
|
NioClientSocketPipelineSink(
|
||||||
Executor bossExecutor, Executor workerExecutor,
|
Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
|
||||||
int bossCount, int workerCount) {
|
|
||||||
|
|
||||||
this.bossExecutor = bossExecutor;
|
this.bossExecutor = bossExecutor;
|
||||||
|
|
||||||
|
@ -68,10 +66,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||||
bosses[i] = new Boss();
|
bosses[i] = new Boss();
|
||||||
}
|
}
|
||||||
|
|
||||||
workers = new NioWorker[workerCount];
|
this.workerPool = workerPool;
|
||||||
for (int i = 0; i < workers.length; i ++) {
|
|
||||||
workers[i] = new NioWorker(workerExecutor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -162,8 +157,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
NioWorker nextWorker() {
|
NioWorker nextWorker() {
|
||||||
return workers[Math.abs(
|
return workerPool.nextWorker();
|
||||||
workerIndex.getAndIncrement() % workers.length)];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Boss nextBoss() {
|
Boss nextBoss() {
|
||||||
|
|
|
@ -33,7 +33,7 @@ import java.nio.channels.DatagramChannel;
|
||||||
/**
|
/**
|
||||||
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}.
|
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}.
|
||||||
*/
|
*/
|
||||||
final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
|
public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
|
||||||
implements io.netty.channel.socket.DatagramChannel {
|
implements io.netty.channel.socket.DatagramChannel {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -68,6 +68,11 @@ final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NioDatagramWorker getWorker() {
|
||||||
|
return (NioDatagramWorker) super.getWorker();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isBound() {
|
public boolean isBound() {
|
||||||
return isOpen() && channel.socket().isBound();
|
return isOpen() && channel.socket().isBound();
|
||||||
|
|
|
@ -24,8 +24,9 @@ import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.group.ChannelGroup;
|
import io.netty.channel.group.ChannelGroup;
|
||||||
import io.netty.channel.socket.DatagramChannel;
|
import io.netty.channel.socket.DatagramChannel;
|
||||||
import io.netty.channel.socket.DatagramChannelFactory;
|
import io.netty.channel.socket.DatagramChannelFactory;
|
||||||
|
import io.netty.channel.socket.Worker;
|
||||||
import io.netty.channel.socket.oio.OioDatagramChannelFactory;
|
import io.netty.channel.socket.oio.OioDatagramChannelFactory;
|
||||||
import io.netty.util.internal.ExecutorUtil;
|
import io.netty.util.ExternalResourceReleasable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link DatagramChannelFactory} that creates a NIO-based connectionless
|
* A {@link DatagramChannelFactory} that creates a NIO-based connectionless
|
||||||
|
@ -75,8 +76,8 @@ import io.netty.util.internal.ExecutorUtil;
|
||||||
*/
|
*/
|
||||||
public class NioDatagramChannelFactory implements DatagramChannelFactory {
|
public class NioDatagramChannelFactory implements DatagramChannelFactory {
|
||||||
|
|
||||||
private final Executor workerExecutor;
|
|
||||||
private final NioDatagramPipelineSink sink;
|
private final NioDatagramPipelineSink sink;
|
||||||
|
private final WorkerPool<NioDatagramWorker> workerPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance. Calling this constructor is same with calling
|
* Creates a new instance. Calling this constructor is same with calling
|
||||||
|
@ -101,21 +102,20 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
|
||||||
*/
|
*/
|
||||||
public NioDatagramChannelFactory(final Executor workerExecutor,
|
public NioDatagramChannelFactory(final Executor workerExecutor,
|
||||||
final int workerCount) {
|
final int workerCount) {
|
||||||
if (workerCount <= 0) {
|
this(new NioDatagramWorkerPool(workerExecutor, workerCount, true));
|
||||||
throw new IllegalArgumentException(String
|
|
||||||
.format("workerCount (%s) must be a positive integer.",
|
|
||||||
workerCount));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workerExecutor == null) {
|
|
||||||
throw new NullPointerException(
|
|
||||||
"workerExecutor argument must not be null");
|
|
||||||
}
|
|
||||||
this.workerExecutor = workerExecutor;
|
|
||||||
|
|
||||||
sink = new NioDatagramPipelineSink(workerExecutor, workerCount);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*
|
||||||
|
* @param workerPool
|
||||||
|
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
|
||||||
|
*/
|
||||||
|
public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool) {
|
||||||
|
this.workerPool = workerPool;
|
||||||
|
sink = new NioDatagramPipelineSink(workerPool);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
|
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
|
||||||
return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker());
|
return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker());
|
||||||
|
@ -123,6 +123,9 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void releaseExternalResources() {
|
public void releaseExternalResources() {
|
||||||
ExecutorUtil.terminate(workerExecutor);
|
if (workerPool instanceof ExternalResourceReleasable) {
|
||||||
|
((ExternalResourceReleasable) workerPool).releaseExternalResources();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ import static io.netty.channel.Channels.*;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
@ -36,8 +35,7 @@ import io.netty.channel.MessageEvent;
|
||||||
*/
|
*/
|
||||||
class NioDatagramPipelineSink extends AbstractNioChannelSink {
|
class NioDatagramPipelineSink extends AbstractNioChannelSink {
|
||||||
|
|
||||||
private final NioDatagramWorker[] workers;
|
private final WorkerPool<NioDatagramWorker> workerPool;
|
||||||
private final AtomicInteger workerIndex = new AtomicInteger();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount.
|
* Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount.
|
||||||
|
@ -49,11 +47,8 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
|
||||||
* @param workerCount
|
* @param workerCount
|
||||||
* the number of {@link NioDatagramWorker}s for this sink
|
* the number of {@link NioDatagramWorker}s for this sink
|
||||||
*/
|
*/
|
||||||
NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) {
|
NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
|
||||||
workers = new NioDatagramWorker[workerCount];
|
this.workerPool = workerPool;
|
||||||
for (int i = 0; i < workers.length; i ++) {
|
|
||||||
workers[i] = new NioDatagramWorker(workerExecutor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -191,7 +186,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
NioDatagramWorker nextWorker() {
|
NioDatagramWorker nextWorker() {
|
||||||
return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
|
return workerPool.nextWorker();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ import java.util.concurrent.Executor;
|
||||||
* A class responsible for registering channels with {@link Selector}.
|
* A class responsible for registering channels with {@link Selector}.
|
||||||
* It also implements the {@link Selector} loop.
|
* It also implements the {@link Selector} loop.
|
||||||
*/
|
*/
|
||||||
class NioDatagramWorker extends AbstractNioWorker {
|
public class NioDatagramWorker extends AbstractNioWorker {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sole constructor.
|
* Sole constructor.
|
||||||
|
@ -50,6 +50,10 @@ class NioDatagramWorker extends AbstractNioWorker {
|
||||||
super(executor);
|
super(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) {
|
||||||
|
super(executor, allowShutdownOnIdle);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean read(final SelectionKey key) {
|
protected boolean read(final SelectionKey key) {
|
||||||
final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
|
final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.socket.nio;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default implementation which hands of {@link NioDatagramWorker}'s
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class NioDatagramWorkerPool extends AbstractNioWorkerPool<NioDatagramWorker> {
|
||||||
|
|
||||||
|
public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
|
||||||
|
super(executor, workerCount, allowShutdownOnIdle);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
|
||||||
|
return new NioDatagramWorker(executor, allowShutdownOnIdle);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -26,6 +26,8 @@ import io.netty.channel.ChannelSink;
|
||||||
import io.netty.channel.group.ChannelGroup;
|
import io.netty.channel.group.ChannelGroup;
|
||||||
import io.netty.channel.socket.ServerSocketChannel;
|
import io.netty.channel.socket.ServerSocketChannel;
|
||||||
import io.netty.channel.socket.ServerSocketChannelFactory;
|
import io.netty.channel.socket.ServerSocketChannelFactory;
|
||||||
|
import io.netty.channel.socket.Worker;
|
||||||
|
import io.netty.util.ExternalResourceReleasable;
|
||||||
import io.netty.util.internal.ExecutorUtil;
|
import io.netty.util.internal.ExecutorUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,7 +86,7 @@ import io.netty.util.internal.ExecutorUtil;
|
||||||
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
|
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
|
||||||
|
|
||||||
final Executor bossExecutor;
|
final Executor bossExecutor;
|
||||||
private final Executor workerExecutor;
|
private final WorkerPool<NioWorker> workerPool;
|
||||||
private final ChannelSink sink;
|
private final ChannelSink sink;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -116,22 +118,32 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
|
||||||
public NioServerSocketChannelFactory(
|
public NioServerSocketChannelFactory(
|
||||||
Executor bossExecutor, Executor workerExecutor,
|
Executor bossExecutor, Executor workerExecutor,
|
||||||
int workerCount) {
|
int workerCount) {
|
||||||
|
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*
|
||||||
|
* @param bossExecutor
|
||||||
|
* the {@link Executor} which will execute the boss threads
|
||||||
|
* @param workerPool
|
||||||
|
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
|
||||||
|
*/
|
||||||
|
public NioServerSocketChannelFactory(
|
||||||
|
Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
|
||||||
if (bossExecutor == null) {
|
if (bossExecutor == null) {
|
||||||
throw new NullPointerException("bossExecutor");
|
throw new NullPointerException("bossExecutor");
|
||||||
}
|
}
|
||||||
if (workerExecutor == null) {
|
if (workerPool == null) {
|
||||||
throw new NullPointerException("workerExecutor");
|
throw new NullPointerException("workerPool");
|
||||||
}
|
|
||||||
if (workerCount <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"workerCount (" + workerCount + ") " +
|
|
||||||
"must be a positive integer.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.bossExecutor = bossExecutor;
|
this.bossExecutor = bossExecutor;
|
||||||
this.workerExecutor = workerExecutor;
|
this.workerPool = workerPool;
|
||||||
sink = new NioServerSocketPipelineSink(workerExecutor, workerCount);
|
sink = new NioServerSocketPipelineSink(workerPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
|
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
|
||||||
return NioServerSocketChannel.create(this, pipeline, sink);
|
return NioServerSocketChannel.create(this, pipeline, sink);
|
||||||
|
@ -139,6 +151,10 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void releaseExternalResources() {
|
public void releaseExternalResources() {
|
||||||
ExecutorUtil.terminate(bossExecutor, workerExecutor);
|
ExecutorUtil.terminate(bossExecutor);
|
||||||
|
if (workerPool instanceof ExternalResourceReleasable) {
|
||||||
|
((ExternalResourceReleasable) workerPool).releaseExternalResources();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
|
@ -45,14 +44,10 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
|
||||||
static final InternalLogger logger =
|
static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
|
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
|
||||||
|
|
||||||
private final NioWorker[] workers;
|
private final WorkerPool<NioWorker> workerPool;
|
||||||
private final AtomicInteger workerIndex = new AtomicInteger();
|
|
||||||
|
|
||||||
NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
|
NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
|
||||||
workers = new NioWorker[workerCount];
|
this.workerPool = workerPool;
|
||||||
for (int i = 0; i < workers.length; i ++) {
|
|
||||||
workers[i] = new NioWorker(workerExecutor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -189,8 +184,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
NioWorker nextWorker() {
|
NioWorker nextWorker() {
|
||||||
return workers[Math.abs(
|
return workerPool.nextWorker();
|
||||||
workerIndex.getAndIncrement() % workers.length)];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class Boss implements Runnable {
|
private final class Boss implements Runnable {
|
||||||
|
|
|
@ -23,7 +23,7 @@ import io.netty.channel.ChannelSink;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
|
|
||||||
class NioSocketChannel extends AbstractNioChannel<SocketChannel>
|
public class NioSocketChannel extends AbstractNioChannel<SocketChannel>
|
||||||
implements io.netty.channel.socket.SocketChannel {
|
implements io.netty.channel.socket.SocketChannel {
|
||||||
|
|
||||||
private static final int ST_OPEN = 0;
|
private static final int ST_OPEN = 0;
|
||||||
|
@ -43,6 +43,11 @@ class NioSocketChannel extends AbstractNioChannel<SocketChannel>
|
||||||
config = new DefaultNioSocketChannelConfig(socket.socket());
|
config = new DefaultNioSocketChannelConfig(socket.socket());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NioWorker getWorker() {
|
||||||
|
return (NioWorker) super.getWorker();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NioSocketChannelConfig getConfig() {
|
public NioSocketChannelConfig getConfig() {
|
||||||
return config;
|
return config;
|
||||||
|
|
|
@ -35,13 +35,18 @@ import java.nio.channels.Selector;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
class NioWorker extends AbstractNioWorker {
|
public class NioWorker extends AbstractNioWorker {
|
||||||
|
|
||||||
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
|
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
|
||||||
|
|
||||||
NioWorker(Executor executor) {
|
public NioWorker(Executor executor) {
|
||||||
super(executor);
|
super(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public NioWorker(Executor executor, boolean allowShutdownOnIdle) {
|
||||||
|
super(executor, allowShutdownOnIdle);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean read(SelectionKey k) {
|
protected boolean read(SelectionKey k) {
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.socket.nio;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default implementation which hands of {@link NioWorker}'s
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> {
|
||||||
|
|
||||||
|
public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) {
|
||||||
|
super(executor, workerCount, allowShutdownOnIdle);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) {
|
||||||
|
return new NioWorker(executor, allowShutdownOnIdle);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.socket.nio;
|
||||||
|
|
||||||
|
import io.netty.channel.socket.Worker;
|
||||||
|
import io.netty.util.ExternalResourceReleasable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once
|
||||||
|
* you want to release any resources of it.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E> {
|
||||||
|
|
||||||
|
private final WorkerPool<E> wrapped;
|
||||||
|
|
||||||
|
public ShareableWorkerPool(WorkerPool<E> wrapped) {
|
||||||
|
this.wrapped = wrapped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public E nextWorker() {
|
||||||
|
return wrapped.nextWorker();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
|
||||||
|
*/
|
||||||
|
public void destroy() {
|
||||||
|
if (wrapped instanceof ExternalResourceReleasable) {
|
||||||
|
((ExternalResourceReleasable) wrapped).releaseExternalResources();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2011 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.channel.socket.nio;
|
||||||
|
|
||||||
|
import io.netty.channel.socket.Worker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface WorkerPool<E extends Worker> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the next {@link Worker} to use
|
||||||
|
*
|
||||||
|
* @return worker
|
||||||
|
*/
|
||||||
|
E nextWorker();
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelEvent;
|
import io.netty.channel.ChannelEvent;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.socket.ChannelRunnableWrapper;
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
|
|
||||||
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
|
public abstract class AbstractOioChannelSink extends AbstractChannelSink {
|
||||||
|
@ -32,7 +33,9 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink {
|
||||||
AbstractOioChannel channel = (AbstractOioChannel) ch;
|
AbstractOioChannel channel = (AbstractOioChannel) ch;
|
||||||
Worker worker = channel.worker;
|
Worker worker = channel.worker;
|
||||||
if (worker != null) {
|
if (worker != null) {
|
||||||
return channel.worker.executeInIoThread(ch, task);
|
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
|
||||||
|
channel.worker.executeInIoThread(wrapper);
|
||||||
|
return wrapper;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,13 +19,11 @@ import static io.netty.channel.Channels.*;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.Channels;
|
import io.netty.channel.Channels;
|
||||||
import io.netty.channel.socket.ChannelRunnableWrapper;
|
|
||||||
import io.netty.channel.socket.Worker;
|
import io.netty.channel.socket.Worker;
|
||||||
import io.netty.util.internal.QueueFactory;
|
import io.netty.util.internal.QueueFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract base class for Oio-Worker implementations
|
* Abstract base class for Oio-Worker implementations
|
||||||
|
@ -34,10 +32,16 @@ import java.util.concurrent.RejectedExecutionException;
|
||||||
*/
|
*/
|
||||||
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
|
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
|
||||||
|
|
||||||
private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class);
|
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
|
||||||
|
|
||||||
protected final C channel;
|
protected final C channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this worker has been started thread will be a reference to the thread
|
||||||
|
* used when starting. i.e. the current thread when the run method is executed.
|
||||||
|
*/
|
||||||
|
protected volatile Thread thread;
|
||||||
|
|
||||||
public AbstractOioWorker(C channel) {
|
public AbstractOioWorker(C channel) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
channel.worker = this;
|
channel.worker = this;
|
||||||
|
@ -45,7 +49,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
channel.workerThread = Thread.currentThread();
|
thread = channel.workerThread = Thread.currentThread();
|
||||||
|
|
||||||
while (channel.isOpen()) {
|
while (channel.isOpen()) {
|
||||||
synchronized (channel.interestOpsLock) {
|
synchronized (channel.interestOpsLock) {
|
||||||
|
@ -91,31 +95,21 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
|
public void executeInIoThread(Runnable task) {
|
||||||
if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) {
|
if (Thread.currentThread() == thread) {
|
||||||
try {
|
task.run();
|
||||||
task.run();
|
|
||||||
return succeededFuture(channel);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
return failedFuture(channel, t);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
|
boolean added = eventQueue.offer(task);
|
||||||
boolean added = eventQueue.offer(channelRunnable);
|
|
||||||
|
|
||||||
if (added) {
|
if (added) {
|
||||||
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
|
// as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest
|
||||||
|
}
|
||||||
} else {
|
|
||||||
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
|
|
||||||
}
|
|
||||||
return channelRunnable;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processEventQueue() throws IOException {
|
private void processEventQueue() throws IOException {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
final ChannelRunnableWrapper task = eventQueue.poll();
|
final Runnable task = eventQueue.poll();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user