Make reads work like expected with AOI. See #396

This commit is contained in:
Norman Maurer 2012-06-16 20:51:45 +02:00
parent f8ef5d5d78
commit 70baea35da
2 changed files with 42 additions and 78 deletions

View File

@ -1,52 +0,0 @@
package io.netty.channel.socket.nio2;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.NetworkChannel;
import java.util.Map;
import java.util.Set;
public class AsyncNetworkChannel implements NetworkChannel {
private Map<SocketOption<?>, Object> options;
@Override
public void close() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isOpen() {
throw new UnsupportedOperationException();
}
@Override
public NetworkChannel bind(SocketAddress local) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public SocketAddress getLocalAddress() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public synchronized <T> T getOption(SocketOption<T> name) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public synchronized <T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public Set<SocketOption<?>> supportedOptions() {
throw new UnsupportedOperationException();
}
}

View File

@ -26,6 +26,7 @@ import io.netty.channel.ChannelStateHandlerAdapter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
@ -42,8 +43,10 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelActive(ctx);
// once the channel is active, the first read is scheduled
AsyncSocketchannel.read((AsyncSocketchannel)ctx.channel());
} finally {
ctx.pipeline().remove(this);
}
@ -129,10 +132,30 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
return null;
}
/**
* Trigger a read from the {@link AsyncSocketchannel}
*
*/
private static void read(AsyncSocketchannel channel) {
channel.javaChannel().read(channel.pipeline().inboundByteBuffer().nioBuffer(), channel, READ_HANDLER);
ByteBuf byteBuf = channel.pipeline().inboundByteBuffer();
expandReadBuffer(byteBuf);
// Get a ByteBuffer view on the ByteBuf and clear it before try to read
ByteBuffer buffer = byteBuf.nioBuffer();
buffer.clear();
channel.javaChannel().read(buffer, channel, READ_HANDLER);
}
private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
@ -200,23 +223,26 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
assert channel.eventLoop().inEventLoop();
final ChannelPipeline pipeline = channel.pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
boolean closed = false;
boolean read = false;
try {
expandReadBuffer(byteBuf);
for (;;) {
int localReadAmount = result.intValue();
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
int localReadAmount = result.intValue();
if (localReadAmount > 0) {
//Set the writerIndex of the buffer correctly to the
// current writerIndex + read amount of bytes.
//
// This is needed as the ByteBuffer and the ByteBuf does not share
// each others index
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
byteBuf.writerIndex(byteBuf.writerIndex() + result);
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
@ -238,16 +264,6 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
}
}
}
private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
@Override
public void failed(Throwable t, AsyncSocketchannel channel) {
@ -280,7 +296,7 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
@Override
public AsyncSocketChannelConfig config() {
if (config == null) {
throw new IllegalStateException("Channel not registered yet");
throw new IllegalStateException("Channel not open yet");
}
return config;
}