Rewrite bridge implementation in DefaultChannelHandlerContext

This commit splits bridge into two parts.  One is NextBridgeFeeder,
which provides ByteBuf and MessageBuf that are local to the context
whose next*Buffer() has been invoked on.  The other is a thread-safe
queue that stores the data fed by NextBridgeFeeder.feed().

By splitting the bridge into the two parts, the data pushed by a handler
is not lost anymore when the next handler who provided the next buffer
is removed from the pipeline.

- Fixes #1272
This commit is contained in:
Trustin Lee 2013-04-12 12:57:37 +09:00
parent 5bfb408b7d
commit 4a792151b0
2 changed files with 277 additions and 250 deletions

View File

@ -67,28 +67,26 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
// Note we use an AtomicReferenceFieldUpdater for atomic operations on these to save memory. This will save us
// 64 bytes per Bridge.
@SuppressWarnings("UnusedDeclaration")
private volatile MessageBridge inMsgBridge;
private volatile Queue<Object> inBridge;
@SuppressWarnings("UnusedDeclaration")
private volatile MessageBridge outMsgBridge;
private volatile Queue<Object> outBridge;
@SuppressWarnings("UnusedDeclaration")
private volatile ByteBridge inByteBridge;
private volatile NextBridgeFeeder nextInBridgeFeeder;
@SuppressWarnings("UnusedDeclaration")
private volatile ByteBridge outByteBridge;
private volatile NextBridgeFeeder nextOutBridgeFeeder;
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, MessageBridge> IN_MSG_BRIDGE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
MessageBridge.class, "inMsgBridge");
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, MessageBridge> OUT_MSG_BRIDGE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
MessageBridge.class, "outMsgBridge");
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, ByteBridge> IN_BYTE_BRIDGE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
ByteBridge.class, "inByteBridge");
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, ByteBridge> OUT_BYTE_BRIDGE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
ByteBridge.class, "outByteBridge");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, Queue> IN_BRIDGE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class, Queue.class, "inBridge");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, Queue> OUT_BRIDGE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class, Queue.class, "outBridge");
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, NextBridgeFeeder>
NEXT_IN_BRIDGE_FEEDER = AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelHandlerContext.class, NextBridgeFeeder.class, "nextInBridgeFeeder");
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, NextBridgeFeeder>
NEXT_OUT_BRIDGE_FEEDER = AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelHandlerContext.class, NextBridgeFeeder.class, "nextOutBridgeFeeder");
// Lazily instantiated tasks used to trigger events to a handler with different executor.
private Runnable invokeInboundBufferUpdatedTask;
@ -315,72 +313,82 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
outMsgBuf = h.msgSink;
}
private void fillInboundBridge() {
if (!(handler instanceof ChannelInboundHandler)) {
return;
}
if (inMsgBridge != null) {
MessageBridge bridge = inMsgBridge;
if (bridge != null) {
bridge.fill();
}
} else if (inByteBridge != null) {
ByteBridge bridge = inByteBridge;
if (bridge != null) {
bridge.fill();
}
}
}
private void fillOutboundBridge() {
if (!(handler instanceof ChannelOutboundHandler)) {
return;
}
if (outMsgBridge != null) {
MessageBridge bridge = outMsgBridge;
if (bridge != null) {
bridge.fill();
}
} else if (outByteBridge != null) {
ByteBridge bridge = outByteBridge;
if (bridge != null) {
bridge.fill();
}
}
}
private boolean flushInboundBridge() {
if (inMsgBridge != null) {
MessageBridge bridge = inMsgBridge;
if (bridge != null) {
return bridge.flush(inMsgBuf);
}
} else if (inByteBridge != null) {
ByteBridge bridge = inByteBridge;
if (bridge != null) {
return bridge.flush(inByteBuf);
}
Queue<Object> inBridge = this.inBridge;
if (inBridge == null) {
return true;
}
return true;
return flushBridge(inBridge, inMsgBuf, inByteBuf);
}
private boolean flushOutboundBridge() {
if (outMsgBridge != null) {
MessageBridge bridge = outMsgBridge;
if (bridge != null) {
return bridge.flush(outMsgBuf);
Queue<Object> outBridge = this.outBridge;
if (outBridge == null) {
return true;
}
return flushBridge(outBridge, outMsgBuf, outByteBuf);
}
private static boolean flushBridge(Queue<Object> bridge, MessageBuf<Object> msgBuf, ByteBuf byteBuf) {
if (bridge == null) {
return true;
}
boolean nextBufferHadEnoughRoom = true;
for (;;) {
Object o = bridge.peek();
if (o == null) {
break;
}
} else if (outByteBridge != null) {
ByteBridge bridge = outByteBridge;
if (bridge != null) {
return bridge.flush(outByteBuf);
try {
if (o instanceof Object[]) {
Object[] data = (Object[]) o;
int i;
for (i = 0; i < data.length; i ++) {
Object m = data[i];
if (m == null) {
break;
}
if (msgBuf.offer(m)) {
data[i] = null;
} else {
System.arraycopy(data, i, data, 0, data.length - i);
for (int j = i + 1; j < data.length; j ++) {
data[j] = null;
}
nextBufferHadEnoughRoom = false;
break;
}
}
} else if (o instanceof ByteBuf) {
ByteBuf data = (ByteBuf) o;
if (byteBuf.writerIndex() > byteBuf.maxCapacity() - data.readableBytes()) {
// The target buffer is not going to be able to accept all data in the bridge.
byteBuf.capacity(byteBuf.maxCapacity());
byteBuf.writeBytes(data, byteBuf.writableBytes());
nextBufferHadEnoughRoom = false;
break;
} else {
try {
byteBuf.writeBytes(data);
} finally {
data.release();
}
}
} else {
throw new Error();
}
} finally {
if (nextBufferHadEnoughRoom) {
Object removed = bridge.remove();
assert removed == o;
}
}
}
return true;
return nextBufferHadEnoughRoom;
}
void setRemoved() {
@ -398,48 +406,65 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
final ChannelHandler handler = handler();
if (handler instanceof ChannelInboundHandler) {
try {
((ChannelInboundHandler) handler).freeInboundBuffer(this);
} catch (Exception e) {
notifyHandlerException(e);
} finally {
freeInboundBridge();
try {
if (handler instanceof ChannelInboundHandler) {
try {
((ChannelInboundHandler) handler).freeInboundBuffer(this);
} catch (Exception e) {
notifyHandlerException(e);
}
}
}
if (handler instanceof ChannelOutboundHandler) {
try {
((ChannelOutboundHandler) handler).freeOutboundBuffer(this);
} catch (Exception e) {
notifyHandlerException(e);
} finally {
freeOutboundBridge();
if (handler instanceof ChannelOutboundHandler) {
try {
((ChannelOutboundHandler) handler).freeOutboundBuffer(this);
} catch (Exception e) {
notifyHandlerException(e);
}
}
} finally {
free();
}
}
}
private void freeInboundBridge() {
ByteBridge inByteBridge = this.inByteBridge;
if (inByteBridge != null) {
inByteBridge.release();
private void free() {
freeInbound();
freeOutbound();
}
private void freeInbound() {
// Release the bridge feeder
NextBridgeFeeder feeder;
feeder = nextInBridgeFeeder;
if (feeder != null) {
feeder.release();
nextInBridgeFeeder = null;
}
MessageBridge inMsgBridge = this.inMsgBridge;
if (inMsgBridge != null) {
inMsgBridge.release();
// Warn if the bridge has unflushed elements.
if (logger.isWarnEnabled()) {
Queue<Object> bridge;
bridge = inBridge;
if (bridge != null && !bridge.isEmpty()) {
logger.warn("inbound bridge not empty - bug?: {}", bridge.size());
}
}
}
private void freeOutboundBridge() {
ByteBridge outByteBridge = this.outByteBridge;
if (outByteBridge != null) {
outByteBridge.release();
private void freeOutbound() {
// Release the bridge feeder
NextBridgeFeeder feeder = nextOutBridgeFeeder;
if (feeder != null) {
feeder.release();
nextOutBridgeFeeder = null;
}
MessageBridge outMsgBridge = this.outMsgBridge;
if (outMsgBridge != null) {
outMsgBridge.release();
// Warn if the bridge has unflushed elements.
if (logger.isWarnEnabled()) {
Queue<Object> bridge = outBridge;
if (bridge != null && !bridge.isEmpty()) {
logger.warn("outbound bridge not empty - bug?: {}", bridge.size());
}
}
}
@ -549,17 +574,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return ctx.inByteBuf;
}
if (executor().inEventLoop(currentThread)) {
ByteBridge bridge = ctx.inByteBridge;
if (bridge == null) {
bridge = new ByteBridge(ctx, true);
if (!IN_BYTE_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
// release it as it was set before
bridge.release();
bridge = ctx.inByteBridge;
}
}
return bridge.byteBuf;
return nextInBridgeFeeder().byteBuf;
}
throw new IllegalStateException("nextInboundByteBuffer() called from outside the eventLoop");
}
@ -577,17 +592,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return ctx.inMsgBuf;
}
if (executor().inEventLoop(currentThread)) {
MessageBridge bridge = ctx.inMsgBridge;
if (bridge == null) {
bridge = new MessageBridge();
if (!IN_MSG_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
// release it as it was set before
bridge.release();
bridge = ctx.inMsgBridge;
}
}
return bridge.msgBuf;
return nextInBridgeFeeder().msgBuf;
}
throw new IllegalStateException("nextInboundMessageBuffer() called from outside the eventLoop");
}
@ -595,6 +600,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
private NextBridgeFeeder nextInBridgeFeeder() {
NextBridgeFeeder feeder = nextInBridgeFeeder;
if (feeder == null) {
feeder = new NextInboundBridgeFeeder();
if (!NEXT_IN_BRIDGE_FEEDER.compareAndSet(this, null, feeder)) {
feeder.release();
feeder = nextInBridgeFeeder;
}
}
return feeder;
}
@Override
public ByteBuf nextOutboundByteBuffer() {
DefaultChannelHandlerContext ctx = prev;
@ -605,17 +622,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return ctx.outboundByteBuffer();
}
if (executor().inEventLoop(currentThread)) {
ByteBridge bridge = ctx.outByteBridge;
if (bridge == null) {
bridge = new ByteBridge(ctx, false);
if (!OUT_BYTE_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
// release it as it was set before
bridge.release();
bridge = ctx.outByteBridge;
}
}
return bridge.byteBuf;
return nextOutBridgeFeeder().byteBuf;
}
throw new IllegalStateException("nextOutboundByteBuffer() called from outside the eventLoop");
}
@ -633,17 +640,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return ctx.outboundMessageBuffer();
}
if (executor().inEventLoop(currentThread)) {
MessageBridge bridge = ctx.outMsgBridge;
if (bridge == null) {
bridge = new MessageBridge();
if (!OUT_MSG_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
// release it as it was set before
bridge.release();
bridge = ctx.outMsgBridge;
}
}
return bridge.msgBuf;
return nextOutBridgeFeeder().msgBuf;
}
throw new IllegalStateException("nextOutboundMessageBuffer() called from outside the eventLoop");
}
@ -651,6 +648,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
private NextBridgeFeeder nextOutBridgeFeeder() {
NextBridgeFeeder feeder = nextOutBridgeFeeder;
if (feeder == null) {
feeder = new NextOutboundBridgeFeeder();
if (!NEXT_OUT_BRIDGE_FEEDER.compareAndSet(this, null, feeder)) {
feeder.release();
feeder = nextOutBridgeFeeder;
}
}
return feeder;
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
final DefaultChannelHandlerContext next = findContextInbound();
@ -858,7 +867,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void fireInboundBufferUpdated0(final DefaultChannelHandlerContext next) {
if (!pipeline.isInboundShutdown()) {
next.fillInboundBridge();
feedNextInBridge();
// This comparison is safe because this method is always executed from the executor.
if (next.executor == executor) {
next.invokeInboundBufferUpdated();
@ -886,6 +895,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
private void feedNextInBridge() {
NextBridgeFeeder feeder = nextInBridgeFeeder;
if (feeder != null) {
feeder.feed();
}
}
private void invokeInboundBufferUpdated() {
ChannelStateHandler handler = (ChannelStateHandler) handler();
@ -1234,10 +1250,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
"Unable to flush as outbound buffer of next handler was freed already"));
return;
}
prev.fillOutboundBridge();
feedNextOutBridge();
prev.invokeFlush(promise, currentThread);
}
private void feedNextOutBridge() {
NextBridgeFeeder feeder = nextOutBridgeFeeder;
if (feeder != null) {
feeder.feed();
}
}
private ChannelFuture invokeFlush(final ChannelPromise promise, Thread currentThread) {
EventExecutor executor = executor();
if (executor.inEventLoop(currentThread)) {
@ -1431,7 +1454,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeInboundBridge();
freeInbound();
}
}
@ -1483,7 +1506,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeOutboundBridge();
freeOutbound();
}
}
@ -1608,120 +1631,124 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return ctx;
}
private static final class MessageBridge {
private final MessageBuf<Object> msgBuf = Unpooled.messageBuffer();
private abstract class NextBridgeFeeder {
final MessageBuf<Object> msgBuf;
final ByteBuf byteBuf;
private final Queue<Object[]> exchangeBuf = new ConcurrentLinkedQueue<Object[]>();
private void fill() {
if (msgBuf.isEmpty()) {
return;
}
Object[] data = msgBuf.toArray();
msgBuf.clear();
exchangeBuf.add(data);
protected NextBridgeFeeder() {
msgBuf = Unpooled.messageBuffer();
byteBuf = ChannelHandlerUtil.allocate(DefaultChannelHandlerContext.this);
}
private boolean flush(MessageBuf<Object> out) {
for (;;) {
Object[] data = exchangeBuf.peek();
if (data == null) {
return true;
final void feed() {
int dataLen = byteBuf.readableBytes();
if (dataLen != 0) {
ByteBuf data;
if (byteBuf.isDirect()) {
data = alloc().directBuffer(dataLen, dataLen);
} else {
data = alloc().heapBuffer(dataLen, dataLen);
}
int i;
for (i = 0; i < data.length; i ++) {
Object m = data[i];
if (m == null) {
break;
}
byteBuf.readBytes(data).discardSomeReadBytes();
nextByteBridge().add(data);
}
if (out.offer(m)) {
data[i] = null;
} else {
System.arraycopy(data, i, data, 0, data.length - i);
for (int j = i + 1; j < data.length; j ++) {
data[j] = null;
}
return false;
}
}
exchangeBuf.remove();
if (!msgBuf.isEmpty()) {
Object[] data = msgBuf.toArray();
msgBuf.clear();
nextMessageBridge().add(data);
}
}
private void release() {
final void release() {
byteBuf.release();
msgBuf.release();
}
protected abstract Queue<Object> nextByteBridge();
protected abstract Queue<Object> nextMessageBridge();
}
private final class NextInboundBridgeFeeder extends NextBridgeFeeder {
@Override
protected Queue<Object> nextByteBridge() {
DefaultChannelHandlerContext ctx = next;
for (;;) {
if (ctx.hasInboundByteBuffer()) {
break;
}
ctx = ctx.next;
}
return bridge(ctx);
}
@Override
protected Queue<Object> nextMessageBridge() {
DefaultChannelHandlerContext ctx = next;
for (;;) {
if (ctx.hasInboundMessageBuffer()) {
break;
}
ctx = ctx.next;
}
return bridge(ctx);
}
private Queue<Object> bridge(DefaultChannelHandlerContext ctx) {
Queue<Object> bridge = ctx.inBridge;
if (bridge == null) {
Queue<Object> newBridge = new ConcurrentLinkedQueue<Object>();
if (IN_BRIDGE_UPDATER.compareAndSet(ctx, null, newBridge)) {
bridge = newBridge;
} else {
bridge = ctx.inBridge;
}
}
return bridge;
}
}
private static final class ByteBridge {
private final ByteBuf byteBuf;
private final Queue<ByteBuf> exchangeBuf = new ConcurrentLinkedQueue<ByteBuf>();
private final ChannelHandlerContext ctx;
ByteBridge(ChannelHandlerContext ctx, boolean inbound) {
this.ctx = ctx;
if (inbound) {
if (ctx.inboundByteBuffer().isDirect()) {
byteBuf = ctx.alloc().directBuffer();
} else {
byteBuf = ctx.alloc().heapBuffer();
}
} else {
if (ctx.outboundByteBuffer().isDirect()) {
byteBuf = ctx.alloc().directBuffer();
} else {
byteBuf = ctx.alloc().heapBuffer();
}
}
}
private void fill() {
if (!byteBuf.isReadable()) {
return;
}
int dataLen = byteBuf.readableBytes();
ByteBuf data;
if (byteBuf.isDirect()) {
data = ctx.alloc().directBuffer(dataLen, dataLen);
} else {
data = ctx.alloc().buffer(dataLen, dataLen);
}
byteBuf.readBytes(data).discardSomeReadBytes();
exchangeBuf.add(data);
}
private boolean flush(ByteBuf out) {
private final class NextOutboundBridgeFeeder extends NextBridgeFeeder {
@Override
protected Queue<Object> nextByteBridge() {
DefaultChannelHandlerContext ctx = prev;
for (;;) {
ByteBuf data = exchangeBuf.peek();
if (data == null) {
return true;
}
if (out.writerIndex() > out.maxCapacity() - data.readableBytes()) {
// The target buffer is not going to be able to accept all data in the bridge.
out.capacity(out.maxCapacity());
out.writeBytes(data, out.writableBytes());
return false;
} else {
exchangeBuf.remove();
try {
out.writeBytes(data);
} finally {
data.release();
}
if (ctx.hasOutboundByteBuffer()) {
break;
}
ctx = ctx.prev;
}
return bridge(ctx);
}
private void release() {
byteBuf.release();
@Override
protected Queue<Object> nextMessageBridge() {
DefaultChannelHandlerContext ctx = prev;
for (;;) {
if (ctx.hasOutboundMessageBuffer()) {
break;
}
ctx = ctx.prev;
}
return bridge(ctx);
}
private Queue<Object> bridge(DefaultChannelHandlerContext ctx) {
Queue<Object> bridge = ctx.outBridge;
if (bridge == null) {
Queue<Object> newBridge = new ConcurrentLinkedQueue<Object>();
if (OUT_BRIDGE_UPDATER.compareAndSet(ctx, null, newBridge)) {
bridge = newBridge;
} else {
bridge = ctx.outBridge;
}
}
return bridge;
}
}
}

View File

@ -81,7 +81,7 @@ public class LocalTransportThreadModelTest {
}
@Test(timeout = 30000)
@Ignore
@Ignore("regression test")
public void testStagedExecutionMultiple() throws Throwable {
for (int i = 0; i < 10; i ++) {
testStagedExecution();
@ -206,7 +206,7 @@ public class LocalTransportThreadModelTest {
}
@Test(timeout = 30000)
@Ignore
@Ignore("regression test")
public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));