Add javadocs and also handle writes correctly. See #396
This commit is contained in:
parent
70baea35da
commit
2ff22ff4c3
@ -72,7 +72,7 @@ public abstract class AbstractAsyncChannel extends AbstractChannel {
|
||||
|
||||
@Override
|
||||
protected void doDeregister() throws Exception {
|
||||
throw new UnsupportedOperationException("Deregistration is not supported by AbstractAsyncChannel");
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,13 +173,17 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
||||
|
||||
@Override
|
||||
protected boolean isFlushPending() {
|
||||
// TODO: Fix me
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
|
||||
// Only one pending write can be scheduled at one time. Otherwise
|
||||
// a PendingWriteException will be thrown. So use CAS to not run
|
||||
// into this
|
||||
if (flushing.compareAndSet(false, true)) {
|
||||
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
|
||||
ByteBuffer buffer = (ByteBuffer)buf.nioBuffer();
|
||||
javaChannel().write(buffer, this, WRITE_HANDLER);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -190,13 +194,21 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
||||
@Override
|
||||
public void completed(Integer result, AsyncSocketchannel channel) {
|
||||
ByteBuf buf = channel.pipeline().outboundByteBuffer();
|
||||
if (!buf.readable()) {
|
||||
buf.discardReadBytes();
|
||||
}
|
||||
|
||||
if (result > 0) {
|
||||
if (result < buf.readableBytes()) {
|
||||
// Update the readerIndex with the amount of read bytes
|
||||
buf.readerIndex(buf.readerIndex() + result);
|
||||
} else {
|
||||
// not enough space in the buffer anymore so discard everything that
|
||||
// was read already
|
||||
buf.discardReadBytes();
|
||||
|
||||
}
|
||||
channel.notifyFlushFutures();
|
||||
}
|
||||
|
||||
// Allow to have the next write pending
|
||||
channel.flushing.set(false);
|
||||
}
|
||||
|
||||
@ -212,6 +224,7 @@ public class AsyncSocketchannel extends AbstractAsyncChannel {
|
||||
if (cause instanceof IOException) {
|
||||
channel.close(channel.unsafe().voidFuture());
|
||||
}
|
||||
// Allow to have the next write pending
|
||||
channel.flushing.set(false);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user