Correct semantic of LocalChannel.doWrite(...) and remove memory copy
Motivation: The semantic of LocalChannel.doWrite(...) were a bit off as it notified the ChannelFuture before the data was actual moved to the peer buffer. Modifications: - Use our MPSC queue as inbound buffer - Directly copy to data to the inbound buffer of the peer and either success or fail the promise after each copy. Result: Correct semantic and less memory copies.
This commit is contained in:
parent
868eb49cd2
commit
e276cb796c
@ -29,14 +29,14 @@ import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.SingleThreadEventExecutor;
|
||||
import io.netty.util.internal.InternalThreadLocalMap;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.AlreadyConnectedException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.ConnectionPendingException;
|
||||
import java.nio.channels.NotYetConnectedException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collections;
|
||||
import java.util.Queue;
|
||||
|
||||
/**
|
||||
@ -51,7 +51,8 @@ public class LocalChannel extends AbstractChannel {
|
||||
private static final int MAX_READER_STACK_DEPTH = 8;
|
||||
|
||||
private final ChannelConfig config = new DefaultChannelConfig(this);
|
||||
private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
|
||||
// To futher optimize this we could write our own SPSC queue.
|
||||
private final Queue<Object> inboundBuffer = PlatformDependent.newMpscQueue();
|
||||
private final Runnable readTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@ -66,7 +67,6 @@ public class LocalChannel extends AbstractChannel {
|
||||
pipeline.fireChannelReadComplete();
|
||||
}
|
||||
};
|
||||
|
||||
private final Runnable shutdownHook = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@ -289,29 +289,25 @@ public class LocalChannel extends AbstractChannel {
|
||||
final ChannelPipeline peerPipeline = peer.pipeline();
|
||||
final EventLoop peerLoop = peer.eventLoop();
|
||||
|
||||
if (peerLoop == eventLoop()) {
|
||||
for (;;) {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
peer.inboundBuffer.add(msg);
|
||||
ReferenceCountUtil.retain(msg);
|
||||
in.remove();
|
||||
for (;;) {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
|
||||
in.remove();
|
||||
} catch (Throwable cause) {
|
||||
in.remove(cause);
|
||||
}
|
||||
}
|
||||
|
||||
if (peerLoop == eventLoop()) {
|
||||
finishPeerRead(peer, peerPipeline);
|
||||
} else {
|
||||
// Use a copy because the original msgs will be recycled by AbstractChannel.
|
||||
final Object[] msgsCopy = new Object[in.size()];
|
||||
for (int i = 0; i < msgsCopy.length; i ++) {
|
||||
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
|
||||
in.remove();
|
||||
}
|
||||
|
||||
peerLoop.execute(new Runnable() {
|
||||
peerLoop.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
Collections.addAll(peer.inboundBuffer, msgsCopy);
|
||||
finishPeerRead(peer, peerPipeline);
|
||||
}
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user