[#1388] Ensure AbstractNioMessageChannel based Channels will call fireInboundBufferUpdated() soon enough to release resources

This commit is contained in:
Norman Maurer 2013-05-23 16:59:03 +02:00
parent 83dcf829d6
commit 50ac0cdfcb

View File

@ -28,6 +28,9 @@ import java.nio.channels.SelectionKey;
*/
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
// Hard coded for now.
private static final int READ_BATCH_SIZE = 16;
/**
* @see {@link AbstractNioChannel#AbstractNioChannel(Channel, Integer, SelectableChannel, int)}
*/
@ -56,40 +59,47 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
boolean closed = false;
boolean read = false;
boolean firedChannelReadSuspended = false;
try {
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break;
} else if (localReadAmount < 0) {
closed = true;
break;
loop: for (;;) {
int reads = 0;
try {
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break loop;
} else if (localReadAmount < 0) {
closed = true;
break loop;
}
if (reads++ > READ_BATCH_SIZE) {
break;
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
if (t instanceof IOException) {
if (t instanceof IOException) {
closed = true;
} else if (!closed) {
firedChannelReadSuspended = true;
pipeline.fireChannelReadSuspended();
}
} else if (!closed) {
firedChannelReadSuspended = true;
pipeline.fireChannelReadSuspended();
}
pipeline().fireExceptionCaught(t);
} finally {
if (read) {
pipeline().fireExceptionCaught(t);
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidPromise());
} else if (!firedChannelReadSuspended) {
pipeline.fireChannelReadSuspended();
}
if (closed && isOpen()) {
close(voidPromise());
} else if (!firedChannelReadSuspended) {
pipeline.fireChannelReadSuspended();
}
}
}
}