MQTT5 support for netty-codec-mqtt (#10483)

Motivation:

 MQTT Specification version 5 was released over a year ago,
 netty-codec-mqtt should be changed to support it.

Modifications:

  Added more message and header types in `io.netty.handler.codec.mqtt`
  package in `netty-coded-mqtt` subproject,
  changed `MqttEncoder` and `MqttDecoder` to handle them properly,
  added attribute `NETTY_CODEC_MQTT_VERSION` to track protocol version

Result:

  `netty-codec-mqtt` supports both MQTT5 and MQTT3 now.
This commit is contained in:
Paul Lysak 2020-08-31 10:16:40 +03:00 committed by Norman Maurer
parent 079b15eee1
commit fb4a57826b
27 changed files with 2319 additions and 238 deletions

View File

@ -16,7 +16,10 @@
package io.netty.handler.codec.mqtt;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
final class MqttCodecUtil {
@ -24,6 +27,22 @@ final class MqttCodecUtil {
private static final int MIN_CLIENT_ID_LENGTH = 1;
private static final int MAX_CLIENT_ID_LENGTH = 23;
static final AttributeKey<MqttVersion> MQTT_VERSION_KEY = AttributeKey.valueOf("NETTY_CODEC_MQTT_VERSION");
static MqttVersion getMqttVersion(ChannelHandlerContext ctx) {
Attribute<MqttVersion> attr = ctx.channel().attr(MQTT_VERSION_KEY);
MqttVersion version = attr.get();
if (version == null) {
return MqttVersion.MQTT_3_1_1;
}
return version;
}
static void setMqttVersion(ChannelHandlerContext ctx, MqttVersion version) {
Attribute<MqttVersion> attr = ctx.channel().attr(MQTT_VERSION_KEY);
attr.set(version);
}
static boolean isValidPublishTopicName(String topicName) {
// publish topic name must not contain any wildcard
for (char c : TOPIC_WILDCARDS) {
@ -43,15 +62,15 @@ final class MqttCodecUtil {
return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
clientId.length() <= MAX_CLIENT_ID_LENGTH;
}
if (mqttVersion == MqttVersion.MQTT_3_1_1) {
// In 3.1.3.1 Client Identifier of MQTT 3.1.1 specification, The Server MAY allow ClientIds
if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_5) {
// In 3.1.3.1 Client Identifier of MQTT 3.1.1 and 5.0 specifications, The Server MAY allow ClientIds
// that contain more than 23 encoded bytes. And, The Server MAY allow zero-length ClientId.
return clientId != null;
}
throw new IllegalArgumentException(mqttVersion + " is unknown mqtt version");
}
static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
static MqttFixedHeader validateFixedHeader(ChannelHandlerContext ctx, MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBREL:
case SUBSCRIBE:
@ -59,6 +78,12 @@ final class MqttCodecUtil {
if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
}
return mqttFixedHeader;
case AUTH:
if (MqttCodecUtil.getMqttVersion(ctx) != MqttVersion.MQTT_5) {
throw new DecoderException("AUTH message requires at least MQTT 5");
}
return mqttFixedHeader;
default:
return mqttFixedHeader;
}

View File

@ -27,9 +27,17 @@ public final class MqttConnAckVariableHeader {
private final boolean sessionPresent;
private final MqttProperties properties;
public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent) {
this(connectReturnCode, sessionPresent, MqttProperties.NO_PROPERTIES);
}
public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent,
MqttProperties properties) {
this.connectReturnCode = connectReturnCode;
this.sessionPresent = sessionPresent;
this.properties = MqttProperties.withEmptyDefaults(properties);
}
public MqttConnectReturnCode connectReturnCode() {
@ -40,6 +48,10 @@ public final class MqttConnAckVariableHeader {
return sessionPresent;
}
public MqttProperties properties() {
return properties;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))

View File

@ -27,13 +27,15 @@ import io.netty.util.internal.StringUtil;
public final class MqttConnectPayload {
private final String clientIdentifier;
private final MqttProperties willProperties;
private final String willTopic;
private final byte[] willMessage;
private final String userName;
private final byte[] password;
/**
* @deprecated use {@link MqttConnectPayload#MqttConnectPayload(String, String, byte[], String, byte[])} instead
* @deprecated use {@link MqttConnectPayload#MqttConnectPayload(String,
* MqttProperties, String, byte[], String, byte[])} instead
*/
@Deprecated
public MqttConnectPayload(
@ -44,6 +46,7 @@ public final class MqttConnectPayload {
String password) {
this(
clientIdentifier,
MqttProperties.NO_PROPERTIES,
willTopic,
willMessage.getBytes(CharsetUtil.UTF_8),
userName,
@ -56,7 +59,23 @@ public final class MqttConnectPayload {
byte[] willMessage,
String userName,
byte[] password) {
this(clientIdentifier,
MqttProperties.NO_PROPERTIES,
willTopic,
willMessage,
userName,
password);
}
public MqttConnectPayload(
String clientIdentifier,
MqttProperties willProperties,
String willTopic,
byte[] willMessage,
String userName,
byte[] password) {
this.clientIdentifier = clientIdentifier;
this.willProperties = MqttProperties.withEmptyDefaults(willProperties);
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
@ -67,6 +86,10 @@ public final class MqttConnectPayload {
return clientIdentifier;
}
public MqttProperties willProperties() {
return willProperties;
}
public String willTopic() {
return willTopic;
}

View File

@ -25,11 +25,34 @@ import java.util.Map;
*/
public enum MqttConnectReturnCode {
CONNECTION_ACCEPTED((byte) 0x00),
//MQTT 3 codes
CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01),
CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02),
CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03),
CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04),
CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05);
CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05),
//MQTT 5 codes
CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80),
CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81),
CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82),
CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83),
CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84),
CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85),
CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86),
CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87),
CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88),
CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89),
CONNECTION_REFUSED_BANNED((byte) 0x8A),
CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C),
CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90),
CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95),
CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97),
CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99),
CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A),
CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B),
CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C),
CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D),
CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F);
private static final Map<Byte, MqttConnectReturnCode> VALUE_TO_CODE_MAP;

View File

@ -32,6 +32,7 @@ public final class MqttConnectVariableHeader {
private final boolean isWillFlag;
private final boolean isCleanSession;
private final int keepAliveTimeSeconds;
private final MqttProperties properties;
public MqttConnectVariableHeader(
String name,
@ -43,6 +44,29 @@ public final class MqttConnectVariableHeader {
boolean isWillFlag,
boolean isCleanSession,
int keepAliveTimeSeconds) {
this(name,
version,
hasUserName,
hasPassword,
isWillRetain,
willQos,
isWillFlag,
isCleanSession,
keepAliveTimeSeconds,
MqttProperties.NO_PROPERTIES);
}
public MqttConnectVariableHeader(
String name,
int version,
boolean hasUserName,
boolean hasPassword,
boolean isWillRetain,
int willQos,
boolean isWillFlag,
boolean isCleanSession,
int keepAliveTimeSeconds,
MqttProperties properties) {
this.name = name;
this.version = version;
this.hasUserName = hasUserName;
@ -52,6 +76,7 @@ public final class MqttConnectVariableHeader {
this.isWillFlag = isWillFlag;
this.isCleanSession = isCleanSession;
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
this.properties = MqttProperties.withEmptyDefaults(properties);
}
public String name() {
@ -90,6 +115,10 @@ public final class MqttConnectVariableHeader {
return keepAliveTimeSeconds;
}
public MqttProperties properties() {
return properties;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
import io.netty.util.CharsetUtil;
@ -31,11 +32,15 @@ import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
/**
* Decodes Mqtt messages from bytes, following
* <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">
* the MQTT protocol specification v3.1</a>
* the MQTT protocol specification
* <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">v3.1</a>
* or
* <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">v5.0</a>, depending on the
* version specified in the CONNECT message that first goes through the channel.
*/
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
@ -72,7 +77,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
switch (state()) {
case READ_FIXED_HEADER: try {
mqttFixedHeader = decodeFixedHeader(buffer);
mqttFixedHeader = decodeFixedHeader(ctx, buffer);
bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
checkpoint(DecoderState.READ_VARIABLE_HEADER);
// fall through
@ -82,10 +87,10 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
}
case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
final Result<?> decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
@ -98,6 +103,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
case READ_PAYLOAD: try {
final Result<?> decodedPayload =
decodePayload(
ctx,
buffer,
mqttFixedHeader.messageType(),
bytesRemainingInVariablePart,
@ -142,7 +148,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
* @param buffer the buffer to decode from
* @return the fixed header
*/
private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
short b1 = buffer.readUnsignedByte();
MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
@ -167,7 +173,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
}
MqttFixedHeader decodedFixedHeader =
new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
}
/**
@ -176,44 +182,54 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
* @param mqttFixedHeader MqttFixedHeader of the same message
* @return the variable header
*/
private static Result<?> decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
private Result<?> decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case CONNECT:
return decodeConnectionVariableHeader(buffer);
return decodeConnectionVariableHeader(ctx, buffer);
case CONNACK:
return decodeConnAckVariableHeader(buffer);
return decodeConnAckVariableHeader(ctx, buffer);
case SUBSCRIBE:
case UNSUBSCRIBE:
case SUBSCRIBE:
case SUBACK:
case UNSUBACK:
return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
case PUBACK:
case PUBREC:
case PUBCOMP:
case PUBREL:
return decodeMessageIdVariableHeader(buffer);
return decodePubReplyMessage(buffer);
case PUBLISH:
return decodePublishVariableHeader(buffer, mqttFixedHeader);
return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
case DISCONNECT:
case AUTH:
return decodeReasonCodeAndPropertiesVariableHeader(buffer);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
// Empty variable header
return new Result<>(null, 0);
default:
//shouldn't reach here
throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
}
return new Result<>(null, 0); //should never reach here
}
private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(
ChannelHandlerContext ctx,
ByteBuf buffer) {
final Result<String> protoString = decodeString(buffer);
int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
final byte protocolLevel = buffer.readByte();
numberOfBytesConsumed += 1;
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
MqttCodecUtil.setMqttVersion(ctx, version);
final int b1 = buffer.readUnsignedByte();
numberOfBytesConsumed += 1;
@ -227,7 +243,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
final int willQos = (b1 & 0x18) >> 3;
final boolean willFlag = (b1 & 0x04) == 0x04;
final boolean cleanSession = (b1 & 0x02) == 0x02;
if (mqttVersion == MqttVersion.MQTT_3_1_1) {
if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
if (!zeroReservedFlag) {
// MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
@ -237,25 +253,48 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
}
}
final MqttProperties properties;
if (version == MqttVersion.MQTT_5) {
final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
properties = propertiesResult.value;
numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
} else {
properties = MqttProperties.NO_PROPERTIES;
}
final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
mqttVersion.protocolName(),
mqttVersion.protocolLevel(),
version.protocolName(),
version.protocolLevel(),
hasUserName,
hasPassword,
willRetain,
willQos,
willFlag,
cleanSession,
keepAlive.value);
keepAlive.value,
properties);
return new Result<>(mqttConnectVariableHeader, numberOfBytesConsumed);
}
private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(ByteBuf buffer) {
private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(
ChannelHandlerContext ctx,
ByteBuf buffer) {
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
byte returnCode = buffer.readByte();
final int numberOfBytesConsumed = 2;
int numberOfBytesConsumed = 2;
final MqttProperties properties;
if (mqttVersion == MqttVersion.MQTT_5) {
final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
properties = propertiesResult.value;
numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
} else {
properties = MqttProperties.NO_PROPERTIES;
}
final MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent);
new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
return new Result<>(mqttConnAckVariableHeader, numberOfBytesConsumed);
}
@ -266,9 +305,89 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
messageId.numberOfBytesConsumed);
}
private static Result<MqttPublishVariableHeader> decodePublishVariableHeader(
private static Result<MqttMessageIdAndPropertiesVariableHeader> decodeMessageIdAndPropertiesVariableHeader(
ChannelHandlerContext ctx,
ByteBuf buffer) {
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
final Result<Integer> packetId = decodeMessageId(buffer);
final MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader;
final int mqtt5Consumed;
if (mqttVersion == MqttVersion.MQTT_5) {
final Result<MqttProperties> properties = decodeProperties(buffer);
mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId.value, properties.value);
mqtt5Consumed = properties.numberOfBytesConsumed;
} else {
mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId.value,
MqttProperties.NO_PROPERTIES);
mqtt5Consumed = 0;
}
return new Result<MqttMessageIdAndPropertiesVariableHeader>(mqttVariableHeader,
packetId.numberOfBytesConsumed + mqtt5Consumed);
}
private Result<MqttPubReplyMessageVariableHeader> decodePubReplyMessage(ByteBuf buffer) {
final Result<Integer> packetId = decodeMessageId(buffer);
final MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader;
final int consumed;
if (bytesRemainingInVariablePart > 3) {
final byte reasonCode = buffer.readByte();
final Result<MqttProperties> properties = decodeProperties(buffer);
mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value,
reasonCode,
properties.value);
consumed = packetId.numberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
} else if (bytesRemainingInVariablePart > 2) {
final byte reasonCode = buffer.readByte();
mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value,
reasonCode,
MqttProperties.NO_PROPERTIES);
consumed = packetId.numberOfBytesConsumed + 1;
} else {
mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value,
(byte) 0,
MqttProperties.NO_PROPERTIES);
consumed = packetId.numberOfBytesConsumed;
}
return new Result<MqttPubReplyMessageVariableHeader>(mqttPubAckVariableHeader, consumed);
}
private Result<MqttReasonCodeAndPropertiesVariableHeader> decodeReasonCodeAndPropertiesVariableHeader(
ByteBuf buffer) {
final byte reasonCode;
final MqttProperties properties;
final int consumed;
if (bytesRemainingInVariablePart > 1) {
reasonCode = buffer.readByte();
final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
properties = propertiesResult.value;
consumed = 1 + propertiesResult.numberOfBytesConsumed;
} else if (bytesRemainingInVariablePart > 0) {
reasonCode = buffer.readByte();
properties = MqttProperties.NO_PROPERTIES;
consumed = 1;
} else {
reasonCode = 0;
properties = MqttProperties.NO_PROPERTIES;
consumed = 0;
}
final MqttReasonCodeAndPropertiesVariableHeader mqttReasonAndPropsVariableHeader =
new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
return new Result<MqttReasonCodeAndPropertiesVariableHeader>(
mqttReasonAndPropsVariableHeader,
consumed);
}
private Result<MqttPublishVariableHeader> decodePublishVariableHeader(
ChannelHandlerContext ctx,
ByteBuf buffer,
MqttFixedHeader mqttFixedHeader) {
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
final Result<String> decodedTopic = decodeString(buffer);
if (!isValidPublishTopicName(decodedTopic.value)) {
throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
@ -281,8 +400,18 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
messageId = decodedMessageId.value;
numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
}
final MqttProperties properties;
if (mqttVersion == MqttVersion.MQTT_5) {
final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
properties = propertiesResult.value;
numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
} else {
properties = MqttProperties.NO_PROPERTIES;
}
final MqttPublishVariableHeader mqttPublishVariableHeader =
new MqttPublishVariableHeader(decodedTopic.value, messageId);
new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
return new Result<>(mqttPublishVariableHeader, numberOfBytesConsumed);
}
@ -304,6 +433,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
* @return the payload
*/
private static Result<?> decodePayload(
ChannelHandlerContext ctx,
ByteBuf buffer,
MqttMessageType messageType,
int bytesRemainingInVariablePart,
@ -321,6 +451,9 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
case UNSUBSCRIBE:
return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
case UNSUBACK:
return decodeUnsubAckPayload(ctx, buffer, bytesRemainingInVariablePart);
case PUBLISH:
return decodePublishPayload(buffer, bytesRemainingInVariablePart);
@ -344,11 +477,22 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
Result<String> decodedWillTopic = null;
Result<byte[]> decodedWillMessage = null;
final MqttProperties willProperties;
if (mqttConnectVariableHeader.isWillFlag()) {
if (mqttVersion == MqttVersion.MQTT_5) {
final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
willProperties = propertiesResult.value;
numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
} else {
willProperties = MqttProperties.NO_PROPERTIES;
}
decodedWillTopic = decodeString(buffer, 0, 32767);
numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
decodedWillMessage = decodeByteArray(buffer);
numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
} else {
willProperties = MqttProperties.NO_PROPERTIES;
}
Result<String> decodedUserName = null;
Result<byte[]> decodedPassword = null;
@ -364,6 +508,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
final MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(
decodedClientId.value,
willProperties,
decodedWillTopic != null ? decodedWillTopic.value : null,
decodedWillMessage != null ? decodedWillMessage.value : null,
decodedUserName != null ? decodedUserName.value : null,
@ -379,9 +524,21 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
final Result<String> decodedTopicName = decodeString(buffer);
numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
int qos = buffer.readUnsignedByte() & 0x03;
//See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details
final short optionByte = buffer.readUnsignedByte();
MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
noLocal,
retainAsPublished,
retainHandling);
numberOfBytesConsumed++;
subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos)));
subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
}
return new Result<>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
}
@ -402,6 +559,20 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
return new Result<>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
}
private static Result<MqttUnsubAckPayload> decodeUnsubAckPayload(
ChannelHandlerContext ctx,
ByteBuf buffer,
int bytesRemainingInVariablePart) {
final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
short reasonCode = buffer.readUnsignedByte();
numberOfBytesConsumed++;
reasonCodes.add(reasonCode);
}
return new Result<MqttUnsubAckPayload>(new MqttUnsubAckPayload(reasonCodes), numberOfBytesConsumed);
}
private static Result<MqttUnsubscribePayload> decodeUnsubscribePayload(
ByteBuf buffer,
int bytesRemainingInVariablePart) {
@ -464,6 +635,31 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
return new Result<>(result, numberOfBytesConsumed);
}
/**
* See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules
*
* @param buffer the buffer to decode from
* @return decoded integer
*/
private static Result<Integer> decodeVariableByteInteger(ByteBuf buffer) {
int remainingLength = 0;
int multiplier = 1;
short digit;
int loops = 0;
do {
digit = buffer.readUnsignedByte();
remainingLength += (digit & 127) * multiplier;
multiplier *= 128;
loops++;
} while ((digit & 128) != 0 && loops < 4);
// MQTT protocol limits Remaining Length to 4 bytes
if (loops == 4 && (digit & 128) != 0) {
return null;
}
return new Result<Integer>(remainingLength, loops);
}
private static final class Result<T> {
private final T value;
@ -474,4 +670,82 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
this.numberOfBytesConsumed = numberOfBytesConsumed;
}
}
private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
final Result<Integer> propertiesLength = decodeVariableByteInteger(buffer);
int totalPropertiesLength = propertiesLength.value;
int numberOfBytesConsumed = propertiesLength.numberOfBytesConsumed;
MqttProperties decodedProperties = new MqttProperties();
while (numberOfBytesConsumed < totalPropertiesLength) {
Result<Integer> propertyId = decodeVariableByteInteger(buffer);
numberOfBytesConsumed += propertyId.numberOfBytesConsumed;
MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyId.value);
switch (propertyType) {
case PAYLOAD_FORMAT_INDICATOR:
case REQUEST_PROBLEM_INFORMATION:
case REQUEST_RESPONSE_INFORMATION:
case MAXIMUM_QOS:
case RETAIN_AVAILABLE:
case WILDCARD_SUBSCRIPTION_AVAILABLE:
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
case SHARED_SUBSCRIPTION_AVAILABLE:
final int b1 = buffer.readUnsignedByte();
numberOfBytesConsumed++;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, b1));
break;
case SERVER_KEEP_ALIVE:
case RECEIVE_MAXIMUM:
case TOPIC_ALIAS_MAXIMUM:
case TOPIC_ALIAS:
final Result<Integer> int2BytesResult = decodeMsbLsb(buffer);
numberOfBytesConsumed += int2BytesResult.numberOfBytesConsumed;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, int2BytesResult.value));
break;
case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL:
case WILL_DELAY_INTERVAL:
case MAXIMUM_PACKET_SIZE:
final int maxPacketSize = buffer.readInt();
numberOfBytesConsumed += 4;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, maxPacketSize));
break;
case SUBSCRIPTION_IDENTIFIER:
Result<Integer> vbIntegerResult = decodeVariableByteInteger(buffer);
numberOfBytesConsumed += vbIntegerResult.numberOfBytesConsumed;
decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, vbIntegerResult.value));
break;
case CONTENT_TYPE:
case RESPONSE_TOPIC:
case ASSIGNED_CLIENT_IDENTIFIER:
case AUTHENTICATION_METHOD:
case RESPONSE_INFORMATION:
case SERVER_REFERENCE:
case REASON_STRING:
final Result<String> stringResult = decodeString(buffer);
numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
decodedProperties.add(new MqttProperties.StringProperty(propertyId.value, stringResult.value));
break;
case USER_PROPERTY:
final Result<String> keyResult = decodeString(buffer);
final Result<String> valueResult = decodeString(buffer);
numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
break;
case CORRELATION_DATA:
case AUTHENTICATION_DATA:
final Result<byte[]> binaryDataResult = decodeByteArray(buffer);
numberOfBytesConsumed += binaryDataResult.numberOfBytesConsumed;
decodedProperties.add(new MqttProperties.BinaryProperty(propertyId.value, binaryDataResult.value));
break;
default:
//shouldn't reach here
throw new DecoderException("Unknown property type: " + propertyType);
}
}
return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
}
}

View File

@ -18,9 +18,10 @@ package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.EmptyArrays;
@ -32,6 +33,8 @@ import static io.netty.handler.codec.mqtt.MqttCodecUtil.*;
/**
* Encodes Mqtt messages into bytes following the protocol specification v3.1
* as described here <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
* or v5.0 as described here <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html">MQTTv5.0</a> -
* depending on the version specified in the first CONNECT message that goes through the channel.
*/
@ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
@ -42,49 +45,57 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
out.add(doEncode(ctx.alloc(), msg));
out.add(doEncode(ctx, msg));
}
/**
* This is the main encoding method.
* It's only visible for testing.
*
* @param byteBufAllocator Allocates ByteBuf
* @param message MQTT message to encode
* @return ByteBuf with encoded bytes
*/
static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {
static ByteBuf doEncode(ChannelHandlerContext ctx,
MqttMessage message) {
switch (message.fixedHeader().messageType()) {
case CONNECT:
return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);
return encodeConnectMessage(ctx, (MqttConnectMessage) message);
case CONNACK:
return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);
return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
case PUBLISH:
return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);
return encodePublishMessage(ctx, (MqttPublishMessage) message);
case SUBSCRIBE:
return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);
return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
case UNSUBSCRIBE:
return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);
return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
case SUBACK:
return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);
return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
case UNSUBACK:
if (message instanceof MqttUnsubAckMessage) {
return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
}
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);
return encodePubReplyMessage(ctx, message);
case DISCONNECT:
case AUTH:
return encodeReasonCodePlusPropertiesMessage(ctx, message);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);
return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
default:
throw new IllegalArgumentException(
@ -93,7 +104,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
}
private static ByteBuf encodeConnectMessage(
ByteBufAllocator byteBufAllocator,
ChannelHandlerContext ctx,
MqttConnectMessage message) {
int payloadBufferSize = 0;
@ -102,10 +113,11 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
MqttConnectPayload payload = message.payload();
MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
(byte) variableHeader.version());
MqttCodecUtil.setMqttVersion(ctx, mqttVersion);
// as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0
if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
throw new DecoderException("Without a username, the password MUST be not set");
throw new EncoderException("Without a username, the password MUST be not set");
}
// Client id
@ -138,40 +150,62 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
payloadBufferSize += 2 + passwordBytes.length;
}
// Fixed header
// Fixed and variable header
byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(
mqttVersion,
ctx.alloc(),
message.variableHeader().properties());
try {
final ByteBuf willPropertiesBuf;
if (variableHeader.isWillFlag()) {
willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
payloadBufferSize += willPropertiesBuf.readableBytes();
} else {
willPropertiesBuf = Unpooled.EMPTY_BUFFER;
}
try {
int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
buf.writeShort(protocolNameBytes.length);
buf.writeBytes(protocolNameBytes);
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.writeByte(variableHeader.version());
buf.writeByte(getConnVariableHeaderFlag(variableHeader));
buf.writeShort(variableHeader.keepAliveTimeSeconds());
buf.writeShort(protocolNameBytes.length);
buf.writeBytes(protocolNameBytes);
// Payload
buf.writeShort(clientIdentifierBytes.length);
buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
if (variableHeader.isWillFlag()) {
buf.writeShort(willTopicBytes.length);
buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
buf.writeShort(willMessageBytes.length);
buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
buf.writeByte(variableHeader.version());
buf.writeByte(getConnVariableHeaderFlag(variableHeader));
buf.writeShort(variableHeader.keepAliveTimeSeconds());
buf.writeBytes(propertiesBuf);
// Payload
buf.writeShort(clientIdentifierBytes.length);
buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
if (variableHeader.isWillFlag()) {
buf.writeBytes(willPropertiesBuf);
buf.writeShort(willTopicBytes.length);
buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
buf.writeShort(willMessageBytes.length);
buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
}
if (variableHeader.hasUserName()) {
buf.writeShort(userNameBytes.length);
buf.writeBytes(userNameBytes, 0, userNameBytes.length);
}
if (variableHeader.hasPassword()) {
buf.writeShort(passwordBytes.length);
buf.writeBytes(passwordBytes, 0, passwordBytes.length);
}
return buf;
} finally {
willPropertiesBuf.release();
}
} finally {
propertiesBuf.release();
}
if (variableHeader.hasUserName()) {
buf.writeShort(userNameBytes.length);
buf.writeBytes(userNameBytes, 0, userNameBytes.length);
}
if (variableHeader.hasPassword()) {
buf.writeShort(passwordBytes.length);
buf.writeBytes(passwordBytes, 0, passwordBytes.length);
}
return buf;
}
private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
@ -196,114 +230,193 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
}
private static ByteBuf encodeConnAckMessage(
ByteBufAllocator byteBufAllocator,
ChannelHandlerContext ctx,
MqttConnAckMessage message) {
ByteBuf buf = byteBufAllocator.buffer(4);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
buf.writeByte(2);
buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(),
message.variableHeader().properties());
return buf;
try {
ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
buf.writeBytes(propertiesBuf);
return buf;
} finally {
propertiesBuf.release();
}
}
private static ByteBuf encodeSubscribeMessage(
ByteBufAllocator byteBufAllocator,
ChannelHandlerContext ctx,
MqttSubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(),
message.idAndPropertiesVariableHeader().properties());
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttSubscribePayload payload = message.payload();
try {
final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
int payloadBufferSize = 0;
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
payloadBufferSize += 1;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttSubscribePayload payload = message.payload();
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
payloadBufferSize += 1;
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
buf.writeBytes(propertiesBuf);
// Payload
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
writeUTF8String(buf, topic.topicName());
final MqttSubscriptionOption option = topic.option();
int optionEncoded = option.retainHandling().value() << 4;
if (option.isRetainAsPublished()) {
optionEncoded |= 0x08;
}
if (option.isNoLocal()) {
optionEncoded |= 0x04;
}
optionEncoded |= option.qos().value();
buf.writeByte(optionEncoded);
}
return buf;
} finally {
propertiesBuf.release();
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
// Payload
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
buf.writeByte(topic.qualityOfService().value());
}
return buf;
}
private static ByteBuf encodeUnsubscribeMessage(
ByteBufAllocator byteBufAllocator,
ChannelHandlerContext ctx,
MqttUnsubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(),
message.idAndPropertiesVariableHeader().properties());
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttUnsubscribePayload payload = message.payload();
try {
final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
int payloadBufferSize = 0;
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttUnsubscribePayload payload = message.payload();
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
buf.writeBytes(propertiesBuf);
// Payload
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
}
return buf;
} finally {
propertiesBuf.release();
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
// Payload
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
}
return buf;
}
private static ByteBuf encodeSubAckMessage(
ByteBufAllocator byteBufAllocator,
ChannelHandlerContext ctx,
MqttSubAckMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = message.payload().grantedQoSLevels().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(message.variableHeader().messageId());
for (int qos : message.payload().grantedQoSLevels()) {
buf.writeByte(qos);
}
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(),
message.idAndPropertiesVariableHeader().properties());
try {
int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
int payloadBufferSize = message.payload().grantedQoSLevels().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(message.variableHeader().messageId());
buf.writeBytes(propertiesBuf);
for (int qos : message.payload().grantedQoSLevels()) {
buf.writeByte(qos);
}
return buf;
return buf;
} finally {
propertiesBuf.release();
}
}
private static ByteBuf encodeUnsubAckMessage(
ChannelHandlerContext ctx,
MqttUnsubAckMessage message) {
if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(),
message.idAndPropertiesVariableHeader().properties());
try {
int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
int payloadBufferSize = message.payload().unsubscribeReasonCodes().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(message.variableHeader().messageId());
buf.writeBytes(propertiesBuf);
for (Short reasonCode : message.payload().unsubscribeReasonCodes()) {
buf.writeByte(reasonCode);
}
return buf;
} finally {
propertiesBuf.release();
}
} else {
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
}
}
private static ByteBuf encodePublishMessage(
ByteBufAllocator byteBufAllocator,
ChannelHandlerContext ctx,
MqttPublishMessage message) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuf payload = message.payload().duplicate();
@ -311,23 +424,76 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
String topicName = variableHeader.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
int variableHeaderBufferSize = 2 + topicNameBytes.length +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
int payloadBufferSize = payload.readableBytes();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
ctx.alloc(),
message.variableHeader().properties());
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes);
if (mqttFixedHeader.qosLevel().value() > 0) {
buf.writeShort(variableHeader.packetId());
try {
int variableHeaderBufferSize = 2 + topicNameBytes.length +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes();
int payloadBufferSize = payload.readableBytes();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes);
if (mqttFixedHeader.qosLevel().value() > 0) {
buf.writeShort(variableHeader.packetId());
}
buf.writeBytes(propertiesBuf);
buf.writeBytes(payload);
return buf;
} finally {
propertiesBuf.release();
}
buf.writeBytes(payload);
}
return buf;
private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
MqttMessage message) {
if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPubReplyMessageVariableHeader variableHeader =
(MqttPubReplyMessageVariableHeader) message.variableHeader();
int msgId = variableHeader.messageId();
final ByteBuf propertiesBuf;
final boolean includeReasonCode;
final int variableHeaderBufferSize;
final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
if (mqttVersion == MqttVersion.MQTT_5 &&
(variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
!variableHeader.properties().isEmpty())) {
propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
includeReasonCode = true;
variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
} else {
propertiesBuf = Unpooled.EMPTY_BUFFER;
includeReasonCode = false;
variableHeaderBufferSize = 2;
}
try {
final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
buf.writeShort(msgId);
if (includeReasonCode) {
buf.writeByte(variableHeader.reasonCode());
}
buf.writeBytes(propertiesBuf);
return buf;
} finally {
propertiesBuf.release();
}
} else {
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
}
}
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
@ -347,6 +513,49 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
return buf;
}
private static ByteBuf encodeReasonCodePlusPropertiesMessage(
ChannelHandlerContext ctx,
MqttMessage message) {
if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttReasonCodeAndPropertiesVariableHeader variableHeader =
(MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
final ByteBuf propertiesBuf;
final boolean includeReasonCode;
final int variableHeaderBufferSize;
if (mqttVersion == MqttVersion.MQTT_5 &&
(variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
!variableHeader.properties().isEmpty())) {
propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
includeReasonCode = true;
variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
} else {
propertiesBuf = Unpooled.EMPTY_BUFFER;
includeReasonCode = false;
variableHeaderBufferSize = 0;
}
try {
final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
if (includeReasonCode) {
buf.writeByte(variableHeader.reasonCode());
}
buf.writeBytes(propertiesBuf);
return buf;
} finally {
propertiesBuf.release();
}
} else {
return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
}
}
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
ByteBufAllocator byteBufAllocator,
MqttMessage message) {
@ -358,6 +567,104 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
return buf;
}
private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
ByteBufAllocator byteBufAllocator,
MqttProperties mqttProperties) {
if (mqttVersion == MqttVersion.MQTT_5) {
return encodeProperties(byteBufAllocator, mqttProperties);
}
return Unpooled.EMPTY_BUFFER;
}
private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
MqttProperties mqttProperties) {
ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
// encode also the Properties part
try {
ByteBuf propertiesBuf = byteBufAllocator.buffer();
try {
for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
MqttProperties.MqttPropertyType propertyType =
MqttProperties.MqttPropertyType.valueOf(property.propertyId);
switch (propertyType) {
case PAYLOAD_FORMAT_INDICATOR:
case REQUEST_PROBLEM_INFORMATION:
case REQUEST_RESPONSE_INFORMATION:
case MAXIMUM_QOS:
case RETAIN_AVAILABLE:
case WILDCARD_SUBSCRIPTION_AVAILABLE:
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
case SHARED_SUBSCRIPTION_AVAILABLE:
writeVariableLengthInt(propertiesBuf, property.propertyId);
final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
propertiesBuf.writeByte(bytePropValue);
break;
case SERVER_KEEP_ALIVE:
case RECEIVE_MAXIMUM:
case TOPIC_ALIAS_MAXIMUM:
case TOPIC_ALIAS:
writeVariableLengthInt(propertiesBuf, property.propertyId);
final short twoBytesInPropValue =
((MqttProperties.IntegerProperty) property).value.shortValue();
propertiesBuf.writeShort(twoBytesInPropValue);
break;
case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL:
case WILL_DELAY_INTERVAL:
case MAXIMUM_PACKET_SIZE:
writeVariableLengthInt(propertiesBuf, property.propertyId);
final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
propertiesBuf.writeInt(fourBytesIntPropValue);
break;
case SUBSCRIPTION_IDENTIFIER:
writeVariableLengthInt(propertiesBuf, property.propertyId);
final int vbi = ((MqttProperties.IntegerProperty) property).value;
writeVariableLengthInt(propertiesBuf, vbi);
break;
case CONTENT_TYPE:
case RESPONSE_TOPIC:
case ASSIGNED_CLIENT_IDENTIFIER:
case AUTHENTICATION_METHOD:
case RESPONSE_INFORMATION:
case SERVER_REFERENCE:
case REASON_STRING:
writeVariableLengthInt(propertiesBuf, property.propertyId);
writeUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
break;
case USER_PROPERTY:
final List<MqttProperties.StringPair> pairs =
((MqttProperties.UserProperties) property).value;
for (MqttProperties.StringPair pair : pairs) {
writeVariableLengthInt(propertiesBuf, property.propertyId);
writeUTF8String(propertiesBuf, pair.key);
writeUTF8String(propertiesBuf, pair.value);
}
break;
case CORRELATION_DATA:
case AUTHENTICATION_DATA:
writeVariableLengthInt(propertiesBuf, property.propertyId);
final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
propertiesBuf.writeShort(binaryPropValue.length);
propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
break;
default:
//shouldn't reach here
throw new EncoderException("Unknown property type: " + propertyType);
}
}
writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
propertiesHeaderBuf.writeBytes(propertiesBuf);
return propertiesHeaderBuf;
} finally {
propertiesBuf.release();
}
} catch (RuntimeException e) {
propertiesHeaderBuf.release();
throw e;
}
}
private static int getFixedHeaderByte1(MqttFixedHeader header) {
int ret = 0;
ret |= header.messageType().value() << 4;
@ -382,6 +689,12 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
} while (num > 0);
}
static void writeUTF8String(ByteBuf buf, String s) {
byte[] sBytes = encodeStringUtf8(s);
buf.writeShort(sBytes.length);
buf.writeBytes(sBytes, 0, sBytes.length);
}
private static int getVariableLengthInt(int num) {
int count = 0;
do {

View File

@ -20,6 +20,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public final class MqttMessageBuilders {
@ -30,6 +31,7 @@ public final class MqttMessageBuilders {
private MqttQoS qos;
private ByteBuf payload;
private int messageId;
private MqttProperties mqttProperties;
PublishBuilder() {
}
@ -59,9 +61,15 @@ public final class MqttMessageBuilders {
return this;
}
public PublishBuilder properties(MqttProperties properties) {
this.mqttProperties = properties;
return this;
}
public MqttPublishMessage build() {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0);
MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId);
MqttPublishVariableHeader mqttVariableHeader =
new MqttPublishVariableHeader(topic, messageId, mqttProperties);
return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, Unpooled.buffer().writeBytes(payload));
}
}
@ -74,6 +82,7 @@ public final class MqttMessageBuilders {
private boolean hasUser;
private boolean hasPassword;
private int keepAliveSecs;
private MqttProperties willProperties = MqttProperties.NO_PROPERTIES;
private boolean willFlag;
private boolean willRetain;
private MqttQoS willQos = MqttQoS.AT_MOST_ONCE;
@ -81,6 +90,7 @@ public final class MqttMessageBuilders {
private byte[] willMessage;
private String username;
private byte[] password;
private MqttProperties properties = MqttProperties.NO_PROPERTIES;
ConnectBuilder() {
}
@ -139,6 +149,11 @@ public final class MqttMessageBuilders {
return this;
}
public ConnectBuilder willProperties(MqttProperties willProperties) {
this.willProperties = willProperties;
return this;
}
public ConnectBuilder hasUser(boolean value) {
this.hasUser = value;
return this;
@ -170,6 +185,11 @@ public final class MqttMessageBuilders {
return this;
}
public ConnectBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttConnectMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
@ -183,9 +203,10 @@ public final class MqttMessageBuilders {
willQos.value(),
willFlag,
cleanSession,
keepAliveSecs);
keepAliveSecs,
properties);
MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(clientId, willTopic, willMessage, username, password);
new MqttConnectPayload(clientId, willProperties, willTopic, willMessage, username, password);
return new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
}
}
@ -194,36 +215,54 @@ public final class MqttMessageBuilders {
private List<MqttTopicSubscription> subscriptions;
private int messageId;
private MqttProperties properties;
SubscribeBuilder() {
}
public SubscribeBuilder addSubscription(MqttQoS qos, String topic) {
if (subscriptions == null) {
subscriptions = new ArrayList<>(5);
}
ensureSubscriptionsExist();
subscriptions.add(new MqttTopicSubscription(topic, qos));
return this;
}
public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) {
ensureSubscriptionsExist();
subscriptions.add(new MqttTopicSubscription(topic, option));
return this;
}
public SubscribeBuilder messageId(int messageId) {
this.messageId = messageId;
return this;
}
public SubscribeBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttSubscribeMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader =
new MqttMessageIdAndPropertiesVariableHeader(messageId, properties);
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions);
return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
}
private void ensureSubscriptionsExist() {
if (subscriptions == null) {
subscriptions = new ArrayList<MqttTopicSubscription>(5);
}
}
}
public static final class UnsubscribeBuilder {
private List<String> topicFilters;
private int messageId;
private MqttProperties properties;
UnsubscribeBuilder() {
}
@ -241,10 +280,16 @@ public final class MqttMessageBuilders {
return this;
}
public UnsubscribeBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttUnsubscribeMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader =
new MqttMessageIdAndPropertiesVariableHeader(messageId, properties);
MqttUnsubscribePayload mqttSubscribePayload = new MqttUnsubscribePayload(topicFilters);
return new MqttUnsubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload);
}
@ -254,6 +299,7 @@ public final class MqttMessageBuilders {
private MqttConnectReturnCode returnCode;
private boolean sessionPresent;
private MqttProperties properties = MqttProperties.NO_PROPERTIES;
ConnAckBuilder() {
}
@ -268,15 +314,196 @@ public final class MqttMessageBuilders {
return this;
}
public ConnAckBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttConnAckMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(returnCode, sessionPresent);
new MqttConnAckVariableHeader(returnCode, sessionPresent, properties);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}
}
public static final class PubAckBuilder {
private short packetId;
private byte reasonCode;
private MqttProperties properties;
PubAckBuilder() {
}
public PubAckBuilder reasonCode(byte reasonCode) {
this.reasonCode = reasonCode;
return this;
}
public PubAckBuilder packetId(short packetId) {
this.packetId = packetId;
return this;
}
public PubAckBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader =
new MqttPubReplyMessageVariableHeader(packetId, reasonCode, properties);
return new MqttMessage(mqttFixedHeader, mqttPubAckVariableHeader);
}
}
public static final class SubAckBuilder {
private short packetId;
private MqttProperties properties;
private final List<MqttQoS> grantedQoses = new ArrayList<MqttQoS>();
SubAckBuilder() {
}
public SubAckBuilder packetId(short packetId) {
this.packetId = packetId;
return this;
}
public SubAckBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public SubAckBuilder addGrantedQos(MqttQoS qos) {
this.grantedQoses.add(qos);
return this;
}
public SubAckBuilder addGrantedQoses(MqttQoS... qoses) {
this.grantedQoses.addAll(Arrays.asList(qoses));
return this;
}
public MqttSubAckMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdAndPropertiesVariableHeader mqttSubAckVariableHeader =
new MqttMessageIdAndPropertiesVariableHeader(packetId, properties);
//transform to primitive types
int[] grantedQoses = new int[this.grantedQoses.size()];
int i = 0;
for (MqttQoS grantedQos : this.grantedQoses) {
grantedQoses[i++] = grantedQos.value();
}
MqttSubAckPayload subAckPayload = new MqttSubAckPayload(grantedQoses);
return new MqttSubAckMessage(mqttFixedHeader, mqttSubAckVariableHeader, subAckPayload);
}
}
public static final class UnsubAckBuilder {
private short packetId;
private MqttProperties properties;
private final List<Short> reasonCodes = new ArrayList<Short>();
UnsubAckBuilder() {
}
public UnsubAckBuilder packetId(short packetId) {
this.packetId = packetId;
return this;
}
public UnsubAckBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public UnsubAckBuilder addReasonCode(short reasonCode) {
this.reasonCodes.add(reasonCode);
return this;
}
public UnsubAckBuilder addReasonCodes(Short... reasonCodes) {
this.reasonCodes.addAll(Arrays.asList(reasonCodes));
return this;
}
public MqttUnsubAckMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdAndPropertiesVariableHeader mqttSubAckVariableHeader =
new MqttMessageIdAndPropertiesVariableHeader(packetId, properties);
MqttUnsubAckPayload subAckPayload = new MqttUnsubAckPayload(reasonCodes);
return new MqttUnsubAckMessage(mqttFixedHeader, mqttSubAckVariableHeader, subAckPayload);
}
}
public static final class DisconnectBuilder {
private MqttProperties properties;
private byte reasonCode;
DisconnectBuilder() {
}
public DisconnectBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public DisconnectBuilder reasonCode(byte reasonCode) {
this.reasonCode = reasonCode;
return this;
}
public MqttMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttReasonCodeAndPropertiesVariableHeader mqttDisconnectVariableHeader =
new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
return new MqttMessage(mqttFixedHeader, mqttDisconnectVariableHeader);
}
}
public static final class AuthBuilder {
private MqttProperties properties;
private byte reasonCode;
AuthBuilder() {
}
public AuthBuilder properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public AuthBuilder reasonCode(byte reasonCode) {
this.reasonCode = reasonCode;
return this;
}
public MqttMessage build() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.AUTH, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttReasonCodeAndPropertiesVariableHeader mqttAuthVariableHeader =
new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
return new MqttMessage(mqttFixedHeader, mqttAuthVariableHeader);
}
}
public static ConnectBuilder connect() {
return new ConnectBuilder();
}
@ -297,6 +524,26 @@ public final class MqttMessageBuilders {
return new UnsubscribeBuilder();
}
public static PubAckBuilder pubAck() {
return new PubAckBuilder();
}
public static SubAckBuilder subAck() {
return new SubAckBuilder();
}
public static UnsubAckBuilder unsubAck() {
return new UnsubAckBuilder();
}
public static DisconnectBuilder disconnect() {
return new DisconnectBuilder();
}
public static AuthBuilder auth() {
return new AuthBuilder();
}
private MqttMessageBuilders() {
}
}

View File

@ -38,7 +38,7 @@ public final class MqttMessageFactory {
case SUBSCRIBE:
return new MqttSubscribeMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttMessageIdAndPropertiesVariableHeader) variableHeader,
(MqttSubscribePayload) payload);
case SUBACK:
@ -50,12 +50,13 @@ public final class MqttMessageFactory {
case UNSUBACK:
return new MqttUnsubAckMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader);
(MqttMessageIdVariableHeader) variableHeader,
(MqttUnsubAckPayload) payload);
case UNSUBSCRIBE:
return new MqttUnsubscribeMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttMessageIdAndPropertiesVariableHeader) variableHeader,
(MqttUnsubscribePayload) payload);
case PUBLISH:
@ -65,17 +66,24 @@ public final class MqttMessageFactory {
(ByteBuf) payload);
case PUBACK:
//Having MqttPubReplyMessageVariableHeader or MqttMessageIdVariableHeader
return new MqttPubAckMessage(mqttFixedHeader, (MqttMessageIdVariableHeader) variableHeader);
case PUBREC:
case PUBREL:
case PUBCOMP:
//Having MqttPubReplyMessageVariableHeader or MqttMessageIdVariableHeader
return new MqttMessage(mqttFixedHeader, variableHeader);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
return new MqttMessage(mqttFixedHeader);
case DISCONNECT:
case AUTH:
//Having MqttReasonCodeAndPropertiesVariableHeader
return new MqttMessage(mqttFixedHeader,
(MqttReasonCodeAndPropertiesVariableHeader) variableHeader);
default:
throw new IllegalArgumentException("unknown message type: " + mqttFixedHeader.messageType());
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable Header containing, Packet Id and Properties as in MQTT v5 spec.
*/
public final class MqttMessageIdAndPropertiesVariableHeader extends MqttMessageIdVariableHeader {
private final MqttProperties properties;
public MqttMessageIdAndPropertiesVariableHeader(int messageId, MqttProperties properties) {
super(messageId);
if (messageId < 1 || messageId > 0xffff) {
throw new IllegalArgumentException("messageId: " + messageId + " (expected: 1 ~ 65535)");
}
this.properties = MqttProperties.withEmptyDefaults(properties);
}
public MqttProperties properties() {
return properties;
}
@Override
public String toString() {
return StringUtil.simpleClassName(this) + "[" +
"messageId=" + messageId() +
", properties=" + properties +
']';
}
}

View File

@ -22,7 +22,7 @@ import io.netty.util.internal.StringUtil;
* Variable Header containing only Message Id
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#msg-id">MQTTV3.1/msg-id</a>
*/
public final class MqttMessageIdVariableHeader {
public class MqttMessageIdVariableHeader {
private final int messageId;
@ -33,7 +33,7 @@ public final class MqttMessageIdVariableHeader {
return new MqttMessageIdVariableHeader(messageId);
}
private MqttMessageIdVariableHeader(int messageId) {
protected MqttMessageIdVariableHeader(int messageId) {
this.messageId = messageId;
}
@ -49,4 +49,8 @@ public final class MqttMessageIdVariableHeader {
.append(']')
.toString();
}
public MqttMessageIdAndPropertiesVariableHeader withEmptyProperties() {
return new MqttMessageIdAndPropertiesVariableHeader(messageId, MqttProperties.NO_PROPERTIES);
}
}

View File

@ -33,7 +33,8 @@ public enum MqttMessageType {
UNSUBACK(11),
PINGREQ(12),
PINGRESP(13),
DISCONNECT(14);
DISCONNECT(14),
AUTH(15);
private final int value;

View File

@ -0,0 +1,232 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
/**
* MQTT Properties container
* */
public final class MqttProperties {
public enum MqttPropertyType {
// single byte properties
PAYLOAD_FORMAT_INDICATOR(0x01),
REQUEST_PROBLEM_INFORMATION(0x17),
REQUEST_RESPONSE_INFORMATION(0x19),
MAXIMUM_QOS(0x24),
RETAIN_AVAILABLE(0x25),
WILDCARD_SUBSCRIPTION_AVAILABLE(0x28),
SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29),
SHARED_SUBSCRIPTION_AVAILABLE(0x2A),
// two bytes properties
SERVER_KEEP_ALIVE(0x13),
RECEIVE_MAXIMUM(0x21),
TOPIC_ALIAS_MAXIMUM(0x22),
TOPIC_ALIAS(0x23),
// four bytes properties
PUBLICATION_EXPIRY_INTERVAL(0x02),
SESSION_EXPIRY_INTERVAL(0x11),
WILL_DELAY_INTERVAL(0x18),
MAXIMUM_PACKET_SIZE(0x27),
// Variable Byte Integer
SUBSCRIPTION_IDENTIFIER(0x0B),
// UTF-8 Encoded String properties
CONTENT_TYPE(0x03),
RESPONSE_TOPIC(0x08),
ASSIGNED_CLIENT_IDENTIFIER(0x12),
AUTHENTICATION_METHOD(0x15),
RESPONSE_INFORMATION(0x1A),
SERVER_REFERENCE(0x1C),
REASON_STRING(0x1F),
USER_PROPERTY(0x26),
// Binary Data
CORRELATION_DATA(0x09),
AUTHENTICATION_DATA(0x16);
private final int value;
MqttPropertyType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static MqttPropertyType valueOf(int type) {
for (MqttPropertyType t : values()) {
if (t.value == type) {
return t;
}
}
throw new IllegalArgumentException("unknown property type: " + type);
}
}
public static final MqttProperties NO_PROPERTIES = new MqttProperties(
Collections.unmodifiableMap(new HashMap<Integer, MqttProperty>())
);
static MqttProperties withEmptyDefaults(MqttProperties properties) {
if (properties == null) {
return MqttProperties.NO_PROPERTIES;
}
return properties;
}
public abstract static class MqttProperty<T> {
final T value;
final int propertyId;
protected MqttProperty(int propertyId, T value) {
this.propertyId = propertyId;
this.value = value;
}
}
public static final class IntegerProperty extends MqttProperty<Integer> {
public IntegerProperty(int propertyId, Integer value) {
super(propertyId, value);
}
}
public static final class StringProperty extends MqttProperty<String> {
public StringProperty(int propertyId, String value) {
super(propertyId, value);
}
}
public static final class StringPair {
public final String key;
public final String value;
public StringPair(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public int hashCode() {
return key.hashCode() + 31 * value.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
StringPair that = (StringPair) obj;
return that.key.equals(this.key) && that.value.equals(this.value);
}
}
//User properties are the only properties that may be included multiple times and
//are the only properties where ordering is required. Therefore, they need a special handling
public static final class UserProperties extends MqttProperty<List<StringPair>> {
public UserProperties() {
super(MqttPropertyType.USER_PROPERTY.value, new ArrayList<StringPair>());
}
/**
* Create user properties from the collection of the String pair values
*
* @param values string pairs. Collection entries are copied, collection itself isn't shared
*/
public UserProperties(Collection<StringPair> values) {
this();
this.value.addAll(values);
}
public void add(StringPair pair) {
this.value.add(pair);
}
public void add(String key, String value) {
this.value.add(new StringPair(key, value));
}
}
public static final class UserProperty extends MqttProperty<StringPair> {
public UserProperty(String key, String value) {
super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value));
}
}
public static final class BinaryProperty extends MqttProperty<byte[]> {
public BinaryProperty(int propertyId, byte[] value) {
super(propertyId, value);
}
}
public MqttProperties() {
this(new HashMap<Integer, MqttProperty>());
}
private MqttProperties(Map<Integer, MqttProperty> props) {
this.props = props;
}
private final Map<Integer, MqttProperty> props;
public void add(MqttProperty property) {
if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) {
UserProperties userProps = (UserProperties) props.get(property.propertyId);
if (userProps == null) {
userProps = new UserProperties();
props.put(property.propertyId, userProps);
}
if (property instanceof UserProperty) {
userProps.add(((UserProperty) property).value);
} else {
for (StringPair pair: ((UserProperties) property).value) {
userProps.add(pair);
}
}
} else {
props.put(property.propertyId, property);
}
}
public Collection<? extends MqttProperty> listAll() {
return props.values();
}
public boolean isEmpty() {
return props.isEmpty();
}
public MqttProperty getProperty(int propertyId) {
return props.get(propertyId);
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable Header containing Packet Id, reason code and Properties as in MQTT v5 spec.
*/
public final class MqttPubReplyMessageVariableHeader extends MqttMessageIdVariableHeader {
private final byte reasonCode;
private final MqttProperties properties;
public static final byte REASON_CODE_OK = 0;
public MqttPubReplyMessageVariableHeader(int messageId, byte reasonCode, MqttProperties properties) {
super(messageId);
if (messageId < 1 || messageId > 0xffff) {
throw new IllegalArgumentException("messageId: " + messageId + " (expected: 1 ~ 65535)");
}
this.reasonCode = reasonCode;
this.properties = MqttProperties.withEmptyDefaults(properties);
}
public byte reasonCode() {
return reasonCode;
}
public MqttProperties properties() {
return properties;
}
@Override
public String toString() {
return StringUtil.simpleClassName(this) + "[" +
"messageId=" + messageId() +
", reasonCode=" + reasonCode +
", properties=" + properties +
']';
}
}

View File

@ -25,10 +25,16 @@ public final class MqttPublishVariableHeader {
private final String topicName;
private final int packetId;
private final MqttProperties properties;
public MqttPublishVariableHeader(String topicName, int packetId) {
this(topicName, packetId, MqttProperties.NO_PROPERTIES);
}
public MqttPublishVariableHeader(String topicName, int packetId, MqttProperties properties) {
this.topicName = topicName;
this.packetId = packetId;
this.properties = MqttProperties.withEmptyDefaults(properties);
}
public String topicName() {
@ -47,6 +53,10 @@ public final class MqttPublishVariableHeader {
return packetId;
}
public MqttProperties properties() {
return properties;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))

View File

@ -0,0 +1,54 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable Header for AUTH and DISCONNECT messages represented by {@link MqttMessage}
*/
public final class MqttReasonCodeAndPropertiesVariableHeader {
private final byte reasonCode;
private final MqttProperties properties;
public static final byte REASON_CODE_OK = 0;
public MqttReasonCodeAndPropertiesVariableHeader(byte reasonCode,
MqttProperties properties) {
this.reasonCode = reasonCode;
this.properties = MqttProperties.withEmptyDefaults(properties);
}
public byte reasonCode() {
return reasonCode;
}
public MqttProperties properties() {
return properties;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("reasonCode=").append(reasonCode)
.append(", properties=").append(properties)
.append(']')
.toString();
}
}

View File

@ -23,16 +23,27 @@ public final class MqttSubAckMessage extends MqttMessage {
public MqttSubAckMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttMessageIdAndPropertiesVariableHeader variableHeader,
MqttSubAckPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
public MqttSubAckMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubAckPayload payload) {
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() {
return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader();
}
@Override
public MqttSubAckPayload payload() {
return (MqttSubAckPayload) super.payload();

View File

@ -24,16 +24,27 @@ public final class MqttSubscribeMessage extends MqttMessage {
public MqttSubscribeMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttMessageIdAndPropertiesVariableHeader variableHeader,
MqttSubscribePayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
public MqttSubscribeMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubscribePayload payload) {
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() {
return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader();
}
@Override
public MqttSubscribePayload payload() {
return (MqttSubscribePayload) super.payload();

View File

@ -0,0 +1,124 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* Model the SubscriptionOption used in Subscribe MQTT v5 packet
*/
public final class MqttSubscriptionOption {
enum RetainedHandlingPolicy {
SEND_AT_SUBSCRIBE(0),
SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS(1),
DONT_SEND_AT_SUBSCRIBE(2);
private final int value;
RetainedHandlingPolicy(int value) {
this.value = value;
}
public int value() {
return value;
}
public static RetainedHandlingPolicy valueOf(int value) {
for (RetainedHandlingPolicy q: values()) {
if (q.value == value) {
return q;
}
}
throw new IllegalArgumentException("invalid RetainedHandlingPolicy: " + value);
}
}
private final MqttQoS qos;
private final boolean noLocal;
private final boolean retainAsPublished;
private final RetainedHandlingPolicy retainHandling;
public static MqttSubscriptionOption onlyFromQos(MqttQoS qos) {
return new MqttSubscriptionOption(qos, false, false, RetainedHandlingPolicy.SEND_AT_SUBSCRIBE);
}
public MqttSubscriptionOption(MqttQoS qos,
boolean noLocal,
boolean retainAsPublished,
RetainedHandlingPolicy retainHandling) {
this.qos = qos;
this.noLocal = noLocal;
this.retainAsPublished = retainAsPublished;
this.retainHandling = retainHandling;
}
public MqttQoS qos() {
return qos;
}
public boolean isNoLocal() {
return noLocal;
}
public boolean isRetainAsPublished() {
return retainAsPublished;
}
public RetainedHandlingPolicy retainHandling() {
return retainHandling;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttSubscriptionOption that = (MqttSubscriptionOption) o;
if (noLocal != that.noLocal) {
return false;
}
if (retainAsPublished != that.retainAsPublished) {
return false;
}
if (qos != that.qos) {
return false;
}
return retainHandling == that.retainHandling;
}
@Override
public int hashCode() {
int result = qos.hashCode();
result = 31 * result + (noLocal ? 1 : 0);
result = 31 * result + (retainAsPublished ? 1 : 0);
result = 31 * result + retainHandling.hashCode();
return result;
}
@Override
public String toString() {
return "SubscriptionOption[" +
"qos=" + qos +
", noLocal=" + noLocal +
", retainAsPublished=" + retainAsPublished +
", retainHandling=" + retainHandling +
']';
}
}

View File

@ -25,11 +25,16 @@ import io.netty.util.internal.StringUtil;
public final class MqttTopicSubscription {
private final String topicFilter;
private final MqttQoS qualityOfService;
private final MqttSubscriptionOption option;
public MqttTopicSubscription(String topicFilter, MqttQoS qualityOfService) {
this.topicFilter = topicFilter;
this.qualityOfService = qualityOfService;
this.option = MqttSubscriptionOption.onlyFromQos(qualityOfService);
}
public MqttTopicSubscription(String topicFilter, MqttSubscriptionOption option) {
this.topicFilter = topicFilter;
this.option = option;
}
public String topicName() {
@ -37,7 +42,11 @@ public final class MqttTopicSubscription {
}
public MqttQoS qualityOfService() {
return qualityOfService;
return option.qos();
}
public MqttSubscriptionOption option() {
return option;
}
@Override
@ -45,7 +54,7 @@ public final class MqttTopicSubscription {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("topicFilter=").append(topicFilter)
.append(", qualityOfService=").append(qualityOfService)
.append(", option=").append(this.option)
.append(']')
.toString();
}

View File

@ -21,12 +21,41 @@ package io.netty.handler.codec.mqtt;
*/
public final class MqttUnsubAckMessage extends MqttMessage {
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader, null);
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdAndPropertiesVariableHeader variableHeader,
MqttUnsubAckPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttUnsubAckPayload payload) {
this(mqttFixedHeader, fallbackVariableHeader(variableHeader), payload);
}
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader) {
this(mqttFixedHeader, variableHeader, null);
}
private static MqttMessageIdAndPropertiesVariableHeader fallbackVariableHeader(
MqttMessageIdVariableHeader variableHeader) {
if (variableHeader instanceof MqttMessageIdAndPropertiesVariableHeader) {
return (MqttMessageIdAndPropertiesVariableHeader) variableHeader;
}
return new MqttMessageIdAndPropertiesVariableHeader(variableHeader.messageId(), MqttProperties.NO_PROPERTIES);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() {
return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader();
}
@Override
public MqttUnsubAckPayload payload() {
return (MqttUnsubAckPayload) super.payload();
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Payload for MQTT unsuback message as in V5.
*/
public final class MqttUnsubAckPayload {
private final List<Short> unsubscribeReasonCodes;
public MqttUnsubAckPayload(short... unsubscribeReasonCodes) {
ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes");
List<Short> list = new ArrayList<Short>(unsubscribeReasonCodes.length);
for (Short v: unsubscribeReasonCodes) {
list.add(v);
}
this.unsubscribeReasonCodes = Collections.unmodifiableList(list);
}
public MqttUnsubAckPayload(Iterable<Short> unsubscribeReasonCodes) {
ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes");
List<Short> list = new ArrayList<Short>();
for (Short v: unsubscribeReasonCodes) {
ObjectUtil.checkNotNull(v, "unsubscribeReasonCode");
list.add(v);
}
this.unsubscribeReasonCodes = Collections.unmodifiableList(list);
}
public List<Short> unsubscribeReasonCodes() {
return unsubscribeReasonCodes;
}
@Override
public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this))
.append('[')
.append("unsubscribeReasonCodes=").append(unsubscribeReasonCodes)
.append(']')
.toString();
}
}

View File

@ -24,16 +24,27 @@ public final class MqttUnsubscribeMessage extends MqttMessage {
public MqttUnsubscribeMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttMessageIdAndPropertiesVariableHeader variableHeader,
MqttUnsubscribePayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
public MqttUnsubscribeMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttUnsubscribePayload payload) {
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() {
return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader();
}
@Override
public MqttUnsubscribePayload payload() {
return (MqttUnsubscribePayload) super.payload();

View File

@ -25,7 +25,8 @@ import io.netty.util.CharsetUtil;
*/
public enum MqttVersion {
MQTT_3_1("MQIsdp", (byte) 3),
MQTT_3_1_1("MQTT", (byte) 4);
MQTT_3_1_1("MQTT", (byte) 4),
MQTT_5("MQTT", (byte) 5);
private final String name;
private final byte level;
@ -49,8 +50,8 @@ public enum MqttVersion {
public static MqttVersion fromProtocolNameAndLevel(String protocolName, byte protocolLevel) {
for (MqttVersion mv : values()) {
if (mv.name.equals(protocolName)) {
if (mv.level == protocolLevel) {
if (mv.level == protocolLevel) {
if (mv.name.equals(protocolName)) {
return mv;
} else {
throw new MqttUnacceptableProtocolVersionException(protocolName + " and " +

View File

@ -18,10 +18,13 @@ package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Attribute;
import io.netty.util.CharsetUtil;
import org.junit.Before;
import org.junit.Test;
@ -33,6 +36,9 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.*;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@ -51,12 +57,17 @@ public class MqttCodecTest {
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
@Mock
private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
@Mock
private final Channel channel = mock(Channel.class);
@Mock
private final Attribute<MqttVersion> versionAttrMock = mock(Attribute.class);
private final MqttDecoder mqttDecoder = new MqttDecoder();
/**
@ -68,12 +79,14 @@ public class MqttCodecTest {
public void setup() {
MockitoAnnotations.initMocks(this);
when(ctx.channel()).thenReturn(channel);
when(ctx.alloc()).thenReturn(ALLOCATOR);
when(channel.attr(MqttCodecUtil.MQTT_VERSION_KEY)).thenReturn(versionAttrMock);
}
@Test
public void testConnectMessageForMqtt31() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttConnectMessage> captor = ArgumentCaptor.forClass(MqttConnectMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -89,7 +102,7 @@ public class MqttCodecTest {
@Test
public void testConnectMessageForMqtt311() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttConnectMessage> captor = ArgumentCaptor.forClass(MqttConnectMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -105,7 +118,7 @@ public class MqttCodecTest {
@Test
public void testConnectMessageWithNonZeroReservedFlagForMqtt311() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
// Set the reserved flag in the CONNECT Packet to 1
byteBuf.setByte(9, byteBuf.getByte(9) | 0x1);
@ -126,19 +139,24 @@ public class MqttCodecTest {
@Test
public void testConnectMessageNoPassword() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1, null, PASSWORD);
final MqttConnectMessage message = createConnectMessage(
MqttVersion.MQTT_3_1_1,
null,
PASSWORD,
MqttProperties.NO_PROPERTIES,
MqttProperties.NO_PROPERTIES);
try {
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
} catch (Exception cause) {
assertTrue(cause instanceof DecoderException);
assertTrue(cause instanceof EncoderException);
}
}
@Test
public void testConnAckMessage() throws Exception {
final MqttConnAckMessage message = createConnAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttConnAckMessage> captor = ArgumentCaptor.forClass(MqttConnAckMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -152,7 +170,7 @@ public class MqttCodecTest {
@Test
public void testPublishMessage() throws Exception {
final MqttPublishMessage message = createPublishMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttPublishMessage> captor = ArgumentCaptor.forClass(MqttPublishMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -187,7 +205,7 @@ public class MqttCodecTest {
@Test
public void testSubscribeMessage() throws Exception {
final MqttSubscribeMessage message = createSubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttSubscribeMessage> captor = ArgumentCaptor.forClass(MqttSubscribeMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -202,7 +220,7 @@ public class MqttCodecTest {
@Test
public void testSubAckMessage() throws Exception {
final MqttSubAckMessage message = createSubAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttSubAckMessage> captor = ArgumentCaptor.forClass(MqttSubAckMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -223,7 +241,7 @@ public class MqttCodecTest {
MqttSubAckMessage message =
new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttSubAckMessage> captor = ArgumentCaptor.forClass(MqttSubAckMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -240,7 +258,7 @@ public class MqttCodecTest {
@Test
public void testUnSubscribeMessage() throws Exception {
final MqttUnsubscribeMessage message = createUnsubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttUnsubscribeMessage> captor = ArgumentCaptor.forClass(MqttUnsubscribeMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -272,11 +290,12 @@ public class MqttCodecTest {
testMessageWithOnlyFixedHeader(MqttMessage.DISCONNECT);
}
//All 0..F message type codes are valid in MQTT 5
@Test
public void testUnknownMessageType() throws Exception {
final MqttMessage message = createMessageWithFixedHeader(MqttMessageType.PINGREQ);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
// setting an invalid message type (15, reserved and forbidden by MQTT 3.1.1 spec)
byteBuf.setByte(0, 0xF0);
@ -288,8 +307,8 @@ public class MqttCodecTest {
final MqttMessage decodedMessage = captor.getValue();
assertTrue(decodedMessage.decoderResult().isFailure());
Throwable cause = decodedMessage.decoderResult().cause();
assertTrue(cause instanceof IllegalArgumentException);
assertEquals("unknown message type: 15", cause.getMessage());
assertTrue(cause instanceof DecoderException);
assertEquals("AUTH message requires at least MQTT 5", cause.getMessage());
} finally {
byteBuf.release();
}
@ -298,7 +317,7 @@ public class MqttCodecTest {
@Test
public void testConnectMessageForMqtt31TooLarge() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -319,7 +338,7 @@ public class MqttCodecTest {
@Test
public void testConnectMessageForMqtt311TooLarge() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -340,7 +359,7 @@ public class MqttCodecTest {
@Test
public void testConnAckMessageTooLarge() throws Exception {
final MqttConnAckMessage message = createConnAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -358,7 +377,7 @@ public class MqttCodecTest {
@Test
public void testPublishMessageTooLarge() throws Exception {
final MqttPublishMessage message = createPublishMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -379,7 +398,7 @@ public class MqttCodecTest {
@Test
public void testSubscribeMessageTooLarge() throws Exception {
final MqttSubscribeMessage message = createSubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -399,7 +418,7 @@ public class MqttCodecTest {
@Test
public void testSubAckMessageTooLarge() throws Exception {
final MqttSubAckMessage message = createSubAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -419,7 +438,7 @@ public class MqttCodecTest {
@Test
public void testUnSubscribeMessageTooLarge() throws Exception {
final MqttUnsubscribeMessage message = createUnsubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
@ -436,8 +455,279 @@ public class MqttCodecTest {
}
}
@Test
public void testConnectMessageForMqtt5() throws Exception {
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(SESSION_EXPIRY_INTERVAL.value(), 10));
props.add(new MqttProperties.StringProperty(AUTHENTICATION_METHOD.value(), "Plain"));
MqttProperties willProps = new MqttProperties();
willProps.add(new MqttProperties.IntegerProperty(WILL_DELAY_INTERVAL.value(), 100));
final MqttConnectMessage message =
createConnectMessage(MqttVersion.MQTT_5, USER_NAME, PASSWORD, props, willProps);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttConnectMessage> captor = ArgumentCaptor.forClass(MqttConnectMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttConnectMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validateConnectPayload(message.payload(), decodedMessage.payload());
}
@Test
public void testConnAckMessageForMqtt5() throws Exception {
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(SESSION_EXPIRY_INTERVAL.value(), 10));
props.add(new MqttProperties.IntegerProperty(MAXIMUM_QOS.value(), 1));
props.add(new MqttProperties.IntegerProperty(MAXIMUM_PACKET_SIZE.value(), 1000));
final MqttConnAckMessage message = createConnAckMessage(props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttConnAckMessage> captor = ArgumentCaptor.forClass(MqttConnAckMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttConnAckMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnAckVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
}
@Test
public void testPublishMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
props.add(new MqttProperties.UserProperty("isSecret", "true"));
props.add(new MqttProperties.UserProperty("isUrgent", "false"));
assertEquals("User properties count mismatch",
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value.size(), 2);
final MqttPublishMessage message = createPublishMessage(props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttPublishMessage> captor = ArgumentCaptor.forClass(MqttPublishMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttPublishMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePublishVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validatePublishPayload(message.payload(), decodedMessage.payload());
}
@Test
public void testPubAckMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
//0x87 - Not Authorized
final MqttMessage message = createPubAckMessage((byte) 0x87, props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePubReplyVariableHeader((MqttPubReplyMessageVariableHeader) message.variableHeader(),
(MqttPubReplyMessageVariableHeader) decodedMessage.variableHeader());
}
@Test
public void testPubAckMessageSkipCodeForMqtt5() throws Exception {
//Code 0 (Success) and no properties - skip encoding code and properties
final MqttMessage message = createPubAckMessage((byte) 0, MqttProperties.NO_PROPERTIES);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePubReplyVariableHeader((MqttPubReplyMessageVariableHeader) message.variableHeader(),
(MqttPubReplyMessageVariableHeader) decodedMessage.variableHeader());
}
@Test
public void testSubAckMessageForMqtt5() throws Exception {
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
final MqttSubAckMessage message = createSubAckMessage(props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttSubAckMessage> captor = ArgumentCaptor.forClass(MqttSubAckMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttSubAckMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePacketIdAndPropertiesVariableHeader(
(MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(),
(MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader());
}
@Test
public void testSubscribeMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
final MqttSubscribeMessage message = MqttMessageBuilders.subscribe()
.messageId((short) 1)
.properties(props)
.addSubscription("/topic", new MqttSubscriptionOption(AT_LEAST_ONCE,
true,
true,
SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS))
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttSubscribeMessage> captor = ArgumentCaptor.forClass(MqttSubscribeMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttSubscribeMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
final MqttMessageIdAndPropertiesVariableHeader expectedHeader =
(MqttMessageIdAndPropertiesVariableHeader) message.variableHeader();
final MqttMessageIdAndPropertiesVariableHeader actualHeader =
(MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader();
validatePacketIdAndPropertiesVariableHeader(expectedHeader, actualHeader);
validateSubscribePayload(message.payload(), decodedMessage.payload());
}
@Test
public void testUnsubAckMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
final MqttUnsubAckMessage message = MqttMessageBuilders.unsubAck()
.packetId((short) 1)
.properties(props)
.addReasonCode((short) 0x83)
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttUnsubAckMessage> captor = ArgumentCaptor.forClass(MqttUnsubAckMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttUnsubAckMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePacketIdAndPropertiesVariableHeader(
(MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(),
(MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader());
assertEquals("Reason code list doesn't match", message.payload().unsubscribeReasonCodes(),
decodedMessage.payload().unsubscribeReasonCodes());
}
@Test
public void testDisconnectMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(SESSION_EXPIRY_INTERVAL.value(), 6));
final MqttMessage message = MqttMessageBuilders.disconnect()
.reasonCode((byte) 0x96) // Message rate too high
.properties(props)
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateReasonCodeAndPropertiesVariableHeader(
(MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(),
(MqttReasonCodeAndPropertiesVariableHeader) decodedMessage.variableHeader());
}
@Test
public void testDisconnectMessageSkipCodeForMqtt5() throws Exception {
//code 0 and no properties - skip encoding code and properties
final MqttMessage message = MqttMessageBuilders.disconnect()
.reasonCode((byte) 0) // ok
.properties(MqttProperties.NO_PROPERTIES)
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateReasonCodeAndPropertiesVariableHeader(
(MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(),
(MqttReasonCodeAndPropertiesVariableHeader) decodedMessage.variableHeader());
}
@Test
public void testAuthMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.BinaryProperty(AUTHENTICATION_DATA.value(), "secret".getBytes(CharsetUtil.UTF_8)));
final MqttMessage message = MqttMessageBuilders.auth()
.reasonCode((byte) 0x18) // Continue authentication
.properties(props)
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue();
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateReasonCodeAndPropertiesVariableHeader(
(MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(),
(MqttReasonCodeAndPropertiesVariableHeader) decodedMessage.variableHeader());
}
@Test
public void testMqttVersionDetection() throws Exception {
clearInvocations(versionAttrMock);
//Encode CONNECT message so that encoder would initialize its version
final MqttConnectMessage connectMessage = createConnectMessage(MqttVersion.MQTT_5);
ByteBuf connectByteBuf = MqttEncoder.doEncode(ctx, connectMessage);
verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5);
clearInvocations(versionAttrMock);
ArgumentCaptor<MqttConnectMessage> captor = ArgumentCaptor.forClass(MqttConnectMessage.class);
mqttDecoder.decode(ctx, connectByteBuf);
verify(ctx).fireChannelRead(captor.capture());
verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5);
final MqttConnectMessage decodedConnectMessage = captor.getValue();
validateFixedHeaders(connectMessage.fixedHeader(), decodedConnectMessage.fixedHeader());
validateConnectVariableHeader(connectMessage.variableHeader(), decodedConnectMessage.variableHeader());
validateConnectPayload(connectMessage.payload(), decodedConnectMessage.payload());
verifyNoMoreInteractions(versionAttrMock);
}
private void testMessageWithOnlyFixedHeader(MqttMessage message) throws Exception {
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -451,7 +741,7 @@ public class MqttCodecTest {
throws Exception {
MqttMessage message = createMessageWithFixedHeaderAndMessageIdVariableHeader(messageType);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
ArgumentCaptor<MqttMessage> captor = ArgumentCaptor.forClass(MqttMessage.class);
mqttDecoder.decode(ctx, byteBuf);
@ -484,36 +774,57 @@ public class MqttCodecTest {
}
private static MqttConnectMessage createConnectMessage(MqttVersion mqttVersion) {
return createConnectMessage(mqttVersion, USER_NAME, PASSWORD);
return createConnectMessage(mqttVersion,
USER_NAME,
PASSWORD,
MqttProperties.NO_PROPERTIES,
MqttProperties.NO_PROPERTIES);
}
private static MqttConnectMessage createConnectMessage(MqttVersion mqttVersion, String username, String password) {
private static MqttConnectMessage createConnectMessage(MqttVersion mqttVersion,
String username,
String password,
MqttProperties properties,
MqttProperties willProperties) {
return MqttMessageBuilders.connect()
.clientId(CLIENT_ID)
.protocolVersion(mqttVersion)
.username(username)
.password(password)
.password(password.getBytes(CharsetUtil.UTF_8))
.properties(properties)
.willRetain(true)
.willQoS(MqttQoS.AT_LEAST_ONCE)
.willFlag(true)
.willTopic(WILL_TOPIC)
.willMessage(WILL_MESSAGE)
.willMessage(WILL_MESSAGE.getBytes(CharsetUtil.UTF_8))
.willProperties(willProperties)
.cleanSession(true)
.keepAlive(KEEP_ALIVE_SECONDS)
.build();
}
private static MqttConnAckMessage createConnAckMessage() {
return createConnAckMessage(MqttProperties.NO_PROPERTIES);
}
private static MqttConnAckMessage createConnAckMessage(MqttProperties properties) {
return MqttMessageBuilders.connAck()
.returnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED)
.properties(properties)
.sessionPresent(true)
.build();
}
private static MqttPublishMessage createPublishMessage() {
return createPublishMessage(MqttProperties.NO_PROPERTIES);
}
private static MqttPublishMessage createPublishMessage(MqttProperties properties) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", 1234);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc",
1234,
properties);
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes("whatever".getBytes(CharsetUtil.UTF_8));
return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
@ -534,6 +845,10 @@ public class MqttCodecTest {
}
private static MqttSubAckMessage createSubAckMessage() {
return createSubAckMessage(MqttProperties.NO_PROPERTIES);
}
private static MqttSubAckMessage createSubAckMessage(MqttProperties properties) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
@ -555,6 +870,14 @@ public class MqttCodecTest {
return new MqttUnsubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttUnsubscribePayload);
}
private MqttMessage createPubAckMessage(byte reasonCode, MqttProperties properties) {
return MqttMessageBuilders.pubAck()
.packetId((short) 1)
.reasonCode(reasonCode)
.properties(properties)
.build();
}
// Helper methods to compare expected and actual
// MQTT messages
@ -572,6 +895,8 @@ public class MqttCodecTest {
expected.keepAliveTimeSeconds(),
actual.keepAliveTimeSeconds());
assertEquals("MqttConnectVariableHeader Version mismatch ", expected.version(), actual.version());
assertEquals("MqttConnectVariableHeader Version mismatch ", expected.version(), actual.version());
validateProperties(expected.properties(), actual.properties());
assertEquals("MqttConnectVariableHeader WillQos mismatch ", expected.willQos(), actual.willQos());
assertEquals("MqttConnectVariableHeader HasUserName mismatch ", expected.hasUserName(), actual.hasUserName());
@ -602,6 +927,7 @@ public class MqttCodecTest {
"MqttConnectPayload WillMessage bytes mismatch ",
Arrays.equals(expected.willMessageInBytes(), actual.willMessageInBytes()));
assertEquals("MqttConnectPayload WillTopic mismatch ", expected.willTopic(), actual.willTopic());
validateProperties(expected.willProperties(), actual.willProperties());
}
private static void validateConnAckVariableHeader(
@ -617,7 +943,8 @@ public class MqttCodecTest {
MqttPublishVariableHeader expected,
MqttPublishVariableHeader actual) {
assertEquals("MqttPublishVariableHeader TopicName mismatch ", expected.topicName(), actual.topicName());
assertEquals("MqttPublishVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId());
assertEquals("MqttPublishVariableHeader MessageId mismatch ", expected.packetId(), actual.packetId());
validateProperties(expected.properties(), actual.properties());
}
private static void validatePublishPayload(ByteBuf expected, ByteBuf actual) {
@ -651,6 +978,10 @@ public class MqttCodecTest {
"MqttTopicSubscription Qos mismatch ",
expected.qualityOfService(),
actual.qualityOfService());
assertEquals(
"MqttTopicSubscription options mismatch ",
expected.option(),
actual.option());
}
private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) {
@ -676,4 +1007,121 @@ public class MqttCodecTest {
assertTrue("MqttMessage DecoderResult cause reason expect to contain 'too large message' ",
cause.getMessage().contains("too large message:"));
}
private static void validatePubReplyVariableHeader(
MqttPubReplyMessageVariableHeader expected,
MqttPubReplyMessageVariableHeader actual) {
assertEquals("MqttPubReplyMessageVariableHeader MessageId mismatch ",
expected.messageId(), actual.messageId());
assertEquals("MqttPubReplyMessageVariableHeader reasonCode mismatch ",
expected.reasonCode(), actual.reasonCode());
final MqttProperties expectedProps = expected.properties();
final MqttProperties actualProps = actual.properties();
validateProperties(expectedProps, actualProps);
}
private void validatePacketIdAndPropertiesVariableHeader(MqttMessageIdAndPropertiesVariableHeader expected,
MqttMessageIdAndPropertiesVariableHeader actual) {
assertEquals("MqttMessageIdAndPropertiesVariableHeader MessageId mismatch ",
expected.messageId(), actual.messageId());
final MqttProperties expectedProps = expected.properties();
final MqttProperties actualProps = actual.properties();
validateProperties(expectedProps, actualProps);
}
private void validateReasonCodeAndPropertiesVariableHeader(MqttReasonCodeAndPropertiesVariableHeader expected,
MqttReasonCodeAndPropertiesVariableHeader actual) {
assertEquals("MqttReasonCodeAndPropertiesVariableHeader reason mismatch ",
expected.reasonCode(), actual.reasonCode());
final MqttProperties expectedProps = expected.properties();
final MqttProperties actualProps = actual.properties();
validateProperties(expectedProps, actualProps);
}
private static void validateProperties(MqttProperties expected, MqttProperties actual) {
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
// one byte value integer property
case PAYLOAD_FORMAT_INDICATOR:
case REQUEST_PROBLEM_INFORMATION:
case REQUEST_RESPONSE_INFORMATION:
case MAXIMUM_QOS:
case RETAIN_AVAILABLE:
case WILDCARD_SUBSCRIPTION_AVAILABLE:
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
case SHARED_SUBSCRIPTION_AVAILABLE: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("one byte property doesn't match", expectedValue, actualValue);
break;
}
// two byte value integer property
case SERVER_KEEP_ALIVE:
case RECEIVE_MAXIMUM:
case TOPIC_ALIAS_MAXIMUM:
case TOPIC_ALIAS: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("two byte property doesn't match", expectedValue, actualValue);
break;
}
// four byte value integer property
case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL:
case WILL_DELAY_INTERVAL:
case MAXIMUM_PACKET_SIZE: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("four byte property doesn't match", expectedValue, actualValue);
break;
}
// four byte value integer property
case SUBSCRIPTION_IDENTIFIER: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
break;
}
// UTF-8 string value integer property
case CONTENT_TYPE:
case RESPONSE_TOPIC:
case ASSIGNED_CLIENT_IDENTIFIER:
case AUTHENTICATION_METHOD:
case RESPONSE_INFORMATION:
case SERVER_REFERENCE:
case REASON_STRING: {
final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value;
final String actualValue = ((MqttProperties.StringProperty) actualProperty).value;
assertEquals("String property doesn't match", expectedValue, actualValue);
break;
}
// User property
case USER_PROPERTY: {
final List<MqttProperties.StringPair> expectedPairs =
((MqttProperties.UserProperties) expectedProperty).value;
final List<MqttProperties.StringPair> actualPairs =
((MqttProperties.UserProperties) actualProperty).value;
assertEquals("User properties count doesn't match", expectedPairs, actualPairs);
for (int i = 0; i < expectedPairs.size(); i++) {
assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i));
}
break;
}
// byte[] property
case CORRELATION_DATA:
case AUTHENTICATION_DATA: {
final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value;
final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value;
final String expectedHexDump = ByteBufUtil.hexDump(expectedValue);
final String actualHexDump = ByteBufUtil.hexDump(actualValue);
assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump);
break;
}
default:
fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId));
}
}
}
}

View File

@ -32,8 +32,12 @@ public class MqttConnectPayloadTest {
byte[] willMessage = null;
String userName = "userName";
byte[] password = "password".getBytes(CharsetUtil.UTF_8);
MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(clientIdentifier, willTopic, willMessage, userName, password);
MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(clientIdentifier,
MqttProperties.NO_PROPERTIES,
willTopic,
willMessage,
userName,
password);
assertNull(mqttConnectPayload.willMessageInBytes());
assertNull(mqttConnectPayload.willMessage());
@ -46,8 +50,12 @@ public class MqttConnectPayloadTest {
byte[] willMessage = "willMessage".getBytes(CharsetUtil.UTF_8);
String userName = "userName";
byte[] password = null;
MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(clientIdentifier, willTopic, willMessage, userName, password);
MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(clientIdentifier,
MqttProperties.NO_PROPERTIES,
willTopic,
willMessage,
userName,
password);
assertNull(mqttConnectPayload.passwordInBytes());
assertNull(mqttConnectPayload.password());

View File

@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
@ -54,8 +55,13 @@ public class MqttHeartBeatClientHandler implements ChannelHandler {
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnectVariableHeader connectVariableHeader =
new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false,
0, false, false, 20);
MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, null, null, userName, password);
0, false, false, 20, MqttProperties.NO_PROPERTIES);
MqttConnectPayload connectPayload = new MqttConnectPayload(clientId,
MqttProperties.NO_PROPERTIES,
null,
null,
userName,
password);
MqttConnectMessage connectMessage =
new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload);
ctx.writeAndFlush(connectMessage);