[#1535] Remove Channel.id() and so fix the possible leakage of the previous used Channel id map

The user can still use Channel.hashCode() for logging. It's just not 100% unique but should be good enough for most cases
This commit is contained in:
Norman Maurer 2013-07-08 13:31:59 +02:00
parent 39b57b889c
commit 9c1b31d20a
14 changed files with 139 additions and 153 deletions

View File

@ -155,7 +155,7 @@ public abstract class WebSocketServerHandshaker {
HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version %s server handshake", version(), channel.id()));
logger.debug(String.format("Channel %s WS Version %s server handshake", version(), channel.hashCode()));
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
channel.write(response).addListener(new ChannelFutureListener() {

View File

@ -0,0 +1,65 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.io.Serializable;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
public final class ConcurrentSet<E> extends AbstractSet<E> implements Serializable {
private static final long serialVersionUID = -6761513279741915432L;
private final ConcurrentMap<E, Boolean> map;
/**
* Creates a new instance which wraps the specified {@code map}.
*/
public ConcurrentSet() {
map = PlatformDependent.newConcurrentHashMap();
}
@Override
public int size() {
return map.size();
}
@Override
public boolean contains(Object o) {
return map.containsKey(o);
}
@Override
public boolean add(E o) {
return map.putIfAbsent(o, Boolean.TRUE) == null;
}
@Override
public boolean remove(Object o) {
return map.remove(o) != null;
}
@Override
public void clear() {
map.clear();
}
@Override
public Iterator<E> iterator() {
return map.keySet().iterator();
}
}

View File

@ -95,7 +95,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame, MessageList<Object> out) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().id(), frame.getClass().getSimpleName()));
"Channel %s received %s", ctx.channel().hashCode(), frame.getClass().getSimpleName()));
}
if (frame instanceof CloseWebSocketFrame) {

View File

@ -121,7 +121,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("Channel %s received %s", ctx.channel().id(), request));
logger.fine(String.format("Channel %s received %s", ctx.channel().hashCode(), request));
}
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}

View File

@ -123,7 +123,7 @@ public class WebSocketSslServerHandler extends SimpleChannelInboundHandler<Objec
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("Channel %s received %s", ctx.channel().id(), request));
logger.fine(String.format("Channel %s received %s", ctx.channel().hashCode(), request));
}
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}

View File

@ -87,7 +87,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
ctx.channel().id(), checkInterval);
ctx.channel().hashCode(), checkInterval);
setTrafficCounter(trafficCounter);
trafficCounter.start();
}

View File

@ -31,7 +31,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.ConcurrentMap;
/**
* A skeletal {@link Channel} implementation.
@ -40,37 +39,19 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
static final ConcurrentMap<Integer, Channel> allChannels = PlatformDependent.newConcurrentHashMap();
/**
* Generates a negative unique integer ID. This method generates only
* negative integers to avoid conflicts with user-specified IDs where only
* non-negative integers are allowed.
*/
private static Integer allocateId(Channel channel) {
private static Integer allocateId() {
int idVal = ThreadLocalRandom.current().nextInt();
if (idVal > 0) {
idVal = -idVal;
} else if (idVal == 0) {
idVal = -1;
}
Integer id;
for (;;) {
id = Integer.valueOf(idVal);
// Loop until a unique ID is acquired.
// It should be found in one loop practically.
if (allChannels.putIfAbsent(id, channel) == null) {
// Successfully acquired.
return id;
} else {
// Taken by other channel at almost the same moment.
idVal --;
if (idVal >= 0) {
idVal = -1;
}
}
}
return idVal;
}
private final Channel parent;
@ -108,27 +89,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected AbstractChannel(Channel parent, Integer id) {
if (id == null) {
id = allocateId(this);
id = allocateId();
} else {
if (id.intValue() < 0) {
throw new IllegalArgumentException("id: " + id + " (expected: >= 0)");
}
if (allChannels.putIfAbsent(id, this) != null) {
throw new IllegalArgumentException("duplicate ID: " + id);
}
}
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
allChannels.remove(id());
}
});
}
@Override
@ -136,11 +107,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return outboundBuffer.getWritable();
}
@Override
public final Integer id() {
return id;
}
@Override
public Channel parent() {
return parent;
@ -346,17 +312,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return this == o;
}
/**
* Compares the {@linkplain #id() ID} of the two channels.
*/
@Override
public final int compareTo(Channel o) {
return id().compareTo(o.id());
if (o instanceof AbstractChannel) {
return id.compareTo(((AbstractChannel) o).id);
}
return id.compareTo(Integer.valueOf(o.hashCode()));
}
/**
* Returns the {@link String} representation of this channel. The returned
* string contains the {@linkplain #id() ID}, {@linkplain #localAddress() local address},
* string contains the {@linkplain #hashCode()} ID}, {@linkplain #localAddress() local address},
* and {@linkplain #remoteAddress() remote address} of this channel for
* easier identification.
*/

View File

@ -68,11 +68,6 @@ import java.net.SocketAddress;
*/
public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPropertyAccess, Comparable<Channel> {
/**
* Returns the unique integer ID of this channel. The returned value MUST be non {@code null}.
*/
Integer id();
/**
* Return the {@link EventLoop} this {@link Channel} was registered too.
*/

View File

@ -96,13 +96,6 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
String name();
/**
* Returns the {@link Channel} whose ID matches the specified integer.
*
* @return the matching {@link Channel} if found. {@code null} otherwise.
*/
Channel find(Integer id);
/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group. If the specified {@code message} is an instance of

View File

@ -27,11 +27,11 @@ import java.util.Map;
/**
* {@link ChannelException} which holds {@link ChannelFuture}s that failed because of an error.
*/
public class ChannelGroupException extends ChannelException implements Iterable<Map.Entry<Integer, Throwable>> {
public class ChannelGroupException extends ChannelException implements Iterable<Map.Entry<Channel, Throwable>> {
private static final long serialVersionUID = -4093064295562629453L;
private final Collection<Map.Entry<Integer, Throwable>> failed;
private final Collection<Map.Entry<Channel, Throwable>> failed;
public ChannelGroupException(Collection<Map.Entry<Integer, Throwable>> causes) {
public ChannelGroupException(Collection<Map.Entry<Channel, Throwable>> causes) {
if (causes == null) {
throw new NullPointerException("causes");
}
@ -46,7 +46,7 @@ public class ChannelGroupException extends ChannelException implements Iterable<
* related id of the {@link Channel}.
*/
@Override
public Iterator<Map.Entry<Integer, Throwable>> iterator() {
public Iterator<Map.Entry<Channel, Throwable>> iterator() {
return failed.iterator();
}
}

View File

@ -109,16 +109,6 @@ public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture
*/
ChannelGroup group();
/**
* Returns the {@link ChannelFuture} of the individual I/O operation which
* is associated with the {@link Channel} whose ID matches the specified
* integer.
*
* @return the matching {@link ChannelFuture} if found.
* {@code null} otherwise.
*/
ChannelFuture find(Integer channelId);
/**
* Returns the {@link ChannelFuture} of the individual I/O operation which
* is associated with the specified {@link Channel}.

View File

@ -24,7 +24,7 @@ import io.netty.channel.MessageList;
import io.netty.channel.ServerChannel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ConcurrentSet;
import java.util.AbstractSet;
import java.util.ArrayList;
@ -32,7 +32,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -43,8 +42,8 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
private static final AtomicInteger nextId = new AtomicInteger();
private final String name;
private final EventExecutor executor;
private final ConcurrentMap<Integer, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Integer, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
private final ConcurrentSet<Channel> serverChannels = new ConcurrentSet<Channel>();
private final ConcurrentSet<Channel> nonServerChannels = new ConcurrentSet<Channel>();
private final ChannelFutureListener remover = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -88,26 +87,14 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return nonServerChannels.size() + serverChannels.size();
}
@Override
public Channel find(Integer id) {
Channel c = nonServerChannels.get(id);
if (c != null) {
return c;
} else {
return serverChannels.get(id);
}
}
@Override
public boolean contains(Object o) {
if (o instanceof Integer) {
return nonServerChannels.containsKey(o) || serverChannels.containsKey(o);
} else if (o instanceof Channel) {
if (o instanceof Channel) {
Channel c = (Channel) o;
if (o instanceof ServerChannel) {
return serverChannels.containsKey(c.id());
return serverChannels.contains(c);
} else {
return nonServerChannels.containsKey(c.id());
return nonServerChannels.contains(c);
}
} else {
return false;
@ -116,10 +103,10 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public boolean add(Channel channel) {
ConcurrentMap<Integer, Channel> map =
ConcurrentSet<Channel> set =
channel instanceof ServerChannel? serverChannels : nonServerChannels;
boolean added = map.putIfAbsent(channel.id(), channel) == null;
boolean added = set.add(channel);
if (added) {
channel.closeFuture().addListener(remover);
}
@ -128,22 +115,17 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public boolean remove(Object o) {
Channel c = null;
if (o instanceof Integer) {
c = nonServerChannels.remove(o);
if (c == null) {
c = serverChannels.remove(o);
}
} else if (o instanceof Channel) {
c = (Channel) o;
if (c instanceof ServerChannel) {
c = serverChannels.remove(c.id());
} else {
c = nonServerChannels.remove(c.id());
}
if (!(o instanceof Channel)) {
return false;
}
if (c == null) {
boolean removed;
Channel c = (Channel) o;
if (c instanceof ServerChannel) {
removed = serverChannels.remove(c);
} else {
removed = nonServerChannels.remove(c);
}
if (!removed) {
return false;
}
@ -160,36 +142,36 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public Iterator<Channel> iterator() {
return new CombinedIterator<Channel>(
serverChannels.values().iterator(),
nonServerChannels.values().iterator());
serverChannels.iterator(),
nonServerChannels.iterator());
}
@Override
public Object[] toArray() {
Collection<Channel> channels = new ArrayList<Channel>(size());
channels.addAll(serverChannels.values());
channels.addAll(nonServerChannels.values());
channels.addAll(serverChannels);
channels.addAll(nonServerChannels);
return channels.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
Collection<Channel> channels = new ArrayList<Channel>(size());
channels.addAll(serverChannels.values());
channels.addAll(nonServerChannels.values());
channels.addAll(serverChannels);
channels.addAll(nonServerChannels);
return channels.toArray(a);
}
@Override
public ChannelGroupFuture close() {
Map<Integer, ChannelFuture> futures =
new LinkedHashMap<Integer, ChannelFuture>(size());
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels.values()) {
futures.put(c.id(), c.close());
for (Channel c: serverChannels) {
futures.put(c, c.close());
}
for (Channel c: nonServerChannels.values()) {
futures.put(c.id(), c.close());
for (Channel c: nonServerChannels) {
futures.put(c, c.close());
}
return new DefaultChannelGroupFuture(this, futures, executor);
@ -197,14 +179,14 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public ChannelGroupFuture disconnect() {
Map<Integer, ChannelFuture> futures =
new LinkedHashMap<Integer, ChannelFuture>(size());
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels.values()) {
futures.put(c.id(), c.disconnect());
for (Channel c: serverChannels) {
futures.put(c, c.disconnect());
}
for (Channel c: nonServerChannels.values()) {
futures.put(c.id(), c.disconnect());
for (Channel c: nonServerChannels) {
futures.put(c, c.disconnect());
}
return new DefaultChannelGroupFuture(this, futures, executor);
@ -216,9 +198,9 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
throw new NullPointerException("message");
}
Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
futures.put(c.id(), c.write(safeDuplicate(message)));
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
futures.put(c, c.write(safeDuplicate(message)));
}
ReferenceCountUtil.release(message);
@ -231,14 +213,14 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
throw new NullPointerException("messages");
}
Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
int size = messages.size();
MessageList<Object> messageCopy = MessageList.newInstance(size);
for (int i = 0 ; i < size; i++) {
messageCopy.add(safeDuplicate(messages.get(i)));
}
futures.put(c.id(), c.write(messageCopy));
futures.put(c, c.write(messageCopy));
}
messages.releaseAllAndRecycle();
@ -259,14 +241,14 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public ChannelGroupFuture deregister() {
Map<Integer, ChannelFuture> futures =
new LinkedHashMap<Integer, ChannelFuture>(size());
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels.values()) {
futures.put(c.id(), c.deregister());
for (Channel c: serverChannels) {
futures.put(c, c.deregister());
}
for (Channel c: nonServerChannels.values()) {
futures.put(c.id(), c.deregister());
for (Channel c: nonServerChannels) {
futures.put(c, c.deregister());
}
return new DefaultChannelGroupFuture(this, futures, executor);

View File

@ -40,7 +40,7 @@ import java.util.Map;
final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
private final ChannelGroup group;
private final Map<Integer, ChannelFuture> futures;
private final Map<Channel, ChannelFuture> futures;
private int successCount;
private int failureCount;
@ -62,11 +62,11 @@ final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements Ch
if (callSetDone) {
if (failureCount > 0) {
List<Map.Entry<Integer, Throwable>> failed =
new ArrayList<Map.Entry<Integer, Throwable>>(failureCount);
List<Map.Entry<Channel, Throwable>> failed =
new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
for (ChannelFuture f: futures.values()) {
if (!f.isSuccess()) {
failed.add(new DefaultEntry<Integer, Throwable>(f.channel().id(), f.cause()));
failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
}
}
setFailure0(new ChannelGroupException(failed));
@ -91,9 +91,9 @@ final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements Ch
this.group = group;
Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
for (ChannelFuture f: futures) {
futureMap.put(f.channel().id(), f);
futureMap.put(f.channel(), f);
}
this.futures = Collections.unmodifiableMap(futureMap);
@ -108,7 +108,7 @@ final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements Ch
}
}
DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures, EventExecutor executor) {
DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
super(executor);
this.group = group;
this.futures = Collections.unmodifiableMap(futures);
@ -127,14 +127,9 @@ final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements Ch
return group;
}
@Override
public ChannelFuture find(Integer channelId) {
return futures.get(channelId);
}
@Override
public ChannelFuture find(Channel channel) {
return futures.get(channel.id());
return futures.get(channel);
}
@Override

View File

@ -40,7 +40,7 @@ public final class LocalAddress extends SocketAddress implements Comparable<Loca
LocalAddress(Channel channel) {
StringBuilder buf = new StringBuilder(16);
buf.append("local:E");
buf.append(Long.toHexString(channel.id().intValue() & 0xFFFFFFFFL | 0x100000000L));
buf.append(Long.toHexString(channel.hashCode() & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(7, ':');
id = buf.substring(6);
strVal = buf.toString();