Reduce the memory footprint of DefaultChannelPromise even more

- Merge waiters and fluchCheckpoint into a single field
- This limits the number of waiter threads to 2^24 - 1, which is still very large.  Can you imagine an application with 16 million threads?
This commit is contained in:
Trustin Lee 2013-02-26 13:52:49 -08:00
parent 98192d7c01
commit d8f5521210
2 changed files with 37 additions and 13 deletions

View File

@ -200,7 +200,7 @@ public final class ChannelFlushPromiseNotifier {
// Avoid overflow
final long newWriteCounter = this.writeCounter;
if (newWriteCounter >= 0x1000000000000000L) {
if (newWriteCounter >= 0x8000000000L) {
// Reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
this.writeCounter = 0;

View File

@ -47,11 +47,12 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
private final Channel channel;
private volatile Throwable cause;
private Object listeners; // Can be ChannelFutureListener or DefaultChannelPromiseListeners
private int waiters;
/**
* Opportunistically extending FlushCheckpoint to reduce GC.
* Only used for flush() operation. See AbstractChannel.DefaultUnsafe.flush() */
* The first 24 bits of this field represents the number of waiters waiting for this promise with await*().
* The other 40 bits of this field represents the flushCheckpoint used by ChannelFlushPromiseNotifier and
* AbstractChannel.Unsafe.flush().
*/
private long flushCheckpoint;
/**
@ -212,11 +213,11 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
synchronized (this) {
while (!isDone()) {
checkDeadLock();
waiters++;
incWaiters();
try {
wait();
} finally {
waiters--;
decWaiters();
}
}
}
@ -244,13 +245,13 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
synchronized (this) {
while (!isDone()) {
checkDeadLock();
waiters++;
incWaiters();
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
} finally {
waiters--;
decWaiters();
}
}
}
@ -308,7 +309,7 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
}
checkDeadLock();
waiters++;
incWaiters();
try {
for (;;) {
try {
@ -331,7 +332,7 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
}
}
} finally {
waiters--;
decWaiters();
}
}
} finally {
@ -395,7 +396,7 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
}
this.cause = cause;
if (waiters > 0) {
if (hasWaiters()) {
notifyAll();
}
}
@ -477,12 +478,35 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
@Override
long flushCheckpoint() {
return flushCheckpoint;
return flushCheckpoint & 0x000000FFFFFFFFFFL;
}
@Override
void flushCheckpoint(long checkpoint) {
flushCheckpoint = checkpoint;
if ((checkpoint & 0xFFFFFF0000000000L) != 0) {
throw new IllegalStateException("flushCheckpoint overflow");
}
flushCheckpoint = flushCheckpoint & 0xFFFFFF0000000000L | checkpoint;
}
private boolean hasWaiters() {
return (flushCheckpoint & 0xFFFFFF0000000000L) != 0;
}
private void incWaiters() {
long waiters = waiters() + 1;
if ((waiters & 0xFFFFFFFFFF000000L) != 0) {
throw new IllegalStateException("too many waiters");
}
flushCheckpoint = flushCheckpoint() | waiters << 40L;
}
private void decWaiters() {
flushCheckpoint = flushCheckpoint() | waiters() - 1L << 40L;
}
private long waiters() {
return flushCheckpoint >>> 40;
}
@Override