Reduce the memory footprint of DefaultChannelPromise

This commit is contained in:
Trustin Lee 2013-02-13 16:38:20 -08:00
parent e2c948782b
commit d0a3c2d95e

View File

@ -16,11 +16,11 @@
package io.netty.channel; package io.netty.channel;
import io.netty.channel.ChannelFlushPromiseNotifier.FlushCheckpoint; import io.netty.channel.ChannelFlushPromiseNotifier.FlushCheckpoint;
import io.netty.util.Signal;
import io.netty.util.internal.InternalLogger; import io.netty.util.internal.InternalLogger;
import io.netty.util.internal.InternalLoggerFactory; import io.netty.util.internal.InternalLoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
@ -42,11 +42,11 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
} }
}; };
private static final Signal SUCCESS = new Signal(DefaultChannelPromise.class.getName() + ".SUCCESS");
private final Channel channel; private final Channel channel;
private ChannelFutureListener firstListener; private Object listeners; // Can be ChannelFutureListener or DefaultChannelPromiseListeners
private List<ChannelFutureListener> otherListeners;
private boolean done;
private Throwable cause; private Throwable cause;
private int waiters; private int waiters;
@ -72,17 +72,21 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
@Override @Override
public synchronized boolean isDone() { public synchronized boolean isDone() {
return done; return cause != null;
} }
@Override @Override
public synchronized boolean isSuccess() { public synchronized boolean isSuccess() {
return done && cause == null; return cause == SUCCESS;
} }
@Override @Override
public synchronized Throwable cause() { public Throwable cause() {
return cause; Throwable cause;
synchronized (this) {
cause = this.cause;
}
return cause == SUCCESS? null : cause;
} }
@Override @Override
@ -93,16 +97,17 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
boolean notifyNow = false; boolean notifyNow = false;
synchronized (this) { synchronized (this) {
if (done) { if (cause != null) {
notifyNow = true; notifyNow = true;
} else { } else {
if (firstListener == null) { if (listeners == null) {
firstListener = listener; listeners = listener;
} else { } else {
if (otherListeners == null) { if (listeners instanceof DefaultChannelPromiseListeners) {
otherListeners = new ArrayList<ChannelFutureListener>(1); ((DefaultChannelPromiseListeners) listeners).add(listener);
} else {
listeners = new DefaultChannelPromiseListeners((ChannelFutureListener) listeners, listener);
} }
otherListeners.add(listener);
} }
} }
} }
@ -136,15 +141,11 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
} }
synchronized (this) { synchronized (this) {
if (!done) { if (cause == null) {
if (listener == firstListener) { if (listeners instanceof DefaultChannelPromiseListeners) {
if (otherListeners != null && !otherListeners.isEmpty()) { ((DefaultChannelPromiseListeners) listeners).remove(listener);
firstListener = otherListeners.remove(0); } else if (listeners == listener) {
} else { listeners = null;
firstListener = null;
}
} else if (otherListeners != null) {
otherListeners.remove(listener);
} }
} }
} }
@ -205,7 +206,7 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
} }
synchronized (this) { synchronized (this) {
while (!done) { while (cause == null) {
checkDeadLock(); checkDeadLock();
waiters++; waiters++;
try { try {
@ -233,7 +234,7 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
public ChannelPromise awaitUninterruptibly() { public ChannelPromise awaitUninterruptibly() {
boolean interrupted = false; boolean interrupted = false;
synchronized (this) { synchronized (this) {
while (!done) { while (cause == null) {
checkDeadLock(); checkDeadLock();
waiters++; waiters++;
try { try {
@ -282,8 +283,8 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
try { try {
synchronized (this) { synchronized (this) {
if (done || waitTime <= 0) { if (cause != null || waitTime <= 0) {
return done; return cause != null;
} }
checkDeadLock(); checkDeadLock();
@ -300,12 +301,12 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
} }
} }
if (done) { if (cause != null) {
return true; return true;
} else { } else {
waitTime = timeoutNanos - (System.nanoTime() - startTime); waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) { if (waitTime <= 0) {
return done; return cause != null;
} }
} }
} }
@ -346,11 +347,11 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
private synchronized boolean success0() { private synchronized boolean success0() {
// Allow only once. // Allow only once.
if (done) { if (cause != null) {
return false; return false;
} }
done = true; cause = SUCCESS;
if (waiters > 0) { if (waiters > 0) {
notifyAll(); notifyAll();
} }
@ -377,12 +378,11 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
private synchronized boolean failure0(Throwable cause) { private synchronized boolean failure0(Throwable cause) {
// Allow only once. // Allow only once.
if (done) { if (cause != null) {
return false; return false;
} }
this.cause = cause; this.cause = cause;
done = true;
if (waiters > 0) { if (waiters > 0) {
notifyAll(); notifyAll();
} }
@ -396,33 +396,31 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
// 2) This method is called only when 'done' is true. Once 'done' // 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener() // becomes true, the listener list is never modified - see add/removeListener()
if (firstListener == null) { if (listeners == null) {
return; return;
} }
if (channel().eventLoop().inEventLoop()) { if (channel().eventLoop().inEventLoop()) {
notifyListener0(this, firstListener); if (listeners instanceof DefaultChannelPromiseListeners) {
firstListener = null; for (ChannelFutureListener l : (DefaultChannelPromiseListeners) listeners) {
if (otherListeners != null) {
for (ChannelFutureListener l: otherListeners) {
notifyListener0(this, l); notifyListener0(this, l);
} }
otherListeners = null; } else {
notifyListener0(this, (ChannelFutureListener) listeners);
} }
listeners = null;
} else { } else {
final ChannelFutureListener firstListener = this.firstListener; final Object listeners = this.listeners;
final List<ChannelFutureListener> otherListeners = this.otherListeners; this.listeners = null;
this.firstListener = null;
this.otherListeners = null;
channel().eventLoop().execute(new Runnable() { channel().eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
notifyListener0(DefaultChannelPromise.this, firstListener); if (listeners instanceof DefaultChannelPromiseListeners) {
if (otherListeners != null) { for (ChannelFutureListener l : (DefaultChannelPromiseListeners) listeners) {
for (ChannelFutureListener l: otherListeners) {
notifyListener0(DefaultChannelPromise.this, l); notifyListener0(DefaultChannelPromise.this, l);
} }
} else {
notifyListener0(DefaultChannelPromise.this, (ChannelFutureListener) listeners);
} }
} }
}); });
@ -478,4 +476,14 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
ChannelPromise future() { ChannelPromise future() {
return this; return this;
} }
private static final class DefaultChannelPromiseListeners extends ArrayList<ChannelFutureListener> {
private static final long serialVersionUID = 7414281537694651180L;
DefaultChannelPromiseListeners(ChannelFutureListener firstListener, ChannelFutureListener secondListener) {
super(2);
add(firstListener);
add(secondListener);
}
}
} }