* Added Channel.getCloseFuture()

* Changed ChannelFuture.setSuccess() and setFailure() to return a boolean value
This commit is contained in:
Trustin Lee 2008-11-26 09:21:00 +00:00
parent 6947ba0863
commit 6e40f62574
7 changed files with 89 additions and 21 deletions

View File

@ -24,7 +24,6 @@ package org.jboss.netty.channel;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.util.TimeBasedUuidGenerator; import org.jboss.netty.util.TimeBasedUuidGenerator;
@ -44,8 +43,7 @@ public abstract class AbstractChannel implements Channel, Comparable<Channel> {
private final ChannelFactory factory; private final ChannelFactory factory;
private final ChannelPipeline pipeline; private final ChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelFuture closeFuture = new UnfailingChannelFuture(this, false);
private final AtomicBoolean closed = new AtomicBoolean();
private volatile int interestOps = OP_READ; private volatile int interestOps = OP_READ;
/** Cache for the string representation of this channel */ /** Cache for the string representation of this channel */
@ -131,7 +129,7 @@ public abstract class AbstractChannel implements Channel, Comparable<Channel> {
} }
public boolean isOpen() { public boolean isOpen() {
return !closed.get(); return !closeFuture.isDone();
} }
/** /**
@ -143,7 +141,7 @@ public abstract class AbstractChannel implements Channel, Comparable<Channel> {
* closed yet * closed yet
*/ */
protected boolean setClosed() { protected boolean setClosed() {
return closed.compareAndSet(false, true); return closeFuture.setSuccess();
} }
public ChannelFuture bind(SocketAddress localAddress) { public ChannelFuture bind(SocketAddress localAddress) {
@ -155,7 +153,13 @@ public abstract class AbstractChannel implements Channel, Comparable<Channel> {
} }
public ChannelFuture close() { public ChannelFuture close() {
return Channels.close(this); ChannelFuture returnedCloseFuture = Channels.close(this);
assert closeFuture == returnedCloseFuture;
return closeFuture;
}
public ChannelFuture getCloseFuture() {
return closeFuture;
} }
public ChannelFuture connect(SocketAddress remoteAddress) { public ChannelFuture connect(SocketAddress remoteAddress) {

View File

@ -230,13 +230,22 @@ public interface Channel {
/** /**
* Closes this channel asynchronously. If this channel is bound or * Closes this channel asynchronously. If this channel is bound or
* connected, it will be disconnected and unbound first. * connected, it will be disconnected and unbound first. Once a channel
* is closed, it can not be open again. Calling this method on a closed
* channel has no effect. Please note that this method always returns the
* same future instance.
* *
* @return the {@link ChannelFuture} which will be notified when the * @return the {@link ChannelFuture} which will be notified when the
* close request succeeds or fails * close request succeeds or fails
*/ */
ChannelFuture close(); ChannelFuture close();
/**
* Returns the {@link ChannelFuture} which will be notified when this
* channel is closed. This method always returns the same future instance.
*/
ChannelFuture getCloseFuture();
/** /**
* Returns the current {@code interestOps} of this channel. * Returns the current {@code interestOps} of this channel.
* *

View File

@ -97,13 +97,13 @@ public interface ChannelFuture {
* Marks this future as a success and notifies all * Marks this future as a success and notifies all
* listeners. * listeners.
*/ */
void setSuccess(); boolean setSuccess();
/** /**
* Marks this future as a failure and notifies all * Marks this future as a failure and notifies all
* listeners. * listeners.
*/ */
void setFailure(Throwable cause); boolean setFailure(Throwable cause);
/** /**
* Adds the specified listener to this future. The * Adds the specified listener to this future. The

View File

@ -732,7 +732,7 @@ public class Channels {
* @return the {@link ChannelFuture} which will be notified on closure * @return the {@link ChannelFuture} which will be notified on closure
*/ */
public static ChannelFuture close(Channel channel) { public static ChannelFuture close(Channel channel) {
ChannelFuture future = future(channel); ChannelFuture future = channel.getCloseFuture();
channel.getPipeline().sendDownstream(new DefaultChannelStateEvent( channel.getPipeline().sendDownstream(new DefaultChannelStateEvent(
channel, future, ChannelState.OPEN, Boolean.FALSE)); channel, future, ChannelState.OPEN, Boolean.FALSE));
return future; return future;

View File

@ -101,12 +101,12 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
return true; return true;
} }
public void setFailure(Throwable cause) { public boolean setFailure(Throwable cause) {
// Unused return false;
} }
public void setSuccess() { public boolean setSuccess() {
// Unused return false;
} }
public boolean cancel() { public boolean cancel() {

View File

@ -234,37 +234,39 @@ public class DefaultChannelFuture implements ChannelFuture {
} }
} }
public void setSuccess() { public boolean setSuccess() {
synchronized (this) { synchronized (this) {
// Allow only once. // Allow only once.
if (done) { if (done) {
return; return false;
} }
done = true; done = true;
if (waiters > 0) { if (waiters > 0) {
this.notifyAll(); notifyAll();
} }
} }
notifyListeners(); notifyListeners();
return true;
} }
public void setFailure(Throwable cause) { public boolean setFailure(Throwable cause) {
synchronized (this) { synchronized (this) {
// Allow only once. // Allow only once.
if (done) { if (done) {
return; return false;
} }
this.cause = cause; this.cause = cause;
done = true; done = true;
if (waiters > 0) { if (waiters > 0) {
this.notifyAll(); notifyAll();
} }
} }
notifyListeners(); notifyListeners();
return true;
} }
public boolean cancel() { public boolean cancel() {
@ -281,7 +283,7 @@ public class DefaultChannelFuture implements ChannelFuture {
cause = CANCELLED; cause = CANCELLED;
done = true; done = true;
if (waiters > 0) { if (waiters > 0) {
this.notifyAll(); notifyAll();
} }
} }

View File

@ -0,0 +1,53 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
/**
* The {@link ChannelFuture} which can not fail at all. Any attempt to mark
* this future as failure, by calling {@link #setFailure(Throwable)} will raise
* an {@link IllegalStateException}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*/
public class UnfailingChannelFuture extends DefaultChannelFuture {
/**
* Creates a new instance.
*
* @param channel
* the {@link Channel} associated with this future
* @param cancellable
* {@code true} if and only if this future can be canceled
*/
public UnfailingChannelFuture(Channel channel, boolean cancellable) {
super(channel, cancellable);
}
@Override
public boolean setFailure(Throwable cause) {
throw new IllegalStateException("Can not fail");
}
}