Revive Channel.closeFuture

- ChannelPipeline now rejects an unsafe ChannelFuture, so there's no
  need to hide/remove closeFuture.
This commit is contained in:
Trustin Lee 2012-05-13 01:37:16 +09:00
parent 175acb7899
commit 6d14fac99c
2 changed files with 33 additions and 68 deletions

View File

@ -24,8 +24,6 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -74,13 +72,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final Integer id;
private final Unsafe unsafe;
private final ChannelPipeline pipeline = new DefaultChannelPipeline(this);
private final List<ChannelFutureListener> closureListeners = new ArrayList<ChannelFutureListener>(4);
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelFuture voidFuture = new VoidChannelFuture(AbstractChannel.this);
private final ChannelFuture voidFuture = new VoidChannelFuture(this);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile EventLoop eventLoop;
private volatile boolean registered;
private volatile boolean notifiedClosureListeners;
/**
* The future of the current connection attempt. If not null, subsequent
@ -125,7 +122,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.id = id;
unsafe = new DefaultUnsafe();
closureListeners.add(new ChannelFutureListener() {
closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
allChannels.remove(id());
@ -267,63 +264,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void addClosureListener(final ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
synchronized (closureListeners) {
if (notifiedClosureListeners) {
eventLoop().execute(new Runnable() {
@Override
public void run() {
notifyClosureListener(listener);
}
});
} else {
closureListeners.add(listener);
}
}
}
@Override
public void removeClosureListener(ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
synchronized (closureListeners) {
if (!notifiedClosureListeners) {
closureListeners.remove(listener);
}
}
}
private void notifyClosureListeners() {
final ChannelFutureListener[] array;
synchronized (closureListeners) {
if (notifiedClosureListeners) {
return;
}
notifiedClosureListeners = true;
array = closureListeners.toArray(new ChannelFutureListener[closureListeners.size()]);
closureListeners.clear();
}
eventLoop().execute(new Runnable() {
@Override
public void run() {
for (ChannelFutureListener l: array) {
notifyClosureListener(l);
}
}
});
}
private void notifyClosureListener(final ChannelFutureListener listener) {
try {
listener.operationComplete(newSucceededFuture());
} catch (Exception e) {
logger.warn("Failed to notify a closure listener.", e);
}
public ChannelFuture closeFuture() {
return closeFuture;
}
@Override
@ -610,12 +552,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
notifyFlushFutures(closedChannelException);
notifyClosureListeners();
if (wasActive && !isActive()) {
pipeline().fireChannelInactive();
}
closeFuture.setClosed();
deregister(voidFuture());
} else {
// Closed already.
@ -807,6 +749,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private static final class CloseFuture extends DefaultChannelFuture implements ChannelFuture.Unsafe {
CloseFuture(AbstractChannel ch) {
super(ch, false);
}
@Override
public boolean setSuccess() {
throw new IllegalStateException();
}
@Override
public boolean setFailure(Throwable cause) {
throw new IllegalStateException();
}
void setClosed() {
boolean set = super.setSuccess();
assert set;
}
}
protected abstract boolean isCompatible(EventLoop loop);
protected abstract java.nio.channels.Channel javaChannel();

View File

@ -163,10 +163,11 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
*/
SocketAddress remoteAddress();
// FIXME: Introduce more flexible channel state notification mechanism
// - notify me when channel becomes (un)registered, (in)active
void addClosureListener(ChannelFutureListener listener);
void removeClosureListener(ChannelFutureListener remover);
/**
* Returns the {@link ChannelFuture} which will be notified when this
* channel is closed. This method always returns the same future instance.
*/
ChannelFuture closeFuture();
Unsafe unsafe();