Fixed issue: NETTY-377 messageReceived could happen before

channelConnected when using local channels

More precise LocalChannel state management to avoid incorrect event
order
This commit is contained in:
Trustin Lee 2011-01-12 17:58:49 +09:00
parent a28702c9e9
commit 8ad04dc6ca
2 changed files with 49 additions and 9 deletions

View File

@ -20,10 +20,11 @@ import static org.jboss.netty.channel.Channels.*;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
@ -41,9 +42,16 @@ import org.jboss.netty.util.internal.ThreadLocalBoolean;
*/
final class DefaultLocalChannel extends AbstractChannel implements LocalChannel {
// TODO Move the state management up to AbstractChannel to remove duplication.
private static final int ST_OPEN = 0;
private static final int ST_BOUND = 1;
private static final int ST_CONNECTED = 2;
private static final int ST_CLOSED = -1;
private final AtomicInteger state = new AtomicInteger(ST_OPEN);
private final ChannelConfig config;
private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
final AtomicBoolean bound = new AtomicBoolean();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
volatile DefaultLocalChannel pairedChannel;
@ -62,14 +70,42 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
return config;
}
@Override
public boolean isOpen() {
return state.get() >= ST_OPEN;
}
@Override
public boolean isBound() {
return bound.get() && isOpen();
return state.get() >= ST_BOUND;
}
@Override
public boolean isConnected() {
return pairedChannel != null && isOpen();
return state.get() == ST_CONNECTED;
}
final void setBound() throws ClosedChannelException {
if (!state.compareAndSet(ST_OPEN, ST_BOUND)) {
switch (state.get()) {
case ST_CLOSED:
throw new ClosedChannelException();
default:
throw new ChannelException("already bound");
}
}
}
final void setConnected() {
if (state.get() != ST_CLOSED) {
state.set(ST_CONNECTED);
}
}
@Override
protected boolean setClosed() {
state.set(ST_CLOSED);
return super.setClosed();
}
@Override

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.net.ConnectException;
import org.jboss.netty.channel.AbstractChannelSink;
@ -96,10 +97,7 @@ final class LocalClientChannelSink extends AbstractChannelSink {
throw new ChannelException("address already in use: " + localAddress);
}
if (!channel.bound.compareAndSet(false, true)) {
throw new ChannelException("already bound");
}
channel.setBound();
channel.localAddress = localAddress;
future.setSuccess();
fireChannelBound(channel, localAddress);
@ -136,12 +134,18 @@ final class LocalClientChannelSink extends AbstractChannelSink {
bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
channel.remoteAddress = serverChannel.getLocalAddress();
channel.setConnected();
fireChannelConnected(channel, serverChannel.getLocalAddress());
acceptedChannel.localAddress = serverChannel.getLocalAddress();
acceptedChannel.bound.set(true);
try {
acceptedChannel.setBound();
} catch (IOException e) {
throw new Error(e);
}
fireChannelBound(acceptedChannel, channel.getRemoteAddress());
acceptedChannel.remoteAddress = channel.getLocalAddress();
acceptedChannel.setConnected();
fireChannelConnected(acceptedChannel, channel.getLocalAddress());
// Flush something that was written in channelBound / channelConnected