Made sure MessageEvents are fired after both channels are fully connected

This commit is contained in:
Trustin Lee 2009-02-09 09:28:12 +00:00
parent 03de679b17
commit 2b222b4994
2 changed files with 47 additions and 49 deletions

View File

@ -53,6 +53,7 @@ class LocalChannel extends AbstractChannel {
volatile LocalChannel pairedChannel;
volatile LocalAddress localAddress;
volatile LocalAddress remoteAddress;
final AtomicBoolean bound = new AtomicBoolean();
private final LocalChannelConfig config;
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
@ -73,8 +74,7 @@ class LocalChannel extends AbstractChannel {
}
public boolean isConnected() {
return localAddress != null &&
pairedChannel != null && pairedChannel.localAddress != null;
return localAddress != null && remoteAddress != null;
}
public LocalAddress getLocalAddress() {
@ -82,12 +82,7 @@ class LocalChannel extends AbstractChannel {
}
public LocalAddress getRemoteAddress() {
LocalChannel pairedChannel = this.pairedChannel;
if (pairedChannel == null) {
return null;
} else {
return pairedChannel.getLocalAddress();
}
return remoteAddress;
}
void closeNow(ChannelFuture future) {
@ -130,52 +125,49 @@ class LocalChannel extends AbstractChannel {
void flushWriteBuffer() {
LocalChannel pairedChannel = this.pairedChannel;
if (isConnected()){
// Channel is open and connected and channelConnected event has
// been fired.
if (!delivering.get()) {
delivering.set(true);
try {
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
e.getFuture().setSuccess();
fireMessageReceived(pairedChannel, e.getMessage());
fireWriteComplete(this, 1);
}
} finally {
delivering.set(false);
}
}
return;
}
if (pairedChannel != null) {
// Channel is open and connected but channelConnected event has
// not been fired yet.
return;
}
if (pairedChannel.isConnected()){
// Channel is open and connected and channelConnected event has
// been fired.
if (!delivering.get()) {
delivering.set(true);
try {
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
// Channel is closed or not connected yet - notify as failures.
Exception cause;
if (isOpen()) {
cause = new NotYetConnectedException();
e.getFuture().setSuccess();
fireMessageReceived(pairedChannel, e.getMessage());
fireWriteComplete(this, 1);
}
} finally {
delivering.set(false);
}
}
} else {
// Channel is open and connected but channelConnected event has
// not been fired yet.
}
} else {
cause = new ClosedChannelException();
}
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
// Channel is closed or not connected yet - notify as failures.
Exception cause;
if (isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
e.getFuture().setFailure(cause);
fireExceptionCaught(this, cause);
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
e.getFuture().setFailure(cause);
fireExceptionCaught(this, cause);
}
}
}
}

View File

@ -137,11 +137,17 @@ final class LocalClientChannelSink extends AbstractChannelSink {
channel.pairedChannel = acceptedChannel;
bind(channel, succeededFuture(channel), LocalAddress.newEphemeralInstance());
channel.remoteAddress = serverChannel.getLocalAddress();
fireChannelConnected(channel, serverChannel.getLocalAddress());
acceptedChannel.localAddress = serverChannel.getLocalAddress();
acceptedChannel.bound.set(true);
fireChannelBound(acceptedChannel, channel.getRemoteAddress());
acceptedChannel.remoteAddress = channel.getLocalAddress();
fireChannelConnected(acceptedChannel, channel.getLocalAddress());
// Flush something that was written in channelBound / channelConnected
channel.flushWriteBuffer();
acceptedChannel.flushWriteBuffer();
}
}