Imlemented progress notification for NIO writes

This commit is contained in:
Trustin Lee 2010-02-23 01:12:36 +00:00
parent 43a603cfde
commit b010cd15a5
3 changed files with 13 additions and 3 deletions

View File

@ -394,7 +394,9 @@ public class DefaultChannelFuture implements ChannelFuture {
return false; return false;
} }
if (progressListeners.isEmpty()) { Collection<ChannelFutureProgressListener> progressListeners =
this.progressListeners;
if (progressListeners == null || progressListeners.isEmpty()) {
// Nothing to notify - no need to create an empty array. // Nothing to notify - no need to create an empty array.
return true; return true;
} }

View File

@ -466,7 +466,9 @@ class NioWorker implements Runnable {
bb = buf.buffer; bb = buf.buffer;
} }
ChannelFuture future = evt.getFuture();
try { try {
int oldWrittenBytes = writtenBytes;
for (int i = writeSpinCount; i > 0; i --) { for (int i = writeSpinCount; i > 0; i --) {
int localWrittenBytes = ch.write(bb); int localWrittenBytes = ch.write(bb);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
@ -478,7 +480,6 @@ class NioWorker implements Runnable {
if (!bb.hasRemaining()) { if (!bb.hasRemaining()) {
// Successful write - proceed to the next message. // Successful write - proceed to the next message.
buf.release(); buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
evt = null; evt = null;
@ -489,13 +490,18 @@ class NioWorker implements Runnable {
// Not written fully - perhaps the kernel buffer is full. // Not written fully - perhaps the kernel buffer is full.
addOpWrite = true; addOpWrite = true;
channel.writeSuspended = true; channel.writeSuspended = true;
// Notify progress listeners if necessary.
future.setProgress(
writtenBytes - oldWrittenBytes,
bb.position() - buf.initialPos,
bb.limit() - buf.initialPos);
break; break;
} }
} catch (AsynchronousCloseException e) { } catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore. // Doesn't need a user attention - ignore.
} catch (Throwable t) { } catch (Throwable t) {
buf.release(); buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
buf = null; buf = null;

View File

@ -145,10 +145,12 @@ final class SocketSendBufferPool {
final class SendBuffer { final class SendBuffer {
private final Preallocation parent; private final Preallocation parent;
final ByteBuffer buffer; final ByteBuffer buffer;
final int initialPos;
SendBuffer(Preallocation parent, ByteBuffer buffer) { SendBuffer(Preallocation parent, ByteBuffer buffer) {
this.parent = parent; this.parent = parent;
this.buffer = buffer; this.buffer = buffer;
initialPos = buffer.position();
} }
void release() { void release() {