Reduce garbage on MQTT (#10509)

Reduce garbage on MQTT encoding

Motivation:

MQTT encoding and decoding is doing unnecessary object allocation in a number of places:
- MqttEncoder create many byte[] to encode Strings into UTF-8 bytes
- MqttProperties uses Integer keys instead of int
- Some enums valueOf create unnecessary arrays on the hot paths
- MqttDecoder was using unecessary Result<T>

Modification:

- ByteBufUtil::utf8Bytes and ByteBufUtil::reserveAndWriteUtf8 allows to perform the same operation GC-free
- MqttProperties uses a primitive key map
- Implemented GC free const table lookup/switch valueOf
- Use some bit-tricks to pack 2 ints into a single primitive long to store both result and numberOfBytesConsumed and use byte[].length to compute numberOfByteConsumed on fly. These changes allowed to save creating Result<T>.

Result:
Significantly less garbage produced in MQTT encoding/decoding
This commit is contained in:
Francesco Nigro 2020-09-04 18:27:22 +02:00 committed by Chris Vest
parent ed6ad17caa
commit 319a4bc3ba
10 changed files with 768 additions and 165 deletions

View File

@ -16,10 +16,6 @@
package io.netty.handler.codec.mqtt; package io.netty.handler.codec.mqtt;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/** /**
* Return Code of {@link MqttConnAckMessage} * Return Code of {@link MqttConnAckMessage}
*/ */
@ -54,14 +50,15 @@ public enum MqttConnectReturnCode {
CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D), CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D),
CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F); CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F);
private static final Map<Byte, MqttConnectReturnCode> VALUE_TO_CODE_MAP; private static final MqttConnectReturnCode[] VALUES;
static { static {
final Map<Byte, MqttConnectReturnCode> valueMap = new HashMap<>(); MqttConnectReturnCode[] values = values();
for (MqttConnectReturnCode code: values()) { VALUES = new MqttConnectReturnCode[160];
valueMap.put(code.byteValue, code); for (MqttConnectReturnCode code : values) {
final int unsignedByte = code.byteValue & 0xFF;
VALUES[unsignedByte] = code;
} }
VALUE_TO_CODE_MAP = Collections.unmodifiableMap(valueMap);
} }
private final byte byteValue; private final byte byteValue;
@ -75,9 +72,16 @@ public enum MqttConnectReturnCode {
} }
public static MqttConnectReturnCode valueOf(byte b) { public static MqttConnectReturnCode valueOf(byte b) {
if (VALUE_TO_CODE_MAP.containsKey(b)) { final int unsignedByte = b & 0xFF;
return VALUE_TO_CODE_MAP.get(b); MqttConnectReturnCode mqttConnectReturnCode = null;
try {
mqttConnectReturnCode = VALUES[unsignedByte];
} catch (ArrayIndexOutOfBoundsException ignored) {
// no op
} }
throw new IllegalArgumentException("unknown connect return code: " + (b & 0xFF)); if (mqttConnectReturnCode == null) {
throw new IllegalArgumentException("unknown connect return code: " + unsignedByte);
}
return mqttConnectReturnCode;
} }
} }

View File

@ -22,6 +22,7 @@ import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState; import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import java.util.ArrayList; import java.util.ArrayList;
@ -234,8 +235,8 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
final int b1 = buffer.readUnsignedByte(); final int b1 = buffer.readUnsignedByte();
numberOfBytesConsumed += 1; numberOfBytesConsumed += 1;
final Result<Integer> keepAlive = decodeMsbLsb(buffer); final int keepAlive = decodeMsbLsb(buffer);
numberOfBytesConsumed += keepAlive.numberOfBytesConsumed; numberOfBytesConsumed += 2;
final boolean hasUserName = (b1 & 0x80) == 0x80; final boolean hasUserName = (b1 & 0x80) == 0x80;
final boolean hasPassword = (b1 & 0x40) == 0x40; final boolean hasPassword = (b1 & 0x40) == 0x40;
@ -271,7 +272,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
willQos, willQos,
willFlag, willFlag,
cleanSession, cleanSession,
keepAlive.value, keepAlive,
properties); properties);
return new Result<>(mqttConnectVariableHeader, numberOfBytesConsumed); return new Result<>(mqttConnectVariableHeader, numberOfBytesConsumed);
} }
@ -298,59 +299,53 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
return new Result<>(mqttConnAckVariableHeader, numberOfBytesConsumed); return new Result<>(mqttConnAckVariableHeader, numberOfBytesConsumed);
} }
private static Result<MqttMessageIdVariableHeader> decodeMessageIdVariableHeader(ByteBuf buffer) {
final Result<Integer> messageId = decodeMessageId(buffer);
return new Result<>(
MqttMessageIdVariableHeader.from(messageId.value),
messageId.numberOfBytesConsumed);
}
private static Result<MqttMessageIdAndPropertiesVariableHeader> decodeMessageIdAndPropertiesVariableHeader( private static Result<MqttMessageIdAndPropertiesVariableHeader> decodeMessageIdAndPropertiesVariableHeader(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
ByteBuf buffer) { ByteBuf buffer) {
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
final Result<Integer> packetId = decodeMessageId(buffer); final int packetId = decodeMessageId(buffer);
final MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader; final MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader;
final int mqtt5Consumed; final int mqtt5Consumed;
if (mqttVersion == MqttVersion.MQTT_5) { if (mqttVersion == MqttVersion.MQTT_5) {
final Result<MqttProperties> properties = decodeProperties(buffer); final Result<MqttProperties> properties = decodeProperties(buffer);
mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId.value, properties.value); mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
mqtt5Consumed = properties.numberOfBytesConsumed; mqtt5Consumed = properties.numberOfBytesConsumed;
} else { } else {
mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId.value, mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId,
MqttProperties.NO_PROPERTIES); MqttProperties.NO_PROPERTIES);
mqtt5Consumed = 0; mqtt5Consumed = 0;
} }
return new Result<MqttMessageIdAndPropertiesVariableHeader>(mqttVariableHeader, return new Result<MqttMessageIdAndPropertiesVariableHeader>(mqttVariableHeader,
packetId.numberOfBytesConsumed + mqtt5Consumed); 2 + mqtt5Consumed);
} }
private Result<MqttPubReplyMessageVariableHeader> decodePubReplyMessage(ByteBuf buffer) { private Result<MqttPubReplyMessageVariableHeader> decodePubReplyMessage(ByteBuf buffer) {
final Result<Integer> packetId = decodeMessageId(buffer); final int packetId = decodeMessageId(buffer);
final MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader; final MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader;
final int consumed; final int consumed;
final int packetIdNumberOfBytesConsumed = 2;
if (bytesRemainingInVariablePart > 3) { if (bytesRemainingInVariablePart > 3) {
final byte reasonCode = buffer.readByte(); final byte reasonCode = buffer.readByte();
final Result<MqttProperties> properties = decodeProperties(buffer); final Result<MqttProperties> properties = decodeProperties(buffer);
mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value, mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
reasonCode, reasonCode,
properties.value); properties.value);
consumed = packetId.numberOfBytesConsumed + 1 + properties.numberOfBytesConsumed; consumed = packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
} else if (bytesRemainingInVariablePart > 2) { } else if (bytesRemainingInVariablePart > 2) {
final byte reasonCode = buffer.readByte(); final byte reasonCode = buffer.readByte();
mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value, mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
reasonCode, reasonCode,
MqttProperties.NO_PROPERTIES); MqttProperties.NO_PROPERTIES);
consumed = packetId.numberOfBytesConsumed + 1; consumed = packetIdNumberOfBytesConsumed + 1;
} else { } else {
mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value, mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
(byte) 0, (byte) 0,
MqttProperties.NO_PROPERTIES); MqttProperties.NO_PROPERTIES);
consumed = packetId.numberOfBytesConsumed; consumed = packetIdNumberOfBytesConsumed;
} }
return new Result<MqttPubReplyMessageVariableHeader>(mqttPubAckVariableHeader, consumed); return new Result<MqttPubReplyMessageVariableHeader>(mqttPubAckVariableHeader, consumed);
@ -396,9 +391,8 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
int messageId = -1; int messageId = -1;
if (mqttFixedHeader.qosLevel().value() > 0) { if (mqttFixedHeader.qosLevel().value() > 0) {
final Result<Integer> decodedMessageId = decodeMessageId(buffer); messageId = decodeMessageId(buffer);
messageId = decodedMessageId.value; numberOfBytesConsumed += 2;
numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
} }
final MqttProperties properties; final MqttProperties properties;
@ -415,10 +409,13 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
return new Result<>(mqttPublishVariableHeader, numberOfBytesConsumed); return new Result<>(mqttPublishVariableHeader, numberOfBytesConsumed);
} }
private static Result<Integer> decodeMessageId(ByteBuf buffer) { /**
final Result<Integer> messageId = decodeMsbLsb(buffer); * @return messageId with numberOfBytesConsumed is 2
if (!isValidMessageId(messageId.value)) { */
throw new DecoderException("invalid messageId: " + messageId.value); private static int decodeMessageId(ByteBuf buffer) {
final int messageId = decodeMsbLsb(buffer);
if (!isValidMessageId(messageId)) {
throw new DecoderException("invalid messageId: " + messageId);
} }
return messageId; return messageId;
} }
@ -476,7 +473,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed; int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
Result<String> decodedWillTopic = null; Result<String> decodedWillTopic = null;
Result<byte[]> decodedWillMessage = null; byte[] decodedWillMessage = null;
final MqttProperties willProperties; final MqttProperties willProperties;
if (mqttConnectVariableHeader.isWillFlag()) { if (mqttConnectVariableHeader.isWillFlag()) {
@ -490,19 +487,19 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
decodedWillTopic = decodeString(buffer, 0, 32767); decodedWillTopic = decodeString(buffer, 0, 32767);
numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed; numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
decodedWillMessage = decodeByteArray(buffer); decodedWillMessage = decodeByteArray(buffer);
numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed; numberOfBytesConsumed += decodedWillMessage.length + 2;
} else { } else {
willProperties = MqttProperties.NO_PROPERTIES; willProperties = MqttProperties.NO_PROPERTIES;
} }
Result<String> decodedUserName = null; Result<String> decodedUserName = null;
Result<byte[]> decodedPassword = null; byte[] decodedPassword = null;
if (mqttConnectVariableHeader.hasUserName()) { if (mqttConnectVariableHeader.hasUserName()) {
decodedUserName = decodeString(buffer); decodedUserName = decodeString(buffer);
numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed; numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
} }
if (mqttConnectVariableHeader.hasPassword()) { if (mqttConnectVariableHeader.hasPassword()) {
decodedPassword = decodeByteArray(buffer); decodedPassword = decodeByteArray(buffer);
numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed; numberOfBytesConsumed += decodedPassword.length + 2;
} }
final MqttConnectPayload mqttConnectPayload = final MqttConnectPayload mqttConnectPayload =
@ -510,9 +507,9 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
decodedClientId.value, decodedClientId.value,
willProperties, willProperties,
decodedWillTopic != null ? decodedWillTopic.value : null, decodedWillTopic != null ? decodedWillTopic.value : null,
decodedWillMessage != null ? decodedWillMessage.value : null, decodedWillMessage != null ? decodedWillMessage : null,
decodedUserName != null ? decodedUserName.value : null, decodedUserName != null ? decodedUserName.value : null,
decodedPassword != null ? decodedPassword.value : null); decodedPassword != null ? decodedPassword : null);
return new Result<>(mqttConnectPayload, numberOfBytesConsumed); return new Result<>(mqttConnectPayload, numberOfBytesConsumed);
} }
@ -598,9 +595,8 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
} }
private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) { private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
final Result<Integer> decodedSize = decodeMsbLsb(buffer); int size = decodeMsbLsb(buffer);
int size = decodedSize.value; int numberOfBytesConsumed = 2;
int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed;
if (size < minBytes || size > maxBytes) { if (size < minBytes || size > maxBytes) {
buffer.skipBytes(size); buffer.skipBytes(size);
numberOfBytesConsumed += size; numberOfBytesConsumed += size;
@ -612,36 +608,53 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
return new Result<>(s, numberOfBytesConsumed); return new Result<>(s, numberOfBytesConsumed);
} }
private static Result<byte[]> decodeByteArray(ByteBuf buffer) { /**
final Result<Integer> decodedSize = decodeMsbLsb(buffer); *
int size = decodedSize.value; * @return the decoded byte[], numberOfBytesConsumed = byte[].length + 2
*/
private static byte[] decodeByteArray(ByteBuf buffer) {
int size = decodeMsbLsb(buffer);
byte[] bytes = new byte[size]; byte[] bytes = new byte[size];
buffer.readBytes(bytes); buffer.readBytes(bytes);
return new Result<>(bytes, decodedSize.numberOfBytesConsumed + size); return bytes;
} }
private static Result<Integer> decodeMsbLsb(ByteBuf buffer) { // packing utils to reduce the amount of garbage while decoding ints
return decodeMsbLsb(buffer, 0, 65535); private static long packInts(int a, int b) {
return (((long) a) << 32) | (b & 0xFFFFFFFFL);
} }
private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) { private static int unpackA(long ints) {
return (int) (ints >> 32);
}
private static int unpackB(long ints) {
return (int) ints;
}
/**
* numberOfBytesConsumed = 2. return decoded result.
*/
private static int decodeMsbLsb(ByteBuf buffer) {
int min = 0;
int max = 65535;
short msbSize = buffer.readUnsignedByte(); short msbSize = buffer.readUnsignedByte();
short lsbSize = buffer.readUnsignedByte(); short lsbSize = buffer.readUnsignedByte();
final int numberOfBytesConsumed = 2;
int result = msbSize << 8 | lsbSize; int result = msbSize << 8 | lsbSize;
if (result < min || result > max) { if (result < min || result > max) {
result = -1; result = -1;
} }
return new Result<>(result, numberOfBytesConsumed); return result;
} }
/** /**
* See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
* *
* @param buffer the buffer to decode from * @param buffer the buffer to decode from
* @return decoded integer * @return result pack with a = decoded integer, b = numberOfBytesConsumed. Need to unpack to read them.
* @throws DecoderException if bad MQTT protocol limits Remaining Length
*/ */
private static Result<Integer> decodeVariableByteInteger(ByteBuf buffer) { private static long decodeVariableByteInteger(ByteBuf buffer) {
int remainingLength = 0; int remainingLength = 0;
int multiplier = 1; int multiplier = 1;
short digit; short digit;
@ -653,11 +666,10 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
loops++; loops++;
} while ((digit & 128) != 0 && loops < 4); } while ((digit & 128) != 0 && loops < 4);
// MQTT protocol limits Remaining Length to 4 bytes
if (loops == 4 && (digit & 128) != 0) { if (loops == 4 && (digit & 128) != 0) {
return null; throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
} }
return new Result<Integer>(remainingLength, loops); return packInts(remainingLength, loops);
} }
private static final class Result<T> { private static final class Result<T> {
@ -672,16 +684,16 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
} }
private static Result<MqttProperties> decodeProperties(ByteBuf buffer) { private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
final Result<Integer> propertiesLength = decodeVariableByteInteger(buffer); final long propertiesLength = decodeVariableByteInteger(buffer);
int totalPropertiesLength = propertiesLength.value; int totalPropertiesLength = unpackA(propertiesLength);
int numberOfBytesConsumed = propertiesLength.numberOfBytesConsumed; int numberOfBytesConsumed = unpackB(propertiesLength);
MqttProperties decodedProperties = new MqttProperties(); MqttProperties decodedProperties = new MqttProperties();
while (numberOfBytesConsumed < totalPropertiesLength) { while (numberOfBytesConsumed < totalPropertiesLength) {
Result<Integer> propertyId = decodeVariableByteInteger(buffer); long propertyId = decodeVariableByteInteger(buffer);
numberOfBytesConsumed += propertyId.numberOfBytesConsumed; final int propertyIdValue = unpackA(propertyId);
numberOfBytesConsumed += unpackB(propertyId);
MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyId.value); MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
switch (propertyType) { switch (propertyType) {
case PAYLOAD_FORMAT_INDICATOR: case PAYLOAD_FORMAT_INDICATOR:
case REQUEST_PROBLEM_INFORMATION: case REQUEST_PROBLEM_INFORMATION:
@ -693,15 +705,15 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
case SHARED_SUBSCRIPTION_AVAILABLE: case SHARED_SUBSCRIPTION_AVAILABLE:
final int b1 = buffer.readUnsignedByte(); final int b1 = buffer.readUnsignedByte();
numberOfBytesConsumed++; numberOfBytesConsumed++;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, b1)); decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
break; break;
case SERVER_KEEP_ALIVE: case SERVER_KEEP_ALIVE:
case RECEIVE_MAXIMUM: case RECEIVE_MAXIMUM:
case TOPIC_ALIAS_MAXIMUM: case TOPIC_ALIAS_MAXIMUM:
case TOPIC_ALIAS: case TOPIC_ALIAS:
final Result<Integer> int2BytesResult = decodeMsbLsb(buffer); final int int2BytesResult = decodeMsbLsb(buffer);
numberOfBytesConsumed += int2BytesResult.numberOfBytesConsumed; numberOfBytesConsumed += 2;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, int2BytesResult.value)); decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
break; break;
case PUBLICATION_EXPIRY_INTERVAL: case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL: case SESSION_EXPIRY_INTERVAL:
@ -709,12 +721,12 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
case MAXIMUM_PACKET_SIZE: case MAXIMUM_PACKET_SIZE:
final int maxPacketSize = buffer.readInt(); final int maxPacketSize = buffer.readInt();
numberOfBytesConsumed += 4; numberOfBytesConsumed += 4;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, maxPacketSize)); decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
break; break;
case SUBSCRIPTION_IDENTIFIER: case SUBSCRIPTION_IDENTIFIER:
Result<Integer> vbIntegerResult = decodeVariableByteInteger(buffer); long vbIntegerResult = decodeVariableByteInteger(buffer);
numberOfBytesConsumed += vbIntegerResult.numberOfBytesConsumed; numberOfBytesConsumed += unpackB(vbIntegerResult);
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, vbIntegerResult.value)); decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
break; break;
case CONTENT_TYPE: case CONTENT_TYPE:
case RESPONSE_TOPIC: case RESPONSE_TOPIC:
@ -725,7 +737,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
case REASON_STRING: case REASON_STRING:
final Result<String> stringResult = decodeString(buffer); final Result<String> stringResult = decodeString(buffer);
numberOfBytesConsumed += stringResult.numberOfBytesConsumed; numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
decodedProperties.add(new MqttProperties.StringProperty(propertyId.value, stringResult.value)); decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
break; break;
case USER_PROPERTY: case USER_PROPERTY:
final Result<String> keyResult = decodeString(buffer); final Result<String> keyResult = decodeString(buffer);
@ -736,9 +748,9 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
break; break;
case CORRELATION_DATA: case CORRELATION_DATA:
case AUTHENTICATION_DATA: case AUTHENTICATION_DATA:
final Result<byte[]> binaryDataResult = decodeByteArray(buffer); final byte[] binaryDataResult = decodeByteArray(buffer);
numberOfBytesConsumed += binaryDataResult.numberOfBytesConsumed; numberOfBytesConsumed += binaryDataResult.length + 2;
decodedProperties.add(new MqttProperties.BinaryProperty(propertyId.value, binaryDataResult.value)); decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
break; break;
default: default:
//shouldn't reach here //shouldn't reach here

View File

@ -23,12 +23,14 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import java.util.List; import java.util.List;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.*; import static io.netty.buffer.ByteBufUtil.*;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
/** /**
* Encodes Mqtt messages into bytes following the protocol specification v3.1 * Encodes Mqtt messages into bytes following the protocol specification v3.1
@ -113,7 +115,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
MqttConnectPayload payload = message.payload(); MqttConnectPayload payload = message.payload();
MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(), MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
(byte) variableHeader.version()); (byte) variableHeader.version());
MqttCodecUtil.setMqttVersion(ctx, mqttVersion); setMqttVersion(ctx, mqttVersion);
// as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0 // as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
if (!variableHeader.hasUserName() && variableHeader.hasPassword()) { if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
@ -125,23 +127,23 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
if (!isValidClientId(mqttVersion, clientIdentifier)) { if (!isValidClientId(mqttVersion, clientIdentifier)) {
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier); throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
} }
byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier); int clientIdentifierBytes = utf8Bytes(clientIdentifier);
payloadBufferSize += 2 + clientIdentifierBytes.length; payloadBufferSize += 2 + clientIdentifierBytes;
// Will topic and message // Will topic and message
String willTopic = payload.willTopic(); String willTopic = payload.willTopic();
byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES; int willTopicBytes = nullableUtf8Bytes(willTopic);
byte[] willMessage = payload.willMessageInBytes(); byte[] willMessage = payload.willMessageInBytes();
byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES; byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
if (variableHeader.isWillFlag()) { if (variableHeader.isWillFlag()) {
payloadBufferSize += 2 + willTopicBytes.length; payloadBufferSize += 2 + willTopicBytes;
payloadBufferSize += 2 + willMessageBytes.length; payloadBufferSize += 2 + willMessageBytes.length;
} }
String userName = payload.userName(); String userName = payload.userName();
byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES; int userNameBytes = nullableUtf8Bytes(userName);
if (variableHeader.hasUserName()) { if (variableHeader.hasUserName()) {
payloadBufferSize += 2 + userNameBytes.length; payloadBufferSize += 2 + userNameBytes;
} }
byte[] password = payload.passwordInBytes(); byte[] password = payload.passwordInBytes();
@ -182,18 +184,15 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
buf.writeBytes(propertiesBuf); buf.writeBytes(propertiesBuf);
// Payload // Payload
buf.writeShort(clientIdentifierBytes.length); writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
if (variableHeader.isWillFlag()) { if (variableHeader.isWillFlag()) {
buf.writeBytes(willPropertiesBuf); buf.writeBytes(willPropertiesBuf);
buf.writeShort(willTopicBytes.length); writeExactUTF8String(buf, willTopic, willTopicBytes);
buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
buf.writeShort(willMessageBytes.length); buf.writeShort(willMessageBytes.length);
buf.writeBytes(willMessageBytes, 0, willMessageBytes.length); buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
} }
if (variableHeader.hasUserName()) { if (variableHeader.hasUserName()) {
buf.writeShort(userNameBytes.length); writeExactUTF8String(buf, userName, userNameBytes);
buf.writeBytes(userNameBytes, 0, userNameBytes.length);
} }
if (variableHeader.hasPassword()) { if (variableHeader.hasPassword()) {
buf.writeShort(passwordBytes.length); buf.writeShort(passwordBytes.length);
@ -232,7 +231,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
private static ByteBuf encodeConnAckMessage( private static ByteBuf encodeConnAckMessage(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttConnAckMessage message) { MqttConnAckMessage message) {
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); final MqttVersion mqttVersion = getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(), ctx.alloc(),
message.variableHeader().properties()); message.variableHeader().properties());
@ -253,7 +252,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
private static ByteBuf encodeSubscribeMessage( private static ByteBuf encodeSubscribeMessage(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttSubscribeMessage message) { MqttSubscribeMessage message) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttVersion mqttVersion = getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(), ctx.alloc(),
message.idAndPropertiesVariableHeader().properties()); message.idAndPropertiesVariableHeader().properties());
@ -268,8 +267,8 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
for (MqttTopicSubscription topic : payload.topicSubscriptions()) { for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName(); String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName); int topicNameBytes = utf8Bytes(topicName);
payloadBufferSize += 2 + topicNameBytes.length; payloadBufferSize += 2 + topicNameBytes;
payloadBufferSize += 1; payloadBufferSize += 1;
} }
@ -287,7 +286,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
// Payload // Payload
for (MqttTopicSubscription topic : payload.topicSubscriptions()) { for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
writeUTF8String(buf, topic.topicName()); writeUnsafeUTF8String(buf, topic.topicName());
final MqttSubscriptionOption option = topic.option(); final MqttSubscriptionOption option = topic.option();
int optionEncoded = option.retainHandling().value() << 4; int optionEncoded = option.retainHandling().value() << 4;
@ -311,7 +310,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
private static ByteBuf encodeUnsubscribeMessage( private static ByteBuf encodeUnsubscribeMessage(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttUnsubscribeMessage message) { MqttUnsubscribeMessage message) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttVersion mqttVersion = getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(), ctx.alloc(),
message.idAndPropertiesVariableHeader().properties()); message.idAndPropertiesVariableHeader().properties());
@ -325,8 +324,8 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
MqttUnsubscribePayload payload = message.payload(); MqttUnsubscribePayload payload = message.payload();
for (String topicName : payload.topics()) { for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName); int topicNameBytes = utf8Bytes(topicName);
payloadBufferSize += 2 + topicNameBytes.length; payloadBufferSize += 2 + topicNameBytes;
} }
int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
@ -343,9 +342,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
// Payload // Payload
for (String topicName : payload.topics()) { for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName); writeUnsafeUTF8String(buf, topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
} }
return buf; return buf;
@ -357,7 +354,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
private static ByteBuf encodeSubAckMessage( private static ByteBuf encodeSubAckMessage(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttSubAckMessage message) { MqttSubAckMessage message) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttVersion mqttVersion = getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(), ctx.alloc(),
message.idAndPropertiesVariableHeader().properties()); message.idAndPropertiesVariableHeader().properties());
@ -385,7 +382,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttUnsubAckMessage message) { MqttUnsubAckMessage message) {
if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) { if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttVersion mqttVersion = getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(), ctx.alloc(),
message.idAndPropertiesVariableHeader().properties()); message.idAndPropertiesVariableHeader().properties());
@ -416,20 +413,20 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
private static ByteBuf encodePublishMessage( private static ByteBuf encodePublishMessage(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttPublishMessage message) { MqttPublishMessage message) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttVersion mqttVersion = getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader(); MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuf payload = message.payload().duplicate(); ByteBuf payload = message.payload().duplicate();
String topicName = variableHeader.topicName(); String topicName = variableHeader.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName); int topicNameBytes = utf8Bytes(topicName);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(), ctx.alloc(),
message.variableHeader().properties()); message.variableHeader().properties());
try { try {
int variableHeaderBufferSize = 2 + topicNameBytes.length + int variableHeaderBufferSize = 2 + topicNameBytes +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes(); (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes();
int payloadBufferSize = payload.readableBytes(); int payloadBufferSize = payload.readableBytes();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
@ -438,8 +435,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize); writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(topicNameBytes.length); writeExactUTF8String(buf, topicName, topicNameBytes);
buf.writeBytes(topicNameBytes);
if (mqttFixedHeader.qosLevel().value() > 0) { if (mqttFixedHeader.qosLevel().value() > 0) {
buf.writeShort(variableHeader.packetId()); buf.writeShort(variableHeader.packetId());
} }
@ -463,7 +459,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
final ByteBuf propertiesBuf; final ByteBuf propertiesBuf;
final boolean includeReasonCode; final boolean includeReasonCode;
final int variableHeaderBufferSize; final int variableHeaderBufferSize;
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); final MqttVersion mqttVersion = getMqttVersion(ctx);
if (mqttVersion == MqttVersion.MQTT_5 && if (mqttVersion == MqttVersion.MQTT_5 &&
(variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK || (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
!variableHeader.properties().isEmpty())) { !variableHeader.properties().isEmpty())) {
@ -517,7 +513,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
MqttMessage message) { MqttMessage message) {
if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) { if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttVersion mqttVersion = getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttReasonCodeAndPropertiesVariableHeader variableHeader = MqttReasonCodeAndPropertiesVariableHeader variableHeader =
(MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(); (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
@ -629,15 +625,15 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
case SERVER_REFERENCE: case SERVER_REFERENCE:
case REASON_STRING: case REASON_STRING:
writeVariableLengthInt(propertiesBuf, property.propertyId); writeVariableLengthInt(propertiesBuf, property.propertyId);
writeUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value); writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
break; break;
case USER_PROPERTY: case USER_PROPERTY:
final List<MqttProperties.StringPair> pairs = final List<MqttProperties.StringPair> pairs =
((MqttProperties.UserProperties) property).value; ((MqttProperties.UserProperties) property).value;
for (MqttProperties.StringPair pair : pairs) { for (MqttProperties.StringPair pair : pairs) {
writeVariableLengthInt(propertiesBuf, property.propertyId); writeVariableLengthInt(propertiesBuf, property.propertyId);
writeUTF8String(propertiesBuf, pair.key); writeEagerUTF8String(propertiesBuf, pair.key);
writeUTF8String(propertiesBuf, pair.value); writeEagerUTF8String(propertiesBuf, pair.value);
} }
break; break;
case CORRELATION_DATA: case CORRELATION_DATA:
@ -689,10 +685,40 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
} while (num > 0); } while (num > 0);
} }
static void writeUTF8String(ByteBuf buf, String s) { private static int nullableUtf8Bytes(String s) {
byte[] sBytes = encodeStringUtf8(s); return s == null? 0 : utf8Bytes(s);
buf.writeShort(sBytes.length); }
buf.writeBytes(sBytes, 0, sBytes.length);
private static int nullableMaxUtf8Bytes(String s) {
return s == null? 0 : utf8MaxBytes(s);
}
private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
buf.ensureWritable(utf8Length + 2);
buf.writeShort(utf8Length);
if (utf8Length > 0) {
final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
assert writtenUtf8Length == utf8Length;
}
}
private static void writeEagerUTF8String(ByteBuf buf, String s) {
final int maxUtf8Length = nullableMaxUtf8Bytes(s);
buf.ensureWritable(maxUtf8Length + 2);
final int writerIndex = buf.writerIndex();
final int startUtf8String = writerIndex + 2;
buf.writerIndex(startUtf8String);
final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
buf.setShort(writerIndex, utf8Length);
}
private static void writeUnsafeUTF8String(ByteBuf buf, String s) {
final int writerIndex = buf.writerIndex();
final int startUtf8String = writerIndex + 2;
// no need to reserve any capacity here, already done earlier: that's why is Unsafe
buf.writerIndex(startUtf8String);
final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, 0) : 0;
buf.setShort(writerIndex, utf8Length);
} }
private static int getVariableLengthInt(int num) { private static int getVariableLengthInt(int num) {
@ -704,7 +730,4 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
return count; return count;
} }
private static byte[] encodeStringUtf8(String s) {
return s.getBytes(CharsetUtil.UTF_8);
}
} }

View File

@ -15,10 +15,10 @@
*/ */
package io.netty.handler.codec.mqtt; package io.netty.handler.codec.mqtt;
import io.netty.util.collection.IntObjectHashMap;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
@ -67,6 +67,15 @@ public final class MqttProperties {
CORRELATION_DATA(0x09), CORRELATION_DATA(0x09),
AUTHENTICATION_DATA(0x16); AUTHENTICATION_DATA(0x16);
private static final MqttPropertyType[] VALUES;
static {
VALUES = new MqttPropertyType[43];
for (MqttPropertyType v : values()) {
VALUES[v.value] = v;
}
}
private final int value; private final int value;
MqttPropertyType(int value) { MqttPropertyType(int value) {
@ -78,18 +87,20 @@ public final class MqttProperties {
} }
public static MqttPropertyType valueOf(int type) { public static MqttPropertyType valueOf(int type) {
for (MqttPropertyType t : values()) { MqttPropertyType t = null;
if (t.value == type) { try {
return t; t = VALUES[type];
} } catch (ArrayIndexOutOfBoundsException ignored) {
// nop
} }
throw new IllegalArgumentException("unknown property type: " + type); if (t == null) {
throw new IllegalArgumentException("unknown property type: " + type);
}
return t;
} }
} }
public static final MqttProperties NO_PROPERTIES = new MqttProperties( public static final MqttProperties NO_PROPERTIES = new MqttProperties(false);
Collections.unmodifiableMap(new HashMap<Integer, MqttProperty>())
);
static MqttProperties withEmptyDefaults(MqttProperties properties) { static MqttProperties withEmptyDefaults(MqttProperties properties) {
if (properties == null) { if (properties == null) {
@ -190,20 +201,29 @@ public final class MqttProperties {
} }
public MqttProperties() { public MqttProperties() {
this(new HashMap<Integer, MqttProperty>()); this(true);
} }
private MqttProperties(Map<Integer, MqttProperty> props) { private MqttProperties(boolean canModify) {
this.props = props; this.canModify = canModify;
} }
private final Map<Integer, MqttProperty> props; private IntObjectHashMap<MqttProperty> props;
private final boolean canModify;
public void add(MqttProperty property) { public void add(MqttProperty property) {
if (!canModify) {
throw new UnsupportedOperationException("adding property isn't allowed");
}
IntObjectHashMap<MqttProperty> props = this.props;
if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) {
UserProperties userProps = (UserProperties) props.get(property.propertyId); UserProperties userProps = (UserProperties) (props != null? props.get(property.propertyId) : null);
if (userProps == null) { if (userProps == null) {
userProps = new UserProperties(); userProps = new UserProperties();
if (props == null) {
props = new IntObjectHashMap<MqttProperty>();
this.props = props;
}
props.put(property.propertyId, userProps); props.put(property.propertyId, userProps);
} }
if (property instanceof UserProperty) { if (property instanceof UserProperty) {
@ -214,19 +234,26 @@ public final class MqttProperties {
} }
} }
} else { } else {
if (props == null) {
props = new IntObjectHashMap<MqttProperty>();
this.props = props;
}
props.put(property.propertyId, property); props.put(property.propertyId, property);
} }
} }
public Collection<? extends MqttProperty> listAll() { public Collection<? extends MqttProperty> listAll() {
return props.values(); IntObjectHashMap<MqttProperty> props = this.props;
return props == null? Collections.<MqttProperty>emptyList() : props.values();
} }
public boolean isEmpty() { public boolean isEmpty() {
return props.isEmpty(); IntObjectHashMap<MqttProperty> props = this.props;
return props == null || props.isEmpty();
} }
public MqttProperty getProperty(int propertyId) { public MqttProperty getProperty(int propertyId) {
return props.get(propertyId); IntObjectHashMap<MqttProperty> props = this.props;
return props == null? null : props.get(propertyId);
} }
} }

View File

@ -32,11 +32,17 @@ public enum MqttQoS {
} }
public static MqttQoS valueOf(int value) { public static MqttQoS valueOf(int value) {
for (MqttQoS q: values()) { switch (value) {
if (q.value == value) { case 0:
return q; return AT_MOST_ONCE;
} case 1:
return AT_LEAST_ONCE;
case 2:
return EXACTLY_ONCE;
case 0x80:
return FAILURE;
default:
throw new IllegalArgumentException("invalid QoS: " + value);
} }
throw new IllegalArgumentException("invalid QoS: " + value);
} }
} }

View File

@ -36,12 +36,16 @@ public final class MqttSubscriptionOption {
} }
public static RetainedHandlingPolicy valueOf(int value) { public static RetainedHandlingPolicy valueOf(int value) {
for (RetainedHandlingPolicy q: values()) { switch (value) {
if (q.value == value) { case 0:
return q; return SEND_AT_SUBSCRIBE;
} case 1:
return SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS;
case 2:
return DONT_SEND_AT_SUBSCRIBE;
default:
throw new IllegalArgumentException("invalid RetainedHandlingPolicy: " + value);
} }
throw new IllegalArgumentException("invalid RetainedHandlingPolicy: " + value);
} }
} }

View File

@ -49,16 +49,24 @@ public enum MqttVersion {
} }
public static MqttVersion fromProtocolNameAndLevel(String protocolName, byte protocolLevel) { public static MqttVersion fromProtocolNameAndLevel(String protocolName, byte protocolLevel) {
for (MqttVersion mv : values()) { MqttVersion mv = null;
if (mv.level == protocolLevel) { switch (protocolLevel) {
if (mv.name.equals(protocolName)) { case 3:
return mv; mv = MQTT_3_1;
} else { break;
throw new MqttUnacceptableProtocolVersionException(protocolName + " and " + case 4:
protocolLevel + " are not match"); mv = MQTT_3_1_1;
} break;
} case 5:
mv = MQTT_5;
break;
} }
throw new MqttUnacceptableProtocolVersionException(protocolName + "is unknown protocol name"); if (mv == null) {
throw new MqttUnacceptableProtocolVersionException(protocolName + "is unknown protocol name");
}
if (mv.name.equals(protocolName)) {
return mv;
}
throw new MqttUnacceptableProtocolVersionException(protocolName + " and " + protocolLevel + " are not match");
} }
} }

View File

@ -0,0 +1,214 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.Warmup;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 5, time = 5)
@Measurement(iterations = 5, time = 5)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MqttConnectReturnCodeBench extends AbstractMicrobenchmark {
private static final byte[] DATASET = new byte[] {
-103, -105, -100, -116, 0, 2, -100, 5, -125, 4, -118, -103, -123, 3, 0, -128, -119, 3, -99, 5, 0, -118,
-100, 5, -102, -127, -116,
-103, 4, -128, 0, -122, -101, -116, -128, -112, -124, -127, -118, -116, -107, -126, -119, -100, -121, -103,
-103, -103, 3,
-105, -126, 3, -105, -120, -127, -116, -100, -99, -122, -123, -101, -112, -101, 1, -100, 2, -116, 2, -101,
-97, -100, -124, -123,
-97, -112, -105, -118, -122, -120, 3, -102, -120, -103, 1, -119, -126, 3, -102, -125, -128, -125, -100,
-123, 0, -99, -118, -101,
-118, -112, -126, -128, 4, -121, -101, -124, 1, -118, -127, 1, -119, -120, -127, -100, -112, -124, 2, 5,
-103, -126, -101, -119,
-128, -122, -101, -119, -121, 4, -105, -103, 3, -122, 4, -119, -107, 4, -118, -121, -112, -118, 0, -100,
-116, -124, -107, 3,
-101, -125, -112, -102, -127, -105, -101, 5, -116, 4, -102, 0, -116, -101, -103, -99, -122, 2, -112, -118,
-103, -118, 3, -105,
-120, -99, -103, -116, -124, -124, -123, -128, -116, -101, -124, -122, -101, -122, -116, -118, 5, -97, 1,
-122, -127, -124, 1, 5,
4, -116, -112, -123, -123, -99, 4, -124, -126, -124, -103, 2, -119, -97, -116, -99, -101, -124, -97, -99, 0,
-126, -105, -120,
-123, -121, -100, -102, 3, -103, 0, -124, -123, -120, 0, -112, -118, -103, 1, -101, 0, -102, -99, 5, -125,
-100, -116, -105,
-122, -122, -99, 0, -121, -102, -102, 3, -121, -124, -116, 1, -122, -121, 2, 0, -101, -101, 5, -112, -101,
-128, 4, -99,
-105, -121, -118, 4, -122, -99, -103, 1, -102, -120, -99, -101, -103, 5, -120, -112, -118, -123, -128, -103,
-120, 4, -100, -101,
-107, -120, -99, -125, -119, -101, 5, 5, -107, -101, 3, -112, -122, -119, -122, -119, 0, -107, -127, 0,
-122, 4, 0, -119,
-97, 5, -119, -124, -102, -126, -107, -120, -100, -125, 4, -121, 1, -107, -100, -103, -105, -121, -123,
-120, -100, 0, 5, -127,
-128, -103, 3, -99, -121, -105, -101, -116, -127, -116, 1, 2, -105, -116, -126, 4, -123, -112, -105, -127,
-128, -123, -119, -121,
0, -119, 4, -119, -99, -97, 4, -112, -127, -103, -107, -124, -119, -102, -122, -122, -116, 1, -102, -107,
-119, -102, -128, 3,
-107, -99, -101, -126, -128, 4, -128, -103, 2, -127, -101, 4, -102, 1, -128, -97, -99, -112, -128, -107, 4,
-118, -100, -125,
-120, -116, 4, 3, 4, 2, -99, 1, -123, -120, -122, 5, -116, 2, -128, -101, -125, 3, 3, 3, -127, 5, 2, 4,
-122, -118, 0, -128, -121, -125, -97, -127, -124, 2, -127, -97, -97, -112, 0, -128, -105, -116, -118, -126,
-116, -119, -126, -127,
-116, -97, -105, -102, -107, -101, -126, -123, -107, -125, -107, -125, -120, -116, -123, -128, -119, -103,
-121, -126, -127,
-112, -121, 5, -127, 4, -124, -99, 4, -127, -118, -107, -99, 4, -119, -116, -127, 4, 4, -112, -124, -102,
-112, -99, -112, -121,
-128, 5, -101, -121, -116, -125, -112, -121, -107, -116, -123, 1, -105, -97, -101, -107, -100, -107, -125,
-121, -118, -123,
-99, -105, 5, 0, -123, -116, -127, -97, -124, -105, -124, -124, -118, -112, -103, -116, -125, -127, 1, -124,
5, -103, -128, 3, 2,
-103, -127, -103, -112, 0, 5, -120, -101, 4, 3, 1, 4, 4, -99, 0, -121, -124, -103, 3, -118, -127, 2, 2, 2,
-128, -119, 0,
-122, -112, -124, -101, 3, -102, 3, 3, -118, -99, -120, -125, 2, -100, -120, 0, 3, -120, -127, -122, -122,
-126, -127, -97,
-120, -99, -122, -102, -123, -116, -118, -119, -112, 2, 5, -100, -101, 4, -120, -107, 5, 1, -119, 0, -125,
-105, -101, -100,
-125, 0, -119, -123, 3, -118, 2, -127, -122, -100, 4, -126, 4, -125, -121, -100, 0, -112, -123, -125, 0,
-122, -101, -112,
-119, -118, -127, -124, -128, -127, -102, -125, -103, -103, -126, -116, -107, -125, 0, 3, -123, -100, 3,
-97, 2, -123, -121, -101,
-100, -119, -101, -123, -118, 4, 5, -125, -107, -107, 1, -105, 3, -99, -126, -99, 0, -118, 5, -122, -116,
-105, 2, -124,
-116, -126, 2, -121, -118, -100, 3, 0, -124, 2, -128, -126, -100, -99, -101, 1, -120, 2, -112, -102, -101,
5, -107, 1,
4, 2, -124, -100, -123, 4, -122, -118, -107, -103, -121, -101, -128, -112, -127, -105, -118, 2, -125, -121,
-101, 4, -126, -123,
0, -102, -128, -119, -99, 3, 4, -97, -128, -119, -99, -107, -116, -99, -127, -100, -119, -127, -122, -102,
-119, -118, -119, -103,
-123, -100, 5, -127, -112, 1, -125, -103, 4, 4, -99, 2, -116, -118, -105, 5, -123, -101, -123, -97, 4, -116,
2, -124,
-116, -125, 1, -118, -118, -124, -120, -118, -120, 4, 5, -118, -97, 0, -127, -100, -121, -97, -97, -125,
-120, -122, -126, -125,
-100, -97, 1, -97, -116, -126, -97, 2, 4, 2, -101, -103, -124, 2, 4, -123, -124, -107, -120, -122, 1, -123,
-97, -112,
-126, 0, -97, 0, -102, -99, -125, -101, -103, 1, -118, 5, -120, -102, -101, -116, -125, -125, -116, -102,
-120, -121, 3, -118,
-118, -123, -128, -126, 4, -101, -100, -103, -100, -105, -126, -121, -118, 4, -126, -123, -120, -99, -105,
-116, -127, -128,
-119, -124, -100, -120, -101, -100, 5, -116, -119, -105, -99, -119, -103, -103, -101, -107, -102, 5, -107,
-99, -102, -122, 2,
-125, -126, 0, -97, -123, 1, 3, 1, 5, -127, 2, -112, -103, -125, -112, -124, -118, -99, 0, 4, 2, -118, 5,
-128, -122, -120, 5,
-121, -112, 2, 5, -102, -125, -116, -127, -128, -102, 0, 2, 5, -122, -126, -120, -127, -101, -102, 5, -100,
-120, -107, -107,
-126, -101, 5, 4, -125, 4, -124, -125, -119, -123, -103, 2, -123, -105, 0, 1, 3, -121, -101, 3, -107, -105,
1, -105,
-122, -124, 0, -103, -116, 0, -101, -127, -122, -118, -103, 1, -107, -123, 1, -121, -107, 4, -102, -101, 4,
-127, -101, 3,
-121, -103, -125, -124, -127, 5, -128, 1, 3, -119, -126, -119, -125, -112, -124
};
byte[] types;
long next;
long mask;
@Setup
public void initDataSet() {
types = DATASET;
next = 0;
mask = types.length - 1;
if (Integer.bitCount(types.length) != 1) {
throw new AssertionError("The data set should contains power of 2 items");
}
}
@Benchmark
public MqttConnectReturnCode getViaArray() {
long next = this.next;
int nextIndex = (int) (next & mask);
MqttConnectReturnCode code = MqttConnectReturnCode.valueOf(types[nextIndex]);
this.next = next + 1;
return code;
}
@Benchmark
public MqttConnectReturnCode getViaSwitch() {
long next = this.next;
int nextIndex = (int) (next & mask);
MqttConnectReturnCode code = switchValueOf(types[nextIndex]);
this.next = next + 1;
return code;
}
public static MqttConnectReturnCode switchValueOf(byte b) {
switch (b) {
case 0:
return MqttConnectReturnCode.CONNECTION_ACCEPTED;
case 1:
return MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
case 2:
return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
case 3:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
case 4:
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
case 5:
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
case -128:
return MqttConnectReturnCode.CONNECTION_REFUSED_UNSPECIFIED_ERROR;
case -127:
return MqttConnectReturnCode.CONNECTION_REFUSED_MALFORMED_PACKET;
case -126:
return MqttConnectReturnCode.CONNECTION_REFUSED_PROTOCOL_ERROR;
case -125:
return MqttConnectReturnCode.CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC;
case -124:
return MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
case -123:
return MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
case -122:
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
case -121:
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
case -120:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
case -119:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_BUSY;
case -118:
return MqttConnectReturnCode.CONNECTION_REFUSED_BANNED;
case -116:
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD;
case -112:
return MqttConnectReturnCode.CONNECTION_REFUSED_TOPIC_NAME_INVALID;
case -107:
return MqttConnectReturnCode.CONNECTION_REFUSED_PACKET_TOO_LARGE;
case -105:
return MqttConnectReturnCode.CONNECTION_REFUSED_QUOTA_EXCEEDED;
case -103:
return MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID;
case -102:
return MqttConnectReturnCode.CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED;
case -101:
return MqttConnectReturnCode.CONNECTION_REFUSED_QOS_NOT_SUPPORTED;
case -100:
return MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER;
case -99:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_MOVED;
case -97:
return MqttConnectReturnCode.CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED;
default:
throw new IllegalArgumentException("unknown connect return code: " + (b & 0xFF));
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.Warmup;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 5, time = 5)
@Measurement(iterations = 5, time = 5)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MqttPropertyTypeValueOfBench extends AbstractMicrobenchmark {
private static final int[] DATASET = new int[] {
28, 28, 40, 28, 36, 22, 23, 22, 11, 40, 18, 33, 8, 37, 23, 23, 33, 2, 2, 33, 18,
31, 39, 41, 39, 25, 2, 28, 26, 38, 42, 9, 40, 3, 2, 19, 1, 8, 18, 38, 28, 9, 18,
22, 42, 22, 37, 42, 23, 38, 22, 40, 2, 8, 40, 26, 1, 8, 17, 2, 37, 21, 41, 8, 37, 26, 1, 17,
39, 40, 41, 41, 9, 28, 38, 38, 36, 23, 19, 9, 9, 11, 25, 11, 9, 35, 31, 25, 3, 39,
17, 2, 35, 33, 21, 26, 34, 1, 34, 22, 33, 19, 17, 24, 9, 17, 40, 40, 11, 3, 19, 34,
42, 42, 11, 22, 39, 1, 25, 37, 41, 21, 42, 39, 1, 22, 21, 8, 28, 21, 17, 3, 39, 24,
33, 25, 9, 19, 42, 11, 42, 19, 2, 19, 35, 1, 11, 35, 42, 39, 9, 36, 36, 1, 23, 26,
41, 2, 41, 38, 9, 37, 9, 33, 2, 26, 18, 39, 21, 21, 8, 24, 17, 40, 1, 35, 36, 37,
21, 25, 1, 2, 24, 37, 3, 9, 42, 24, 28, 37, 24, 3, 35, 31, 11, 17, 42, 21, 23, 11,
41, 22, 38, 23, 11, 42, 38, 39, 34, 28, 21, 24, 9, 2, 35, 36, 21, 9, 21, 1, 8, 18,
3, 17, 24, 28, 40, 36, 21, 21, 3, 41, 41, 41, 22, 24, 19, 38, 39, 26, 3, 2, 41, 11,
25, 34, 42, 38, 31, 21, 23, 34, 8, 36, 19, 19, 42, 39, 3, 42, 35, 11, 33, 9, 17, 21,
18, 9, 42, 38, 24, 17, 34, 24, 3, 21, 23, 40, 25, 33, 11, 19, 31, 40, 24, 25, 23, 36,
23, 17, 39, 17, 22, 34, 28, 18, 25, 42, 31, 24, 19, 40, 21, 38, 22, 42, 35, 37, 41, 1,
17, 33, 3, 21, 42, 33, 2, 36, 36, 35, 42, 24, 37, 41, 8, 22, 36, 26, 42, 11, 41, 26,
24, 34, 11, 9, 34, 19, 23, 41, 22, 3, 35, 24, 1, 36, 24, 3, 18, 33, 2, 42, 42, 18,
19, 41, 38, 21, 34, 19, 40, 38, 19, 39, 21, 39, 42, 3, 36, 18, 22, 3, 25, 22, 28, 31,
31, 23, 24, 19, 34, 26, 33, 34, 42, 18, 3, 42, 19, 24, 21, 31, 8, 42, 25, 24, 39, 35,
3, 42, 31, 3, 18, 19, 24, 28, 3, 25, 39, 40, 40, 34, 33, 1, 41, 21, 17, 34, 31, 34,
34, 8, 17, 17, 19, 21, 21, 9, 21, 39, 24, 1, 23, 8, 37, 37, 23, 21, 34, 42, 23, 18,
42, 9, 34, 23, 24, 22, 11, 18, 18, 35, 24, 42, 23, 1, 31, 2, 9, 11, 24, 22, 34, 28,
11, 23, 26, 25, 31, 19, 39, 11, 40, 24, 41, 2, 11, 23, 33, 42, 34, 9, 17, 28, 33, 28,
2, 2, 21, 41, 42, 33, 33, 2, 8, 28, 19, 24, 36, 21, 36, 1, 19, 8, 1, 23, 21, 3,
40, 28, 38, 22, 21, 19, 37, 2, 23, 8, 33, 8, 31, 25, 17, 40, 36, 22, 3, 41, 21, 22,
41, 23, 3, 33, 26, 11, 33, 1, 9, 33, 40, 24, 11, 34, 8, 34, 19, 21, 34, 41, 19, 34,
42, 26, 41, 37, 28, 24, 42, 11, 38, 35, 33, 2, 26, 21, 9, 25, 9, 18, 33, 24, 19, 2,
11, 40, 37, 36, 19, 28, 40, 26, 41, 35, 21, 23, 28, 22, 19, 34, 3, 31, 36, 38, 25, 34,
31, 40, 38, 3, 22, 9, 8, 40, 26, 9, 17, 11, 11, 31, 19, 3, 24, 23, 3, 2, 19, 9,
28, 19, 28, 37, 18, 42, 38, 26, 37, 26, 39, 3, 33, 28, 17, 11, 25, 38, 34, 22, 34, 17,
3, 1, 34, 38, 8, 2, 37, 25, 9, 11, 36, 23, 19, 8, 35, 24, 11, 11, 11, 25, 11, 11,
11, 28, 17, 42, 19, 41, 40, 34, 38, 24, 28, 23, 39, 28, 41, 40, 3, 39, 34, 11, 25, 33,
2, 1, 3, 26, 28, 2, 17, 18, 2, 41, 42, 37, 36, 33, 38, 33, 18, 3, 34, 37, 21, 37,
23, 35, 21, 3, 9, 21, 34, 38, 22, 37, 28, 38, 8, 2, 31, 1, 38, 25, 40, 35, 37, 41,
36, 31, 23, 21, 37, 3, 24, 17, 17, 8, 22, 8, 2, 23, 1, 17, 31, 38, 9, 23, 42, 41,
2, 33, 11, 23, 33, 38, 17, 25, 1, 33, 37, 19, 8, 23, 41, 26, 39, 18, 18, 31, 17, 18,
34, 3, 41, 34, 40, 9, 23, 33, 11, 40, 39, 34, 19, 40, 3, 2, 19, 17, 23, 33, 2, 19,
26, 25, 36, 37, 34, 17, 39, 42, 22, 22, 19, 35, 22, 18, 18, 41, 40, 40, 26, 3, 19, 40,
9, 1, 19, 41, 24, 9, 18, 1, 28, 31, 18, 3, 21, 11, 24, 3, 22, 11, 11, 37, 22, 8,
2, 38, 3, 2, 37, 28, 11, 35, 18, 36, 9, 35, 21, 19, 42, 35, 24, 2, 2, 17, 18, 33,
33, 34, 8, 37, 24, 42, 17, 37, 21, 1, 36, 38, 25, 40, 1, 22, 26, 28, 22, 33, 28, 1,
33, 33, 33, 19, 40, 2, 36, 38, 33, 41, 2, 3, 31, 22, 1, 24, 18, 36, 28, 39, 28, 3,
8, 35, 17, 18, 18, 8, 18, 22, 2, 25, 18, 41, 37, 21, 17, 28, 34, 1, 35, 25, 22, 38,
17, 28, 19, 25, 35, 36, 39, 9, 21, 36, 39, 41, 22, 38, 39, 19, 34, 22, 40, 8, 1, 11,
1, 31, 1, 17, 24, 23, 28, 21, 8, 37, 42, 33, 17, 24, 19, 18, 2, 42, 39, 36, 19, 2,
34, 35, 36, 11, 9, 35, 2, 21, 3, 42, 28, 37, 24, 1, 38, 2, 11, 41, 33, 39, 25, 17,
26, 39, 36, 37, 11, 25, 42, 17, 8, 31, 41, 21, 22, 2, 2, 24, 19, 21, 31, 34, 2, 39,
39, 18, 1, 33, 28, 11, 34, 40, 17, 42,
};
int[] types;
long next;
long mask;
@Setup
public void initDataSet() {
types = DATASET;
next = 0;
mask = types.length - 1;
if (Integer.bitCount(types.length) != 1) {
throw new AssertionError("The data set should contains power of 2 items");
}
}
@Benchmark
public MqttPropertyType getViaArray() {
long next = this.next;
int nextIndex = (int) (next & mask);
MqttPropertyType type = MqttPropertyType.valueOf(types[nextIndex]);
this.next = next + 1;
return type;
}
@Benchmark
public MqttPropertyType getViaSwitch() {
long next = this.next;
int nextIndex = (int) (next & mask);
MqttPropertyType type = switchValueOf(types[nextIndex]);
this.next = next + 1;
return type;
}
private static MqttPropertyType switchValueOf(int type) {
switch (type) {
case 1:
return MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
case 2:
return MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL;
case 3:
return MqttPropertyType.CONTENT_TYPE;
case 8:
return MqttPropertyType.RESPONSE_TOPIC;
case 9:
return MqttPropertyType.CORRELATION_DATA;
case 11:
return MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
case 17:
return MqttPropertyType.SESSION_EXPIRY_INTERVAL;
case 18:
return MqttPropertyType.ASSIGNED_CLIENT_IDENTIFIER;
case 19:
return MqttPropertyType.SERVER_KEEP_ALIVE;
case 21:
return MqttPropertyType.AUTHENTICATION_METHOD;
case 22:
return MqttPropertyType.AUTHENTICATION_DATA;
case 23:
return MqttPropertyType.REQUEST_PROBLEM_INFORMATION;
case 24:
return MqttPropertyType.WILL_DELAY_INTERVAL;
case 25:
return MqttPropertyType.REQUEST_RESPONSE_INFORMATION;
case 26:
return MqttPropertyType.RESPONSE_INFORMATION;
case 28:
return MqttPropertyType.SERVER_REFERENCE;
case 31:
return MqttPropertyType.REASON_STRING;
case 33:
return MqttPropertyType.RECEIVE_MAXIMUM;
case 34:
return MqttPropertyType.TOPIC_ALIAS_MAXIMUM;
case 35:
return MqttPropertyType.TOPIC_ALIAS;
case 36:
return MqttPropertyType.MAXIMUM_QOS;
case 37:
return MqttPropertyType.RETAIN_AVAILABLE;
case 38:
return MqttPropertyType.USER_PROPERTY;
case 39:
return MqttPropertyType.MAXIMUM_PACKET_SIZE;
case 40:
return MqttPropertyType.WILDCARD_SUBSCRIPTION_AVAILABLE;
case 41:
return MqttPropertyType.SUBSCRIPTION_IDENTIFIER_AVAILABLE;
case 42:
return MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE;
default:
throw new IllegalArgumentException("unknown message type: " + type);
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.Warmup;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 5, time = 5)
@Measurement(iterations = 5, time = 5)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MqttQoSValueOfBench extends AbstractMicrobenchmark {
private static final int[] DATASET = new int[] {
2, 0, 2, 2, 2, 0, 1, 2, 0, 1, 0, 1, 128, 1, 1, 2, 0, 2, 2, 1, 2, 1, 2, 0, 1, 2, 1, 0, 2, 1, 1, 2, 1, 128,
2, 2, 1, 2, 2, 1, 1, 128, 0, 128, 2, 2, 1, 1, 2, 128, 128, 128, 128, 128, 0, 2, 128, 128, 2, 1, 128, 1, 0,
2, 1, 0, 2, 0, 2, 0, 2, 128, 2, 2, 128, 0, 128, 2, 128, 0, 0, 1, 2, 1, 128, 0, 2, 0, 2, 0, 0, 128, 2, 0,
0, 2, 2, 0, 2, 128, 0, 2, 1, 1, 0, 2, 2, 128, 1, 128, 0, 0, 1, 128, 0, 2, 128, 0, 2, 2, 128, 2, 128, 128,
0, 2, 128, 0, 1, 2, 2, 1, 128, 0, 1, 128, 1, 2, 0, 0, 2, 0, 128, 0, 1, 0, 0, 1, 0, 1, 2, 128, 128, 1, 2,
128, 0, 1, 1, 1, 1, 0, 2, 1, 1, 128, 1, 2, 128, 128, 1, 0, 128, 1, 2, 2, 128, 2, 0, 0, 2, 2, 128, 1, 1,
0, 0, 2, 0, 2, 2, 2, 1, 2, 0, 128, 0, 1, 128, 0, 2, 128, 1, 2, 1, 1, 1, 1, 1, 1, 1, 128, 1, 128, 1, 2,
0, 0, 128, 128, 1, 2, 2, 0, 0, 0, 2, 128, 128, 0, 1, 128, 1, 128, 1, 2, 128, 2, 0, 0, 2, 128, 128, 0, 2,
1, 1, 0, 0, 1, 128, 1, 128, 2, 128, 1, 1, 128, 1, 1, 1, 2, 2, 2, 128, 128, 0, 1, 1, 2, 128, 1, 0, 2, 2,
2, 2, 1, 1, 0, 128, 1, 128, 2, 1, 1, 0, 128, 0, 2, 128, 128, 2, 0, 128, 128, 1, 128, 0, 1, 128, 0, 128,
1, 1, 2, 128, 0, 2, 128, 2, 0, 128, 0, 1, 0, 0, 0, 2, 0, 2, 1, 2, 1, 2, 0, 2, 2, 128, 128, 2, 0, 2, 1,
128, 1, 128, 1, 0, 128, 0, 0, 128, 1, 0, 128, 1, 0, 128, 0, 128, 1, 2, 1, 128, 2, 0, 1, 0, 1, 2, 1, 0,
1, 2, 2, 2, 1, 0, 0, 128, 1, 2, 128, 0, 2, 128, 0, 1, 2, 128, 2, 1, 1, 2, 2, 0, 0, 2, 1, 1, 128, 0, 2,
1, 128, 128, 128, 1, 1, 0, 128, 0, 0, 1, 128, 0, 2, 1, 2, 1, 0, 1, 0, 0, 1, 128, 1, 2, 128, 128, 2, 1, 1,
0, 1, 0, 1, 2, 0, 128, 2, 0, 2, 1, 2, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 128, 0, 1, 2, 0, 0, 2, 2, 2, 0, 0,
128, 1, 0, 0, 0, 1, 2, 128, 2, 0, 128, 128, 128, 128, 2, 2, 0, 1, 1, 0, 1, 128, 128, 1, 0, 0, 128, 128,
1, 2, 128, 2, 128, 128, 128, 2, 0, 1, 128, 1, 0, 1, 2, 128, 128, 2, 2, 0, 0, 1, 1, 0, 1, 1, 0, 128, 0,
0, 0, 128, 0, 0, 0, 128, 2, 1, 0, 1, 1, 0, 2, 1, 1, 0, 2, 1, 2, 1, 2, 0, 0, 2, 128, 128, 0, 0, 2, 128,
0, 128, 2, 2, 128, 0, 128, 1, 1, 0, 0, 128, 2, 1, 2, 128, 0, 2, 128, 1, 0, 1, 2, 0, 0, 128, 128, 0, 1,
2, 128, 1, 2, 2, 0, 128, 2, 1, 0, 1, 128, 128, 2, 0, 1, 0, 0, 2, 0, 128, 2, 2, 1, 128, 128, 128, 1, 0,
128, 2, 128, 1, 128, 1, 1, 2, 0, 128, 0, 1, 0, 128, 0, 2, 2, 0, 0, 2, 128, 128, 2, 0, 128, 1, 128, 128,
2, 1, 1, 2, 0, 2, 128, 0, 0, 0, 2, 2, 2, 0, 128, 0, 1, 2, 1, 128, 1, 2, 2, 0, 128, 1, 1, 1, 128, 2, 2,
128, 0, 0, 1, 2, 1, 1, 2, 0, 1, 2, 128, 2, 2, 128, 128, 0, 128, 1, 1, 128, 128, 128, 2, 1, 1, 1, 2, 1,
1, 0, 1, 128, 0, 2, 2, 0, 1, 2, 128, 128, 128, 2, 128, 128, 128, 2, 2, 2, 0, 128, 2, 128, 1, 0, 128,
128, 2, 128, 0, 2, 1, 128, 128, 0, 1, 1, 0, 128, 0, 0, 2, 1, 0, 2, 1, 2, 128, 0, 128, 1, 128, 1, 0, 2,
1, 1, 1, 2, 1, 0, 1, 128, 2, 2, 0, 1, 128, 1, 0, 1, 0, 0, 128, 0, 128, 2, 2, 128, 128, 1, 0, 128, 1, 0,
2, 1, 128, 128, 0, 0, 0, 0, 128, 2, 2, 1, 128, 1, 0, 1, 128, 0, 128, 128, 1, 128, 0, 2, 2, 2, 0, 2, 0,
1, 1, 2, 1, 1, 1, 128, 0, 2, 2, 2, 0, 2, 0, 1, 1, 1, 128, 128, 128, 128, 2, 128, 1, 0, 1, 1, 1, 1, 2, 2,
1, 0, 128, 2, 128, 128, 0, 1, 128, 128, 128, 128, 128, 1, 0, 2, 0, 128, 0, 0, 2, 2, 0, 1, 2, 1, 0, 2, 128,
0, 2, 2, 2, 0, 0, 1, 128, 2, 1, 128, 128, 1, 128, 0, 1, 128, 1, 1, 1, 2, 0, 128, 1, 128, 2, 2, 0, 2, 0,
0, 0, 0, 0, 2, 128, 0, 1, 2, 0, 0, 2, 2, 2, 0, 0, 0, 1, 0, 128, 1, 0, 1, 1, 128, 128, 128, 1, 128, 0, 128,
2, 1, 2, 1, 0, 1, 2, 128, 2, 1, 1, 2, 1, 128, 1, 2, 0, 2, 128, 2, 128, 2, 2, 1, 1, 128, 2, 2, 0, 128, 0, 2,
1, 128, 128, 128, 0, 1, 2, 2, 2, 0, 0, 128, 1, 2, 2, 128, 128, 128, 1, 128, 0, 2, 2, 1, 2, 2, 2, 0, 0, 2,
2, 0, 2, 2, 128, 0, 2, 128, 1, 0, 1, 128, 1, 0, 128, 1, 128, 1, 0, 1, 2, 1, 128, 1, 128, 2, 128, 128, 1,
2, 128, 1, 2, 0, 2
};
int[] types;
long next;
long mask;
@Setup
public void initDataSet() {
types = DATASET;
next = 0;
mask = types.length - 1;
if (Integer.bitCount(types.length) != 1) {
throw new AssertionError("The data set should contains power of 2 items");
}
}
@Benchmark
public MqttQoS getViaArray() {
long next = this.next;
int nextIndex = (int) (next & mask);
MqttQoS mqttQoS = arrayValueOf(types[nextIndex]);
this.next = next + 1;
return mqttQoS;
}
@Benchmark
public MqttQoS getViaSwitch() {
long next = this.next;
int nextIndex = (int) (next & mask);
MqttQoS mqttQoS = MqttQoS.valueOf(types[nextIndex]);
this.next = next + 1;
return mqttQoS;
}
private static final MqttQoS[] VALUES;
static {
VALUES = new MqttQoS[129];
for (MqttQoS value : MqttQoS.values()) {
VALUES[value.value()] = value;
}
}
public static MqttQoS arrayValueOf(int value) {
MqttQoS mqttQoS = null;
try {
mqttQoS = VALUES[value];
return mqttQoS;
} catch (ArrayIndexOutOfBoundsException ignored) {
// nop
}
if (mqttQoS == null) {
throw new IllegalArgumentException("invalid QoS: " + value);
}
return mqttQoS;
}
}