Fix a bug where channelActive is not fired for an accepted channel

This commit is contained in:
Trustin Lee 2012-06-18 17:22:06 +09:00
parent 3e2953cf92
commit 5caf78acc0

View File

@ -18,10 +18,7 @@ package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType; import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateHandler;
import io.netty.channel.ChannelStateHandlerAdapter;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import java.io.IOException; import java.io.IOException;
@ -38,24 +35,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler(); private static final CompletionHandler<Void, AioSocketChannel> CONNECT_HANDLER = new ConnectHandler();
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler(); private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler(); private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
private static final ChannelStateHandler READ_START_HANDLER = new ChannelStateHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelActive(ctx);
// once the channel is active, the first read is scheduled
((AioSocketChannel)ctx.channel()).read();
} finally {
ctx.pipeline().remove(this);
}
}
};
private final AtomicBoolean flushing = new AtomicBoolean(false); private final AtomicBoolean flushing = new AtomicBoolean(false);
private volatile AioSocketChannelConfig config; private volatile AioSocketChannelConfig config;
@ -65,10 +45,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
public AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) { public AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) {
super(parent, id); super(parent, id);
this.ch = channel; ch = channel;
if (ch != null) { if (ch != null) {
config = new AioSocketChannelConfig(javaChannel()); config = new AioSocketChannelConfig(javaChannel());
pipeline().addLast(READ_START_HANDLER);
} }
} }
@ -129,18 +108,17 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (ch == null) { if (ch == null) {
ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
config = new AioSocketChannelConfig(javaChannel()); config = new AioSocketChannelConfig(javaChannel());
pipeline().addLast(READ_START_HANDLER); } else if (remoteAddress() != null) {
read();
} }
return null; return null;
} }
/** /**
* Trigger a read from the {@link AioSocketChannel} * Trigger a read from the {@link AioSocketChannel}
* *
*/ */
void read() { void read() {
ByteBuf byteBuf = pipeline().inboundByteBuffer(); ByteBuf byteBuf = pipeline().inboundByteBuffer();
expandReadBuffer(byteBuf); expandReadBuffer(byteBuf);
// Get a ByteBuffer view on the ByteBuf // Get a ByteBuffer view on the ByteBuf
@ -148,7 +126,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
javaChannel().read(buffer, this, READ_HANDLER); javaChannel().read(buffer, this, READ_HANDLER);
} }
private static boolean expandReadBuffer(ByteBuf byteBuf) { private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) { if (!byteBuf.writable()) {
// FIXME: Magic number // FIXME: Magic number
@ -157,7 +135,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
return false; return false;
} }
@Override @Override
protected void doBind(SocketAddress localAddress) throws Exception { protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress); javaChannel().bind(localAddress);
@ -185,12 +163,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
buf.clear(); buf.clear();
return true; return true;
} }
// Only one pending write can be scheduled at one time. Otherwise // Only one pending write can be scheduled at one time. Otherwise
// a PendingWriteException will be thrown. So use CAS to not run // a PendingWriteException will be thrown. So use CAS to not run
// into this // into this
if (flushing.compareAndSet(false, true)) { if (flushing.compareAndSet(false, true)) {
ByteBuffer buffer = (ByteBuffer) buf.nioBuffer(); ByteBuffer buffer = buf.nioBuffer();
System.err.println("WRITE: " + buffer);
javaChannel().write(buffer, this, WRITE_HANDLER); javaChannel().write(buffer, this, WRITE_HANDLER);
} }
return false; return false;
@ -206,13 +185,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (result > 0) { if (result > 0) {
// Update the readerIndex with the amount of read bytes // Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result); buf.readerIndex(buf.readerIndex() + result);
channel.notifyFlushFutures(); channel.notifyFlushFutures();
if (!buf.readable()) { if (!buf.readable()) {
buf.discardReadBytes(); buf.discardReadBytes();
} }
} }
// Allow to have the next write pending // Allow to have the next write pending
channel.flushing.set(false); channel.flushing.set(false);
} }
@ -245,10 +224,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
try { try {
int localReadAmount = result.intValue(); int localReadAmount = result.intValue();
if (localReadAmount > 0) { if (localReadAmount > 0) {
//Set the writerIndex of the buffer correctly to the // Set the writerIndex of the buffer correctly to the
// current writerIndex + read amount of bytes. // current writerIndex + read amount of bytes.
// //
// This is needed as the ByteBuffer and the ByteBuf does not share // This is needed as the ByteBuffer and the ByteBuf does not share
@ -261,7 +239,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} else if (localReadAmount < 0) { } else if (localReadAmount < 0) {
closed = true; closed = true;
} }
} catch (Throwable t) { } catch (Throwable t) {
if (read) { if (read) {
read = false; read = false;
@ -301,7 +279,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
public void completed(Void result, AioSocketChannel channel) { public void completed(Void result, AioSocketChannel channel) {
((AsyncUnsafe) channel.unsafe()).connectSuccess(); ((AsyncUnsafe) channel.unsafe()).connectSuccess();
channel.pipeline().fireChannelActive();
// start reading from channel // start reading from channel
channel.read(); channel.read();
} }