diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index dfb6efffca..a97a06648f 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -24,8 +24,6 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -74,13 +72,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final Integer id; private final Unsafe unsafe; private final ChannelPipeline pipeline = new DefaultChannelPipeline(this); - private final List closureListeners = new ArrayList(4); private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); - private final ChannelFuture voidFuture = new VoidChannelFuture(AbstractChannel.this); + private final ChannelFuture voidFuture = new VoidChannelFuture(this); + private final CloseFuture closeFuture = new CloseFuture(this); private volatile EventLoop eventLoop; private volatile boolean registered; - private volatile boolean notifiedClosureListeners; /** * The future of the current connection attempt. If not null, subsequent @@ -125,7 +122,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.id = id; unsafe = new DefaultUnsafe(); - closureListeners.add(new ChannelFutureListener() { + closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { allChannels.remove(id()); @@ -267,63 +264,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void addClosureListener(final ChannelFutureListener listener) { - if (listener == null) { - throw new NullPointerException("listener"); - } - synchronized (closureListeners) { - if (notifiedClosureListeners) { - eventLoop().execute(new Runnable() { - @Override - public void run() { - notifyClosureListener(listener); - } - }); - } else { - closureListeners.add(listener); - } - } - } - - @Override - public void removeClosureListener(ChannelFutureListener listener) { - if (listener == null) { - throw new NullPointerException("listener"); - } - synchronized (closureListeners) { - if (!notifiedClosureListeners) { - closureListeners.remove(listener); - } - } - } - - private void notifyClosureListeners() { - final ChannelFutureListener[] array; - synchronized (closureListeners) { - if (notifiedClosureListeners) { - return; - } - notifiedClosureListeners = true; - array = closureListeners.toArray(new ChannelFutureListener[closureListeners.size()]); - closureListeners.clear(); - } - - eventLoop().execute(new Runnable() { - @Override - public void run() { - for (ChannelFutureListener l: array) { - notifyClosureListener(l); - } - } - }); - } - - private void notifyClosureListener(final ChannelFutureListener listener) { - try { - listener.operationComplete(newSucceededFuture()); - } catch (Exception e) { - logger.warn("Failed to notify a closure listener.", e); - } + public ChannelFuture closeFuture() { + return closeFuture; } @Override @@ -610,12 +552,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } notifyFlushFutures(closedChannelException); - notifyClosureListeners(); if (wasActive && !isActive()) { pipeline().fireChannelInactive(); } + closeFuture.setClosed(); deregister(voidFuture()); } else { // Closed already. @@ -807,6 +749,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + private static final class CloseFuture extends DefaultChannelFuture implements ChannelFuture.Unsafe { + + CloseFuture(AbstractChannel ch) { + super(ch, false); + } + + @Override + public boolean setSuccess() { + throw new IllegalStateException(); + } + + @Override + public boolean setFailure(Throwable cause) { + throw new IllegalStateException(); + } + + void setClosed() { + boolean set = super.setSuccess(); + assert set; + } + } + protected abstract boolean isCompatible(EventLoop loop); protected abstract java.nio.channels.Channel javaChannel(); diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 59d125e68c..72bd726033 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -163,10 +163,11 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu */ SocketAddress remoteAddress(); - // FIXME: Introduce more flexible channel state notification mechanism - // - notify me when channel becomes (un)registered, (in)active - void addClosureListener(ChannelFutureListener listener); - void removeClosureListener(ChannelFutureListener remover); + /** + * Returns the {@link ChannelFuture} which will be notified when this + * channel is closed. This method always returns the same future instance. + */ + ChannelFuture closeFuture(); Unsafe unsafe();