Correct semantic of LocalChannel.doWrite(...) and remove memory copy

Motivation:

The semantic of LocalChannel.doWrite(...) were a bit off as it notified the ChannelFuture before the data was actual moved to the peer buffer.

Modifications:

- Use our MPSC queue as inbound buffer
- Directly copy to data to the inbound buffer of the peer and either success or fail the promise after each copy.

Result:

Correct semantic and less memory copies.
This commit is contained in:
Norman Maurer 2015-05-06 10:44:04 +02:00
parent 0027d830d4
commit 9d4dd933e5

View File

@ -29,14 +29,14 @@ import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.AlreadyConnectedException; import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException; import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue; import java.util.Queue;
/** /**
@ -49,7 +49,8 @@ public class LocalChannel extends AbstractChannel {
private static final int MAX_READER_STACK_DEPTH = 8; private static final int MAX_READER_STACK_DEPTH = 8;
private final ChannelConfig config = new DefaultChannelConfig(this); private final ChannelConfig config = new DefaultChannelConfig(this);
private final Queue<Object> inboundBuffer = new ArrayDeque<Object>(); // To futher optimize this we could write our own SPSC queue.
private final Queue<Object> inboundBuffer = PlatformDependent.newMpscQueue();
private final Runnable readTask = new Runnable() { private final Runnable readTask = new Runnable() {
@Override @Override
public void run() { public void run() {
@ -64,7 +65,6 @@ public class LocalChannel extends AbstractChannel {
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} }
}; };
private final Runnable shutdownHook = new Runnable() { private final Runnable shutdownHook = new Runnable() {
@Override @Override
public void run() { public void run() {
@ -286,29 +286,25 @@ public class LocalChannel extends AbstractChannel {
final ChannelPipeline peerPipeline = peer.pipeline(); final ChannelPipeline peerPipeline = peer.pipeline();
final EventLoop peerLoop = peer.eventLoop(); final EventLoop peerLoop = peer.eventLoop();
if (peerLoop == eventLoop()) {
for (;;) { for (;;) {
Object msg = in.current(); Object msg = in.current();
if (msg == null) { if (msg == null) {
break; break;
} }
peer.inboundBuffer.add(msg); try {
ReferenceCountUtil.retain(msg); peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
in.remove(); in.remove();
} catch (Throwable cause) {
in.remove(cause);
} }
finishPeerRead(peer, peerPipeline);
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
} }
peerLoop.execute(new Runnable() { if (peerLoop == eventLoop()) {
finishPeerRead(peer, peerPipeline);
} else {
peerLoop.execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
Collections.addAll(peer.inboundBuffer, msgsCopy);
finishPeerRead(peer, peerPipeline); finishPeerRead(peer, peerPipeline);
} }
}); });