add struct to replace parallel arrays consolidate flushed & unflushed buffers

This commit is contained in:
bgallagher 2013-08-13 15:39:28 -04:00 committed by Norman Maurer
parent 06e250e493
commit a383988cdb

View File

@ -41,7 +41,7 @@ public final class ChannelOutboundBuffer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
private static final int INITIAL_CAPACITY = 16;
private static final int INITIAL_CAPACITY = 32;
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override
@ -59,28 +59,21 @@ public final class ChannelOutboundBuffer {
}
private final Handle handle;
private AbstractChannel channel;
// Flushed messages are stored in a circular buffer.
private Object[] flushed;
private ChannelPromise[] flushedPromises;
private int[] flushedPendingSizes;
private long[] flushedProgresses;
private long[] flushedTotals;
private int head;
// A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The
// flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range
// [unflushed, tail).
private Entry[] buffer;
private int flushed;
private int unflushed;
private int tail;
private ByteBuffer[] nioBuffers;
private int nioBufferCount;
private long nioBufferSize;
// Unflushed messages are stored in an array list.
private Object[] unflushed;
private ChannelPromise[] unflushedPromises;
private int[] unflushedPendingSizes;
private long[] unflushedTotals;
private int unflushedCount;
private boolean inFail;
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
@ -98,164 +91,65 @@ public final class ChannelOutboundBuffer {
private ChannelOutboundBuffer(Handle handle) {
this.handle = handle;
flushed = new Object[INITIAL_CAPACITY];
flushedPromises = new ChannelPromise[INITIAL_CAPACITY];
flushedPendingSizes = new int[INITIAL_CAPACITY];
flushedProgresses = new long[INITIAL_CAPACITY];
flushedTotals = new long[INITIAL_CAPACITY];
buffer = new Entry[INITIAL_CAPACITY];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = new Entry();
}
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
unflushed = new Object[INITIAL_CAPACITY];
unflushedPromises = new ChannelPromise[INITIAL_CAPACITY];
unflushedPendingSizes = new int[INITIAL_CAPACITY];
unflushedTotals = new long[INITIAL_CAPACITY];
}
void addMessage(Object msg, ChannelPromise promise) {
Object[] unflushed = this.unflushed;
int unflushedCount = this.unflushedCount;
if (unflushedCount == unflushed.length - 1) {
doubleUnflushedCapacity();
unflushed = this.unflushed;
}
int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
unflushed[unflushedCount] = msg;
unflushedPendingSizes[unflushedCount] = size;
unflushedPromises[unflushedCount] = promise;
unflushedTotals[unflushedCount] = total(msg);
this.unflushedCount = unflushedCount + 1;
Entry e = buffer[tail++];
e.msg = msg;
e.pendingSize = size;
e.promise = promise;
e.total = total(msg);
tail &= buffer.length - 1;
if (tail == flushed) {
addCapacity();
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size);
}
private static long total(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}
private void doubleUnflushedCapacity() {
int newCapacity = unflushed.length << 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
int unflushedCount = this.unflushedCount;
Object[] a1 = new Object[newCapacity];
System.arraycopy(unflushed, 0, a1, 0, unflushedCount);
unflushed = a1;
ChannelPromise[] a2 = new ChannelPromise[newCapacity];
System.arraycopy(unflushedPromises, 0, a2, 0, unflushedCount);
unflushedPromises = a2;
int[] a3 = new int[newCapacity];
System.arraycopy(unflushedPendingSizes, 0, a3, 0, unflushedCount);
unflushedPendingSizes = a3;
long[] a4 = new long[newCapacity];
System.arraycopy(unflushedTotals, 0, a4, 0, unflushedCount);
unflushedTotals = a4;
}
void addFlush() {
final int unflushedCount = this.unflushedCount;
if (unflushedCount == 0) {
return;
}
Object[] unflushed = this.unflushed;
ChannelPromise[] unflushedPromises = this.unflushedPromises;
int[] unflushedPendingSizes = this.unflushedPendingSizes;
long[] unflushedTotals = this.unflushedTotals;
Object[] flushed = this.flushed;
ChannelPromise[] flushedPromises = this.flushedPromises;
int[] flushedPendingSizes = this.flushedPendingSizes;
long[] flushedProgresses = this.flushedProgresses;
long[] flushedTotals = this.flushedTotals;
int head = this.head;
int tail = this.tail;
for (int i = 0; i < unflushedCount; i ++) {
flushed[tail] = unflushed[i];
unflushed[i] = null;
flushedPromises[tail] = unflushedPromises[i];
unflushedPromises[i] = null;
flushedPendingSizes[tail] = unflushedPendingSizes[i];
flushedProgresses[tail] = 0;
flushedTotals[tail] = unflushedTotals[i];
if ((tail = (tail + 1) & (flushed.length - 1)) == head) {
this.tail = tail;
doubleFlushedCapacity();
head = this.head;
tail = this.tail;
flushed = this.flushed;
flushedPromises = this.flushedPromises;
flushedPendingSizes = this.flushedPendingSizes;
flushedProgresses = this.flushedProgresses;
flushedTotals = this.flushedTotals;
}
}
this.unflushedCount = 0;
this.tail = tail;
}
private void doubleFlushedCapacity() {
int p = head;
int n = flushed.length;
private void addCapacity() {
int p = flushed;
int n = buffer.length;
int r = n - p; // number of elements to the right of p
int s = size();
int newCapacity = n << 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
Object[] a1 = new Object[newCapacity];
System.arraycopy(flushed, p, a1, 0, r);
System.arraycopy(flushed, 0, a1, r, p);
flushed = a1;
Entry[] e = new Entry[newCapacity];
System.arraycopy(buffer, p, e, 0, r);
System.arraycopy(buffer, 0, e, r, p);
for (int i = n; i < e.length; i++) {
e[i] = new Entry();
}
ChannelPromise[] a2 = new ChannelPromise[newCapacity];
System.arraycopy(flushedPromises, p, a2, 0, r);
System.arraycopy(flushedPromises, 0, a2, r, p);
flushedPromises = a2;
int[] a3 = new int[newCapacity];
System.arraycopy(flushedPendingSizes, p, a3, 0, r);
System.arraycopy(flushedPendingSizes, 0, a3, r, p);
flushedPendingSizes = a3;
long[] a4 = new long[newCapacity];
System.arraycopy(flushedProgresses, p, a4, 0, r);
System.arraycopy(flushedProgresses, 0, a4, r, p);
flushedProgresses = a4;
long[] a5 = new long[newCapacity];
System.arraycopy(flushedTotals, p, a5, 0, r);
System.arraycopy(flushedTotals, 0, a5, r, p);
flushedTotals = a5;
head = 0;
buffer = e;
flushed = 0;
unflushed = s;
tail = n;
}
void addFlush() {
this.unflushed = this.tail;
}
/**
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
@ -312,38 +206,56 @@ public final class ChannelOutboundBuffer {
}
}
private static long total(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}
public Object current() {
return flushed[head];
if (isEmpty()) {
return null;
} else {
return buffer[flushed].msg;
}
}
public void progress(long amount) {
int head = this.head;
ChannelPromise p = flushedPromises[head];
Entry e = buffer[flushed];
ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) {
long progress = flushedProgresses[head] + amount;
flushedProgresses[head] = progress;
((ChannelProgressivePromise) p).tryProgress(progress, flushedTotals[head]);
long progress = e.progress + amount;
e.progress = progress;
((ChannelProgressivePromise) p).tryProgress(progress, e.total);
}
}
public boolean remove() {
int head = this.head;
if (isEmpty()) {
return false;
}
Object msg = flushed[head];
Entry e = buffer[this.flushed];
Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise;
int size = e.pendingSize;
e.clear();
this.flushed = flushed + 1 & buffer.length - 1;
safeRelease(msg);
flushed[head] = null;
ChannelPromise promise = flushedPromises[head];
flushedPromises[head] = null;
int size = flushedPendingSizes[head];
flushedPendingSizes[head] = 0;
this.head = head + 1 & flushed.length - 1;
promise.trySuccess();
decrementPendingOutboundBytes(size);
@ -352,23 +264,24 @@ public final class ChannelOutboundBuffer {
}
public boolean remove(Throwable cause) {
int head = this.head;
if (isEmpty()) {
return false;
}
Object msg = flushed[head];
Entry e = buffer[this.flushed];
Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise;
int size = e.pendingSize;
e.clear();
this.flushed = flushed + 1 & buffer.length - 1;
safeRelease(msg);
flushed[head] = null;
ChannelPromise promise = flushedPromises[head];
flushedPromises[head] = null;
int size = flushedPendingSizes[head];
flushedPendingSizes[head] = 0;
this.head = head + 1 & flushed.length - 1;
safeFail(promise, cause);
decrementPendingOutboundBytes(size);
@ -392,11 +305,11 @@ public final class ChannelOutboundBuffer {
long nioBufferSize = 0;
int nioBufferCount = 0;
final int mask = flushed.length - 1;
final int mask = buffer.length - 1;
Object m;
int i = head;
while ((m = flushed[i]) != null) {
int i = flushed;
while (i != unflushed && (m = buffer[i].msg) != null) {
if (!(m instanceof ByteBuf)) {
this.nioBufferCount = 0;
this.nioBufferSize = 0;
@ -434,7 +347,7 @@ public final class ChannelOutboundBuffer {
ByteBuf directBuf = channel.alloc().directBuffer(readableBytes);
directBuf.writeBytes(buf, readerIndex, readableBytes);
buf.release();
flushed[i] = directBuf;
buffer[i].msg = directBuf;
if (nioBufferCount == nioBuffers.length) {
nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount);
}
@ -476,11 +389,11 @@ public final class ChannelOutboundBuffer {
}
public int size() {
return tail - head & flushed.length - 1;
return unflushed - flushed & buffer.length - 1;
}
public boolean isEmpty() {
return head == tail;
return unflushed == flushed;
}
void failFlushed(Throwable cause) {
@ -522,24 +435,22 @@ public final class ChannelOutboundBuffer {
throw new IllegalStateException("close() must be invoked after the channel is closed.");
}
if (head != tail) {
if (!isEmpty()) {
throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
}
// Release all unflushed messages.
Object[] unflushed = this.unflushed;
ChannelPromise[] unflushedPromises = this.unflushedPromises;
int[] unflushedPendingSizes = this.unflushedPendingSizes;
final int unflushedCount = this.unflushedCount;
final int unflushedCount = this.tail - this.unflushed & buffer.length - 1;
try {
for (int i = 0; i < unflushedCount; i++) {
safeRelease(unflushed[i]);
unflushed[i] = null;
safeFail(unflushedPromises[i], cause);
unflushedPromises[i] = null;
Entry e = buffer[unflushed + i & buffer.length - 1];
safeRelease(e.msg);
e.msg = null;
safeFail(e.promise, cause);
e.promise = null;
// Just decrease; do not trigger any events via decrementPendingOutboundBytes()
int size = unflushedPendingSizes[i];
int size = e.pendingSize;
long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue - size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
@ -547,12 +458,13 @@ public final class ChannelOutboundBuffer {
newWriteBufferSize = oldValue - size;
}
unflushedPendingSizes[i] = 0;
e.pendingSize = 0;
}
} finally {
this.unflushedCount = 0;
tail = unflushed;
inFail = false;
}
RECYCLER.recycle(this, handle);
// Set the channel to null so it can be GC'ed ASAP
@ -572,4 +484,21 @@ public final class ChannelOutboundBuffer {
logger.warn("Promise done already: {} - new exception is:", promise, cause);
}
}
private static final class Entry {
Object msg;
ChannelPromise promise;
long progress;
long total;
int pendingSize;
public void clear() {
msg = null;
promise = null;
progress = 0;
total = 0;
pendingSize = 0;
}
}
}