Fix a race where 2 handlers in different threads access the same buffer
- DefaultChannelPipeline detects such cases and creates an object called 'bridge' that works as a man-in-the-middle to deal with a race condition - Slight performance drop is observed but still faster than v3. Couldn't find much from a profiler yet.
This commit is contained in:
parent
c1afe3d8c3
commit
0aa99606d9
@ -1,9 +1,12 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBuffers;
|
||||
import io.netty.util.DefaultAttributeMap;
|
||||
import io.netty.util.internal.QueueFactory;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -13,7 +16,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
volatile DefaultChannelHandlerContext prev;
|
||||
private final Channel channel;
|
||||
private final DefaultChannelPipeline pipeline;
|
||||
final EventExecutor executor;
|
||||
EventExecutor executor; // not thread-safe but OK because it never changes once set.
|
||||
private final String name;
|
||||
private final ChannelHandler handler;
|
||||
private final boolean canHandleInbound;
|
||||
@ -22,14 +25,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
final ChannelBufferHolder<Object> out;
|
||||
|
||||
// When the two handlers run in a different thread and they are next to each other,
|
||||
// each other's buffers can be accessed at the same time resuslting in a race condition.
|
||||
// each other's buffers can be accessed at the same time resulting in a race condition.
|
||||
// To avoid such situation, we lazily creates an additional thread-safe buffer called
|
||||
// 'bridge' so that the two handlers access each other's buffer only via the bridges.
|
||||
// The content written into a bridge is flushed into the actual buffer by flushBridge().
|
||||
final AtomicReference<BlockingQueue<Object>> inMsgBridge;
|
||||
final AtomicReference<BlockingQueue<Object>> outMsgBridge;
|
||||
final AtomicReference<ChannelBuffer> inByteBridge;
|
||||
final AtomicReference<ChannelBuffer> outByteBridge;
|
||||
final AtomicReference<MessageBridge> inMsgBridge;
|
||||
final AtomicReference<MessageBridge> outMsgBridge;
|
||||
final AtomicReference<StreamBridge> inByteBridge;
|
||||
final AtomicReference<StreamBridge> outByteBridge;
|
||||
|
||||
// Runnables that calls handlers
|
||||
final Runnable fireChannelRegisteredTask = new Runnable() {
|
||||
@ -80,7 +83,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
};
|
||||
final Runnable fireInboundBufferUpdatedTask = new Runnable() {
|
||||
final Runnable curCtxFireInboundBufferUpdatedTask = new Runnable() {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void run() {
|
||||
@ -98,6 +101,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
};
|
||||
private final Runnable nextCtxFireInboundBufferUpdatedTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
DefaultChannelHandlerContext next =
|
||||
DefaultChannelPipeline.nextInboundContext(DefaultChannelHandlerContext.this.next);
|
||||
if (next != null) {
|
||||
next.fillBridge();
|
||||
DefaultChannelPipeline.fireInboundBufferUpdated(next);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
DefaultChannelHandlerContext(
|
||||
@ -151,11 +165,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
if (!in.isBypass()) {
|
||||
if (in.hasByteBuffer()) {
|
||||
inByteBridge = new AtomicReference<ChannelBuffer>();
|
||||
inByteBridge = new AtomicReference<StreamBridge>();
|
||||
inMsgBridge = null;
|
||||
} else {
|
||||
inByteBridge = null;
|
||||
inMsgBridge = new AtomicReference<BlockingQueue<Object>>();
|
||||
inMsgBridge = new AtomicReference<MessageBridge>();
|
||||
}
|
||||
} else {
|
||||
inByteBridge = null;
|
||||
@ -179,11 +193,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
if (!out.isBypass()) {
|
||||
if (out.hasByteBuffer()) {
|
||||
outByteBridge = new AtomicReference<ChannelBuffer>();
|
||||
outByteBridge = new AtomicReference<StreamBridge>();
|
||||
outMsgBridge = null;
|
||||
} else {
|
||||
outByteBridge = null;
|
||||
outMsgBridge = new AtomicReference<BlockingQueue<Object>>();
|
||||
outMsgBridge = new AtomicReference<MessageBridge>();
|
||||
}
|
||||
} else {
|
||||
outByteBridge = null;
|
||||
@ -196,17 +210,54 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
|
||||
void flushBridge() {
|
||||
void fillBridge() {
|
||||
if (inMsgBridge != null) {
|
||||
BlockingQueue<Object> bridge = inMsgBridge.get();
|
||||
MessageBridge bridge = inMsgBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.drainTo(in.messageBuffer());
|
||||
bridge.fill();
|
||||
}
|
||||
} else if (inByteBridge != null) {
|
||||
StreamBridge bridge = inByteBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.fill();
|
||||
}
|
||||
}
|
||||
|
||||
if (outMsgBridge != null) {
|
||||
BlockingQueue<Object> bridge = outMsgBridge.get();
|
||||
MessageBridge bridge = outMsgBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.drainTo(out.messageBuffer());
|
||||
bridge.fill();
|
||||
}
|
||||
} else if (outByteBridge != null) {
|
||||
StreamBridge bridge = outByteBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.fill();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void flushBridge() {
|
||||
if (inMsgBridge != null) {
|
||||
MessageBridge bridge = inMsgBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.flush(in.messageBuffer());
|
||||
}
|
||||
} else if (inByteBridge != null) {
|
||||
StreamBridge bridge = inByteBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.flush(in.byteBuffer());
|
||||
}
|
||||
}
|
||||
|
||||
if (outMsgBridge != null) {
|
||||
MessageBridge bridge = outMsgBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.flush(out.messageBuffer());
|
||||
}
|
||||
} else if (outByteBridge != null) {
|
||||
StreamBridge bridge = outByteBridge.get();
|
||||
if (bridge != null) {
|
||||
bridge.flush(out.byteBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -224,7 +275,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
@Override
|
||||
public EventExecutor executor() {
|
||||
if (executor == null) {
|
||||
return channel.eventLoop();
|
||||
return executor = channel.eventLoop();
|
||||
} else {
|
||||
return executor;
|
||||
}
|
||||
@ -282,7 +333,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public ChannelBuffer nextInboundByteBuffer() {
|
||||
return DefaultChannelPipeline.nextInboundByteBuffer(next);
|
||||
return DefaultChannelPipeline.nextInboundByteBuffer(executor(), next);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -292,7 +343,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public ChannelBuffer nextOutboundByteBuffer() {
|
||||
return pipeline.nextOutboundByteBuffer(prev);
|
||||
return pipeline.nextOutboundByteBuffer(executor(), prev);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -352,9 +403,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public void fireInboundBufferUpdated() {
|
||||
DefaultChannelHandlerContext next = DefaultChannelPipeline.nextInboundContext(this.next);
|
||||
if (next != null) {
|
||||
DefaultChannelPipeline.fireInboundBufferUpdated(next);
|
||||
EventExecutor executor = executor();
|
||||
if (executor.inEventLoop()) {
|
||||
nextCtxFireInboundBufferUpdatedTask.run();
|
||||
} else {
|
||||
executor.execute(nextCtxFireInboundBufferUpdatedTask);
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,8 +482,22 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture flush(ChannelFuture future) {
|
||||
return pipeline.flush(DefaultChannelPipeline.nextOutboundContext(prev), future);
|
||||
public ChannelFuture flush(final ChannelFuture future) {
|
||||
EventExecutor executor = executor();
|
||||
if (executor.inEventLoop()) {
|
||||
DefaultChannelHandlerContext prev = DefaultChannelPipeline.nextOutboundContext(this.prev);
|
||||
prev.fillBridge();
|
||||
pipeline.flush(prev, future);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
flush(future);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -452,4 +519,56 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||
return channel.newFailedFuture(cause);
|
||||
}
|
||||
|
||||
static final class MessageBridge {
|
||||
final Queue<Object> msgBuf = new ArrayDeque<Object>();
|
||||
final BlockingQueue<Object[]> exchangeBuf = QueueFactory.createQueue();
|
||||
|
||||
void fill() {
|
||||
if (msgBuf.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Object[] data = msgBuf.toArray();
|
||||
msgBuf.clear();
|
||||
exchangeBuf.add(data);
|
||||
}
|
||||
|
||||
void flush(Queue<Object> out) {
|
||||
for (;;) {
|
||||
Object[] data = exchangeBuf.poll();
|
||||
if (data == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
for (Object d: data) {
|
||||
out.add(d);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static final class StreamBridge {
|
||||
final ChannelBuffer byteBuf = ChannelBuffers.dynamicBuffer();
|
||||
final BlockingQueue<ChannelBuffer> exchangeBuf = QueueFactory.createQueue();
|
||||
|
||||
void fill() {
|
||||
if (!byteBuf.readable()) {
|
||||
return;
|
||||
}
|
||||
ChannelBuffer data = byteBuf.readBytes(byteBuf.readableBytes());
|
||||
byteBuf.discardReadBytes();
|
||||
exchangeBuf.add(data);
|
||||
}
|
||||
|
||||
void flush(ChannelBuffer out) {
|
||||
for (;;) {
|
||||
ChannelBuffer data = exchangeBuf.poll();
|
||||
if (data == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
out.writeBytes(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -16,9 +16,10 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.channel.DefaultChannelHandlerContext.MessageBridge;
|
||||
import io.netty.channel.DefaultChannelHandlerContext.StreamBridge;
|
||||
import io.netty.logging.InternalLogger;
|
||||
import io.netty.logging.InternalLoggerFactory;
|
||||
import io.netty.util.internal.QueueFactory;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
@ -29,9 +30,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* The default {@link ChannelPipeline} implementation. It is usually created
|
||||
@ -595,7 +594,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
throw new NoSuchBufferException(
|
||||
"The first inbound buffer of this channel must be a message buffer.");
|
||||
}
|
||||
return nextInboundMessageBuffer(null, head.next);
|
||||
return nextInboundMessageBuffer(SingleThreadEventExecutor.currentEventLoop(), head.next);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -604,17 +603,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
throw new NoSuchBufferException(
|
||||
"The first inbound buffer of this channel must be a byte buffer.");
|
||||
}
|
||||
return nextInboundByteBuffer(head.next);
|
||||
return nextInboundByteBuffer(SingleThreadEventExecutor.currentEventLoop(), head.next);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue<Object> outboundMessageBuffer() {
|
||||
return nextOutboundMessageBuffer(null, tail);
|
||||
return nextOutboundMessageBuffer(SingleThreadEventExecutor.currentEventLoop(), tail);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer outboundByteBuffer() {
|
||||
return nextOutboundByteBuffer(tail);
|
||||
return nextOutboundByteBuffer(SingleThreadEventExecutor.currentEventLoop(), tail);
|
||||
}
|
||||
|
||||
static boolean hasNextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
|
||||
@ -643,11 +642,25 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
|
||||
static ChannelBuffer nextInboundByteBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
|
||||
for (;;) {
|
||||
if (ctx == null) {
|
||||
throw new NoSuchBufferException();
|
||||
}
|
||||
if (ctx.inByteBridge != null) {
|
||||
if (currentExecutor == ctx.executor()) {
|
||||
return ctx.in.byteBuffer();
|
||||
} else {
|
||||
StreamBridge bridge = ctx.inByteBridge.get();
|
||||
if (bridge == null) {
|
||||
bridge = new StreamBridge();
|
||||
if (!ctx.inByteBridge.compareAndSet(null, bridge)) {
|
||||
bridge = ctx.inByteBridge.get();
|
||||
}
|
||||
}
|
||||
return bridge.byteBuf;
|
||||
}
|
||||
}
|
||||
ChannelBufferHolder<Object> in = ctx.in;
|
||||
if (in != null && !in.isBypass() && in.hasByteBuffer()) {
|
||||
return in.byteBuffer();
|
||||
@ -656,26 +669,24 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
static Queue<Object> nextInboundMessageBuffer(
|
||||
EventExecutor currentExecutor, DefaultChannelHandlerContext ctx) {
|
||||
static Queue<Object> nextInboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
|
||||
for (;;) {
|
||||
if (ctx == null) {
|
||||
throw new NoSuchBufferException();
|
||||
}
|
||||
|
||||
final AtomicReference<BlockingQueue<Object>> inMsgBridge = ctx.inMsgBridge;
|
||||
if (inMsgBridge != null) {
|
||||
if (ctx.inMsgBridge != null) {
|
||||
if (currentExecutor == ctx.executor()) {
|
||||
return ctx.in.messageBuffer();
|
||||
} else {
|
||||
BlockingQueue<Object> queue = inMsgBridge.get();
|
||||
if (queue == null) {
|
||||
queue = QueueFactory.createQueue();
|
||||
if (!inMsgBridge.compareAndSet(null, queue)) {
|
||||
queue = inMsgBridge.get();
|
||||
MessageBridge bridge = ctx.inMsgBridge.get();
|
||||
if (bridge == null) {
|
||||
bridge = new MessageBridge();
|
||||
if (!ctx.inMsgBridge.compareAndSet(null, bridge)) {
|
||||
bridge = ctx.inMsgBridge.get();
|
||||
}
|
||||
}
|
||||
return queue;
|
||||
return bridge.msgBuf;
|
||||
}
|
||||
}
|
||||
ctx = ctx.next;
|
||||
@ -710,15 +721,25 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
|
||||
ChannelBuffer nextOutboundByteBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
|
||||
for (;;) {
|
||||
if (ctx == null) {
|
||||
throw new NoSuchBufferException();
|
||||
}
|
||||
|
||||
ChannelBufferHolder<Object> out = ctx.outbound();
|
||||
if (out != null && !out.isBypass() && out.hasByteBuffer()) {
|
||||
return out.byteBuffer();
|
||||
if (ctx.outByteBridge != null) {
|
||||
if (currentExecutor == ctx.executor()) {
|
||||
return ctx.out.byteBuffer();
|
||||
} else {
|
||||
StreamBridge bridge = ctx.outByteBridge.get();
|
||||
if (bridge == null) {
|
||||
bridge = new StreamBridge();
|
||||
if (!ctx.outByteBridge.compareAndSet(null, bridge)) {
|
||||
bridge = ctx.outByteBridge.get();
|
||||
}
|
||||
}
|
||||
return bridge.byteBuf;
|
||||
}
|
||||
}
|
||||
ctx = ctx.prev;
|
||||
}
|
||||
@ -730,22 +751,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
throw new NoSuchBufferException();
|
||||
}
|
||||
|
||||
final AtomicReference<BlockingQueue<Object>> outMsgBridge = ctx.outMsgBridge;
|
||||
if (outMsgBridge != null) {
|
||||
if (ctx.outMsgBridge != null) {
|
||||
if (currentExecutor == ctx.executor()) {
|
||||
return ctx.out.messageBuffer();
|
||||
} else {
|
||||
BlockingQueue<Object> queue = outMsgBridge.get();
|
||||
if (queue == null) {
|
||||
queue = QueueFactory.createQueue();
|
||||
if (!outMsgBridge.compareAndSet(null, queue)) {
|
||||
queue = outMsgBridge.get();
|
||||
MessageBridge bridge = ctx.outMsgBridge.get();
|
||||
if (bridge == null) {
|
||||
bridge = new MessageBridge();
|
||||
if (!ctx.outMsgBridge.compareAndSet(null, bridge)) {
|
||||
bridge = ctx.outMsgBridge.get();
|
||||
}
|
||||
}
|
||||
return queue;
|
||||
return bridge.msgBuf;
|
||||
}
|
||||
}
|
||||
|
||||
ctx = ctx.prev;
|
||||
}
|
||||
}
|
||||
@ -916,9 +935,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
static void fireInboundBufferUpdated(DefaultChannelHandlerContext ctx) {
|
||||
EventExecutor executor = ctx.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
ctx.fireInboundBufferUpdatedTask.run();
|
||||
ctx.curCtxFireInboundBufferUpdatedTask.run();
|
||||
} else {
|
||||
executor.execute(ctx.fireInboundBufferUpdatedTask);
|
||||
executor.execute(ctx.curCtxFireInboundBufferUpdatedTask);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package io.netty.channel.local;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelBufferHolder;
|
||||
import io.netty.channel.ChannelBufferHolders;
|
||||
@ -18,7 +19,7 @@ import io.netty.util.internal.QueueFactory;
|
||||
import java.util.HashSet;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -185,27 +186,58 @@ public class LocalTransportThreadModelTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 50000)
|
||||
@Test(timeout = 10000)
|
||||
public void testConcurrentMessageBufferAccess() throws Throwable {
|
||||
EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l"));
|
||||
EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1"));
|
||||
EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2"));
|
||||
MessageForwarder h1 = new MessageForwarder();
|
||||
MessageForwarder h2 = new MessageForwarder();
|
||||
MessageDiscarder h3 = new MessageDiscarder();
|
||||
EventExecutor e3 = new DefaultEventExecutor(4, new PrefixThreadFactory("e3"));
|
||||
EventExecutor e4 = new DefaultEventExecutor(4, new PrefixThreadFactory("e4"));
|
||||
EventExecutor e5 = new DefaultEventExecutor(4, new PrefixThreadFactory("e5"));
|
||||
|
||||
Channel ch = new LocalChannel();
|
||||
ch.pipeline().addLast(h1).addLast(e1, h2).addLast(e2, h3);
|
||||
try {
|
||||
final MessageForwarder1 h1 = new MessageForwarder1();
|
||||
final MessageForwarder2 h2 = new MessageForwarder2();
|
||||
final MessageForwarder3 h3 = new MessageForwarder3();
|
||||
final MessageForwarder1 h4 = new MessageForwarder1();
|
||||
final MessageForwarder2 h5 = new MessageForwarder2();
|
||||
final MessageDiscarder h6 = new MessageDiscarder();
|
||||
|
||||
l.register(ch).sync().channel().connect(ADDR).sync();
|
||||
final Channel ch = new LocalChannel();
|
||||
|
||||
final int COUNT = 1048576 * 4;
|
||||
for (int i = 0; i < COUNT;) {
|
||||
Queue<Object> buf = ch.pipeline().inboundMessageBuffer();
|
||||
// Thread-safe bridge must be returned.
|
||||
Assert.assertTrue(buf instanceof BlockingQueue);
|
||||
for (int j = 0; i < COUNT && j < COUNT / 8; j ++) {
|
||||
buf.add(Integer.valueOf(i ++));
|
||||
// inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null
|
||||
// outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null
|
||||
ch.pipeline().addLast(h1)
|
||||
.addLast(e1, h2)
|
||||
.addLast(e2, h3)
|
||||
.addLast(e3, h4)
|
||||
.addLast(e4, h5)
|
||||
.addLast(e5, h6);
|
||||
|
||||
l.register(ch).sync().channel().connect(ADDR).sync();
|
||||
|
||||
final int ROUNDS = 1024;
|
||||
final int ELEMS_PER_ROUNDS = 8192;
|
||||
final int TOTAL_CNT = ROUNDS * ELEMS_PER_ROUNDS;
|
||||
for (int i = 0; i < TOTAL_CNT;) {
|
||||
final int start = i;
|
||||
final int end = i + ELEMS_PER_ROUNDS;
|
||||
i = end;
|
||||
|
||||
ch.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Queue<Object> buf = ch.pipeline().inboundMessageBuffer();
|
||||
for (int j = start; j < end; j ++) {
|
||||
buf.add(Integer.valueOf(j));
|
||||
}
|
||||
ch.pipeline().fireInboundBufferUpdated();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while (h1.inCnt < TOTAL_CNT || h2.inCnt < TOTAL_CNT || h3.inCnt < TOTAL_CNT ||
|
||||
h4.inCnt < TOTAL_CNT || h5.inCnt < TOTAL_CNT || h6.inCnt < TOTAL_CNT) {
|
||||
if (h1.exception.get() != null) {
|
||||
throw h1.exception.get();
|
||||
}
|
||||
@ -215,28 +247,37 @@ public class LocalTransportThreadModelTest {
|
||||
if (h3.exception.get() != null) {
|
||||
throw h3.exception.get();
|
||||
}
|
||||
}
|
||||
ch.pipeline().fireInboundBufferUpdated();
|
||||
}
|
||||
|
||||
while (h1.inCnt < COUNT || h2.inCnt < COUNT || h3.inCnt < COUNT) {
|
||||
if (h1.exception.get() != null) {
|
||||
throw h1.exception.get();
|
||||
}
|
||||
if (h2.exception.get() != null) {
|
||||
throw h2.exception.get();
|
||||
}
|
||||
if (h3.exception.get() != null) {
|
||||
throw h3.exception.get();
|
||||
if (h4.exception.get() != null) {
|
||||
throw h4.exception.get();
|
||||
}
|
||||
if (h5.exception.get() != null) {
|
||||
throw h5.exception.get();
|
||||
}
|
||||
if (h6.exception.get() != null) {
|
||||
throw h6.exception.get();
|
||||
}
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
}
|
||||
for (int i = 0; i < TOTAL_CNT;) {
|
||||
final int start = i;
|
||||
final int end = i + ELEMS_PER_ROUNDS;
|
||||
i = end;
|
||||
|
||||
for (int i = 0; i < COUNT;) {
|
||||
Queue<Object> buf = ch.pipeline().outboundMessageBuffer();
|
||||
for (int j = 0; i < COUNT && j < COUNT / 8; j ++) {
|
||||
buf.add(Integer.valueOf(i ++));
|
||||
ch.pipeline().context(h6).executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Queue<Object> buf = ch.pipeline().outboundMessageBuffer();
|
||||
for (int j = start; j < end; j ++) {
|
||||
buf.add(Integer.valueOf(j));
|
||||
}
|
||||
ch.pipeline().flush();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while (h1.outCnt < TOTAL_CNT || h2.outCnt < TOTAL_CNT || h3.outCnt < TOTAL_CNT ||
|
||||
h4.outCnt < TOTAL_CNT || h5.outCnt < TOTAL_CNT || h6.outCnt < TOTAL_CNT) {
|
||||
if (h1.exception.get() != null) {
|
||||
throw h1.exception.get();
|
||||
}
|
||||
@ -246,24 +287,29 @@ public class LocalTransportThreadModelTest {
|
||||
if (h3.exception.get() != null) {
|
||||
throw h3.exception.get();
|
||||
}
|
||||
if (h4.exception.get() != null) {
|
||||
throw h4.exception.get();
|
||||
}
|
||||
if (h5.exception.get() != null) {
|
||||
throw h5.exception.get();
|
||||
}
|
||||
if (h6.exception.get() != null) {
|
||||
throw h6.exception.get();
|
||||
}
|
||||
Thread.sleep(10);
|
||||
}
|
||||
ch.pipeline().flush();
|
||||
|
||||
ch.close().sync();
|
||||
h6.latch.await(); // Wait until channelInactive() is triggered.
|
||||
|
||||
} finally {
|
||||
l.shutdown();
|
||||
e1.shutdown();
|
||||
e2.shutdown();
|
||||
e3.shutdown();
|
||||
e4.shutdown();
|
||||
e5.shutdown();
|
||||
}
|
||||
|
||||
while (h1.outCnt < COUNT || h2.outCnt < COUNT || h3.outCnt < COUNT) {
|
||||
if (h1.exception.get() != null) {
|
||||
throw h1.exception.get();
|
||||
}
|
||||
if (h2.exception.get() != null) {
|
||||
throw h2.exception.get();
|
||||
}
|
||||
if (h3.exception.get() != null) {
|
||||
throw h3.exception.get();
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class ThreadNameAuditor extends ChannelHandlerAdapter<Object, Object> {
|
||||
@ -311,7 +357,170 @@ public class LocalTransportThreadModelTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static class MessageForwarder extends ChannelHandlerAdapter<Object, Object> {
|
||||
/**
|
||||
* Converts integers into a binary stream.
|
||||
*/
|
||||
private static class MessageForwarder1 extends ChannelHandlerAdapter<Integer, Byte> {
|
||||
|
||||
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
private volatile int inCnt;
|
||||
private volatile int outCnt;
|
||||
private volatile Thread t;
|
||||
|
||||
@Override
|
||||
public ChannelBufferHolder<Integer> newInboundBuffer(
|
||||
ChannelInboundHandlerContext<Integer> ctx) throws Exception {
|
||||
return ChannelBufferHolders.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBufferHolder<Byte> newOutboundBuffer(
|
||||
ChannelOutboundHandlerContext<Byte> ctx) throws Exception {
|
||||
return ChannelBufferHolders.byteBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(
|
||||
ChannelInboundHandlerContext<Integer> ctx) throws Exception {
|
||||
Thread t = this.t;
|
||||
if (t == null) {
|
||||
this.t = Thread.currentThread();
|
||||
} else {
|
||||
Assert.assertSame(t, Thread.currentThread());
|
||||
}
|
||||
|
||||
Queue<Integer> in = ctx.inbound().messageBuffer();
|
||||
ChannelBuffer out = ctx.nextInboundByteBuffer();
|
||||
|
||||
for (;;) {
|
||||
Integer msg = in.poll();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
int expected = inCnt ++;
|
||||
Assert.assertEquals(expected, msg.intValue());
|
||||
out.writeInt(msg);
|
||||
}
|
||||
ctx.fireInboundBufferUpdated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelOutboundHandlerContext<Byte> ctx,
|
||||
ChannelFuture future) throws Exception {
|
||||
Assert.assertSame(t, Thread.currentThread());
|
||||
|
||||
// Don't let the write request go to the server-side channel - just swallow.
|
||||
boolean swallow = this == ctx.pipeline().first();
|
||||
|
||||
ChannelBuffer in = ctx.outbound().byteBuffer();
|
||||
Queue<Object> out = ctx.nextOutboundMessageBuffer();
|
||||
while (in.readableBytes() >= 4) {
|
||||
int msg = in.readInt();
|
||||
int expected = outCnt ++;
|
||||
Assert.assertEquals(expected, msg);
|
||||
if (!swallow) {
|
||||
out.add(msg);
|
||||
}
|
||||
}
|
||||
in.discardReadBytes();
|
||||
if (swallow) {
|
||||
future.setSuccess();
|
||||
} else {
|
||||
ctx.flush(future);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelInboundHandlerContext<Integer> ctx,
|
||||
Throwable cause) throws Exception {
|
||||
exception.compareAndSet(null, cause);
|
||||
//System.err.print("[" + Thread.currentThread().getName() + "] ");
|
||||
//cause.printStackTrace();
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a binary stream into integers.
|
||||
*/
|
||||
private static class MessageForwarder2 extends ChannelHandlerAdapter<Byte, Integer> {
|
||||
|
||||
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
private volatile int inCnt;
|
||||
private volatile int outCnt;
|
||||
private volatile Thread t;
|
||||
|
||||
@Override
|
||||
public ChannelBufferHolder<Byte> newInboundBuffer(
|
||||
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
|
||||
return ChannelBufferHolders.byteBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBufferHolder<Integer> newOutboundBuffer(
|
||||
ChannelOutboundHandlerContext<Integer> ctx) throws Exception {
|
||||
return ChannelBufferHolders.messageBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(
|
||||
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
|
||||
Thread t = this.t;
|
||||
if (t == null) {
|
||||
this.t = Thread.currentThread();
|
||||
} else {
|
||||
Assert.assertSame(t, Thread.currentThread());
|
||||
}
|
||||
|
||||
ChannelBuffer in = ctx.inbound().byteBuffer();
|
||||
Queue<Object> out = ctx.nextInboundMessageBuffer();
|
||||
|
||||
while (in.readableBytes() >= 4) {
|
||||
int msg = in.readInt();
|
||||
int expected = inCnt ++;
|
||||
Assert.assertEquals(expected, msg);
|
||||
out.add(msg);
|
||||
}
|
||||
in.discardReadBytes();
|
||||
ctx.fireInboundBufferUpdated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelOutboundHandlerContext<Integer> ctx,
|
||||
ChannelFuture future) throws Exception {
|
||||
Assert.assertSame(t, Thread.currentThread());
|
||||
|
||||
Queue<Integer> in = ctx.outbound().messageBuffer();
|
||||
ChannelBuffer out = ctx.nextOutboundByteBuffer();
|
||||
|
||||
for (;;) {
|
||||
Integer msg = in.poll();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
int expected = outCnt ++;
|
||||
Assert.assertEquals(expected, msg.intValue());
|
||||
out.writeInt(msg);
|
||||
}
|
||||
ctx.flush(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelInboundHandlerContext<Byte> ctx,
|
||||
Throwable cause) throws Exception {
|
||||
exception.compareAndSet(null, cause);
|
||||
//System.err.print("[" + Thread.currentThread().getName() + "] ");
|
||||
//cause.printStackTrace();
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simply forwards the received object to the next handler.
|
||||
*/
|
||||
private static class MessageForwarder3 extends ChannelHandlerAdapter<Object, Object> {
|
||||
|
||||
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
private volatile int inCnt;
|
||||
@ -342,10 +551,6 @@ public class LocalTransportThreadModelTest {
|
||||
|
||||
Queue<Object> in = ctx.inbound().messageBuffer();
|
||||
Queue<Object> out = ctx.nextInboundMessageBuffer();
|
||||
|
||||
// Ensure the bridge buffer is returned.
|
||||
Assert.assertTrue(out instanceof BlockingQueue);
|
||||
|
||||
for (;;) {
|
||||
Object msg = in.poll();
|
||||
if (msg == null) {
|
||||
@ -366,12 +571,6 @@ public class LocalTransportThreadModelTest {
|
||||
|
||||
Queue<Object> in = ctx.outbound().messageBuffer();
|
||||
Queue<Object> out = ctx.nextOutboundMessageBuffer();
|
||||
|
||||
// Ensure the bridge buffer is returned.
|
||||
if (ctx.pipeline().first() != this) {
|
||||
Assert.assertTrue(out instanceof BlockingQueue);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
Object msg = in.poll();
|
||||
if (msg == null) {
|
||||
@ -395,12 +594,16 @@ public class LocalTransportThreadModelTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Discards all received messages.
|
||||
*/
|
||||
private static class MessageDiscarder extends ChannelHandlerAdapter<Object, Object> {
|
||||
|
||||
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
private volatile int inCnt;
|
||||
private volatile int outCnt;
|
||||
private volatile Thread t;
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
public ChannelBufferHolder<Object> newInboundBuffer(
|
||||
@ -442,10 +645,6 @@ public class LocalTransportThreadModelTest {
|
||||
|
||||
Queue<Object> in = ctx.outbound().messageBuffer();
|
||||
Queue<Object> out = ctx.nextOutboundMessageBuffer();
|
||||
|
||||
// Ensure the bridge buffer is returned.
|
||||
Assert.assertTrue(out instanceof BlockingQueue);
|
||||
|
||||
for (;;) {
|
||||
Object msg = in.poll();
|
||||
if (msg == null) {
|
||||
@ -459,12 +658,18 @@ public class LocalTransportThreadModelTest {
|
||||
ctx.flush(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelInboundHandlerContext<Object> ctx)
|
||||
throws Exception {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx,
|
||||
Throwable cause) throws Exception {
|
||||
exception.compareAndSet(null, cause);
|
||||
System.err.print("[" + Thread.currentThread().getName() + "] ");
|
||||
cause.printStackTrace();
|
||||
//System.err.print("[" + Thread.currentThread().getName() + "] ");
|
||||
//cause.printStackTrace();
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user