e743a27e75
- Add ServerChannelFactory for ServerBootstrap which is the counterpart of ChannelFactory for Bootstrap
283 lines
10 KiB
Java
283 lines
10 KiB
Java
/*
|
|
* Copyright 2012 The Netty Project
|
|
*
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at:
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
package io.netty.channel;
|
|
|
|
|
|
import io.netty.util.concurrent.AbstractEventExecutorGroup;
|
|
import io.netty.util.concurrent.DefaultPromise;
|
|
import io.netty.util.concurrent.EventExecutor;
|
|
import io.netty.util.concurrent.Future;
|
|
import io.netty.util.concurrent.FutureListener;
|
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
|
import io.netty.util.concurrent.Promise;
|
|
import io.netty.util.concurrent.ThreadPerTaskExecutor;
|
|
import io.netty.util.internal.EmptyArrays;
|
|
import io.netty.util.internal.PlatformDependent;
|
|
import io.netty.util.internal.ReadOnlyIterator;
|
|
|
|
import java.util.Collections;
|
|
import java.util.Iterator;
|
|
import java.util.Queue;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.RejectedExecutionException;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
/**
|
|
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
|
|
*/
|
|
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
|
|
|
|
private final Object[] childArgs;
|
|
private final int maxChannels;
|
|
final Executor executor;
|
|
final Set<ThreadPerChannelEventLoop> activeChildren =
|
|
Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
|
|
final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
|
|
private final ChannelException tooManyChannels;
|
|
|
|
private volatile boolean shuttingDown;
|
|
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
|
|
private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
|
|
@Override
|
|
public void operationComplete(Future<Object> future) throws Exception {
|
|
// Inefficient, but works.
|
|
if (isTerminated()) {
|
|
terminationFuture.trySuccess(null);
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
|
|
*/
|
|
protected ThreadPerChannelEventLoopGroup() {
|
|
this(0);
|
|
}
|
|
|
|
/**
|
|
* Create a new {@link ThreadPerChannelEventLoopGroup}.
|
|
*
|
|
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
|
|
* a new {@link Channel} and the maximum is exceed it will throw an
|
|
* {@link ChannelException} on the {@link #register(Channel)} and
|
|
* {@link #register(Channel, ChannelPromise)} method.
|
|
* Use {@code 0} to use no limit
|
|
*/
|
|
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
|
|
this(maxChannels, Executors.defaultThreadFactory());
|
|
}
|
|
|
|
/**
|
|
* Create a new {@link ThreadPerChannelEventLoopGroup}.
|
|
*
|
|
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
|
|
* a new {@link Channel} and the maximum is exceed it will throw an
|
|
* {@link ChannelException} on the {@link #register(Channel)} and
|
|
* {@link #register(Channel, ChannelPromise)} method.
|
|
* Use {@code 0} to use no limit
|
|
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
|
|
* registered {@link Channel}s
|
|
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
|
|
*/
|
|
protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
|
|
this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
|
|
}
|
|
|
|
/**
|
|
* Create a new {@link ThreadPerChannelEventLoopGroup}.
|
|
*
|
|
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
|
|
* a new {@link Channel} and the maximum is exceed it will throw an
|
|
* {@link ChannelException} on the {@link #register(Channel)} and
|
|
* {@link #register(Channel, ChannelPromise)} method.
|
|
* Use {@code 0} to use no limit
|
|
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
|
|
* registered {@link Channel}s
|
|
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
|
|
*/
|
|
protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
|
|
if (maxChannels < 0) {
|
|
throw new IllegalArgumentException(String.format(
|
|
"maxChannels: %d (expected: >= 0)", maxChannels));
|
|
}
|
|
if (executor == null) {
|
|
throw new NullPointerException("executor");
|
|
}
|
|
|
|
if (args == null) {
|
|
childArgs = EmptyArrays.EMPTY_OBJECTS;
|
|
} else {
|
|
childArgs = args.clone();
|
|
}
|
|
|
|
this.maxChannels = maxChannels;
|
|
this.executor = executor;
|
|
|
|
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
|
|
tooManyChannels.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
|
}
|
|
|
|
/**
|
|
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
|
|
*/
|
|
protected ThreadPerChannelEventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) {
|
|
return new ThreadPerChannelEventLoop(this);
|
|
}
|
|
|
|
@Override
|
|
public Iterator<EventExecutor> iterator() {
|
|
return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
|
|
}
|
|
|
|
@Override
|
|
public EventLoop next() {
|
|
if (shuttingDown) {
|
|
throw new RejectedExecutionException("shutting down");
|
|
}
|
|
|
|
ThreadPerChannelEventLoop loop = idleChildren.poll();
|
|
if (loop == null) {
|
|
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
|
|
throw tooManyChannels;
|
|
}
|
|
loop = newChild(childArgs);
|
|
loop.terminationFuture().addListener(childTerminationListener);
|
|
}
|
|
activeChildren.add(loop);
|
|
return loop;
|
|
}
|
|
|
|
@Override
|
|
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
|
|
shuttingDown = true;
|
|
|
|
for (EventLoop l: activeChildren) {
|
|
l.shutdownGracefully(quietPeriod, timeout, unit);
|
|
}
|
|
for (EventLoop l: idleChildren) {
|
|
l.shutdownGracefully(quietPeriod, timeout, unit);
|
|
}
|
|
|
|
// Notify the future if there was no children.
|
|
if (isTerminated()) {
|
|
terminationFuture.trySuccess(null);
|
|
}
|
|
|
|
return terminationFuture();
|
|
}
|
|
|
|
@Override
|
|
public Future<?> terminationFuture() {
|
|
return terminationFuture;
|
|
}
|
|
|
|
@Override
|
|
@Deprecated
|
|
public void shutdown() {
|
|
shuttingDown = true;
|
|
|
|
for (EventLoop l: activeChildren) {
|
|
l.shutdown();
|
|
}
|
|
for (EventLoop l: idleChildren) {
|
|
l.shutdown();
|
|
}
|
|
|
|
// Notify the future if there was no children.
|
|
if (isTerminated()) {
|
|
terminationFuture.trySuccess(null);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public boolean isShuttingDown() {
|
|
for (EventLoop l: activeChildren) {
|
|
if (!l.isShuttingDown()) {
|
|
return false;
|
|
}
|
|
}
|
|
for (EventLoop l: idleChildren) {
|
|
if (!l.isShuttingDown()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
@Override
|
|
public boolean isShutdown() {
|
|
for (EventLoop l: activeChildren) {
|
|
if (!l.isShutdown()) {
|
|
return false;
|
|
}
|
|
}
|
|
for (EventLoop l: idleChildren) {
|
|
if (!l.isShutdown()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
@Override
|
|
public boolean isTerminated() {
|
|
for (EventLoop l: activeChildren) {
|
|
if (!l.isTerminated()) {
|
|
return false;
|
|
}
|
|
}
|
|
for (EventLoop l: idleChildren) {
|
|
if (!l.isTerminated()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
@Override
|
|
public boolean awaitTermination(long timeout, TimeUnit unit)
|
|
throws InterruptedException {
|
|
long deadline = System.nanoTime() + unit.toNanos(timeout);
|
|
for (EventLoop l: activeChildren) {
|
|
for (;;) {
|
|
long timeLeft = deadline - System.nanoTime();
|
|
if (timeLeft <= 0) {
|
|
return isTerminated();
|
|
}
|
|
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
for (EventLoop l: idleChildren) {
|
|
for (;;) {
|
|
long timeLeft = deadline - System.nanoTime();
|
|
if (timeLeft <= 0) {
|
|
return isTerminated();
|
|
}
|
|
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
return isTerminated();
|
|
}
|
|
}
|