[#995] Replace AtomicReference usage with AtomicReferenceFieldUpdater
This will safe as an example 2gb mem when have 10 DefaultHandlerContext instances per connection and the connection count is 1000000. Also kind of related to [#920]
This commit is contained in:
parent
03de5f479a
commit
1bb003d9ae
@ -32,7 +32,7 @@ import java.util.concurrent.Callable;
|
|||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||||
|
|
||||||
import static io.netty.channel.DefaultChannelPipeline.*;
|
import static io.netty.channel.DefaultChannelPipeline.*;
|
||||||
|
|
||||||
@ -64,10 +64,28 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
// To avoid such situation, we lazily creates an additional thread-safe buffer called
|
// To avoid such situation, we lazily creates an additional thread-safe buffer called
|
||||||
// 'bridge' so that the two handlers access each other's buffer only via the bridges.
|
// 'bridge' so that the two handlers access each other's buffer only via the bridges.
|
||||||
// The content written into a bridge is flushed into the actual buffer by flushBridge().
|
// The content written into a bridge is flushed into the actual buffer by flushBridge().
|
||||||
private final AtomicReference<MessageBridge> inMsgBridge;
|
//
|
||||||
private AtomicReference<MessageBridge> outMsgBridge;
|
// Note we use an AtomicReferenceFieldUpdater for atomic operations on these to safe memory. This will safe us
|
||||||
private final AtomicReference<ByteBridge> inByteBridge;
|
// 64 bytes per Bridge.
|
||||||
private AtomicReference<ByteBridge> outByteBridge;
|
private volatile MessageBridge inMsgBridge;
|
||||||
|
private volatile MessageBridge outMsgBridge;
|
||||||
|
private volatile ByteBridge inByteBridge;
|
||||||
|
private volatile ByteBridge outByteBridge;
|
||||||
|
|
||||||
|
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, MessageBridge> IN_MSG_BRIDGE_UPDATER
|
||||||
|
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
|
||||||
|
MessageBridge.class, "inMsgBridge");
|
||||||
|
|
||||||
|
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, MessageBridge> OUT_MSG_BRIDGE_UPDATER
|
||||||
|
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
|
||||||
|
MessageBridge.class, "outMsgBridge");
|
||||||
|
|
||||||
|
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, ByteBridge> IN_BYTE_BRIDGE_UPDATER
|
||||||
|
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
|
||||||
|
ByteBridge.class, "inByteBridge");
|
||||||
|
private static final AtomicReferenceFieldUpdater<DefaultChannelHandlerContext, ByteBridge> OUT_BYTE_BRIDGE_UPDATER
|
||||||
|
= AtomicReferenceFieldUpdater.newUpdater(DefaultChannelHandlerContext.class,
|
||||||
|
ByteBridge.class, "outByteBridge");
|
||||||
|
|
||||||
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
||||||
private Runnable invokeChannelRegisteredTask;
|
private Runnable invokeChannelRegisteredTask;
|
||||||
@ -148,14 +166,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
if (buf instanceof ByteBuf) {
|
if (buf instanceof ByteBuf) {
|
||||||
inByteBuf = (ByteBuf) buf;
|
inByteBuf = (ByteBuf) buf;
|
||||||
inByteBridge = new AtomicReference<ByteBridge>();
|
inByteBridge = null;
|
||||||
inMsgBuf = null;
|
inMsgBuf = null;
|
||||||
inMsgBridge = null;
|
inMsgBridge = null;
|
||||||
} else if (buf instanceof MessageBuf) {
|
} else if (buf instanceof MessageBuf) {
|
||||||
inByteBuf = null;
|
inByteBuf = null;
|
||||||
inByteBridge = null;
|
inByteBridge = null;
|
||||||
inMsgBuf = (MessageBuf<Object>) buf;
|
inMsgBuf = (MessageBuf<Object>) buf;
|
||||||
inMsgBridge = new AtomicReference<MessageBridge>();
|
inMsgBridge = null;
|
||||||
} else {
|
} else {
|
||||||
throw new Error();
|
throw new Error();
|
||||||
}
|
}
|
||||||
@ -252,7 +270,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
if (buf instanceof ByteBuf) {
|
if (buf instanceof ByteBuf) {
|
||||||
outByteBuf = (ByteBuf) buf;
|
outByteBuf = (ByteBuf) buf;
|
||||||
outByteBridge = new AtomicReference<ByteBridge>();
|
outByteBridge = null;
|
||||||
outMsgBuf = null;
|
outMsgBuf = null;
|
||||||
outMsgBridge = null;
|
outMsgBridge = null;
|
||||||
} else if (buf instanceof MessageBuf) {
|
} else if (buf instanceof MessageBuf) {
|
||||||
@ -261,7 +279,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
MessageBuf<Object> msgBuf = (MessageBuf<Object>) buf;
|
MessageBuf<Object> msgBuf = (MessageBuf<Object>) buf;
|
||||||
outMsgBuf = msgBuf;
|
outMsgBuf = msgBuf;
|
||||||
outMsgBridge = new AtomicReference<MessageBridge>();
|
outMsgBridge = null;
|
||||||
} else {
|
} else {
|
||||||
throw new Error();
|
throw new Error();
|
||||||
}
|
}
|
||||||
@ -269,24 +287,24 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
private void fillBridge() {
|
private void fillBridge() {
|
||||||
if (inMsgBridge != null) {
|
if (inMsgBridge != null) {
|
||||||
MessageBridge bridge = inMsgBridge.get();
|
MessageBridge bridge = inMsgBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.fill();
|
bridge.fill();
|
||||||
}
|
}
|
||||||
} else if (inByteBridge != null) {
|
} else if (inByteBridge != null) {
|
||||||
ByteBridge bridge = inByteBridge.get();
|
ByteBridge bridge = inByteBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.fill();
|
bridge.fill();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (outMsgBridge != null) {
|
if (outMsgBridge != null) {
|
||||||
MessageBridge bridge = outMsgBridge.get();
|
MessageBridge bridge = outMsgBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.fill();
|
bridge.fill();
|
||||||
}
|
}
|
||||||
} else if (outByteBridge != null) {
|
} else if (outByteBridge != null) {
|
||||||
ByteBridge bridge = outByteBridge.get();
|
ByteBridge bridge = outByteBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.fill();
|
bridge.fill();
|
||||||
}
|
}
|
||||||
@ -295,24 +313,24 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
private void flushBridge() {
|
private void flushBridge() {
|
||||||
if (inMsgBridge != null) {
|
if (inMsgBridge != null) {
|
||||||
MessageBridge bridge = inMsgBridge.get();
|
MessageBridge bridge = inMsgBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.flush(inMsgBuf);
|
bridge.flush(inMsgBuf);
|
||||||
}
|
}
|
||||||
} else if (inByteBridge != null) {
|
} else if (inByteBridge != null) {
|
||||||
ByteBridge bridge = inByteBridge.get();
|
ByteBridge bridge = inByteBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.flush(inByteBuf);
|
bridge.flush(inByteBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (outMsgBridge != null) {
|
if (outMsgBridge != null) {
|
||||||
MessageBridge bridge = outMsgBridge.get();
|
MessageBridge bridge = outMsgBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.flush(outMsgBuf);
|
bridge.flush(outMsgBuf);
|
||||||
}
|
}
|
||||||
} else if (outByteBridge != null) {
|
} else if (outByteBridge != null) {
|
||||||
ByteBridge bridge = outByteBridge.get();
|
ByteBridge bridge = outByteBridge;
|
||||||
if (bridge != null) {
|
if (bridge != null) {
|
||||||
bridge.flush(outByteBuf);
|
bridge.flush(outByteBuf);
|
||||||
}
|
}
|
||||||
@ -771,11 +789,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
if (ctx.executor().inEventLoop()) {
|
if (ctx.executor().inEventLoop()) {
|
||||||
return ctx.inboundByteBuffer();
|
return ctx.inboundByteBuffer();
|
||||||
} else {
|
} else {
|
||||||
ByteBridge bridge = ctx.inByteBridge.get();
|
ByteBridge bridge = ctx.inByteBridge;
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new ByteBridge(ctx);
|
bridge = new ByteBridge(ctx);
|
||||||
if (!ctx.inByteBridge.compareAndSet(null, bridge)) {
|
if (!IN_BYTE_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
|
||||||
bridge = ctx.inByteBridge.get();
|
bridge = ctx.inByteBridge;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return bridge.byteBuf;
|
return bridge.byteBuf;
|
||||||
@ -807,11 +825,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
if (ctx.executor().inEventLoop()) {
|
if (ctx.executor().inEventLoop()) {
|
||||||
return ctx.inboundMessageBuffer();
|
return ctx.inboundMessageBuffer();
|
||||||
} else {
|
} else {
|
||||||
MessageBridge bridge = ctx.inMsgBridge.get();
|
MessageBridge bridge = ctx.inMsgBridge;
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new MessageBridge();
|
bridge = new MessageBridge();
|
||||||
if (!ctx.inMsgBridge.compareAndSet(null, bridge)) {
|
if (!IN_MSG_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
|
||||||
bridge = ctx.inMsgBridge.get();
|
bridge = ctx.inMsgBridge;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return bridge.msgBuf;
|
return bridge.msgBuf;
|
||||||
@ -830,11 +848,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
if (ctx.executor().inEventLoop()) {
|
if (ctx.executor().inEventLoop()) {
|
||||||
return ctx.outboundByteBuffer();
|
return ctx.outboundByteBuffer();
|
||||||
} else {
|
} else {
|
||||||
ByteBridge bridge = ctx.outByteBridge.get();
|
ByteBridge bridge = ctx.outByteBridge;
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new ByteBridge(ctx);
|
bridge = new ByteBridge(ctx);
|
||||||
if (!ctx.outByteBridge.compareAndSet(null, bridge)) {
|
if (!OUT_BYTE_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
|
||||||
bridge = ctx.outByteBridge.get();
|
bridge = ctx.outByteBridge;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return bridge.byteBuf;
|
return bridge.byteBuf;
|
||||||
@ -867,11 +885,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
if (ctx.executor().inEventLoop()) {
|
if (ctx.executor().inEventLoop()) {
|
||||||
return ctx.outboundMessageBuffer();
|
return ctx.outboundMessageBuffer();
|
||||||
} else {
|
} else {
|
||||||
MessageBridge bridge = ctx.outMsgBridge.get();
|
MessageBridge bridge = ctx.outMsgBridge;
|
||||||
if (bridge == null) {
|
if (bridge == null) {
|
||||||
bridge = new MessageBridge();
|
bridge = new MessageBridge();
|
||||||
if (!ctx.outMsgBridge.compareAndSet(null, bridge)) {
|
if (!OUT_MSG_BRIDGE_UPDATER.compareAndSet(ctx, null, bridge)) {
|
||||||
bridge = ctx.outMsgBridge.get();
|
bridge = ctx.outMsgBridge;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return bridge.msgBuf;
|
return bridge.msgBuf;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user