Add MQTT protocol codec

MQTT is a open source protocol on top of TCP which is widely used in
mobile communication and also for IoT (Internet of Things) today. This
will add an open source implementation of MQTT so that it becomes easier
for Netty users to implement an MQTT application.

For more information about the MQTT protocol, read this:

http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
This commit is contained in:
Mousom Dhar Gupta 2014-04-16 14:59:09 -07:00 committed by Trustin Lee
parent e274a6549c
commit 1ba5fa4b4b
33 changed files with 2826 additions and 1 deletions

View File

@ -147,6 +147,13 @@
<scope>compile</scope> <scope>compile</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>netty-codec-socks</artifactId> <artifactId>netty-codec-socks</artifactId>

48
codec-mqtt/pom.xml Normal file
View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>5.0.0.Alpha2-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-mqtt</artifactId>
<packaging>jar</packaging>
<name>Netty/Codec/Mqtt</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
public final class MqttCommonUtil {
public static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case CONNECT:
case CONNACK:
case PUBACK:
case PUBREC:
case PUBCOMP:
case SUBACK:
case UNSUBACK:
case PINGREQ:
case PINGRESP:
case DISCONNECT:
if (mqttFixedHeader.isDup() ||
mqttFixedHeader.qosLevel() != QoS.AT_MOST_ONCE ||
mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
mqttFixedHeader.messageType(),
false,
QoS.AT_MOST_ONCE,
false,
mqttFixedHeader.remainingLength());
}
return mqttFixedHeader;
case PUBREL:
case SUBSCRIBE:
case UNSUBSCRIBE:
if (mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
mqttFixedHeader.messageType(),
mqttFixedHeader.isDup(),
mqttFixedHeader.qosLevel(),
false,
mqttFixedHeader.remainingLength());
}
return mqttFixedHeader;
default:
return mqttFixedHeader;
}
}
private MqttCommonUtil() { }
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connack">MQTTV3.1/connack</a>
*/
public final class MqttConnAckMessage extends MqttMessage {
public MqttConnAckMessage(MqttFixedHeader mqttFixedHeader, MqttConnAckVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader);
}
@Override
public MqttConnAckVariableHeader variableHeader() {
return (MqttConnAckVariableHeader) super.variableHeader();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable header of {@link MqttConnectMessage }
*/
public class MqttConnAckVariableHeader {
private final MqttConnectReturnCode connectReturnCode;
public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode) {
this.connectReturnCode = connectReturnCode;
}
public MqttConnectReturnCode connectReturnCode() {
return connectReturnCode;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("connectReturnCode=").append(connectReturnCode);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect">MQTTV3.1/connect</a>
*/
public final class MqttConnectMessage extends MqttMessage {
public MqttConnectMessage(
MqttFixedHeader mqttFixedHeader,
MqttConnectVariableHeader variableHeader,
MqttConnectPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
@Override
public MqttConnectVariableHeader variableHeader() {
return (MqttConnectVariableHeader) super.variableHeader();
}
@Override
public MqttConnectPayload payload() {
return (MqttConnectPayload) super.payload();
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Payload of {@link MqttConnectMessage}
*/
public class MqttConnectPayload {
private final String clientIdentifier;
private final String willTopic;
private final String willMessage;
private final String userName;
private final String password;
public MqttConnectPayload(
String clientIdentifier,
String willTopic,
String willMessage,
String userName,
String password) {
this.clientIdentifier = clientIdentifier;
this.willTopic = willTopic;
this.willMessage = willMessage;
this.userName = userName;
this.password = password;
}
public String clientIdentifier() {
return clientIdentifier;
}
public String willTopic() {
return willTopic;
}
public String willMessage() {
return willMessage;
}
public String userName() {
return userName;
}
public String password() {
return password;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("clientIdentifier=").append(clientIdentifier);
builder.append(", willTopic=").append(willTopic);
builder.append(", willMessage=").append(willMessage);
builder.append(", userName=").append(userName);
builder.append(", password=").append(password);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Return Code of {@link io.netty.handler.codec.mqtt.MqttConnAckMessage}
*/
public enum MqttConnectReturnCode {
CONNECTION_ACCEPTED((byte) 0x00),
CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01),
CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02),
CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03),
CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04),
CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05);
private static final Map<Byte, MqttConnectReturnCode> valueToCodeMap;
static {
final Map<Byte, MqttConnectReturnCode> valueMap = new HashMap<Byte, MqttConnectReturnCode>();
for (MqttConnectReturnCode code : values()) {
valueMap.put(code.value, code);
}
valueToCodeMap = Collections.unmodifiableMap(valueMap);
}
private final byte value;
MqttConnectReturnCode(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static MqttConnectReturnCode valueOf(byte b) {
if (valueToCodeMap.containsKey(b)) {
return valueToCodeMap.get(b);
}
throw new IllegalArgumentException("connect retirn code " + b + " unsupported");
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable Header for the {@link MqttConnectMessage}
*/
public class MqttConnectVariableHeader {
private final String name;
private final int version;
private final boolean hasUserName;
private final boolean hasPassword;
private final boolean isWillRetain;
private final int willQos;
private final boolean isWillFlag;
private final boolean isCleanSession;
private final int keepAliveTimeSeconds;
public MqttConnectVariableHeader(
String name,
int version,
boolean hasUserName,
boolean hasPassword,
boolean isWillRetain,
int willQos,
boolean isWillFlag,
boolean isCleanSession,
int keepAliveTimeSeconds) {
this.name = name;
this.version = version;
this.hasUserName = hasUserName;
this.hasPassword = hasPassword;
this.isWillRetain = isWillRetain;
this.willQos = willQos;
this.isWillFlag = isWillFlag;
this.isCleanSession = isCleanSession;
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
}
public String name() {
return name;
}
public int version() {
return version;
}
public boolean hasUserName() {
return hasUserName;
}
public boolean hasPassword() {
return hasPassword;
}
public boolean isWillRetain() {
return isWillRetain;
}
public int willQos() {
return willQos;
}
public boolean isWillFlag() {
return isWillFlag;
}
public boolean isCleanSession() {
return isCleanSession;
}
public int keepAliveTimeSeconds() {
return keepAliveTimeSeconds;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("name=").append(name);
builder.append(", version=").append(version);
builder.append(", hasUserName=").append(hasUserName);
builder.append(", hasPassword=").append(hasPassword);
builder.append(", isWillRetain=").append(isWillRetain);
builder.append(", isWillFlag=").append(isWillFlag);
builder.append(", isCleanSession=").append(isCleanSession);
builder.append(", keepAliveTimeSeconds=").append(keepAliveTimeSeconds);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,475 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.List;
import static io.netty.handler.codec.mqtt.MqttValidationUtil.*;
import static io.netty.handler.codec.mqtt.MqttCommonUtil.*;
import static io.netty.handler.codec.mqtt.MqttVersion.*;
/**
* Decodes Mqtt messages from bytes, following
* <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">
* the MQTT protocl specification v3.1</a>
*/
public class MqttDecoder extends ReplayingDecoder<DecoderState> {
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
/**
* States of the decoder.
* We start at READ_FIXED_HEADER, followed by
* READ_VARIABLE_HEADER and finally READ_PAYLOAD.
*/
enum DecoderState {
READ_FIXED_HEADER,
READ_VARIABLE_HEADER,
READ_PAYLOAD,
BAD_MESSAGE,
}
private MqttFixedHeader mqttFixedHeader;
private Object variableHeader;
private Object payload;
private int bytesRemainingInVariablePart;
private final int maxBytesInMessage;
public MqttDecoder() {
this(DEFAULT_MAX_BYTES_IN_MESSAGE);
}
public MqttDecoder(int maxBytesInMessage) {
super(DecoderState.READ_FIXED_HEADER);
this.maxBytesInMessage = maxBytesInMessage;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
switch (state()) {
case READ_FIXED_HEADER:
mqttFixedHeader = decodeFixedHeader(buffer);
bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
checkpoint(DecoderState.READ_VARIABLE_HEADER);
// fall through
case READ_VARIABLE_HEADER: try {
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException(
String.format(
"Client tried to send very large message: %d bytes",
bytesRemainingInVariablePart));
}
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case READ_PAYLOAD: try {
final Result<?> decodedPayload =
decodePayload(
buffer,
mqttFixedHeader.messageType(),
bytesRemainingInVariablePart,
variableHeader);
payload = decodedPayload.value;
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
if (bytesRemainingInVariablePart != 0) {
throw new DecoderException(
String.format(
"Non-zero bytes remaining (%d) should never happen. "
+ "Message type: %s. Channel: %s",
bytesRemainingInVariablePart,
mqttFixedHeader.messageType(),
ctx.channel()));
}
checkpoint(DecoderState.READ_FIXED_HEADER);
MqttMessage message = MqttMessageFactory.create(mqttFixedHeader, variableHeader, payload);
mqttFixedHeader = null;
variableHeader = null;
payload = null;
out.add(message);
break;
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case BAD_MESSAGE:
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
break;
default:
throw new Error("Shouldn't reach here");
}
}
private MqttMessage invalidMessage(Throwable cause) {
checkpoint(DecoderState.BAD_MESSAGE);
return MqttMessageFactory.createInvalidMessage(cause);
}
/**
* Decodes the fixed header. It's one byte for the flags and then variable bytes for the remaining length.
*
* @param buffer the buffer to decode from
* @return the fixed header
*/
private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
short b1 = buffer.readUnsignedByte();
MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
boolean dupFlag = (b1 & 0x08) == 0x08;
int qosLevel = (b1 & 0x06) >> 1;
boolean retain = (b1 & 0x01) != 0;
int remainingLength = 0;
int multiplier = 1;
short digit;
int loops = 0;
do {
digit = buffer.readUnsignedByte();
remainingLength += (digit & 127) * multiplier;
multiplier *= 128;
loops++;
} while ((digit & 128) != 0 && loops < 4);
// MQTT protocol limits Remaining Length to 4 bytes
if (loops == 4 && (digit & 128) != 0) {
throw new DecoderException(
String.format(
"Failed to read fixed header of MQTT message. " +
"It has more than 4 digits for Remaining Length. " +
"MqttMessageType: %s.",
messageType));
}
MqttFixedHeader decodedFixedHeader =
new MqttFixedHeader(messageType, dupFlag, QoS.valueOf(qosLevel), retain, remainingLength);
return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
}
/**
* Decodes the variable header (if any)
* @param buffer the buffer to decode from
* @param mqttFixedHeader MqttFixedHeader of the same message
* @return the variable header
*/
private static Result<?> decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case CONNECT:
return decodeConnectionVariableHeader(buffer);
case CONNACK:
return decodeConnAckVariableHeader(buffer);
case SUBSCRIBE:
case UNSUBSCRIBE:
case SUBACK:
case UNSUBACK:
case PUBACK:
case PUBREC:
case PUBCOMP:
case PUBREL:
return decodeMessageIdVariableHeader(buffer);
case PUBLISH:
return decodePublishVariableHeader(buffer, mqttFixedHeader);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
// Empty variable header
return new Result<Object>(null, 0);
}
return new Result<Object>(null, 0); //should never reach here
}
private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
final Result<String> protoString = decodeString(buffer);
if (!PROTOCOL_NAME.equals(protoString.value)) {
throw new DecoderException(PROTOCOL_NAME + " signature is missing. Closing channel.");
}
int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
final byte version = buffer.readByte();
final int b1 = buffer.readUnsignedByte();
numberOfBytesConsumed += 2;
final Result<Integer> keepAlive = decodeMsbLsb(buffer);
numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
final boolean hasUserName = (b1 & 0x80) == 0x80;
final boolean hasPassword = (b1 & 0x40) == 0x40;
final boolean willRetain = (b1 & 0x20) == 0x20;
final int willQos = (b1 & 0x18) >> 3;
final boolean willFlag = (b1 & 0x04) == 0x04;
final boolean cleanSession = (b1 & 0x02) == 0x02;
final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
version,
hasUserName,
hasPassword,
willRetain,
willQos,
willFlag,
cleanSession,
keepAlive.value);
return new Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
}
private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(ByteBuf buffer) {
buffer.readUnsignedByte(); // reserved byte
byte returnCode = buffer.readByte();
final int numberOfBytesConsumed = 2;
final MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode));
return new Result<MqttConnAckVariableHeader>(mqttConnAckVariableHeader, numberOfBytesConsumed);
}
private static Result<MqttMessageIdVariableHeader> decodeMessageIdVariableHeader(ByteBuf buffer) {
final Result<Integer> messageId = decodeMessageId(buffer);
return new Result<MqttMessageIdVariableHeader>(
MqttMessageIdVariableHeader.from(messageId.value),
messageId.numberOfBytesConsumed);
}
private static Result<MqttPublishVariableHeader> decodePublishVariableHeader(
ByteBuf buffer,
MqttFixedHeader mqttFixedHeader) {
final Result<String> decodedTopic = decodeString(buffer);
if (!isValidPublishTopicName(decodedTopic.value)) {
throw new DecoderException(String.format("Publish topic name %s contains wildcards.", decodedTopic.value));
}
int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
int messageId = -1;
if (mqttFixedHeader.qosLevel().value() > 0) {
final Result<Integer> decodedMessageId = decodeMessageId(buffer);
messageId = decodedMessageId.value;
numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
}
final MqttPublishVariableHeader mqttPublishVariableHeader =
new MqttPublishVariableHeader(decodedTopic.value, messageId);
return new Result<MqttPublishVariableHeader>(mqttPublishVariableHeader, numberOfBytesConsumed);
}
private static Result<Integer> decodeMessageId(ByteBuf buffer) {
final Result<Integer> messageId = decodeMsbLsb(buffer);
if (!isValidMessageId(messageId.value)) {
throw new DecoderException(String.format("Invalid messageId %d", messageId.value));
}
return messageId;
}
/**
* Decodes the payload.
*
* @param buffer the buffer to decode from
* @param messageType type of the message being decoded
* @param bytesRemainingInVariablePart bytes remaining
* @param variableHeader variable header of the same message
* @return the payload
*/
private static Result<?> decodePayload(
ByteBuf buffer,
MqttMessageType messageType,
int bytesRemainingInVariablePart,
Object variableHeader) {
switch (messageType) {
case CONNECT:
return decodeConnectionPayload(buffer, (MqttConnectVariableHeader) variableHeader);
case SUBSCRIBE:
return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
case SUBACK:
return decodeSubackPayload(buffer, bytesRemainingInVariablePart);
case UNSUBSCRIBE:
return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
case PUBLISH:
return decodePublishPayload(buffer, bytesRemainingInVariablePart);
default:
// unknown payload , no byte consumed
return new Result<Object>(null, 0);
}
}
private static Result<MqttConnectPayload> decodeConnectionPayload(
ByteBuf buffer,
MqttConnectVariableHeader mqttConnectVariableHeader) {
final Result<String> decodedClientId = decodeString(buffer);
final String decodedClientIdValue = decodedClientId.value;
if (!isValidClientId(decodedClientIdValue)) {
throw new DecoderException(
String.format("Invalid clientIdentifier %s ",
decodedClientIdValue != null ? decodedClientIdValue : "null"));
}
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
Result<String> decodedWillTopic = null;
Result<String> decodedWillMessage = null;
if (mqttConnectVariableHeader.isWillFlag()) {
decodedWillTopic = decodeString(buffer, 0, 32767);
numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
decodedWillMessage = decodeAsciiString(buffer);
numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
}
Result<String> decodedUserName = null;
Result<String> decodedPassword = null;
if (mqttConnectVariableHeader.hasUserName()) {
decodedUserName = decodeString(buffer);
numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
}
if (mqttConnectVariableHeader.hasPassword()) {
decodedPassword = decodeString(buffer);
numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
}
final MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(
decodedClientId.value,
decodedWillTopic.value,
decodedWillMessage.value,
decodedUserName.value,
decodedPassword.value);
return new Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
}
private static Result<MqttSubscribePayload> decodeSubscribePayload(
ByteBuf buffer,
int bytesRemainingInVariablePart) {
final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
final Result<String> decodedTopicName = decodeString(buffer);
numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
int qos = buffer.readUnsignedByte() & 0x03;
numberOfBytesConsumed++;
subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, QoS.valueOf(qos)));
}
return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
}
private static Result<MqttSubAckPayload> decodeSubackPayload(
ByteBuf buffer,
int bytesRemainingInVariablePart) {
final List<Integer> grantedQos = new ArrayList<Integer>();
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
int qos = buffer.readUnsignedByte() & 0x03;
numberOfBytesConsumed++;
grantedQos.add(qos);
}
return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
}
private static Result<MqttUnsubscribePayload> decodeUnsubscribePayload(
ByteBuf buffer,
int bytesRemainingInVariablePart) {
final List<String> unsubscribeTopics = new ArrayList<String>();
int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
final Result<String> decodedTopicName = decodeString(buffer);
numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
unsubscribeTopics.add(decodedTopicName.value);
}
return new Result<MqttUnsubscribePayload>(
new MqttUnsubscribePayload(unsubscribeTopics),
numberOfBytesConsumed);
}
private static Result<ByteBuf> decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) {
ByteBuf b = buffer.readSlice(bytesRemainingInVariablePart).retain();
return new Result<ByteBuf>(b, bytesRemainingInVariablePart);
}
private static Result<String> decodeString(ByteBuf buffer) {
return decodeString(buffer, 0, Integer.MAX_VALUE);
}
private static Result<String> decodeAsciiString(ByteBuf buffer) {
Result<String> result = decodeString(buffer, 0, Integer.MAX_VALUE);
final String s = result.value;
for (int i = 0; i < s.length(); i++) {
if (s.charAt(i) > 127) {
return new Result<String>(null, result.numberOfBytesConsumed);
}
}
return new Result<String>(s, result.numberOfBytesConsumed);
}
private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
final Result<Integer> decodedSize = decodeMsbLsb(buffer);
int size = decodedSize.value;
int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed;
if (size < minBytes || size > maxBytes) {
buffer.skipBytes(size);
numberOfBytesConsumed += size;
return new Result<String>(null, numberOfBytesConsumed);
}
ByteBuf buf = buffer.readBytes(size);
numberOfBytesConsumed += size;
return new Result<String>(buf.toString(CharsetUtil.UTF_8), numberOfBytesConsumed);
}
private static Result<Integer> decodeMsbLsb(ByteBuf buffer) {
return decodeMsbLsb(buffer, 0, 65535);
}
private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) {
short msbSize = buffer.readUnsignedByte();
short lsbSize = buffer.readUnsignedByte();
final int numberOfBytesConsumed = 2;
int result = msbSize << 8 | lsbSize;
if (result < min || result > max) {
result = -1;
}
return new Result<Integer>(result, numberOfBytesConsumed);
}
private static final class Result<T> {
private final T value;
private final int numberOfBytesConsumed;
Result(T value, int numberOfBytesConsumed) {
this.value = value;
this.numberOfBytesConsumed = numberOfBytesConsumed;
}
}
}

View File

@ -0,0 +1,395 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* Encodes Mqtt messages into bytes following the protocl specification v3.1
* as described here <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html">MQTTV3.1</a>
*/
public class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
public static final MqttEncoder DEFAUL_ENCODER = new MqttEncoder();
private static final byte[] EMPTY = new byte[0];
private static final byte[] CONNECT_VARIABLE_HEADER_START = {0, 6, 'M', 'Q', 'I', 's', 'd', 'p'};
@Override
protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
out.add(doEncode(ctx.alloc(), msg));
}
/**
* This is the main encoding method.
* It's only visible for testing.
*
* @param byteBufAllocator Allocates ByteBuf
* @param message MQTT message to encode
* @return ByteBuf with encoded bytes
*/
static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {
switch (message.fixedHeader().messageType()) {
case CONNECT:
return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);
case CONNACK:
return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);
case PUBLISH:
return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);
case SUBSCRIBE:
return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);
case UNSUBSCRIBE:
return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);
case SUBACK:
return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);
case UNSUBACK:
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);
default:
throw new IllegalArgumentException(
"Unknown message type: " + message.fixedHeader().messageType().value());
}
}
private static ByteBuf encodeConnectMessage(
ByteBufAllocator byteBufAllocator,
MqttConnectMessage message) {
int variableHeaderBufferSize = 12;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttConnectVariableHeader variableHeader = message.variableHeader();
MqttConnectPayload payload = message.payload();
// Client id
String clientIdentifier = payload.clientIdentifier();
if (!isValidClientIdentifier(clientIdentifier)) {
throw new IllegalArgumentException(
String.format("Invalid clientIdentifier %s. Must be less than 23 chars long",
clientIdentifier != null ? clientIdentifier : "null"));
}
byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
payloadBufferSize += 2 + clientIdentifierBytes.length;
// Will topic and message
String willTopic = payload.willTopic();
byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EMPTY;
String willMessage = payload.willMessage();
byte[] willMessageBytes = willMessage != null ? encodeStringUtf8(willMessage) : EMPTY;
if (variableHeader.isWillFlag()) {
payloadBufferSize += 2 + willTopicBytes.length;
payloadBufferSize += 2 + willMessageBytes.length;
}
String userName = payload.userName();
byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EMPTY;
if (variableHeader.hasUserName()) {
payloadBufferSize += 2 + userNameBytes.length;
}
String password = payload.password();
byte[] passwordBytes = password != null ? encodeStringUtf8(password) : EMPTY;
if (variableHeader.hasPassword()) {
payloadBufferSize += 2 + passwordBytes.length;
}
// Fixed header
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.writeBytes(CONNECT_VARIABLE_HEADER_START);
buf.writeByte(variableHeader.version());
buf.writeByte(getConnVariableHeaderFlag(variableHeader));
buf.writeShort(variableHeader.keepAliveTimeSeconds());
// Payload
buf.writeShort(clientIdentifierBytes.length);
buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
if (variableHeader.isWillFlag()) {
buf.writeShort(willTopicBytes.length);
buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
buf.writeShort(willMessageBytes.length);
buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
}
if (variableHeader.hasUserName()) {
buf.writeShort(userNameBytes.length);
buf.writeBytes(userNameBytes, 0, userNameBytes.length);
}
if (variableHeader.hasPassword()) {
buf.writeShort(passwordBytes.length);
buf.writeBytes(passwordBytes, 0, passwordBytes.length);
}
return buf;
}
private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
int flagByte = 0;
if (variableHeader.hasUserName()) {
flagByte |= 0x80;
}
if (variableHeader.hasPassword()) {
flagByte |= 0x40;
}
if (variableHeader.isWillRetain()) {
flagByte |= 0x20;
}
flagByte |= (variableHeader.willQos() & 0x03) << 3;
if (variableHeader.isWillFlag()) {
flagByte |= 0x04;
}
if (variableHeader.isCleanSession()) {
flagByte |= 0x02;
}
return flagByte;
}
private static ByteBuf encodeConnAckMessage(
ByteBufAllocator byteBufAllocator,
MqttConnAckMessage message) {
ByteBuf buf = byteBufAllocator.buffer(4);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
buf.writeByte(2);
buf.writeByte(0);
buf.writeByte(message.variableHeader().connectReturnCode().value());
return buf;
}
private static ByteBuf encodeSubscribeMessage(
ByteBufAllocator byteBufAllocator,
MqttSubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttSubscribePayload payload = message.payload();
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
payloadBufferSize += 1;
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
// Payload
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
buf.writeByte(topic.qualityOfService().value());
}
return buf;
}
private static ByteBuf encodeUnsubscribeMessage(
ByteBufAllocator byteBufAllocator,
MqttUnsubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttUnsubscribePayload payload = message.payload();
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
// Payload
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
}
return buf;
}
private static ByteBuf encodeSubAckMessage(
ByteBufAllocator byteBufAllocator,
MqttSubAckMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = message.payload().grantedQoSLevels().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(message.variableHeader().messageId());
for (int qos : message.payload().grantedQoSLevels()) {
buf.writeByte(qos);
}
return buf;
}
private static ByteBuf encodePublishMessage(
ByteBufAllocator byteBufAllocator,
MqttPublishMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuf payload = message.payload().duplicate();
String topicName = variableHeader.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
int variableHeaderBufferSize = 2 + topicNameBytes.length +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
int payloadBufferSize = payload.readableBytes();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes);
if (mqttFixedHeader.qosLevel().value() > 0) {
buf.writeShort(variableHeader.messageId());
}
buf.writeBytes(payload);
return buf;
}
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
ByteBufAllocator byteBufAllocator,
MqttMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int msgId = variableHeader.messageId();
int variableHeaderBufferSize = 2; // variable part only has a message id
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
buf.writeShort(msgId);
return buf;
}
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
ByteBufAllocator byteBufAllocator,
MqttMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
ByteBuf buf = byteBufAllocator.buffer(2);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
buf.writeByte(0);
return buf;
}
private static int getFixedHeaderByte1(MqttFixedHeader header) {
int ret = 0;
ret |= header.messageType().value() << 4;
if (header.isDup()) {
ret |= 0x08;
}
ret |= header.qosLevel().value() << 1;
if (header.isRetain()) {
ret |= 0x01;
}
return ret;
}
private static void writeVariableLengthInt(ByteBuf buf, int num) {
do {
int digit = num % 128;
num /= 128;
if (num > 0) {
digit |= 0x80;
}
buf.writeByte(digit);
} while (num > 0);
}
private static int getVariableLengthInt(int num) {
int count = 0;
do {
num /= 128;
count++;
} while (num > 0);
return count;
}
private static byte[] encodeStringUtf8(String s) {
return s.getBytes(CharsetUtil.UTF_8);
}
private static boolean isValidClientIdentifier(String clientIdentifier) {
if (clientIdentifier == null) {
return false;
}
int length = clientIdentifier.length();
return length >= 1 && length <= 23;
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#fixed-header">
* MQTTV3.1/fixed-header</a>
*/
public class MqttFixedHeader {
private final MqttMessageType messageType;
private final boolean isDup;
private final QoS qosLevel;
private final boolean isRetain;
private final int remainingLength;
public MqttFixedHeader(
MqttMessageType messageType,
boolean isDup,
QoS qosLevel,
boolean isRetain,
int remainingLength) {
this.messageType = messageType;
this.isDup = isDup;
this.qosLevel = qosLevel;
this.isRetain = isRetain;
this.remainingLength = remainingLength;
}
public MqttMessageType messageType() {
return messageType;
}
public boolean isDup() {
return isDup;
}
public QoS qosLevel() {
return qosLevel;
}
public boolean isRetain() {
return isRetain;
}
public int remainingLength() {
return remainingLength;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("messageType=").append(messageType);
builder.append(", isDup=").append(isDup);
builder.append(", qosLevel=").append(qosLevel);
builder.append(", isRetain=").append(isRetain);
builder.append(", remainingLength=").append(remainingLength);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.handler.codec.DecoderResult;
import io.netty.util.internal.StringUtil;
/**
* Base class for all MQTT message types.
*/
public class MqttMessage {
private final MqttFixedHeader mqttFixedHeader;
private final Object variableHeader;
private final Object payload;
private final DecoderResult decoderResult;
public MqttMessage(MqttFixedHeader mqttFixedHeader) {
this(mqttFixedHeader, null, null);
}
public MqttMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader) {
this(mqttFixedHeader, variableHeader, null);
}
public MqttMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) {
this(mqttFixedHeader, variableHeader, payload, DecoderResult.SUCCESS);
}
public MqttMessage(
MqttFixedHeader mqttFixedHeader,
Object variableHeader,
Object payload,
DecoderResult decoderResult) {
this.mqttFixedHeader = mqttFixedHeader;
this.variableHeader = variableHeader;
this.payload = payload;
this.decoderResult = decoderResult;
}
public MqttFixedHeader fixedHeader() {
return mqttFixedHeader;
}
public Object variableHeader() {
return variableHeader;
}
public Object payload() {
return payload;
}
public DecoderResult decoderResult() {
return decoderResult;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("fixedHeader=").append(fixedHeader() != null ? fixedHeader().toString() : "");
builder.append(", variableHeader=").append(variableHeader() != null ? variableHeader.toString() : "");
builder.append(", payload=").append(payload() != null ? payload.toString() : "");
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.DecoderResult;
/**
* Utility class with factory methods to create different types of MQTT messages.
*/
public final class MqttMessageFactory {
public static MqttMessage create(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) {
switch (mqttFixedHeader.messageType()) {
case CONNECT :
return new MqttConnectMessage(
mqttFixedHeader,
(MqttConnectVariableHeader) variableHeader,
(MqttConnectPayload) payload);
case CONNACK:
return new MqttConnAckMessage(mqttFixedHeader, (MqttConnAckVariableHeader) variableHeader);
case SUBSCRIBE:
return new MqttSubscribeMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttSubscribePayload) payload);
case SUBACK:
return new MqttSubAckMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttSubAckPayload) payload);
case UNSUBACK:
return new MqttUnsubAckMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader);
case UNSUBSCRIBE:
return new MqttUnsubscribeMessage(
mqttFixedHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttUnsubscribePayload) payload);
case PUBLISH:
return new MqttPublishMessage(
mqttFixedHeader,
(MqttPublishVariableHeader) variableHeader,
(ByteBuf) payload);
case PUBACK:
return new MqttPubAckMessage(mqttFixedHeader, (MqttMessageIdVariableHeader) variableHeader);
case PUBREC:
case PUBREL:
case PUBCOMP:
return new MqttMessage(mqttFixedHeader, variableHeader);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
return new MqttMessage(mqttFixedHeader);
default:
throw new IllegalArgumentException("Unknown message type: " + mqttFixedHeader.messageType());
}
}
public static MqttMessage createInvalidMessage(Throwable cause) {
return new MqttMessage(null, null, null, DecoderResult.failure(cause));
}
private MqttMessageFactory() { }
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable Header containing only Message Id
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#msg-id">MQTTV3.1/msg-id</a>
*/
public final class MqttMessageIdVariableHeader {
private final int messageId;
public static MqttMessageIdVariableHeader from(int messageId) {
if (messageId < 1 || messageId > 0xffff) {
throw new IllegalArgumentException(
String.format("Message id must be in the range of 1 - 0xffff but %d given ",
messageId));
}
return new MqttMessageIdVariableHeader(messageId);
}
private MqttMessageIdVariableHeader(int messageId) {
this.messageId = messageId;
}
public int messageId() {
return messageId;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("messageId=").append(messageId);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* MQTT Message Types.
*/
public enum MqttMessageType {
CONNECT(1),
CONNACK(2),
PUBLISH(3),
PUBACK(4),
PUBREC(5),
PUBREL(6),
PUBCOMP(7),
SUBSCRIBE(8),
SUBACK(9),
UNSUBSCRIBE(10),
UNSUBACK(11),
PINGREQ(12),
PINGRESP(13),
DISCONNECT(14);
private final int value;
MqttMessageType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static MqttMessageType valueOf(int type) {
for (MqttMessageType t : values()) {
if (t.value == type) {
return t;
}
}
throw new IllegalArgumentException("message type " + type + " unsupported");
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#puback">MQTTV3.1/puback</a>
*/
public class MqttPubAckMessage extends MqttMessage {
public MqttPubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.IllegalReferenceCountException;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#publish">MQTTV3.1/publish</a>
*/
public class MqttPublishMessage extends MqttMessage implements ByteBufHolder {
public MqttPublishMessage(
MqttFixedHeader mqttFixedHeader,
MqttPublishVariableHeader variableHeader,
ByteBuf payload) {
super(mqttFixedHeader, variableHeader, payload);
}
@Override
public MqttPublishVariableHeader variableHeader() {
return (MqttPublishVariableHeader) super.variableHeader();
}
@Override
public ByteBuf payload() {
return content();
}
@Override
public ByteBuf content() {
final ByteBuf data = (ByteBuf) super.payload();
if (data.refCnt() <= 0) {
throw new IllegalReferenceCountException(data.refCnt());
}
return data;
}
@Override
public MqttPublishMessage copy() {
return new MqttPublishMessage(fixedHeader(), variableHeader(), content().copy());
}
@Override
public MqttPublishMessage duplicate() {
return new MqttPublishMessage(fixedHeader(), variableHeader(), content().duplicate());
}
@Override
public int refCnt() {
return content().refCnt();
}
@Override
public MqttPublishMessage retain() {
content().retain();
return this;
}
@Override
public MqttPublishMessage retain(int increment) {
content().retain(increment);
return this;
}
@Override
public MqttPublishMessage touch() {
content().touch();
return this;
}
@Override
public MqttPublishMessage touch(Object hint) {
content().touch(hint);
return this;
}
@Override
public boolean release() {
return content().release();
}
@Override
public boolean release(int decrement) {
return content().release(decrement);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable Header of the {@link MqttPublishMessage}
*/
public class MqttPublishVariableHeader {
private final String topicName;
private final int messageId;
public MqttPublishVariableHeader(String topicName, int messageId) {
this.topicName = topicName;
this.messageId = messageId;
}
public String topicName() {
return topicName;
}
public int messageId() {
return messageId;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("topicName=").append(topicName);
builder.append(", messageId=").append(messageId);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#suback">MQTTV3.1/suback</a>
*/
public class MqttSubAckMessage extends MqttMessage {
public MqttSubAckMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubAckPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
@Override
public MqttSubAckPayload payload() {
return (MqttSubAckPayload) super.payload();
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
import java.util.List;
/**
* Payload of the {@link MqttSubAckMessage}
*/
public class MqttSubAckPayload {
private final List<Integer> grantedQoSLevels;
public MqttSubAckPayload(List<Integer> grantedQoSLevels) {
this.grantedQoSLevels = grantedQoSLevels;
}
public List<Integer> grantedQoSLevels() {
return grantedQoSLevels;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("grantedQoSLevels=").append(grantedQoSLevels);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#subscribe">
* MQTTV3.1/subscribe</a>
*/
public class MqttSubscribeMessage extends MqttMessage {
public MqttSubscribeMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubscribePayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
@Override
public MqttSubscribePayload payload() {
return (MqttSubscribePayload) super.payload();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
import java.util.Collections;
import java.util.List;
/**
* Payload of the {@link MqttSubscribeMessage}
*/
public class MqttSubscribePayload {
private final List<MqttTopicSubscription> topicSubscriptions;
public MqttSubscribePayload(List<MqttTopicSubscription> topicSubscriptions) {
this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions);
}
public List<MqttTopicSubscription> topicSubscriptions() {
return topicSubscriptions;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topicSubscriptions.size() - 1; i++) {
builder.append(topicSubscriptions.get(i)).append(", ");
}
builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1));
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Contains a topic name and Qos Level.
* This is part of the {@link MqttSubscribePayload}
*/
public class MqttTopicSubscription {
private final String topicFilter;
private final QoS qualityOfService;
public MqttTopicSubscription(String topicFilter, QoS qualityOfService) {
this.topicFilter = topicFilter;
this.qualityOfService = qualityOfService;
}
public String topicName() {
return topicFilter;
}
public QoS qualityOfService() {
return qualityOfService;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
builder.append("topicFilter=").append(topicFilter);
builder.append(", qualityOfService=").append(qualityOfService);
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#unsuback">MQTTV3.1/unsuback</a>
*/
public class MqttUnsubAckMessage extends MqttMessage {
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) {
super(mqttFixedHeader, variableHeader, null);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* See <a href="http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#unsubscribe">
* MQTTV3.1/unsubscribe</a>
*/
public class MqttUnsubscribeMessage extends MqttMessage {
public MqttUnsubscribeMessage(
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttUnsubscribePayload payload) {
super(mqttFixedHeader, variableHeader, payload);
}
@Override
public MqttMessageIdVariableHeader variableHeader() {
return (MqttMessageIdVariableHeader) super.variableHeader();
}
@Override
public MqttUnsubscribePayload payload() {
return (MqttUnsubscribePayload) super.payload();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
import java.util.Collections;
import java.util.List;
/**
* Pyaload of the {@link MqttUnsubscribeMessage}
*/
public class MqttUnsubscribePayload {
private final List<String> topics;
public MqttUnsubscribePayload(List<String> topics) {
this.topics = Collections.unmodifiableList(topics);
}
public List<String> topics() {
return topics;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('[');
for (int i = 0; i < topics.size() - 1; i++) {
builder.append("topicName = " + topics.get(i)).append(", ");
}
builder.append("topicName = " + topics.get(topics.size() - 1));
builder.append(']');
return builder.toString();
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.handler.codec.DecoderException;
public final class MqttValidationUtil {
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
private static final int MAX_CLIENT_ID_LENGTH = 23;
public static boolean isValidPublishTopicName(String topicName) {
// publish topic name must not contain any wildcard
for (char c : TOPIC_WILDCARDS) {
if (topicName.indexOf(c) >= 0) {
return false;
}
}
return true;
}
public static boolean isValidMessageId(int messageId) {
return messageId != 0;
}
public static boolean isValidClientId(String clientId) {
return clientId != null && clientId.length() <= MAX_CLIENT_ID_LENGTH;
}
public static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBREL:
case SUBSCRIBE:
case UNSUBSCRIBE:
if (mqttFixedHeader.qosLevel() != QoS.AT_LEAST_ONCE) {
throw new DecoderException(String.format("%s message must have QoS 1",
mqttFixedHeader.messageType().name()));
}
default:
return mqttFixedHeader;
}
}
private MqttValidationUtil() { }
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
/**
* Holds Constant values used by multiple classes in mqtt-codec.
*/
final class MqttVersion {
static final String PROTOCOL_NAME = "MQIsdp";
private MqttVersion() { }
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.mqtt;
public enum QoS {
AT_MOST_ONCE(0),
AT_LEAST_ONCE(1),
EXACTLY_ONCE(2);
private final int value;
QoS(int value) {
this.value = value;
}
public int value() {
return value;
}
public static QoS valueOf(int value) {
for (QoS q : values()) {
if (q.value == value) {
return q;
}
}
throw new IllegalArgumentException(String.format("Invalid QoS: %d", value));
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Encoder, decoder and different Message Types for MQTT.
*/
package io.netty.handler.codec.mqtt;

View File

@ -0,0 +1,435 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import java.util.LinkedList;
import java.util.List;
import static io.netty.handler.codec.mqtt.MqttVersion.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Unit tests for MqttEncoder and MqttDecoder.
*/
public class MqttCodecTest {
private static final String CLIENT_ID = "RANDOM_TEST_CLIENT";
private static final String WILL_TOPIC = "/my_will";
private static final String WILL_MESSAGE = "gone";
private static final String USER_NAME = "happy_user";
private static final String PASSWORD = "123_or_no_pwd";
private static final int PROTOCOL_VERSION = 3;
private static final int KEEP_ALIVE_SECONDS = 600;
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
@Mock
private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
@Mock
private final Channel channel = mock(Channel.class);
private final MqttDecoder mqttDecoder = new MqttDecoder();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(ctx.channel()).thenReturn(channel);
}
@Test
public void testConnectMessage() throws Exception {
final MqttConnectMessage message = createConnectMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttConnectMessage decodedMessage = (MqttConnectMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
vlidateConnectVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validateConnectPayload(message.payload(), decodedMessage.payload());
}
@Test
public void testConnAckMessage() throws Exception {
final MqttConnAckMessage message = createConnAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttConnAckMessage decodedMessage = (MqttConnAckMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnAckVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
}
@Test
public void testPublishMessage() throws Exception {
final MqttPublishMessage message = createPublishMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttPublishMessage decodedMessage = (MqttPublishMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePublishVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validatePublishPayload(message.payload(), decodedMessage.payload());
}
@Test
public void testPubAckMessage() throws Exception {
testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBACK);
}
@Test
public void testPubRecMessage() throws Exception {
testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBREC);
}
@Test
public void testPubRelMessage() throws Exception {
testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBREL);
}
@Test
public void testPubCompMessage() throws Exception {
testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBCOMP);
}
@Test
public void testSubscribeMessage() throws Exception {
final MqttSubscribeMessage message = createSubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttSubscribeMessage decodedMessage = (MqttSubscribeMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validateSubscribePayload(message.payload(), decodedMessage.payload());
}
@Test
public void testSubAckMessage() throws Exception {
final MqttSubAckMessage message = createSubAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttSubAckMessage decodedMessage = (MqttSubAckMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validateSubAckPayload(message.payload(), decodedMessage.payload());
}
@Test
public void testUnSubscribeMessage() throws Exception {
final MqttUnsubscribeMessage message = createUnsubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttUnsubscribeMessage decodedMessage = (MqttUnsubscribeMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader());
validateUnsubscribePayload(message.payload(), decodedMessage.payload());
}
@Test
public void testUnsubAckMessage() throws Exception {
testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.UNSUBACK);
}
@Test
public void testPingReqMessage() throws Exception {
testMessageWithOnlyFixedHeader(MqttMessageType.PINGREQ);
}
@Test
public void testPingRespMessage() throws Exception {
testMessageWithOnlyFixedHeader(MqttMessageType.PINGRESP);
}
@Test
public void testDisconnectMessage() throws Exception {
testMessageWithOnlyFixedHeader(MqttMessageType.DISCONNECT);
}
private void testMessageWithOnlyFixedHeader(MqttMessageType messageType) throws Exception {
MqttMessage message = createMessageWithFixedHeader(messageType);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
}
private void testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType messageType)
throws Exception {
MqttMessage message = createMessageWithFixedHeaderAndMessageIdVariableHeader(messageType);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object bout got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(
(MqttMessageIdVariableHeader) message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
}
// Factory methods of different MQTT
// Message types to help testing
private static MqttMessage createMessageWithFixedHeader(MqttMessageType messageType) {
return new MqttMessage(new MqttFixedHeader(messageType, false, QoS.AT_MOST_ONCE, false, 0));
}
private static MqttMessage createMessageWithFixedHeaderAndMessageIdVariableHeader(MqttMessageType messageType) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(
messageType,
false,
messageType == MqttMessageType.PUBREL ? QoS.AT_LEAST_ONCE : QoS.AT_MOST_ONCE,
false,
0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
}
private static MqttConnectMessage createConnectMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNECT, false, QoS.AT_MOST_ONCE, false, 0);
MqttConnectVariableHeader mqttConnectVariableHeader =
new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
true,
true,
true,
1,
true,
true,
KEEP_ALIVE_SECONDS);
MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(CLIENT_ID, WILL_TOPIC, WILL_MESSAGE, USER_NAME, PASSWORD);
return new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader, mqttConnectPayload);
}
private static MqttConnAckMessage createConnAckMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, QoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}
private static MqttPublishMessage createPublishMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, QoS.AT_LEAST_ONCE, true, 0);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", 1234);
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes("whatever".getBytes(CharsetUtil.UTF_8));
return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
}
private static MqttSubscribeMessage createSubscribeMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, QoS.AT_LEAST_ONCE, true, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
List<MqttTopicSubscription> topicSubscriptions = new LinkedList<MqttTopicSubscription>();
topicSubscriptions.add(new MqttTopicSubscription("/abc", QoS.AT_LEAST_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/def", QoS.AT_LEAST_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/xyz", QoS.EXACTLY_ONCE));
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(topicSubscriptions);
return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
private static MqttSubAckMessage createSubAckMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, QoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
List<Integer> grantedQosLevels = new LinkedList<Integer>();
grantedQosLevels.add(1);
grantedQosLevels.add(2);
grantedQosLevels.add(0);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQosLevels);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
}
private static MqttUnsubscribeMessage createUnsubscribeMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, QoS.AT_LEAST_ONCE, true, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
List<String> topics = new LinkedList<String>();
topics.add("/abc");
topics.add("/def");
topics.add("/xyz");
MqttUnsubscribePayload mqttUnsubscribePayload = new MqttUnsubscribePayload(topics);
return new MqttUnsubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttUnsubscribePayload);
}
// Helper methdos to compare expected and actual
// MQTT messages
private static void validateFixedHeaders(MqttFixedHeader expected, MqttFixedHeader actual) {
assertEquals("MqttFixedHeader MqttMessageType mismatch ", expected.messageType(), actual.messageType());
assertEquals("MqttFixedHeader Qos mismatch ", expected.qosLevel(), actual.qosLevel());
}
private static void vlidateConnectVariableHeader(
MqttConnectVariableHeader expected,
MqttConnectVariableHeader actual) {
assertEquals("MqttConnectVariableHeader Name mismatch ", expected.name(), actual.name());
assertEquals(
"MqttConnectVariableHeader KeepAliveTimeSeconds mismatch ",
expected.keepAliveTimeSeconds(),
actual.keepAliveTimeSeconds());
assertEquals("MqttConnectVariableHeader Version mismatch ", expected.version(), actual.version());
assertEquals("MqttConnectVariableHeader WillQos mismatch ", expected.willQos(), actual.willQos());
assertEquals("MqttConnectVariableHeader HasUserName mismatch ", expected.hasUserName(), actual.hasUserName());
assertEquals("MqttConnectVariableHeader HasPassword mismatch ", expected.hasPassword(), actual.hasPassword());
assertEquals(
"MqttConnectVariableHeader IsCleanSession mismatch ",
expected.isCleanSession(),
actual.isCleanSession());
assertEquals("MqttConnectVariableHeader IsWillFlag mismatch ", expected.isWillFlag(), actual.isWillFlag());
assertEquals(
"MqttConnectVariableHeader IsWillRetain mismatch ",
expected.isWillRetain(),
actual.isWillRetain());
}
private static void validateConnectPayload(MqttConnectPayload expected, MqttConnectPayload actual) {
assertEquals(
"MqttConnectPayload ClientIdentifier mismatch ",
expected.clientIdentifier(),
actual.clientIdentifier());
assertEquals("MqttConnectPayload UserName mismatch ", expected.userName(), actual.userName());
assertEquals("MqttConnectPayload Password mismatch ", expected.password(), actual.password());
assertEquals("MqttConnectPayload WillMessage mismatch ", expected.willMessage(), actual.willMessage());
assertEquals("MqttConnectPayload WillTopic mismatch ", expected.willTopic(), actual.willTopic());
}
private static void validateConnAckVariableHeader(
MqttConnAckVariableHeader expected,
MqttConnAckVariableHeader actual) {
assertEquals(
"MqttConnAckVariableHeader MqttConnectReturnCode mismatch",
expected.connectReturnCode(),
actual.connectReturnCode());
}
private static void validatePublishVariableHeader(
MqttPublishVariableHeader expected,
MqttPublishVariableHeader actual) {
assertEquals("MqttPublishVariableHeader TopicName mismatch ", expected.topicName(), actual.topicName());
assertEquals("MqttPublishVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId());
}
private static void validatePublishPayload(ByteBuf expected, ByteBuf actual) {
assertEquals("PublishPayload mismatch ", 0, expected.compareTo(actual));
}
private static void validateMessageIdVariableHeader(
MqttMessageIdVariableHeader expected,
MqttMessageIdVariableHeader actual) {
assertEquals("MqttMessageIdVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId());
}
private static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();
assertEquals(
"MqttSubscribePayload TopicSubscriptionList size mismatch ",
expectedTopicSubscriptions.size(),
actualTopicSubscriptions.size());
for (int i = 0; i < expectedTopicSubscriptions.size(); i++) {
validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i));
}
}
private static void validateTopicSubscription(
MqttTopicSubscription expected,
MqttTopicSubscription actual) {
assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName());
assertEquals(
"MqttTopicSubscription Qos mismatch ",
expected.qualityOfService(),
actual.qualityOfService());
}
private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) {
assertArrayEquals(
"MqttSubAckPayload GrantedQosLevels mismatch ",
expected.grantedQoSLevels().toArray(),
actual.grantedQoSLevels().toArray());
}
private static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) {
assertArrayEquals(
"MqttUnsubscribePayload TopicList mismatch ",
expected.topics().toArray(),
actual.topics().toArray());
}
}

11
pom.xml
View File

@ -339,8 +339,9 @@
<module>codec-haproxy</module> <module>codec-haproxy</module>
<module>codec-http</module> <module>codec-http</module>
<module>codec-memcache</module> <module>codec-memcache</module>
<module>codec-stomp</module> <module>codec-mqtt</module>
<module>codec-socks</module> <module>codec-socks</module>
<module>codec-stomp</module>
<module>transport</module> <module>transport</module>
<module>transport-rxtx</module> <module>transport-rxtx</module>
<module>transport-sctp</module> <module>transport-sctp</module>
@ -534,6 +535,14 @@
<version>0.5-rc1</version> <version>0.5-rc1</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Test dependencies for MQTT -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>