From 70baea35dac0c2ac9bc4bdb4da6dff089e16207e Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 16 Jun 2012 20:51:45 +0200 Subject: [PATCH] Make reads work like expected with AOI. See #396 --- .../socket/nio2/AsyncNetworkChannel.java | 52 -------------- .../socket/nio2/AsyncSocketchannel.java | 68 ++++++++++++------- 2 files changed, 42 insertions(+), 78 deletions(-) delete mode 100755 transport/src/main/java/io/netty/channel/socket/nio2/AsyncNetworkChannel.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncNetworkChannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncNetworkChannel.java deleted file mode 100755 index eed1b6f5de..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncNetworkChannel.java +++ /dev/null @@ -1,52 +0,0 @@ -package io.netty.channel.socket.nio2; - -import java.io.IOException; -import java.net.SocketAddress; -import java.net.SocketOption; -import java.nio.channels.NetworkChannel; -import java.util.Map; -import java.util.Set; - -public class AsyncNetworkChannel implements NetworkChannel { - - private Map, Object> options; - - @Override - public void close() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isOpen() { - throw new UnsupportedOperationException(); - - } - - @Override - public NetworkChannel bind(SocketAddress local) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public SocketAddress getLocalAddress() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public synchronized T getOption(SocketOption name) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public synchronized NetworkChannel setOption(SocketOption name, T value) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public Set> supportedOptions() { - throw new UnsupportedOperationException(); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java index cd1e8619cc..98946f9882 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelStateHandlerAdapter; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; @@ -42,8 +43,10 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { public void channelActive(ChannelHandlerContext ctx) throws Exception { try { super.channelActive(ctx); + + // once the channel is active, the first read is scheduled AsyncSocketchannel.read((AsyncSocketchannel)ctx.channel()); - + } finally { ctx.pipeline().remove(this); } @@ -129,10 +132,30 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { return null; } + /** + * Trigger a read from the {@link AsyncSocketchannel} + * + */ private static void read(AsyncSocketchannel channel) { - channel.javaChannel().read(channel.pipeline().inboundByteBuffer().nioBuffer(), channel, READ_HANDLER); + ByteBuf byteBuf = channel.pipeline().inboundByteBuffer(); + expandReadBuffer(byteBuf); + + // Get a ByteBuffer view on the ByteBuf and clear it before try to read + ByteBuffer buffer = byteBuf.nioBuffer(); + buffer.clear(); + channel.javaChannel().read(buffer, channel, READ_HANDLER); } + + private static boolean expandReadBuffer(ByteBuf byteBuf) { + if (!byteBuf.writable()) { + // FIXME: Magic number + byteBuf.ensureWritableBytes(4096); + return true; + } + return false; + } + @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().bind(localAddress); @@ -200,23 +223,26 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { assert channel.eventLoop().inEventLoop(); final ChannelPipeline pipeline = channel.pipeline(); - final ByteBuf byteBuf = pipeline.inboundByteBuffer(); boolean closed = false; boolean read = false; try { - expandReadBuffer(byteBuf); - for (;;) { - int localReadAmount = result.intValue(); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - break; - } - if (!expandReadBuffer(byteBuf)) { - break; - } + + int localReadAmount = result.intValue(); + if (localReadAmount > 0) { + //Set the writerIndex of the buffer correctly to the + // current writerIndex + read amount of bytes. + // + // This is needed as the ByteBuffer and the ByteBuf does not share + // each others index + final ByteBuf byteBuf = pipeline.inboundByteBuffer(); + byteBuf.writerIndex(byteBuf.writerIndex() + result); + + read = true; + + } else if (localReadAmount < 0) { + closed = true; } + } catch (Throwable t) { if (read) { read = false; @@ -238,16 +264,6 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { } } } - - private static boolean expandReadBuffer(ByteBuf byteBuf) { - if (!byteBuf.writable()) { - // FIXME: Magic number - byteBuf.ensureWritableBytes(4096); - return true; - } - return false; - } - @Override public void failed(Throwable t, AsyncSocketchannel channel) { @@ -280,7 +296,7 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { @Override public AsyncSocketChannelConfig config() { if (config == null) { - throw new IllegalStateException("Channel not registered yet"); + throw new IllegalStateException("Channel not open yet"); } return config; }