Move ChannelFutureFactory.newVoidFuture() to Channel.Unsafe() / Cleanup

This commit is contained in:
Trustin Lee 2012-05-11 00:57:42 +09:00
parent d02bc1c0d3
commit cb718a07c8
7 changed files with 47 additions and 110 deletions

View File

@ -92,7 +92,7 @@ public class EchoServer {
}); });
loop.register(ssc).awaitUninterruptibly().rethrowIfFailed(); loop.register(ssc).awaitUninterruptibly().rethrowIfFailed();
ssc.bind(new InetSocketAddress(port), ssc.newFuture()).awaitUninterruptibly().rethrowIfFailed(); ssc.bind(new InetSocketAddress(port)).awaitUninterruptibly().rethrowIfFailed();
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -73,7 +73,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final ChannelPipeline pipeline = new DefaultChannelPipeline(this); private final ChannelPipeline pipeline = new DefaultChannelPipeline(this);
private final List<ChannelFutureListener> closureListeners = new ArrayList<ChannelFutureListener>(4); private final List<ChannelFutureListener> closureListeners = new ArrayList<ChannelFutureListener>(4);
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelFuture voidFuture = new VoidChannelFuture(this);
private volatile EventLoop eventLoop; private volatile EventLoop eventLoop;
private volatile boolean registered; private volatile boolean registered;
@ -159,58 +158,42 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public ChannelFuture bind(SocketAddress localAddress) { public ChannelFuture bind(SocketAddress localAddress) {
ChannelFuture f = newFuture(); return pipeline().bind(localAddress, newFuture());
pipeline().bind(localAddress, f);
return f;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress) { public ChannelFuture connect(SocketAddress remoteAddress) {
ChannelFuture f = newFuture(); return pipeline().connect(remoteAddress, newFuture());
pipeline().connect(remoteAddress, f);
return f;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ChannelFuture f = newFuture(); return pipeline().connect(remoteAddress, localAddress, newFuture());
pipeline().connect(remoteAddress, localAddress, f);
return f;
} }
@Override @Override
public ChannelFuture disconnect() { public ChannelFuture disconnect() {
ChannelFuture f = newFuture(); return pipeline().disconnect(newFuture());
pipeline().disconnect(f);
return f;
} }
@Override @Override
public ChannelFuture close() { public ChannelFuture close() {
ChannelFuture f = newFuture(); return pipeline().close(newFuture());
pipeline().close(f);
return f;
} }
@Override @Override
public ChannelFuture deregister() { public ChannelFuture deregister() {
ChannelFuture f = newFuture(); return pipeline().deregister(newFuture());
pipeline().deregister(f);
return f;
} }
@Override @Override
public ChannelFuture flush() { public ChannelFuture flush() {
ChannelFuture f = newFuture(); return pipeline().flush(newFuture());
pipeline().flush(f);
return f;
} }
@Override @Override
public ChannelFuture write(Object message) { public ChannelFuture write(Object message) {
ChannelFuture f = newFuture(); return pipeline().write(message, newFuture());
pipeline().write(message, f);
return f;
} }
@Override @Override
@ -273,11 +256,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return new FailedChannelFuture(this, cause); return new FailedChannelFuture(this, cause);
} }
@Override
public ChannelFuture newVoidFuture() {
return voidFuture;
}
@Override @Override
public void addClosureListener(final ChannelFutureListener listener) { public void addClosureListener(final ChannelFutureListener listener) {
if (listener == null) { if (listener == null) {
@ -445,6 +423,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private class DefaultUnsafe implements Unsafe { private class DefaultUnsafe implements Unsafe {
private final ChannelFuture voidFuture = new VoidChannelFuture(AbstractChannel.this);
@Override @Override
public java.nio.channels.Channel ch() { public java.nio.channels.Channel ch() {
return javaChannel(); return javaChannel();
@ -455,6 +435,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return firstOut(); return firstOut();
} }
@Override
public ChannelFuture voidFuture() {
return voidFuture;
}
@Override @Override
public SocketAddress localAddress() { public SocketAddress localAddress() {
return localAddress0(); return localAddress0();
@ -624,7 +609,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
pipeline().fireChannelInactive(); pipeline().fireChannelInactive();
} }
deregister(newVoidFuture()); deregister(voidFuture());
} else { } else {
// Closed already. // Closed already.
future.setSuccess(); future.setSuccess();
@ -689,12 +674,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
if (closed) { if (closed) {
close(newVoidFuture()); close(voidFuture());
} }
} catch (Throwable t) { } catch (Throwable t) {
pipeline().fireExceptionCaught(t); pipeline().fireExceptionCaught(t);
if (t instanceof IOException) { if (t instanceof IOException) {
close(newVoidFuture()); close(voidFuture());
} }
} }
} }
@ -737,7 +722,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (isOpen()) { if (isOpen()) {
return; return;
} }
close(newVoidFuture()); close(voidFuture());
} }
} }

View File

@ -174,6 +174,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
public interface Unsafe { public interface Unsafe {
java.nio.channels.Channel ch(); java.nio.channels.Channel ch();
ChannelBufferHolder<Object> out(); ChannelBufferHolder<Object> out();
ChannelFuture voidFuture();
SocketAddress localAddress(); SocketAddress localAddress();
SocketAddress remoteAddress(); SocketAddress remoteAddress();

View File

@ -4,5 +4,4 @@ public interface ChannelFutureFactory {
ChannelFuture newFuture(); ChannelFuture newFuture();
ChannelFuture newSucceededFuture(); ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause); ChannelFuture newFailedFuture(Throwable cause);
ChannelFuture newVoidFuture();
} }

View File

@ -658,58 +658,42 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelFuture bind(SocketAddress localAddress) { public ChannelFuture bind(SocketAddress localAddress) {
ChannelFuture f = channel().newFuture(); return bind(localAddress, channel().newFuture());
bind(localAddress, f);
return f;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress) { public ChannelFuture connect(SocketAddress remoteAddress) {
ChannelFuture f = channel().newFuture(); return connect(remoteAddress, channel().newFuture());
connect(remoteAddress, f);
return f;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ChannelFuture f = channel().newFuture(); return connect(remoteAddress, localAddress, channel().newFuture());
connect(remoteAddress, localAddress, f);
return f;
} }
@Override @Override
public ChannelFuture disconnect() { public ChannelFuture disconnect() {
ChannelFuture f = channel().newFuture(); return disconnect(channel().newFuture());
disconnect(f);
return f;
} }
@Override @Override
public ChannelFuture close() { public ChannelFuture close() {
ChannelFuture f = channel().newFuture(); return close(channel().newFuture());
close(f);
return f;
} }
@Override @Override
public ChannelFuture deregister() { public ChannelFuture deregister() {
ChannelFuture f = channel().newFuture(); return deregister(channel().newFuture());
deregister(f);
return f;
} }
@Override @Override
public ChannelFuture flush() { public ChannelFuture flush() {
ChannelFuture f = channel().newFuture(); return flush(channel().newFuture());
flush(f);
return f;
} }
@Override @Override
public ChannelFuture write(Object message) { public ChannelFuture write(Object message) {
ChannelFuture f = channel().newFuture(); return write(message, channel().newFuture());
write(message, f);
return f;
} }
@Override @Override
@ -1148,58 +1132,42 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelFuture bind(SocketAddress localAddress) { public ChannelFuture bind(SocketAddress localAddress) {
ChannelFuture f = newFuture(); return bind(localAddress, newFuture());
bind(localAddress, f);
return f;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress) { public ChannelFuture connect(SocketAddress remoteAddress) {
ChannelFuture f = newFuture(); return connect(remoteAddress, newFuture());
connect(remoteAddress, f);
return f;
} }
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ChannelFuture f = newFuture(); return connect(remoteAddress, localAddress, newFuture());
connect(remoteAddress, localAddress, f);
return f;
} }
@Override @Override
public ChannelFuture disconnect() { public ChannelFuture disconnect() {
ChannelFuture f = newFuture(); return disconnect(newFuture());
disconnect(f);
return f;
} }
@Override @Override
public ChannelFuture close() { public ChannelFuture close() {
ChannelFuture f = newFuture(); return close(newFuture());
close(f);
return f;
} }
@Override @Override
public ChannelFuture deregister() { public ChannelFuture deregister() {
ChannelFuture f = newFuture(); return deregister(newFuture());
deregister(f);
return f;
} }
@Override @Override
public ChannelFuture flush() { public ChannelFuture flush() {
ChannelFuture f = newFuture(); return flush(newFuture());
flush(f);
return f;
} }
@Override @Override
public ChannelFuture write(Object message) { public ChannelFuture write(Object message) {
ChannelFuture f = newFuture(); return write(message, newFuture());
write(message, f);
return f;
} }
@Override @Override
@ -1262,10 +1230,5 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public ChannelFuture newFailedFuture(Throwable cause) { public ChannelFuture newFailedFuture(Throwable cause) {
return channel().newFailedFuture(cause); return channel().newFailedFuture(cause);
} }
@Override
public ChannelFuture newVoidFuture() {
return channel().newVoidFuture();
}
} }
} }

View File

@ -51,9 +51,10 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
@Override @Override
public ChannelFuture register(Channel channel) { public ChannelFuture register(Channel channel) {
ChannelFuture future = channel.newFuture(); if (channel == null) {
register(channel, future); throw new NullPointerException("channel");
return future; }
return register(channel, channel.newFuture());
} }
@Override @Override

View File

@ -181,14 +181,10 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
new LinkedHashMap<Integer, ChannelFuture>(size()); new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: serverChannels.values()) { for (Channel c: serverChannels.values()) {
ChannelFuture f = c.newFuture(); futures.put(c.id(), c.close().awaitUninterruptibly());
c.close(f);
futures.put(c.id(), f.awaitUninterruptibly());
} }
for (Channel c: nonServerChannels.values()) { for (Channel c: nonServerChannels.values()) {
ChannelFuture f = c.newFuture(); futures.put(c.id(), c.close());
c.close(f);
futures.put(c.id(), f);
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures);
@ -200,14 +196,10 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
new LinkedHashMap<Integer, ChannelFuture>(size()); new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: serverChannels.values()) { for (Channel c: serverChannels.values()) {
ChannelFuture f = c.newFuture(); futures.put(c.id(), c.disconnect());
c.disconnect(f);
futures.put(c.id(), f.awaitUninterruptibly());
} }
for (Channel c: nonServerChannels.values()) { for (Channel c: nonServerChannels.values()) {
ChannelFuture f = c.newFuture(); futures.put(c.id(), c.disconnect());
c.disconnect(f);
futures.put(c.id(), f);
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures);
@ -220,15 +212,11 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
if (message instanceof ChannelBuffer) { if (message instanceof ChannelBuffer) {
ChannelBuffer buf = (ChannelBuffer) message; ChannelBuffer buf = (ChannelBuffer) message;
for (Channel c: nonServerChannels.values()) { for (Channel c: nonServerChannels.values()) {
ChannelFuture f = c.newFuture(); futures.put(c.id(), c.write(buf.duplicate()));
c.write(buf.duplicate(), f);
futures.put(c.id(), f);
} }
} else { } else {
for (Channel c: nonServerChannels.values()) { for (Channel c: nonServerChannels.values()) {
ChannelFuture f = c.newFuture(); futures.put(c.id(), c.write(message));
c.write(message, f);
futures.put(c.id(), f);
} }
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures);