Remove HashMap for lookup name / ctx from DefaultChannelPipeline to reduce memory footprint
Motivation: If you start to have 1M+ concurrent connections memory footprint can be come a big issue. We should try to reduce it as much as possible in the core of netty. Modifications: - Remove HashMap that was used to store name to ctx mapping. This was only used for validation and access a handler by name. As a pipeline is not expected to be very long (like 100+ handlers) we can just walk the linked list structure to find the ctx with a given name. Result: Less memory footprint of the DefaultChannelPipeline.
This commit is contained in:
parent
edb2250d35
commit
2d2e07578a
@ -29,7 +29,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
@ -61,8 +60,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
final AbstractChannelHandlerContext head;
|
final AbstractChannelHandlerContext head;
|
||||||
final AbstractChannelHandlerContext tail;
|
final AbstractChannelHandlerContext tail;
|
||||||
|
|
||||||
private final Map<String, AbstractChannelHandlerContext> name2ctx =
|
|
||||||
new HashMap<String, AbstractChannelHandlerContext>(4);
|
|
||||||
private final boolean touch = ResourceLeakDetector.isEnabled();
|
private final boolean touch = ResourceLeakDetector.isEnabled();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -101,7 +98,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
|
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addFirst0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
addFirst0(new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -110,12 +107,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
|
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addFirst0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
addFirst0(new DefaultChannelHandlerContext(this, invoker, name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addFirst0(String name, AbstractChannelHandlerContext newCtx) {
|
private void addFirst0(AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
AbstractChannelHandlerContext nextCtx = head.next;
|
AbstractChannelHandlerContext nextCtx = head.next;
|
||||||
@ -124,8 +121,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
head.next = newCtx;
|
head.next = newCtx;
|
||||||
nextCtx.prev = newCtx;
|
nextCtx.prev = newCtx;
|
||||||
|
|
||||||
name2ctx.put(name, newCtx);
|
|
||||||
|
|
||||||
callHandlerAdded(newCtx);
|
callHandlerAdded(newCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -138,7 +133,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
|
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addLast0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
addLast0(new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -147,12 +142,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
|
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
addLast0(new DefaultChannelHandlerContext(this, invoker, name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
|
private void addLast0(AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
AbstractChannelHandlerContext prev = tail.prev;
|
AbstractChannelHandlerContext prev = tail.prev;
|
||||||
@ -161,8 +156,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
prev.next = newCtx;
|
prev.next = newCtx;
|
||||||
tail.prev = newCtx;
|
tail.prev = newCtx;
|
||||||
|
|
||||||
name2ctx.put(name, newCtx);
|
|
||||||
|
|
||||||
callHandlerAdded(newCtx);
|
callHandlerAdded(newCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,7 +169,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
addBefore0(ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -187,13 +180,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
addBefore0(ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addBefore0(
|
private void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||||
final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
newCtx.prev = ctx.prev;
|
newCtx.prev = ctx.prev;
|
||||||
@ -201,8 +193,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
ctx.prev.next = newCtx;
|
ctx.prev.next = newCtx;
|
||||||
ctx.prev = newCtx;
|
ctx.prev = newCtx;
|
||||||
|
|
||||||
name2ctx.put(name, newCtx);
|
|
||||||
|
|
||||||
callHandlerAdded(newCtx);
|
callHandlerAdded(newCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,7 +206,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
addAfter0(ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -228,12 +218,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||||
name = filterName(name, handler);
|
name = filterName(name, handler);
|
||||||
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
addAfter0(ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addAfter0(String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
private void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
newCtx.prev = ctx;
|
newCtx.prev = ctx;
|
||||||
@ -241,8 +231,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
ctx.next.prev = newCtx;
|
ctx.next.prev = newCtx;
|
||||||
ctx.next = newCtx;
|
ctx.next = newCtx;
|
||||||
|
|
||||||
name2ctx.put(name, newCtx);
|
|
||||||
|
|
||||||
callHandlerAdded(newCtx);
|
callHandlerAdded(newCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -376,11 +364,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
|
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
|
||||||
// any name conflicts. Note that we don't cache the names generated here.
|
// any name conflicts. Note that we don't cache the names generated here.
|
||||||
if (name2ctx.containsKey(name)) {
|
if (context0(name) != null) {
|
||||||
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
|
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
|
||||||
for (int i = 1;; i ++) {
|
for (int i = 1;; i ++) {
|
||||||
String newName = baseName + i;
|
String newName = baseName + i;
|
||||||
if (!name2ctx.containsKey(newName)) {
|
if (context0(newName) == null) {
|
||||||
name = newName;
|
name = newName;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -448,7 +436,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
AbstractChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
prev.next = next;
|
prev.next = next;
|
||||||
next.prev = prev;
|
next.prev = prev;
|
||||||
name2ctx.remove(ctx.name());
|
|
||||||
callHandlerRemoved(ctx);
|
callHandlerRemoved(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -503,15 +490,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
|
new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
|
||||||
|
|
||||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||||
replace0(ctx, newName, newCtx);
|
replace0(ctx, newCtx);
|
||||||
return ctx.handler();
|
return ctx.handler();
|
||||||
} else {
|
} else {
|
||||||
final String finalNewName = newName;
|
|
||||||
future = newCtx.executor().submit(new Runnable() {
|
future = newCtx.executor().submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
synchronized (DefaultChannelPipeline.this) {
|
synchronized (DefaultChannelPipeline.this) {
|
||||||
replace0(ctx, finalNewName, newCtx);
|
replace0(ctx, newCtx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -526,8 +512,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return ctx.handler();
|
return ctx.handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void replace0(AbstractChannelHandlerContext oldCtx, String newName,
|
private void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
|
||||||
AbstractChannelHandlerContext newCtx) {
|
|
||||||
checkMultiplicity(newCtx);
|
checkMultiplicity(newCtx);
|
||||||
|
|
||||||
AbstractChannelHandlerContext prev = oldCtx.prev;
|
AbstractChannelHandlerContext prev = oldCtx.prev;
|
||||||
@ -542,11 +527,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
prev.next = newCtx;
|
prev.next = newCtx;
|
||||||
next.prev = newCtx;
|
next.prev = newCtx;
|
||||||
|
|
||||||
if (!oldCtx.name().equals(newName)) {
|
|
||||||
name2ctx.remove(oldCtx.name());
|
|
||||||
}
|
|
||||||
name2ctx.put(newName, newCtx);
|
|
||||||
|
|
||||||
// update the reference to the replacement so forward of buffered content will work correctly
|
// update the reference to the replacement so forward of buffered content will work correctly
|
||||||
oldCtx.prev = newCtx;
|
oldCtx.prev = newCtx;
|
||||||
oldCtx.next = newCtx;
|
oldCtx.next = newCtx;
|
||||||
@ -724,9 +704,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
throw new NullPointerException("name");
|
throw new NullPointerException("name");
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (this) {
|
return context0(name);
|
||||||
return name2ctx.get(name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -851,8 +829,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
* Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
|
* Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
|
||||||
* handlerRemoved().
|
* handlerRemoved().
|
||||||
*
|
*
|
||||||
* Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext)})
|
* Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
|
||||||
* before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext)}) so that
|
* before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
|
||||||
* the handlers are removed after all events are handled.
|
* the handlers are removed after all events are handled.
|
||||||
*
|
*
|
||||||
* See: https://github.com/netty/netty/issues/3156
|
* See: https://github.com/netty/netty/issues/3156
|
||||||
@ -1063,13 +1041,24 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return generateName(handler);
|
return generateName(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!name2ctx.containsKey(name)) {
|
if (context0(name) == null) {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new IllegalArgumentException("Duplicate handler name: " + name);
|
throw new IllegalArgumentException("Duplicate handler name: " + name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private AbstractChannelHandlerContext context0(String name) {
|
||||||
|
AbstractChannelHandlerContext context = head.next;
|
||||||
|
while (context != tail) {
|
||||||
|
if (context.name().equals(name)) {
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
context = context.next;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private AbstractChannelHandlerContext getContextOrDie(String name) {
|
private AbstractChannelHandlerContext getContextOrDie(String name) {
|
||||||
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
|
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user