diff --git a/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java b/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java index 1a05490ec6..f5d8ee2f90 100644 --- a/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java +++ b/src/main/java/org/jboss/netty/channel/local/DefaultLocalChannel.java @@ -20,10 +20,11 @@ import static org.jboss.netty.channel.Channels.*; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.ChannelConfig; +import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; @@ -41,9 +42,16 @@ import org.jboss.netty.util.internal.ThreadLocalBoolean; */ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel { + // TODO Move the state management up to AbstractChannel to remove duplication. + private static final int ST_OPEN = 0; + private static final int ST_BOUND = 1; + private static final int ST_CONNECTED = 2; + private static final int ST_CLOSED = -1; + private final AtomicInteger state = new AtomicInteger(ST_OPEN); + private final ChannelConfig config; private final ThreadLocalBoolean delivering = new ThreadLocalBoolean(); - final AtomicBoolean bound = new AtomicBoolean(); + final Queue writeBuffer = new LinkedTransferQueue(); volatile DefaultLocalChannel pairedChannel; @@ -61,12 +69,40 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel return config; } + @Override + public boolean isOpen() { + return state.get() >= ST_OPEN; + } + public boolean isBound() { - return bound.get() && isOpen(); + return state.get() >= ST_BOUND; } public boolean isConnected() { - return pairedChannel != null && isOpen(); + return state.get() == ST_CONNECTED; + } + + final void setBound() throws ClosedChannelException { + if (!state.compareAndSet(ST_OPEN, ST_BOUND)) { + switch (state.get()) { + case ST_CLOSED: + throw new ClosedChannelException(); + default: + throw new ChannelException("already bound"); + } + } + } + + final void setConnected() { + if (state.get() != ST_CLOSED) { + state.set(ST_CONNECTED); + } + } + + @Override + protected boolean setClosed() { + state.set(ST_CLOSED); + return super.setClosed(); } public LocalAddress getLocalAddress() { diff --git a/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java b/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java index 614275108a..347d564b33 100644 --- a/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java @@ -17,6 +17,7 @@ package org.jboss.netty.channel.local; import static org.jboss.netty.channel.Channels.*; +import java.io.IOException; import java.net.ConnectException; import org.jboss.netty.channel.AbstractChannelSink; @@ -95,10 +96,7 @@ final class LocalClientChannelSink extends AbstractChannelSink { throw new ChannelException("address already in use: " + localAddress); } - if (!channel.bound.compareAndSet(false, true)) { - throw new ChannelException("already bound"); - } - + channel.setBound(); channel.localAddress = localAddress; future.setSuccess(); fireChannelBound(channel, localAddress); @@ -135,12 +133,18 @@ final class LocalClientChannelSink extends AbstractChannelSink { bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL)); channel.remoteAddress = serverChannel.getLocalAddress(); + channel.setConnected(); fireChannelConnected(channel, serverChannel.getLocalAddress()); acceptedChannel.localAddress = serverChannel.getLocalAddress(); - acceptedChannel.bound.set(true); + try { + acceptedChannel.setBound(); + } catch (IOException e) { + throw new Error(e); + } fireChannelBound(acceptedChannel, channel.getRemoteAddress()); acceptedChannel.remoteAddress = channel.getLocalAddress(); + acceptedChannel.setConnected(); fireChannelConnected(acceptedChannel, channel.getLocalAddress()); // Flush something that was written in channelBound / channelConnected