Add ThreadPerChannelEventLoopGroup and ThreadPerChannelEventLoop to enable a user to write a new thread-per-channel transport easily
- Fixes #1124
This commit is contained in:
parent
6e0e38f09f
commit
83cdbeca1d
@ -13,26 +13,19 @@
|
|||||||
* License for the specific language governing permissions and limitations
|
* License for the specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package io.netty.channel.oio;
|
package io.netty.channel;
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
|
||||||
import io.netty.channel.ChannelFutureListener;
|
|
||||||
import io.netty.channel.ChannelPromise;
|
|
||||||
import io.netty.channel.SingleThreadEventLoop;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link SingleThreadEventLoop} which is used to handle OIO {@link Channel}'s. So in general there will be
|
* {@link SingleThreadEventLoop} which is used to handle OIO {@link Channel}'s. So in general there will be
|
||||||
* one {@link OioEventLoop} per {@link Channel}.
|
* one {@link ThreadPerChannelEventLoop} per {@link Channel}.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class OioEventLoop extends SingleThreadEventLoop {
|
public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
|
||||||
|
|
||||||
private final OioEventLoopGroup parent;
|
private final ThreadPerChannelEventLoopGroup parent;
|
||||||
private AbstractOioChannel ch;
|
private Channel ch;
|
||||||
|
|
||||||
OioEventLoop(OioEventLoopGroup parent) {
|
public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) {
|
||||||
super(parent, parent.threadFactory, parent.scheduler);
|
super(parent, parent.threadFactory, parent.scheduler);
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
}
|
}
|
||||||
@ -41,9 +34,10 @@ class OioEventLoop extends SingleThreadEventLoop {
|
|||||||
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
||||||
return super.register(channel, promise).addListener(new ChannelFutureListener() {
|
return super.register(channel, promise).addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
ch = (AbstractOioChannel) future.channel();
|
ch = future.channel();
|
||||||
} else {
|
} else {
|
||||||
deregister();
|
deregister();
|
||||||
}
|
}
|
||||||
@ -54,8 +48,6 @@ class OioEventLoop extends SingleThreadEventLoop {
|
|||||||
@Override
|
@Override
|
||||||
protected void run() {
|
protected void run() {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
AbstractOioChannel ch = this.ch;
|
|
||||||
if (ch == null || !ch.isActive()) {
|
|
||||||
Runnable task;
|
Runnable task;
|
||||||
try {
|
try {
|
||||||
task = takeTask();
|
task = takeTask();
|
||||||
@ -63,29 +55,8 @@ class OioEventLoop extends SingleThreadEventLoop {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// Waken up by interruptThread()
|
// Waken up by interruptThread()
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
long startTime = System.nanoTime();
|
|
||||||
for (;;) {
|
|
||||||
final Runnable task = pollTask();
|
|
||||||
if (task == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
task.run();
|
|
||||||
|
|
||||||
// Ensure running tasks doesn't take too much time.
|
|
||||||
if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle deregistration
|
|
||||||
if (!ch.isRegistered()) {
|
|
||||||
runAllTasks();
|
|
||||||
deregister();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
Channel ch = this.ch;
|
||||||
if (isShutdown()) {
|
if (isShutdown()) {
|
||||||
if (ch != null) {
|
if (ch != null) {
|
||||||
ch.unsafe().close(ch.unsafe().voidFuture());
|
ch.unsafe().close(ch.unsafe().voidFuture());
|
||||||
@ -93,6 +64,14 @@ class OioEventLoop extends SingleThreadEventLoop {
|
|||||||
if (confirmShutdown()) {
|
if (confirmShutdown()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (ch != null) {
|
||||||
|
// Handle deregistration
|
||||||
|
if (!ch.isRegistered()) {
|
||||||
|
runAllTasks();
|
||||||
|
deregister();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,237 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel;
|
||||||
|
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.TaskScheduler;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
|
||||||
|
*/
|
||||||
|
public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
|
||||||
|
|
||||||
|
private static final Object[] NO_ARGS = new Object[0];
|
||||||
|
private static final StackTraceElement[] STACK_ELEMENTS = new StackTraceElement[0];
|
||||||
|
|
||||||
|
private final Object[] childArgs;
|
||||||
|
private final int maxChannels;
|
||||||
|
final TaskScheduler scheduler;
|
||||||
|
final ThreadFactory threadFactory;
|
||||||
|
final Set<ThreadPerChannelEventLoop> activeChildren =
|
||||||
|
Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
|
||||||
|
final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
|
||||||
|
private final ChannelException tooManyChannels;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
|
||||||
|
*/
|
||||||
|
protected ThreadPerChannelEventLoopGroup() {
|
||||||
|
this(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link ThreadPerChannelEventLoopGroup}.
|
||||||
|
*
|
||||||
|
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
|
||||||
|
* a new {@link Channel} and the maximum is exceed it will throw an
|
||||||
|
* {@link ChannelException} on the {@link #register(Channel)} and
|
||||||
|
* {@link #register(Channel, ChannelPromise)} method.
|
||||||
|
* Use {@code 0} to use no limit
|
||||||
|
*/
|
||||||
|
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
|
||||||
|
this(maxChannels, Executors.defaultThreadFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link ThreadPerChannelEventLoopGroup}.
|
||||||
|
*
|
||||||
|
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
|
||||||
|
* a new {@link Channel} and the maximum is exceed it will throw an
|
||||||
|
* {@link ChannelException} on the {@link #register(Channel)} and
|
||||||
|
* {@link #register(Channel, ChannelPromise)} method.
|
||||||
|
* Use {@code 0} to use no limit
|
||||||
|
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
|
||||||
|
* registered {@link Channel}s
|
||||||
|
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
|
||||||
|
*/
|
||||||
|
protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
|
||||||
|
if (maxChannels < 0) {
|
||||||
|
throw new IllegalArgumentException(String.format(
|
||||||
|
"maxChannels: %d (expected: >= 0)", maxChannels));
|
||||||
|
}
|
||||||
|
if (threadFactory == null) {
|
||||||
|
throw new NullPointerException("threadFactory");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (args == null) {
|
||||||
|
childArgs = NO_ARGS;
|
||||||
|
} else {
|
||||||
|
childArgs = args.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.maxChannels = maxChannels;
|
||||||
|
this.threadFactory = threadFactory;
|
||||||
|
|
||||||
|
scheduler = new TaskScheduler(threadFactory);
|
||||||
|
|
||||||
|
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
|
||||||
|
tooManyChannels.setStackTrace(STACK_ELEMENTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
|
||||||
|
*/
|
||||||
|
protected ThreadPerChannelEventLoop newChild(
|
||||||
|
@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
|
||||||
|
return new ThreadPerChannelEventLoop(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventLoop next() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
scheduler.shutdown();
|
||||||
|
for (EventLoop l: activeChildren) {
|
||||||
|
l.shutdown();
|
||||||
|
}
|
||||||
|
for (EventLoop l: idleChildren) {
|
||||||
|
l.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isShutdown() {
|
||||||
|
if (!scheduler.isShutdown()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (EventLoop l: activeChildren) {
|
||||||
|
if (!l.isShutdown()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (EventLoop l: idleChildren) {
|
||||||
|
if (!l.isShutdown()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isTerminated() {
|
||||||
|
if (!scheduler.isTerminated()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (EventLoop l: activeChildren) {
|
||||||
|
if (!l.isTerminated()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (EventLoop l: idleChildren) {
|
||||||
|
if (!l.isTerminated()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean awaitTermination(long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException {
|
||||||
|
long deadline = System.nanoTime() + unit.toNanos(timeout);
|
||||||
|
for (;;) {
|
||||||
|
long timeLeft = deadline - System.nanoTime();
|
||||||
|
if (timeLeft <= 0) {
|
||||||
|
return isTerminated();
|
||||||
|
}
|
||||||
|
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (EventLoop l: activeChildren) {
|
||||||
|
for (;;) {
|
||||||
|
long timeLeft = deadline - System.nanoTime();
|
||||||
|
if (timeLeft <= 0) {
|
||||||
|
return isTerminated();
|
||||||
|
}
|
||||||
|
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (EventLoop l: idleChildren) {
|
||||||
|
for (;;) {
|
||||||
|
long timeLeft = deadline - System.nanoTime();
|
||||||
|
if (timeLeft <= 0) {
|
||||||
|
return isTerminated();
|
||||||
|
}
|
||||||
|
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return isTerminated();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture register(Channel channel) {
|
||||||
|
if (channel == null) {
|
||||||
|
throw new NullPointerException("channel");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return nextChild().register(channel);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
return channel.newFailedFuture(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
||||||
|
if (channel == null) {
|
||||||
|
throw new NullPointerException("channel");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return nextChild().register(channel, promise);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
promise.setFailure(t);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private EventLoop nextChild() throws Exception {
|
||||||
|
ThreadPerChannelEventLoop loop = idleChildren.poll();
|
||||||
|
if (loop == null) {
|
||||||
|
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
|
||||||
|
throw tooManyChannels;
|
||||||
|
}
|
||||||
|
loop = newChild(childArgs);
|
||||||
|
}
|
||||||
|
activeChildren.add(loop);
|
||||||
|
return loop;
|
||||||
|
}
|
||||||
|
}
|
@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel;
|
|||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.channel.ThreadPerChannelEventLoop;
|
||||||
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
@ -91,7 +92,7 @@ public abstract class AbstractOioChannel extends AbstractChannel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isCompatible(EventLoop loop) {
|
protected boolean isCompatible(EventLoop loop) {
|
||||||
return loop instanceof OioEventLoop;
|
return loop instanceof ThreadPerChannelEventLoop;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,35 +18,19 @@ package io.netty.channel.oio;
|
|||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFuture;
|
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.util.concurrent.TaskScheduler;
|
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.channel.ThreadPerChannelEventLoopGroup;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link EventLoopGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its
|
* {@link EventLoopGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its
|
||||||
* own {@link EventLoop} to not block others.
|
* own {@link EventLoop} to not block others.
|
||||||
*/
|
*/
|
||||||
public class OioEventLoopGroup implements EventLoopGroup {
|
public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
|
||||||
|
|
||||||
private static final StackTraceElement[] STACK_ELEMENTS = new StackTraceElement[0];
|
|
||||||
private final int maxChannels;
|
|
||||||
final TaskScheduler scheduler;
|
|
||||||
final ThreadFactory threadFactory;
|
|
||||||
final Set<OioEventLoop> activeChildren = Collections.newSetFromMap(
|
|
||||||
PlatformDependent.<OioEventLoop, Boolean>newConcurrentHashMap());
|
|
||||||
final Queue<OioEventLoop> idleChildren = new ConcurrentLinkedQueue<OioEventLoop>();
|
|
||||||
private final ChannelException tooManyChannels;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@link OioEventLoopGroup} with no limit in place.
|
* Create a new {@link OioEventLoopGroup} with no limit in place.
|
||||||
@ -80,148 +64,6 @@ public class OioEventLoopGroup implements EventLoopGroup {
|
|||||||
* registered {@link Channel}s
|
* registered {@link Channel}s
|
||||||
*/
|
*/
|
||||||
public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) {
|
public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) {
|
||||||
if (maxChannels < 0) {
|
super(maxChannels, threadFactory);
|
||||||
throw new IllegalArgumentException(String.format(
|
|
||||||
"maxChannels: %d (expected: >= 0)", maxChannels));
|
|
||||||
}
|
|
||||||
if (threadFactory == null) {
|
|
||||||
throw new NullPointerException("threadFactory");
|
|
||||||
}
|
|
||||||
|
|
||||||
this.maxChannels = maxChannels;
|
|
||||||
this.threadFactory = threadFactory;
|
|
||||||
|
|
||||||
scheduler = new TaskScheduler(threadFactory);
|
|
||||||
|
|
||||||
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
|
|
||||||
tooManyChannels.setStackTrace(STACK_ELEMENTS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventLoop next() {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
scheduler.shutdown();
|
|
||||||
for (EventLoop l: activeChildren) {
|
|
||||||
l.shutdown();
|
|
||||||
}
|
|
||||||
for (EventLoop l: idleChildren) {
|
|
||||||
l.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isShutdown() {
|
|
||||||
if (!scheduler.isShutdown()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (EventLoop l: activeChildren) {
|
|
||||||
if (!l.isShutdown()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (EventLoop l: idleChildren) {
|
|
||||||
if (!l.isShutdown()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTerminated() {
|
|
||||||
if (!scheduler.isTerminated()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (EventLoop l: activeChildren) {
|
|
||||||
if (!l.isTerminated()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (EventLoop l: idleChildren) {
|
|
||||||
if (!l.isTerminated()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean awaitTermination(long timeout, TimeUnit unit)
|
|
||||||
throws InterruptedException {
|
|
||||||
long deadline = System.nanoTime() + unit.toNanos(timeout);
|
|
||||||
for (;;) {
|
|
||||||
long timeLeft = deadline - System.nanoTime();
|
|
||||||
if (timeLeft <= 0) {
|
|
||||||
return isTerminated();
|
|
||||||
}
|
|
||||||
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (EventLoop l: activeChildren) {
|
|
||||||
for (;;) {
|
|
||||||
long timeLeft = deadline - System.nanoTime();
|
|
||||||
if (timeLeft <= 0) {
|
|
||||||
return isTerminated();
|
|
||||||
}
|
|
||||||
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (EventLoop l: idleChildren) {
|
|
||||||
for (;;) {
|
|
||||||
long timeLeft = deadline - System.nanoTime();
|
|
||||||
if (timeLeft <= 0) {
|
|
||||||
return isTerminated();
|
|
||||||
}
|
|
||||||
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return isTerminated();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture register(Channel channel) {
|
|
||||||
if (channel == null) {
|
|
||||||
throw new NullPointerException("channel");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return nextChild().register(channel);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
return channel.newFailedFuture(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
|
||||||
if (channel == null) {
|
|
||||||
throw new NullPointerException("channel");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return nextChild().register(channel, promise);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
promise.setFailure(t);
|
|
||||||
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private EventLoop nextChild() {
|
|
||||||
OioEventLoop loop = idleChildren.poll();
|
|
||||||
if (loop == null) {
|
|
||||||
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
|
|
||||||
throw tooManyChannels;
|
|
||||||
}
|
|
||||||
loop = new OioEventLoop(this);
|
|
||||||
}
|
|
||||||
activeChildren.add(loop);
|
|
||||||
return loop;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user