Remove synchronized blocks to optimize BufferedWriteHandler. See #519
This commit is contained in:
parent
f1ba4f23a6
commit
88124d88ce
@ -22,6 +22,7 @@ import java.util.List;
|
|||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
import org.jboss.netty.buffer.ChannelBuffers;
|
||||||
@ -165,6 +166,7 @@ public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCy
|
|||||||
private final Queue<MessageEvent> queue;
|
private final Queue<MessageEvent> queue;
|
||||||
private final boolean consolidateOnFlush;
|
private final boolean consolidateOnFlush;
|
||||||
private volatile ChannelHandlerContext ctx;
|
private volatile ChannelHandlerContext ctx;
|
||||||
|
private final AtomicBoolean flush = new AtomicBoolean(false);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance with the default unbounded {@link BlockingQueue}
|
* Creates a new instance with the default unbounded {@link BlockingQueue}
|
||||||
@ -244,15 +246,18 @@ public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCy
|
|||||||
// No write request was made.
|
// No write request was made.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Channel channel = ctx.getChannel();
|
||||||
|
boolean acquired;
|
||||||
|
|
||||||
final Queue<MessageEvent> queue = getQueue();
|
// use CAS to see if the have flush already running, if so we don't need to take further actions
|
||||||
if (consolidateOnFlush) {
|
if (acquired = flush.compareAndSet(false, true)) {
|
||||||
if (queue.isEmpty()) {
|
final Queue<MessageEvent> queue = getQueue();
|
||||||
return;
|
if (consolidateOnFlush) {
|
||||||
}
|
if (queue.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
|
List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
|
||||||
synchronized (this) {
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
MessageEvent e = queue.poll();
|
MessageEvent e = queue.poll();
|
||||||
if (e == null) {
|
if (e == null) {
|
||||||
@ -268,9 +273,8 @@ public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCy
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
consolidatedWrite(pendingWrites);
|
consolidatedWrite(pendingWrites);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
synchronized (this) {
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
MessageEvent e = queue.poll();
|
MessageEvent e = queue.poll();
|
||||||
if (e == null) {
|
if (e == null) {
|
||||||
@ -280,6 +284,10 @@ public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCy
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
|
||||||
|
flush(consolidateOnFlush);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
|
private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user