Add a EventLoopGroup.register(ChannelPromise)

Motivation:

EventLoopGroup.register doesn't need the Channel paramter when ChannelPromise is provided as we can get the Channel from ChannelPromise. Resolves #2422.

Modifications:

- Add EventLoopGroup.register(ChannelPromise)
- Deprecate EventLoopGroup.register(Channel, ChannelPromise)

Result:

EventLoopGroup.register is more convenient as people only need to set one parameter.
This commit is contained in:
Xiaoyan Lin 2016-05-09 18:50:39 -07:00 committed by Norman Maurer
parent a729e0fcd9
commit a5006c1969
10 changed files with 75 additions and 11 deletions

View File

@ -35,9 +35,18 @@ public interface EventLoopGroup extends EventExecutorGroup {
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(ChannelPromise promise);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
*
* @deprecated Use {@link #register(ChannelPromise)} instead.
*/
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}

View File

@ -75,6 +75,12 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.ObjectUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
@ -46,9 +47,17 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
@Deprecated
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {

View File

@ -30,6 +30,21 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
this.parent = parent;
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return super.register(promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return super.register(channel, promise).addListener(new ChannelFutureListener() {

View File

@ -77,7 +77,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
@ -90,7 +90,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
@ -106,7 +106,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
@ -274,12 +274,23 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
try {
EventLoop l = nextChild();
return l.register(channel, new DefaultChannelPromise(channel, l));
return l.register(new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}
@Override
public ChannelFuture register(ChannelPromise promise) {
try {
return nextChild().register(promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
if (channel == null) {

View File

@ -23,6 +23,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.ObjectUtil;
import java.util.ArrayDeque;
import java.util.Queue;
@ -120,9 +121,17 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
channel.unsafe().register(this, promise);

View File

@ -46,7 +46,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
public OioEventLoopGroup(int maxChannels) {
@ -59,7 +59,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
@ -74,7 +74,7 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s

View File

@ -306,6 +306,11 @@ public class BootstrapTest {
return promise;
}
@Override
public ChannelFuture register(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture register(Channel channel, final ChannelPromise promise) {
throw new UnsupportedOperationException();

View File

@ -381,7 +381,7 @@ public class SingleThreadEventLoopTest {
}
try {
ChannelFuture f = loopA.register(ch, promise);
ChannelFuture f = loopA.register(promise);
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));

View File

@ -82,7 +82,7 @@ public class ThreadPerChannelEventLoopGroupTest {
ChannelGroup channelGroup = new DefaultChannelGroup(testExecutor);
while (taskCount-- > 0) {
Channel channel = new EmbeddedChannel(NOOP_HANDLER);
loopGroup.register(channel, new DefaultChannelPromise(channel, testExecutor));
loopGroup.register(new DefaultChannelPromise(channel, testExecutor));
channelGroup.add(channel);
}
channelGroup.close().sync();