diff --git a/src/main/java/org/jboss/netty/channel/local/LocalChannel.java b/src/main/java/org/jboss/netty/channel/local/LocalChannel.java index e2de28b741..c6c4d990d1 100644 --- a/src/main/java/org/jboss/netty/channel/local/LocalChannel.java +++ b/src/main/java/org/jboss/netty/channel/local/LocalChannel.java @@ -53,6 +53,7 @@ class LocalChannel extends AbstractChannel { volatile LocalChannel pairedChannel; volatile LocalAddress localAddress; + volatile LocalAddress remoteAddress; final AtomicBoolean bound = new AtomicBoolean(); private final LocalChannelConfig config; final Queue writeBuffer = new LinkedTransferQueue(); @@ -73,8 +74,7 @@ class LocalChannel extends AbstractChannel { } public boolean isConnected() { - return localAddress != null && - pairedChannel != null && pairedChannel.localAddress != null; + return localAddress != null && remoteAddress != null; } public LocalAddress getLocalAddress() { @@ -82,12 +82,7 @@ class LocalChannel extends AbstractChannel { } public LocalAddress getRemoteAddress() { - LocalChannel pairedChannel = this.pairedChannel; - if (pairedChannel == null) { - return null; - } else { - return pairedChannel.getLocalAddress(); - } + return remoteAddress; } void closeNow(ChannelFuture future) { @@ -130,52 +125,49 @@ class LocalChannel extends AbstractChannel { void flushWriteBuffer() { LocalChannel pairedChannel = this.pairedChannel; - if (isConnected()){ - // Channel is open and connected and channelConnected event has - // been fired. - if (!delivering.get()) { - delivering.set(true); - try { - for (;;) { - MessageEvent e = writeBuffer.poll(); - if(e == null) { - break; - } - - e.getFuture().setSuccess(); - fireMessageReceived(pairedChannel, e.getMessage()); - fireWriteComplete(this, 1); - } - } finally { - delivering.set(false); - } - } - - return; - } - if (pairedChannel != null) { - // Channel is open and connected but channelConnected event has - // not been fired yet. - return; - } + if (pairedChannel.isConnected()){ + // Channel is open and connected and channelConnected event has + // been fired. + if (!delivering.get()) { + delivering.set(true); + try { + for (;;) { + MessageEvent e = writeBuffer.poll(); + if(e == null) { + break; + } - // Channel is closed or not connected yet - notify as failures. - Exception cause; - if (isOpen()) { - cause = new NotYetConnectedException(); + e.getFuture().setSuccess(); + fireMessageReceived(pairedChannel, e.getMessage()); + fireWriteComplete(this, 1); + } + } finally { + delivering.set(false); + } + } + } else { + // Channel is open and connected but channelConnected event has + // not been fired yet. + } } else { - cause = new ClosedChannelException(); - } - - for (;;) { - MessageEvent e = writeBuffer.poll(); - if(e == null) { - break; + // Channel is closed or not connected yet - notify as failures. + Exception cause; + if (isOpen()) { + cause = new NotYetConnectedException(); + } else { + cause = new ClosedChannelException(); } - e.getFuture().setFailure(cause); - fireExceptionCaught(this, cause); + for (;;) { + MessageEvent e = writeBuffer.poll(); + if(e == null) { + break; + } + + e.getFuture().setFailure(cause); + fireExceptionCaught(this, cause); + } } } } diff --git a/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java b/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java index 76ce14d053..e77f4228c3 100644 --- a/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/local/LocalClientChannelSink.java @@ -137,11 +137,17 @@ final class LocalClientChannelSink extends AbstractChannelSink { channel.pairedChannel = acceptedChannel; bind(channel, succeededFuture(channel), LocalAddress.newEphemeralInstance()); + channel.remoteAddress = serverChannel.getLocalAddress(); fireChannelConnected(channel, serverChannel.getLocalAddress()); acceptedChannel.localAddress = serverChannel.getLocalAddress(); acceptedChannel.bound.set(true); fireChannelBound(acceptedChannel, channel.getRemoteAddress()); + acceptedChannel.remoteAddress = channel.getLocalAddress(); fireChannelConnected(acceptedChannel, channel.getLocalAddress()); + + // Flush something that was written in channelBound / channelConnected + channel.flushWriteBuffer(); + acceptedChannel.flushWriteBuffer(); } }