diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java index 71309a68e8..071bf7f445 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java @@ -155,7 +155,7 @@ public abstract class WebSocketServerHandshaker { HttpHeaders responseHeaders, final ChannelPromise promise) { if (logger.isDebugEnabled()) { - logger.debug(String.format("Channel %s WS Version %s server handshake", version(), channel.id())); + logger.debug(String.format("Channel %s WS Version %s server handshake", version(), channel.hashCode())); } FullHttpResponse response = newHandshakeResponse(req, responseHeaders); channel.write(response).addListener(new ChannelFutureListener() { diff --git a/common/src/main/java/io/netty/util/internal/ConcurrentSet.java b/common/src/main/java/io/netty/util/internal/ConcurrentSet.java new file mode 100644 index 0000000000..52f8c124d4 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/ConcurrentSet.java @@ -0,0 +1,65 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import java.io.Serializable; +import java.util.AbstractSet; +import java.util.Iterator; +import java.util.concurrent.ConcurrentMap; + +public final class ConcurrentSet extends AbstractSet implements Serializable { + + private static final long serialVersionUID = -6761513279741915432L; + + private final ConcurrentMap map; + + /** + * Creates a new instance which wraps the specified {@code map}. + */ + public ConcurrentSet() { + map = PlatformDependent.newConcurrentHashMap(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean contains(Object o) { + return map.containsKey(o); + } + + @Override + public boolean add(E o) { + return map.putIfAbsent(o, Boolean.TRUE) == null; + } + + @Override + public boolean remove(Object o) { + return map.remove(o) != null; + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public Iterator iterator() { + return map.keySet().iterator(); + } +} diff --git a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java index c106da74a3..97aa040352 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServerHandler.java @@ -95,7 +95,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter { private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame, MessageList out) { if (logger.isLoggable(Level.FINE)) { logger.fine(String.format( - "Channel %s received %s", ctx.channel().id(), frame.getClass().getSimpleName())); + "Channel %s received %s", ctx.channel().hashCode(), frame.getClass().getSimpleName())); } if (frame instanceof CloseWebSocketFrame) { diff --git a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java index 2b858af522..952283953d 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServerHandler.java @@ -121,7 +121,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)) { - logger.fine(String.format("Channel %s received %s", ctx.channel().id(), request)); + logger.fine(String.format("Channel %s received %s", ctx.channel().hashCode(), request)); } ctx.channel().write(new TextWebSocketFrame(request.toUpperCase())); } diff --git a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java index 10892d7977..5db91db46f 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java @@ -123,7 +123,7 @@ public class WebSocketSslServerHandler extends SimpleChannelInboundHandler allChannels = PlatformDependent.newConcurrentHashMap(); - /** * Generates a negative unique integer ID. This method generates only * negative integers to avoid conflicts with user-specified IDs where only * non-negative integers are allowed. */ - private static Integer allocateId(Channel channel) { + private static Integer allocateId() { int idVal = ThreadLocalRandom.current().nextInt(); if (idVal > 0) { idVal = -idVal; } else if (idVal == 0) { idVal = -1; } - - Integer id; - for (;;) { - id = Integer.valueOf(idVal); - // Loop until a unique ID is acquired. - // It should be found in one loop practically. - if (allChannels.putIfAbsent(id, channel) == null) { - // Successfully acquired. - return id; - } else { - // Taken by other channel at almost the same moment. - idVal --; - if (idVal >= 0) { - idVal = -1; - } - } - } + return idVal; } private final Channel parent; @@ -108,27 +89,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected AbstractChannel(Channel parent, Integer id) { if (id == null) { - id = allocateId(this); + id = allocateId(); } else { if (id.intValue() < 0) { throw new IllegalArgumentException("id: " + id + " (expected: >= 0)"); } - if (allChannels.putIfAbsent(id, this) != null) { - throw new IllegalArgumentException("duplicate ID: " + id); - } } this.parent = parent; this.id = id; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); - - closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - allChannels.remove(id()); - } - }); } @Override @@ -136,11 +107,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return outboundBuffer.getWritable(); } - @Override - public final Integer id() { - return id; - } - @Override public Channel parent() { return parent; @@ -346,17 +312,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return this == o; } - /** - * Compares the {@linkplain #id() ID} of the two channels. - */ @Override public final int compareTo(Channel o) { - return id().compareTo(o.id()); + if (o instanceof AbstractChannel) { + return id.compareTo(((AbstractChannel) o).id); + } + return id.compareTo(Integer.valueOf(o.hashCode())); } /** * Returns the {@link String} representation of this channel. The returned - * string contains the {@linkplain #id() ID}, {@linkplain #localAddress() local address}, + * string contains the {@linkplain #hashCode()} ID}, {@linkplain #localAddress() local address}, * and {@linkplain #remoteAddress() remote address} of this channel for * easier identification. */ diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index f50a42ee76..b4621eba29 100755 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -68,11 +68,6 @@ import java.net.SocketAddress; */ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPropertyAccess, Comparable { - /** - * Returns the unique integer ID of this channel. The returned value MUST be non {@code null}. - */ - Integer id(); - /** * Return the {@link EventLoop} this {@link Channel} was registered too. */ diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java index b034d94377..707b284e6e 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java @@ -96,13 +96,6 @@ public interface ChannelGroup extends Set, Comparable { */ String name(); - /** - * Returns the {@link Channel} whose ID matches the specified integer. - * - * @return the matching {@link Channel} if found. {@code null} otherwise. - */ - Channel find(Integer id); - /** * Writes the specified {@code message} to all {@link Channel}s in this * group. If the specified {@code message} is an instance of diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroupException.java b/transport/src/main/java/io/netty/channel/group/ChannelGroupException.java index 3a14f46a03..13d4a28213 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroupException.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroupException.java @@ -27,11 +27,11 @@ import java.util.Map; /** * {@link ChannelException} which holds {@link ChannelFuture}s that failed because of an error. */ -public class ChannelGroupException extends ChannelException implements Iterable> { +public class ChannelGroupException extends ChannelException implements Iterable> { private static final long serialVersionUID = -4093064295562629453L; - private final Collection> failed; + private final Collection> failed; - public ChannelGroupException(Collection> causes) { + public ChannelGroupException(Collection> causes) { if (causes == null) { throw new NullPointerException("causes"); } @@ -46,7 +46,7 @@ public class ChannelGroupException extends ChannelException implements Iterable< * related id of the {@link Channel}. */ @Override - public Iterator> iterator() { + public Iterator> iterator() { return failed.iterator(); } } diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java index ac83f7e846..c0b074b889 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java @@ -109,16 +109,6 @@ public interface ChannelGroupFuture extends Future, Iterable implements Channel private static final AtomicInteger nextId = new AtomicInteger(); private final String name; private final EventExecutor executor; - private final ConcurrentMap serverChannels = PlatformDependent.newConcurrentHashMap(); - private final ConcurrentMap nonServerChannels = PlatformDependent.newConcurrentHashMap(); + private final ConcurrentSet serverChannels = new ConcurrentSet(); + private final ConcurrentSet nonServerChannels = new ConcurrentSet(); private final ChannelFutureListener remover = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -88,26 +87,14 @@ public class DefaultChannelGroup extends AbstractSet implements Channel return nonServerChannels.size() + serverChannels.size(); } - @Override - public Channel find(Integer id) { - Channel c = nonServerChannels.get(id); - if (c != null) { - return c; - } else { - return serverChannels.get(id); - } - } - @Override public boolean contains(Object o) { - if (o instanceof Integer) { - return nonServerChannels.containsKey(o) || serverChannels.containsKey(o); - } else if (o instanceof Channel) { + if (o instanceof Channel) { Channel c = (Channel) o; if (o instanceof ServerChannel) { - return serverChannels.containsKey(c.id()); + return serverChannels.contains(c); } else { - return nonServerChannels.containsKey(c.id()); + return nonServerChannels.contains(c); } } else { return false; @@ -116,10 +103,10 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public boolean add(Channel channel) { - ConcurrentMap map = + ConcurrentSet set = channel instanceof ServerChannel? serverChannels : nonServerChannels; - boolean added = map.putIfAbsent(channel.id(), channel) == null; + boolean added = set.add(channel); if (added) { channel.closeFuture().addListener(remover); } @@ -128,22 +115,17 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public boolean remove(Object o) { - Channel c = null; - if (o instanceof Integer) { - c = nonServerChannels.remove(o); - if (c == null) { - c = serverChannels.remove(o); - } - } else if (o instanceof Channel) { - c = (Channel) o; - if (c instanceof ServerChannel) { - c = serverChannels.remove(c.id()); - } else { - c = nonServerChannels.remove(c.id()); - } + if (!(o instanceof Channel)) { + return false; } - - if (c == null) { + boolean removed; + Channel c = (Channel) o; + if (c instanceof ServerChannel) { + removed = serverChannels.remove(c); + } else { + removed = nonServerChannels.remove(c); + } + if (!removed) { return false; } @@ -160,36 +142,36 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public Iterator iterator() { return new CombinedIterator( - serverChannels.values().iterator(), - nonServerChannels.values().iterator()); + serverChannels.iterator(), + nonServerChannels.iterator()); } @Override public Object[] toArray() { Collection channels = new ArrayList(size()); - channels.addAll(serverChannels.values()); - channels.addAll(nonServerChannels.values()); + channels.addAll(serverChannels); + channels.addAll(nonServerChannels); return channels.toArray(); } @Override public T[] toArray(T[] a) { Collection channels = new ArrayList(size()); - channels.addAll(serverChannels.values()); - channels.addAll(nonServerChannels.values()); + channels.addAll(serverChannels); + channels.addAll(nonServerChannels); return channels.toArray(a); } @Override public ChannelGroupFuture close() { - Map futures = - new LinkedHashMap(size()); + Map futures = + new LinkedHashMap(size()); - for (Channel c: serverChannels.values()) { - futures.put(c.id(), c.close()); + for (Channel c: serverChannels) { + futures.put(c, c.close()); } - for (Channel c: nonServerChannels.values()) { - futures.put(c.id(), c.close()); + for (Channel c: nonServerChannels) { + futures.put(c, c.close()); } return new DefaultChannelGroupFuture(this, futures, executor); @@ -197,14 +179,14 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public ChannelGroupFuture disconnect() { - Map futures = - new LinkedHashMap(size()); + Map futures = + new LinkedHashMap(size()); - for (Channel c: serverChannels.values()) { - futures.put(c.id(), c.disconnect()); + for (Channel c: serverChannels) { + futures.put(c, c.disconnect()); } - for (Channel c: nonServerChannels.values()) { - futures.put(c.id(), c.disconnect()); + for (Channel c: nonServerChannels) { + futures.put(c, c.disconnect()); } return new DefaultChannelGroupFuture(this, futures, executor); @@ -216,9 +198,9 @@ public class DefaultChannelGroup extends AbstractSet implements Channel throw new NullPointerException("message"); } - Map futures = new LinkedHashMap(size()); - for (Channel c: nonServerChannels.values()) { - futures.put(c.id(), c.write(safeDuplicate(message))); + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels) { + futures.put(c, c.write(safeDuplicate(message))); } ReferenceCountUtil.release(message); @@ -231,14 +213,14 @@ public class DefaultChannelGroup extends AbstractSet implements Channel throw new NullPointerException("messages"); } - Map futures = new LinkedHashMap(size()); - for (Channel c: nonServerChannels.values()) { + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels) { int size = messages.size(); MessageList messageCopy = MessageList.newInstance(size); for (int i = 0 ; i < size; i++) { messageCopy.add(safeDuplicate(messages.get(i))); } - futures.put(c.id(), c.write(messageCopy)); + futures.put(c, c.write(messageCopy)); } messages.releaseAllAndRecycle(); @@ -259,14 +241,14 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public ChannelGroupFuture deregister() { - Map futures = - new LinkedHashMap(size()); + Map futures = + new LinkedHashMap(size()); - for (Channel c: serverChannels.values()) { - futures.put(c.id(), c.deregister()); + for (Channel c: serverChannels) { + futures.put(c, c.deregister()); } - for (Channel c: nonServerChannels.values()) { - futures.put(c.id(), c.deregister()); + for (Channel c: nonServerChannels) { + futures.put(c, c.deregister()); } return new DefaultChannelGroupFuture(this, futures, executor); diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java index 9400a19c1e..ebfe6d4d40 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java @@ -40,7 +40,7 @@ import java.util.Map; final class DefaultChannelGroupFuture extends DefaultPromise implements ChannelGroupFuture { private final ChannelGroup group; - private final Map futures; + private final Map futures; private int successCount; private int failureCount; @@ -62,11 +62,11 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements Ch if (callSetDone) { if (failureCount > 0) { - List> failed = - new ArrayList>(failureCount); + List> failed = + new ArrayList>(failureCount); for (ChannelFuture f: futures.values()) { if (!f.isSuccess()) { - failed.add(new DefaultEntry(f.channel().id(), f.cause())); + failed.add(new DefaultEntry(f.channel(), f.cause())); } } setFailure0(new ChannelGroupException(failed)); @@ -91,9 +91,9 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements Ch this.group = group; - Map futureMap = new LinkedHashMap(); + Map futureMap = new LinkedHashMap(); for (ChannelFuture f: futures) { - futureMap.put(f.channel().id(), f); + futureMap.put(f.channel(), f); } this.futures = Collections.unmodifiableMap(futureMap); @@ -108,7 +108,7 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements Ch } } - DefaultChannelGroupFuture(ChannelGroup group, Map futures, EventExecutor executor) { + DefaultChannelGroupFuture(ChannelGroup group, Map futures, EventExecutor executor) { super(executor); this.group = group; this.futures = Collections.unmodifiableMap(futures); @@ -127,14 +127,9 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements Ch return group; } - @Override - public ChannelFuture find(Integer channelId) { - return futures.get(channelId); - } - @Override public ChannelFuture find(Channel channel) { - return futures.get(channel.id()); + return futures.get(channel); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalAddress.java b/transport/src/main/java/io/netty/channel/local/LocalAddress.java index 901e16f066..6098cca470 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalAddress.java +++ b/transport/src/main/java/io/netty/channel/local/LocalAddress.java @@ -40,7 +40,7 @@ public final class LocalAddress extends SocketAddress implements Comparable