Correct semantic of LocalChannel.doWrite(...) and remove memory copy

Motivation:

The semantic of LocalChannel.doWrite(...) were a bit off as it notified the ChannelFuture before the data was actual moved to the peer buffer.

Modifications:

- Use our MPSC queue as inbound buffer
- Directly copy to data to the inbound buffer of the peer and either success or fail the promise after each copy.

Result:

Correct semantic and less memory copies.
This commit is contained in:
Norman Maurer 2015-05-06 10:44:04 +02:00
parent 623b0145c2
commit 8e6699bb5c

View File

@ -28,14 +28,14 @@ import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
/**
@ -50,7 +50,8 @@ public class LocalChannel extends AbstractChannel {
private static final int MAX_READER_STACK_DEPTH = 8;
private final ChannelConfig config = new DefaultChannelConfig(this);
private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
// To futher optimize this we could write our own SPSC queue.
private final Queue<Object> inboundBuffer = PlatformDependent.newMpscQueue();
private final Runnable readTask = new Runnable() {
@Override
public void run() {
@ -65,7 +66,6 @@ public class LocalChannel extends AbstractChannel {
pipeline.fireChannelReadComplete();
}
};
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
@ -288,29 +288,25 @@ public class LocalChannel extends AbstractChannel {
final ChannelPipeline peerPipeline = peer.pipeline();
final EventLoop peerLoop = peer.eventLoop();
if (peerLoop == eventLoop()) {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
peer.inboundBuffer.add(msg);
ReferenceCountUtil.retain(msg);
try {
peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
in.remove();
} catch (Throwable cause) {
in.remove(cause);
}
finishPeerRead(peer, peerPipeline);
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
}
peerLoop.execute(new Runnable() {
if (peerLoop == eventLoop()) {
finishPeerRead(peer, peerPipeline);
} else {
peerLoop.execute(new OneTimeTask() {
@Override
public void run() {
Collections.addAll(peer.inboundBuffer, msgsCopy);
finishPeerRead(peer, peerPipeline);
}
});