From 40003ed250a41286dd73683e62737e318ab003ce Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 15 Nov 2013 22:50:53 +0900 Subject: [PATCH] Resurrect Channel.id() with global uniqueness - Fixes #1810 - Add a new interface ChannelId and its default implementation which generates globally unique channel ID. - Replace AbstractChannel.hashCode with ChannelId.hashCode() and ChannelId.shortValue() - Add variants of ByteBuf.hexDump() which accept byte[] instead of ByteBuf. --- .../java/io/netty/buffer/ByteBufUtil.java | 32 ++ .../io/netty/channel/AbstractChannel.java | 51 ++- .../main/java/io/netty/channel/Channel.java | 5 + .../main/java/io/netty/channel/ChannelId.java | 56 +++ .../io/netty/channel/DefaultChannelId.java | 375 ++++++++++++++++++ .../netty/channel/DefaultChannelIdTest.java | 76 ++++ 6 files changed, 574 insertions(+), 21 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/ChannelId.java create mode 100644 transport/src/main/java/io/netty/channel/DefaultChannelId.java create mode 100644 transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java index 8860cbae74..f501ed13ca 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java @@ -75,6 +75,38 @@ public final class ByteBufUtil { return new String(buf); } + /** + * Returns a hex dump + * of the specified byte array. + */ + public static String hexDump(byte[] array) { + return hexDump(array, 0, array.length); + } + + /** + * Returns a hex dump + * of the specified byte array's sub-region. + */ + public static String hexDump(byte[] array, int fromIndex, int length) { + if (length < 0) { + throw new IllegalArgumentException("length: " + length); + } + if (length == 0) { + return ""; + } + + int endIndex = fromIndex + length; + char[] buf = new char[length << 1]; + + int srcIdx = fromIndex; + int dstIdx = 0; + for (; srcIdx < endIndex; srcIdx ++, dstIdx += 2) { + System.arraycopy(HEXDUMP_TABLE, (array[srcIdx] & 0xFF) << 1, buf, dstIdx, 2); + } + + return new String(buf); + } + /** * Calculates the hash code of the specified buffer. This method is * useful when implementing a new buffer type. diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index deaf67a2a0..b185c88612 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -20,7 +20,6 @@ import io.netty.util.DefaultAttributeMap; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -50,7 +49,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private MessageSizeEstimator.Handle estimatorHandle; private final Channel parent; - private final long hashCode = ThreadLocalRandom.current().nextLong(); + private final ChannelId id = DefaultChannelId.newInstance(); private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); @@ -79,6 +78,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline = new DefaultChannelPipeline(this); } + @Override + public final ChannelId id() { + return id; + } + @Override public boolean isWritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); @@ -285,7 +289,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ @Override public final int hashCode() { - return (int) hashCode; + return id.hashCode(); } /** @@ -303,21 +307,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return 0; } - long ret = hashCode - o.hashCode(); - if (ret > 0) { - return 1; - } - if (ret < 0) { - return -1; - } - - ret = System.identityHashCode(this) - System.identityHashCode(o); - if (ret != 0) { - return (int) ret; - } - - // Jackpot! - different objects with same hashes - throw new Error(); + return id().compareTo(o.id()); } /** @@ -345,11 +335,30 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha srcAddr = remoteAddr; dstAddr = localAddr; } - strVal = String.format("[id: 0x%08x, %s %s %s]", (int) hashCode, srcAddr, active? "=>" : ":>", dstAddr); + + StringBuilder buf = new StringBuilder(96); + buf.append("[id: 0x"); + buf.append(id.asShortText()); + buf.append(", "); + buf.append(srcAddr); + buf.append(active? " => " : " :> "); + buf.append(dstAddr); + buf.append(']'); + strVal = buf.toString(); } else if (localAddr != null) { - strVal = String.format("[id: 0x%08x, %s]", (int) hashCode, localAddr); + StringBuilder buf = new StringBuilder(64); + buf.append("[id: 0x"); + buf.append(id.asShortText()); + buf.append(", "); + buf.append(localAddr); + buf.append(']'); + strVal = buf.toString(); } else { - strVal = String.format("[id: 0x%08x]", (int) hashCode); + StringBuilder buf = new StringBuilder(16); + buf.append("[id: 0x"); + buf.append(id.asShortText()); + buf.append(']'); + strVal = buf.toString(); } strValActive = active; diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 1e397be570..93f1883301 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -73,6 +73,11 @@ import java.net.SocketAddress; */ public interface Channel extends AttributeMap, Comparable { + /** + * Returns the globally unique identifier of this {@link Channel}. + */ + ChannelId id(); + /** * Return the {@link EventLoop} this {@link Channel} was registered too. */ diff --git a/transport/src/main/java/io/netty/channel/ChannelId.java b/transport/src/main/java/io/netty/channel/ChannelId.java new file mode 100644 index 0000000000..b62fff8881 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelId.java @@ -0,0 +1,56 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import java.io.Serializable; + +/** + * Represents the globally unique identifier of a {@link Channel}. + *

+ * The identifier is generated from various sources listed in the following: + *

+ *

+ *

+ * The global uniqueness of the generated identifier mostly depends on the MAC address and the current process ID, + * which are auto-detected at the class-loading time in best-effort manner. If all attempts to acquire them fail, + * a warning message is logged, and random values will be used instead. Alternatively, you can specify them manually + * via system properties: + *

+ *

+ */ +public interface ChannelId extends Serializable, Comparable { + /** + * Returns the short but globally non-unique string representation of the {@link ChannelId}. + */ + String asShortText(); + + /** + * Returns the long yet globally unique string representation of the {@link ChannelId}. + */ + String asLongText(); +} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelId.java b/transport/src/main/java/io/netty/channel/DefaultChannelId.java new file mode 100644 index 0000000000..2f3ae26a00 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultChannelId.java @@ -0,0 +1,375 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.buffer.ByteBufUtil; +import io.netty.util.internal.SystemPropertyUtil; +import io.netty.util.internal.ThreadLocalRandom; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.lang.management.ManagementFactory; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +/** + * The default {@link ChannelId} implementation. + */ +final class DefaultChannelId implements ChannelId { + + private static final long serialVersionUID = 3884076183504074063L; + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelId.class); + + private static final Pattern MACHINE_ID_PATTERN = Pattern.compile("^(?:[0-9a-fA-F][:-]?){6,8}$"); + private static final int MACHINE_ID_LEN = 8; + private static final byte[] MACHINE_ID; + private static final int PROCESS_ID_LEN = 2; + private static final int MAX_PROCESS_ID = 65535; + private static final int PROCESS_ID; + private static final int SEQUENCE_LEN = 4; + private static final int TIMESTAMP_LEN = 8; + private static final int RANDOM_LEN = 4; + + private static final AtomicInteger nextSequence = new AtomicInteger(); + + static ChannelId newInstance() { + DefaultChannelId id = new DefaultChannelId(); + id.init(); + return id; + } + + static { + byte[] machineId = null; + String customMachineId = SystemPropertyUtil.get("io.netty.machineId"); + if (customMachineId != null) { + if (MACHINE_ID_PATTERN.matcher(customMachineId).matches()) { + machineId = parseMachineId(customMachineId); + logger.debug("-Dio.netty.machineId: {} (user-set)", customMachineId); + } else { + logger.warn("-Dio.netty.machineId: {} (malformed)", customMachineId); + } + } + + if (machineId == null) { + machineId = defaultMachineId(); + if (logger.isDebugEnabled()) { + logger.debug("-Dio.netty.machineId: {} (auto-detected)", formatAddress(machineId)); + } + } + + MACHINE_ID = machineId; + + int processId = -1; + String customProcessId = SystemPropertyUtil.get("io.netty.processId"); + if (customProcessId != null) { + try { + processId = Integer.parseInt(customProcessId); + } catch (NumberFormatException e) { + // Malformed input. + } + + if (processId < 0 || processId > MAX_PROCESS_ID) { + processId = -1; + logger.warn("-Dio.netty.processId: {} (malformed)", customProcessId); + } else if (logger.isDebugEnabled()) { + logger.debug("-Dio.netty.processId: {} (user-set)", processId); + } + } + + if (processId < 0) { + processId = defaultProcessId(); + if (logger.isDebugEnabled()) { + logger.debug("-Dio.netty.processId: {} (auto-detected)", processId); + } + } + + PROCESS_ID = processId; + } + + @SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") + private static byte[] parseMachineId(String value) { + // Strip separators. + value = value.replaceAll("[:-]", ""); + + byte[] machineId = new byte[MACHINE_ID_LEN]; + for (int i = 0; i < value.length(); i += 2) { + machineId[i] = (byte) Integer.parseInt(value.substring(i, i + 2), 16); + } + + return machineId; + } + + private static byte[] defaultMachineId() { + // Find the best MAC address available. + final byte[] NOT_FOUND = { -1 }; + byte[] bestMacAddr = NOT_FOUND; + + Enumeration ifaces = null; + try { + ifaces = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e) { + logger.warn("Failed to find the loopback interface", e); + } + + if (ifaces != null) { + while (ifaces.hasMoreElements()) { + NetworkInterface iface = ifaces.nextElement(); + try { + if (iface.isLoopback() || iface.isPointToPoint() || iface.isVirtual()) { + continue; + } + } catch (SocketException e) { + logger.debug("Failed to determine the type of a network interface: {}", iface, e); + continue; + } + + byte[] macAddr; + try { + macAddr = iface.getHardwareAddress(); + } catch (SocketException e) { + logger.debug("Failed to get the hardware address of a network interface: {}", iface, e); + continue; + } + + if (isBetterAddress(bestMacAddr, macAddr)) { + bestMacAddr = macAddr; + } + } + } + + if (bestMacAddr == NOT_FOUND) { + bestMacAddr = new byte[MACHINE_ID_LEN]; + ThreadLocalRandom.current().nextBytes(bestMacAddr); + logger.warn( + "Failed to find a usable hardware address from the network interfaces; using random bytes: {}", + formatAddress(bestMacAddr)); + } + + if (bestMacAddr.length != MACHINE_ID_LEN) { + bestMacAddr = Arrays.copyOf(bestMacAddr, MACHINE_ID_LEN); + } + + return bestMacAddr; + } + + private static boolean isBetterAddress(byte[] current, byte[] candidate) { + if (candidate == null) { + return false; + } + + // Must be EUI-48 or longer. + if (candidate.length < 6) { + return false; + } + + // Must not be filled with only 0 or 1. + boolean onlyZero = true; + boolean onlyOne = true; + for (byte b: candidate) { + if (b != 0) { + onlyZero = false; + } + + if (b != -1) { + onlyOne = false; + } + } + + if (onlyZero || onlyOne) { + return false; + } + + // Must not be a multicast address + if ((candidate[0] & 1) != 0) { + return false; + } + + // Prefer longer globally unique addresses. + if ((current[0] & 2) == 0) { + if ((candidate[0] & 2) == 0) { + return candidate.length > current.length; + } else { + return false; + } + } else { + if ((candidate[0] & 2) == 0) { + return true; + } else { + return candidate.length > current.length; + } + } + } + + private static String formatAddress(byte[] addr) { + StringBuilder buf = new StringBuilder(24); + for (byte b: addr) { + buf.append(String.format("%02x:", b & 0xff)); + } + return buf.substring(0, buf.length() - 1); + } + + private static int defaultProcessId() { + String value = ManagementFactory.getRuntimeMXBean().getName(); + int atIndex = value.indexOf('@'); + if (atIndex >= 0) { + value = value.substring(0, atIndex); + } + + int pid; + try { + pid = Integer.parseInt(value); + } catch (NumberFormatException e) { + pid = -1; + } + + if (pid < 0 || pid > MAX_PROCESS_ID) { + pid = ThreadLocalRandom.current().nextInt(MAX_PROCESS_ID + 1); + logger.warn("Failed to find the current process ID; using a random value: {}", pid); + } + + return pid; + } + + private final byte[] data = new byte[MACHINE_ID_LEN + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN]; + private int hashCode; + + private transient String shortValue; + private transient String longValue; + + public DefaultChannelId() { } + + private void init() { + int i = 0; + + // machineId + System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID_LEN); + i += MACHINE_ID_LEN; + + // processId + i = writeShort(i, PROCESS_ID); + + // sequence + i = writeInt(i, nextSequence.getAndIncrement()); + + // timestamp (kind of) + i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis()); + + // random + int random = ThreadLocalRandom.current().nextInt(); + hashCode = random; + i = writeInt(i, random); + + assert i == data.length; + } + + private int writeShort(int i, int value) { + data[i ++] = (byte) (value >>> 8); + data[i ++] = (byte) value; + return i; + } + + private int writeInt(int i, int value) { + data[i ++] = (byte) (value >>> 24); + data[i ++] = (byte) (value >>> 16); + data[i ++] = (byte) (value >>> 8); + data[i ++] = (byte) value; + return i; + } + + private int writeLong(int i, long value) { + data[i ++] = (byte) (value >>> 56); + data[i ++] = (byte) (value >>> 48); + data[i ++] = (byte) (value >>> 40); + data[i ++] = (byte) (value >>> 32); + data[i ++] = (byte) (value >>> 24); + data[i ++] = (byte) (value >>> 16); + data[i ++] = (byte) (value >>> 8); + data[i ++] = (byte) value; + return i; + } + + @Override + public String asShortText() { + String shortValue = this.shortValue; + if (shortValue == null) { + this.shortValue = shortValue = ByteBufUtil.hexDump( + data, MACHINE_ID_LEN + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN, RANDOM_LEN); + } + return shortValue; + } + + @Override + public String asLongText() { + String longValue = this.longValue; + if (longValue == null) { + this.longValue = longValue = newLongValue(); + } + return longValue; + } + + private String newLongValue() { + StringBuilder buf = new StringBuilder(data.length + 4); + int i = 0; + i = appendHexDumpField(buf, i, MACHINE_ID_LEN); + i = appendHexDumpField(buf, i, PROCESS_ID_LEN); + i = appendHexDumpField(buf, i, SEQUENCE_LEN); + i = appendHexDumpField(buf, i, TIMESTAMP_LEN); + i = appendHexDumpField(buf, i, RANDOM_LEN); + assert i == data.length; + return buf.substring(0, buf.length() - 1); + } + + private int appendHexDumpField(StringBuilder buf, int i, int length) { + buf.append(ByteBufUtil.hexDump(data, i, length)); + buf.append('-'); + i += length; + return i; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public int compareTo(ChannelId o) { + return 0; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof DefaultChannelId)) { + return false; + } + + return Arrays.equals(data, ((DefaultChannelId) obj).data); + } + + @Override + public String toString() { + return asShortText(); + } +} diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java new file mode 100644 index 0000000000..3a2cee35c7 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import org.junit.Test; + +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +@SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") +public class DefaultChannelIdTest { + @Test + public void testShortText() { + String text = DefaultChannelId.newInstance().asShortText(); + assertTrue(text.matches("^[0-9a-f]{8}$")); + } + + @Test + public void testLongText() { + String text = DefaultChannelId.newInstance().asLongText(); + assertTrue(text.matches("^[0-9a-f]{16}-[0-9a-f]{4}-[0-9a-f]{8}-[0-9a-f]{16}-[0-9a-f]{8}$")); + } + + @Test + public void testIdempotentMachineId() { + String a = DefaultChannelId.newInstance().asLongText().substring(0, 16); + String b = DefaultChannelId.newInstance().asLongText().substring(0, 16); + assertThat(a, is(b)); + } + + @Test + public void testIdempotentProcessId() { + String a = DefaultChannelId.newInstance().asLongText().substring(17, 21); + String b = DefaultChannelId.newInstance().asLongText().substring(17, 21); + assertThat(a, is(b)); + } + + @Test + public void testSerialization() throws Exception { + ChannelId a = DefaultChannelId.newInstance(); + ChannelId b; + + ByteBuf buf = Unpooled.buffer(); + ObjectOutputStream out = new ObjectOutputStream(new ByteBufOutputStream(buf)); + out.writeObject(a); + out.flush(); + + ObjectInputStream in = new ObjectInputStream(new ByteBufInputStream(buf)); + b = (ChannelId) in.readObject(); + + assertThat(a, is(b)); + assertThat(a, is(not(sameInstance(b)))); + assertThat(a.asLongText(), is(b.asLongText())); + } +}