Fix a regression caused by 73dfd7c01b

Motivation:

73dfd7c01b introduced various test
failures because:

- EpollSocketChannel.doWrite() raised a NullPointerException when
  notifying the write progress.
- ChannelOutboundBuffer.nioBuffers() did not expand the internal array
  when the pending entries contained more than 1024 buffers, dropping
  the remainder.

Modifications:

- Fix the NPE in EpollSocketChannel by removing an unnecessary progress
  update
- Expand the thread-local buffer array if there is not enough room,
  which was the original behavior dropped by the offending commit

Result:

Regression is gone.
This commit is contained in:
Trustin Lee 2014-07-30 13:49:17 -07:00
parent 07801d7b38
commit 997d8c32d2
2 changed files with 29 additions and 8 deletions

View File

@ -192,7 +192,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.progress(buf.readableBytes());
in.remove();
}
in.progress(writtenBytes);
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.

View File

@ -30,6 +30,7 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -325,13 +326,13 @@ public final class ChannelOutboundBuffer {
}
private void removeEntry(Entry e) {
if (e == tailEntry) {
if (-- flushed == 0) {
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
if (-- flushed == 0) {
flushedEntry = null;
} else {
flushedEntry = e.next;
}
@ -352,7 +353,8 @@ public final class ChannelOutboundBuffer {
long nioBufferSize = 0;
int nioBufferCount = 0;
final ByteBufAllocator alloc = channel.alloc();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get();
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (entry != null && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
@ -369,7 +371,8 @@ public final class ChannelOutboundBuffer {
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
break;
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (count == 1) {
@ -431,6 +434,25 @@ public final class ChannelOutboundBuffer {
return nioBufferCount;
}
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
public int nioBufferCount() {
return nioBufferCount;
}