Introduced an intermediary write buffer to reduce the contention
This commit is contained in:
parent
0241120ace
commit
39086edae6
@ -25,7 +25,9 @@ package org.jboss.netty.channel.socket.nio;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@ -52,7 +54,10 @@ abstract class NioSocketChannel extends AbstractChannel
|
|||||||
|
|
||||||
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
|
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
|
||||||
final Runnable writeTask = new WriteTask();
|
final Runnable writeTask = new WriteTask();
|
||||||
final Queue<MessageEvent> writeBuffer = new LinkedBlockingQueue<MessageEvent>();
|
final BlockingQueue<MessageEvent> writeBuffer =
|
||||||
|
new LinkedBlockingQueue<MessageEvent>();
|
||||||
|
final Queue<MessageEvent> internalWriteBuffer =
|
||||||
|
new LinkedList<MessageEvent>();
|
||||||
MessageEvent currentWriteEvent;
|
MessageEvent currentWriteEvent;
|
||||||
int currentWriteIndex;
|
int currentWriteIndex;
|
||||||
|
|
||||||
|
@ -315,7 +315,6 @@ class NioWorker implements Runnable {
|
|||||||
close(ch, ch.getSucceededFuture());
|
close(ch, ch.getSucceededFuture());
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME I/O 스레드냐 아니냐에 따라서 task queue 에 안넣거나 넣거나 IoSocketHandler, IoSocketDispatcher
|
|
||||||
static void write(final NioSocketChannel channel, boolean mightNeedWakeup) {
|
static void write(final NioSocketChannel channel, boolean mightNeedWakeup) {
|
||||||
if (mightNeedWakeup) {
|
if (mightNeedWakeup) {
|
||||||
NioWorker worker = channel.getWorker();
|
NioWorker worker = channel.getWorker();
|
||||||
@ -359,15 +358,20 @@ class NioWorker implements Runnable {
|
|||||||
boolean addOpWrite = false;
|
boolean addOpWrite = false;
|
||||||
boolean removeOpWrite = false;
|
boolean removeOpWrite = false;
|
||||||
|
|
||||||
|
Queue<MessageEvent> internalWriteBuffer = channel.internalWriteBuffer;
|
||||||
MessageEvent evt;
|
MessageEvent evt;
|
||||||
ChannelBuffer buf;
|
ChannelBuffer buf;
|
||||||
int bufIdx;
|
int bufIdx;
|
||||||
|
|
||||||
synchronized (channel.writeBuffer) {
|
synchronized (internalWriteBuffer) {
|
||||||
|
if (internalWriteBuffer.isEmpty()) {
|
||||||
|
channel.writeBuffer.drainTo(internalWriteBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
evt = channel.currentWriteEvent;
|
evt = channel.currentWriteEvent;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (evt == null) {
|
if (evt == null) {
|
||||||
evt = channel.writeBuffer.poll();
|
evt = internalWriteBuffer.poll();
|
||||||
if (evt == null) {
|
if (evt == null) {
|
||||||
channel.currentWriteEvent = null;
|
channel.currentWriteEvent = null;
|
||||||
removeOpWrite = true;
|
removeOpWrite = true;
|
||||||
@ -433,16 +437,21 @@ class NioWorker implements Runnable {
|
|||||||
boolean addOpWrite = false;
|
boolean addOpWrite = false;
|
||||||
boolean removeOpWrite = false;
|
boolean removeOpWrite = false;
|
||||||
|
|
||||||
int writtenBytes = 0;
|
Queue<MessageEvent> internalWriteBuffer = channel.internalWriteBuffer;
|
||||||
MessageEvent evt;
|
MessageEvent evt;
|
||||||
ChannelBuffer buf;
|
ChannelBuffer buf;
|
||||||
int bufIdx;
|
int bufIdx;
|
||||||
|
int writtenBytes = 0;
|
||||||
|
|
||||||
|
synchronized (internalWriteBuffer) {
|
||||||
|
if (internalWriteBuffer.isEmpty()) {
|
||||||
|
channel.writeBuffer.drainTo(internalWriteBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (channel.writeBuffer) {
|
|
||||||
evt = channel.currentWriteEvent;
|
evt = channel.currentWriteEvent;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (evt == null) {
|
if (evt == null) {
|
||||||
evt = channel.writeBuffer.poll();
|
evt = internalWriteBuffer.poll();
|
||||||
if (evt == null) {
|
if (evt == null) {
|
||||||
channel.currentWriteEvent = null;
|
channel.currentWriteEvent = null;
|
||||||
removeOpWrite = true;
|
removeOpWrite = true;
|
||||||
@ -677,7 +686,12 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Clean up the stale messages in the write buffer.
|
// Clean up the stale messages in the write buffer.
|
||||||
synchronized (channel.writeBuffer) {
|
Queue<MessageEvent> internalWriteBuffer = channel.internalWriteBuffer;
|
||||||
|
synchronized (internalWriteBuffer) {
|
||||||
|
if (internalWriteBuffer.isEmpty()) {
|
||||||
|
channel.writeBuffer.drainTo(internalWriteBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
MessageEvent evt = channel.currentWriteEvent;
|
MessageEvent evt = channel.currentWriteEvent;
|
||||||
if (evt != null) {
|
if (evt != null) {
|
||||||
channel.currentWriteEvent = null;
|
channel.currentWriteEvent = null;
|
||||||
@ -687,7 +701,7 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
evt = channel.writeBuffer.poll();
|
evt = internalWriteBuffer.poll();
|
||||||
if (evt == null) {
|
if (evt == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user