From 1ba5fa4b4bdd923d8db7a5adef5f59d4e6d06e61 Mon Sep 17 00:00:00 2001 From: Mousom Dhar Gupta Date: Wed, 16 Apr 2014 14:59:09 -0700 Subject: [PATCH] Add MQTT protocol codec MQTT is a open source protocol on top of TCP which is widely used in mobile communication and also for IoT (Internet of Things) today. This will add an open source implementation of MQTT so that it becomes easier for Netty users to implement an MQTT application. For more information about the MQTT protocol, read this: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html --- all/pom.xml | 7 + codec-mqtt/pom.xml | 48 ++ .../handler/codec/mqtt/MqttCommonUtil.java | 62 +++ .../codec/mqtt/MqttConnAckMessage.java | 33 ++ .../codec/mqtt/MqttConnAckVariableHeader.java | 43 ++ .../codec/mqtt/MqttConnectMessage.java | 40 ++ .../codec/mqtt/MqttConnectPayload.java | 76 +++ .../codec/mqtt/MqttConnectReturnCode.java | 60 +++ .../codec/mqtt/MqttConnectVariableHeader.java | 107 ++++ .../netty/handler/codec/mqtt/MqttDecoder.java | 475 ++++++++++++++++++ .../netty/handler/codec/mqtt/MqttEncoder.java | 395 +++++++++++++++ .../handler/codec/mqtt/MqttFixedHeader.java | 78 +++ .../netty/handler/codec/mqtt/MqttMessage.java | 80 +++ .../codec/mqtt/MqttMessageFactory.java | 89 ++++ .../mqtt/MqttMessageIdVariableHeader.java | 53 ++ .../handler/codec/mqtt/MqttMessageType.java | 57 +++ .../handler/codec/mqtt/MqttPubAckMessage.java | 32 ++ .../codec/mqtt/MqttPublishMessage.java | 103 ++++ .../codec/mqtt/MqttPublishVariableHeader.java | 50 ++ .../handler/codec/mqtt/MqttSubAckMessage.java | 40 ++ .../handler/codec/mqtt/MqttSubAckPayload.java | 45 ++ .../codec/mqtt/MqttSubscribeMessage.java | 41 ++ .../codec/mqtt/MqttSubscribePayload.java | 49 ++ .../codec/mqtt/MqttTopicSubscription.java | 51 ++ .../codec/mqtt/MqttUnsubAckMessage.java | 32 ++ .../codec/mqtt/MqttUnsubscribeMessage.java | 41 ++ .../codec/mqtt/MqttUnsubscribePayload.java | 49 ++ .../codec/mqtt/MqttValidationUtil.java | 59 +++ .../netty/handler/codec/mqtt/MqttVersion.java | 27 + .../java/io/netty/handler/codec/mqtt/QoS.java | 40 ++ .../handler/codec/mqtt/package-info.java | 19 + .../handler/codec/mqtt/MqttCodecTest.java | 435 ++++++++++++++++ pom.xml | 11 +- 33 files changed, 2826 insertions(+), 1 deletion(-) create mode 100644 codec-mqtt/pom.xml create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCommonUtil.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttFixedHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubAckMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckPayload.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribePayload.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribePayload.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttValidationUtil.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/QoS.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/package-info.java create mode 100644 codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java diff --git a/all/pom.xml b/all/pom.xml index 49d6731264..5248ad0e5b 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -147,6 +147,13 @@ compile true + + ${project.groupId} + netty-codec-mqtt + ${project.version} + compile + true + ${project.groupId} netty-codec-socks diff --git a/codec-mqtt/pom.xml b/codec-mqtt/pom.xml new file mode 100644 index 0000000000..395966c587 --- /dev/null +++ b/codec-mqtt/pom.xml @@ -0,0 +1,48 @@ + + + + + 4.0.0 + + io.netty + netty-parent + 5.0.0.Alpha2-SNAPSHOT + + + netty-codec-mqtt + jar + + Netty/Codec/Mqtt + + + + ${project.groupId} + netty-codec + ${project.version} + + + ${project.groupId} + netty-handler + ${project.version} + + + org.mockito + mockito-all + test + + + diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCommonUtil.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCommonUtil.java new file mode 100644 index 0000000000..cd56f43b0f --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCommonUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +public final class MqttCommonUtil { + + public static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) { + switch (mqttFixedHeader.messageType()) { + case CONNECT: + case CONNACK: + case PUBACK: + case PUBREC: + case PUBCOMP: + case SUBACK: + case UNSUBACK: + case PINGREQ: + case PINGRESP: + case DISCONNECT: + if (mqttFixedHeader.isDup() || + mqttFixedHeader.qosLevel() != QoS.AT_MOST_ONCE || + mqttFixedHeader.isRetain()) { + return new MqttFixedHeader( + mqttFixedHeader.messageType(), + false, + QoS.AT_MOST_ONCE, + false, + mqttFixedHeader.remainingLength()); + } + return mqttFixedHeader; + case PUBREL: + case SUBSCRIBE: + case UNSUBSCRIBE: + if (mqttFixedHeader.isRetain()) { + return new MqttFixedHeader( + mqttFixedHeader.messageType(), + mqttFixedHeader.isDup(), + mqttFixedHeader.qosLevel(), + false, + mqttFixedHeader.remainingLength()); + } + return mqttFixedHeader; + default: + return mqttFixedHeader; + } + } + + private MqttCommonUtil() { } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckMessage.java new file mode 100644 index 0000000000..639dad997f --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckMessage.java @@ -0,0 +1,33 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See MQTTV3.1/connack + */ +public final class MqttConnAckMessage extends MqttMessage { + + public MqttConnAckMessage(MqttFixedHeader mqttFixedHeader, MqttConnAckVariableHeader variableHeader) { + super(mqttFixedHeader, variableHeader); + } + + @Override + public MqttConnAckVariableHeader variableHeader() { + return (MqttConnAckVariableHeader) super.variableHeader(); + } + +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java new file mode 100644 index 0000000000..2167fcdd7c --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java @@ -0,0 +1,43 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable header of {@link MqttConnectMessage } + */ +public class MqttConnAckVariableHeader { + + private final MqttConnectReturnCode connectReturnCode; + + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode) { + this.connectReturnCode = connectReturnCode; + } + + public MqttConnectReturnCode connectReturnCode() { + return connectReturnCode; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("connectReturnCode=").append(connectReturnCode); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectMessage.java new file mode 100644 index 0000000000..ffc42b5d61 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectMessage.java @@ -0,0 +1,40 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See MQTTV3.1/connect + */ +public final class MqttConnectMessage extends MqttMessage { + + public MqttConnectMessage( + MqttFixedHeader mqttFixedHeader, + MqttConnectVariableHeader variableHeader, + MqttConnectPayload payload) { + super(mqttFixedHeader, variableHeader, payload); + } + + @Override + public MqttConnectVariableHeader variableHeader() { + return (MqttConnectVariableHeader) super.variableHeader(); + } + + @Override + public MqttConnectPayload payload() { + return (MqttConnectPayload) super.payload(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java new file mode 100644 index 0000000000..a2a70af67e --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java @@ -0,0 +1,76 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Payload of {@link MqttConnectMessage} + */ +public class MqttConnectPayload { + + private final String clientIdentifier; + private final String willTopic; + private final String willMessage; + private final String userName; + private final String password; + + public MqttConnectPayload( + String clientIdentifier, + String willTopic, + String willMessage, + String userName, + String password) { + this.clientIdentifier = clientIdentifier; + this.willTopic = willTopic; + this.willMessage = willMessage; + this.userName = userName; + this.password = password; + } + + public String clientIdentifier() { + return clientIdentifier; + } + + public String willTopic() { + return willTopic; + } + + public String willMessage() { + return willMessage; + } + + public String userName() { + return userName; + } + + public String password() { + return password; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("clientIdentifier=").append(clientIdentifier); + builder.append(", willTopic=").append(willTopic); + builder.append(", willMessage=").append(willMessage); + builder.append(", userName=").append(userName); + builder.append(", password=").append(password); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java new file mode 100644 index 0000000000..f024010fa0 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java @@ -0,0 +1,60 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Return Code of {@link io.netty.handler.codec.mqtt.MqttConnAckMessage} + */ +public enum MqttConnectReturnCode { + CONNECTION_ACCEPTED((byte) 0x00), + CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01), + CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02), + CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03), + CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04), + CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05); + + private static final Map valueToCodeMap; + + static { + final Map valueMap = new HashMap(); + for (MqttConnectReturnCode code : values()) { + valueMap.put(code.value, code); + } + valueToCodeMap = Collections.unmodifiableMap(valueMap); + } + + private final byte value; + + MqttConnectReturnCode(byte value) { + this.value = value; + } + + public byte value() { + return value; + } + + public static MqttConnectReturnCode valueOf(byte b) { + if (valueToCodeMap.containsKey(b)) { + return valueToCodeMap.get(b); + } + throw new IllegalArgumentException("connect retirn code " + b + " unsupported"); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java new file mode 100644 index 0000000000..3e0266e7d0 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java @@ -0,0 +1,107 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable Header for the {@link MqttConnectMessage} + */ +public class MqttConnectVariableHeader { + + private final String name; + private final int version; + private final boolean hasUserName; + private final boolean hasPassword; + private final boolean isWillRetain; + private final int willQos; + private final boolean isWillFlag; + private final boolean isCleanSession; + private final int keepAliveTimeSeconds; + + public MqttConnectVariableHeader( + String name, + int version, + boolean hasUserName, + boolean hasPassword, + boolean isWillRetain, + int willQos, + boolean isWillFlag, + boolean isCleanSession, + int keepAliveTimeSeconds) { + this.name = name; + this.version = version; + this.hasUserName = hasUserName; + this.hasPassword = hasPassword; + this.isWillRetain = isWillRetain; + this.willQos = willQos; + this.isWillFlag = isWillFlag; + this.isCleanSession = isCleanSession; + this.keepAliveTimeSeconds = keepAliveTimeSeconds; + } + + public String name() { + return name; + } + + public int version() { + return version; + } + + public boolean hasUserName() { + return hasUserName; + } + + public boolean hasPassword() { + return hasPassword; + } + + public boolean isWillRetain() { + return isWillRetain; + } + + public int willQos() { + return willQos; + } + + public boolean isWillFlag() { + return isWillFlag; + } + + public boolean isCleanSession() { + return isCleanSession; + } + + public int keepAliveTimeSeconds() { + return keepAliveTimeSeconds; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("name=").append(name); + builder.append(", version=").append(version); + builder.append(", hasUserName=").append(hasUserName); + builder.append(", hasPassword=").append(hasPassword); + builder.append(", isWillRetain=").append(isWillRetain); + builder.append(", isWillFlag=").append(isWillFlag); + builder.append(", isCleanSession=").append(isCleanSession); + builder.append(", keepAliveTimeSeconds=").append(keepAliveTimeSeconds); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java new file mode 100644 index 0000000000..9d6cc43c4b --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java @@ -0,0 +1,475 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.ReplayingDecoder; +import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState; +import io.netty.util.CharsetUtil; + +import java.util.ArrayList; +import java.util.List; + +import static io.netty.handler.codec.mqtt.MqttValidationUtil.*; +import static io.netty.handler.codec.mqtt.MqttCommonUtil.*; + +import static io.netty.handler.codec.mqtt.MqttVersion.*; + +/** + * Decodes Mqtt messages from bytes, following + * + * the MQTT protocl specification v3.1 + */ +public class MqttDecoder extends ReplayingDecoder { + + private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092; + + /** + * States of the decoder. + * We start at READ_FIXED_HEADER, followed by + * READ_VARIABLE_HEADER and finally READ_PAYLOAD. + */ + enum DecoderState { + READ_FIXED_HEADER, + READ_VARIABLE_HEADER, + READ_PAYLOAD, + BAD_MESSAGE, + } + + private MqttFixedHeader mqttFixedHeader; + private Object variableHeader; + private Object payload; + private int bytesRemainingInVariablePart; + + private final int maxBytesInMessage; + + public MqttDecoder() { + this(DEFAULT_MAX_BYTES_IN_MESSAGE); + } + + public MqttDecoder(int maxBytesInMessage) { + super(DecoderState.READ_FIXED_HEADER); + this.maxBytesInMessage = maxBytesInMessage; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + switch (state()) { + case READ_FIXED_HEADER: + mqttFixedHeader = decodeFixedHeader(buffer); + bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); + checkpoint(DecoderState.READ_VARIABLE_HEADER); + // fall through + + case READ_VARIABLE_HEADER: try { + if (bytesRemainingInVariablePart > maxBytesInMessage) { + throw new DecoderException( + String.format( + "Client tried to send very large message: %d bytes", + bytesRemainingInVariablePart)); + } + final Result decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); + variableHeader = decodedVariableHeader.value; + bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; + checkpoint(DecoderState.READ_PAYLOAD); + // fall through + } catch (Exception cause) { + out.add(invalidMessage(cause)); + return; + } + + case READ_PAYLOAD: try { + final Result decodedPayload = + decodePayload( + buffer, + mqttFixedHeader.messageType(), + bytesRemainingInVariablePart, + variableHeader); + payload = decodedPayload.value; + bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; + if (bytesRemainingInVariablePart != 0) { + throw new DecoderException( + String.format( + "Non-zero bytes remaining (%d) should never happen. " + + "Message type: %s. Channel: %s", + bytesRemainingInVariablePart, + mqttFixedHeader.messageType(), + ctx.channel())); + } + checkpoint(DecoderState.READ_FIXED_HEADER); + MqttMessage message = MqttMessageFactory.create(mqttFixedHeader, variableHeader, payload); + mqttFixedHeader = null; + variableHeader = null; + payload = null; + out.add(message); + break; + } catch (Exception cause) { + out.add(invalidMessage(cause)); + return; + } + + case BAD_MESSAGE: + // Keep discarding until disconnection. + buffer.skipBytes(actualReadableBytes()); + break; + + default: + throw new Error("Shouldn't reach here"); + } + } + + private MqttMessage invalidMessage(Throwable cause) { + checkpoint(DecoderState.BAD_MESSAGE); + return MqttMessageFactory.createInvalidMessage(cause); + } + + /** + * Decodes the fixed header. It's one byte for the flags and then variable bytes for the remaining length. + * + * @param buffer the buffer to decode from + * @return the fixed header + */ + private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) { + short b1 = buffer.readUnsignedByte(); + + MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4); + boolean dupFlag = (b1 & 0x08) == 0x08; + int qosLevel = (b1 & 0x06) >> 1; + boolean retain = (b1 & 0x01) != 0; + + int remainingLength = 0; + int multiplier = 1; + short digit; + int loops = 0; + do { + digit = buffer.readUnsignedByte(); + remainingLength += (digit & 127) * multiplier; + multiplier *= 128; + loops++; + } while ((digit & 128) != 0 && loops < 4); + + // MQTT protocol limits Remaining Length to 4 bytes + if (loops == 4 && (digit & 128) != 0) { + throw new DecoderException( + String.format( + "Failed to read fixed header of MQTT message. " + + "It has more than 4 digits for Remaining Length. " + + "MqttMessageType: %s.", + messageType)); + } + MqttFixedHeader decodedFixedHeader = + new MqttFixedHeader(messageType, dupFlag, QoS.valueOf(qosLevel), retain, remainingLength); + return validateFixedHeader(resetUnusedFields(decodedFixedHeader)); + } + + /** + * Decodes the variable header (if any) + * @param buffer the buffer to decode from + * @param mqttFixedHeader MqttFixedHeader of the same message + * @return the variable header + */ + private static Result decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { + switch (mqttFixedHeader.messageType()) { + case CONNECT: + return decodeConnectionVariableHeader(buffer); + + case CONNACK: + return decodeConnAckVariableHeader(buffer); + + case SUBSCRIBE: + case UNSUBSCRIBE: + case SUBACK: + case UNSUBACK: + case PUBACK: + case PUBREC: + case PUBCOMP: + case PUBREL: + return decodeMessageIdVariableHeader(buffer); + + case PUBLISH: + return decodePublishVariableHeader(buffer, mqttFixedHeader); + + case PINGREQ: + case PINGRESP: + case DISCONNECT: + // Empty variable header + return new Result(null, 0); + } + return new Result(null, 0); //should never reach here + } + + private static Result decodeConnectionVariableHeader(ByteBuf buffer) { + final Result protoString = decodeString(buffer); + if (!PROTOCOL_NAME.equals(protoString.value)) { + throw new DecoderException(PROTOCOL_NAME + " signature is missing. Closing channel."); + } + + int numberOfBytesConsumed = protoString.numberOfBytesConsumed; + + final byte version = buffer.readByte(); + final int b1 = buffer.readUnsignedByte(); + numberOfBytesConsumed += 2; + + final Result keepAlive = decodeMsbLsb(buffer); + numberOfBytesConsumed += keepAlive.numberOfBytesConsumed; + + final boolean hasUserName = (b1 & 0x80) == 0x80; + final boolean hasPassword = (b1 & 0x40) == 0x40; + final boolean willRetain = (b1 & 0x20) == 0x20; + final int willQos = (b1 & 0x18) >> 3; + final boolean willFlag = (b1 & 0x04) == 0x04; + final boolean cleanSession = (b1 & 0x02) == 0x02; + + final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader( + PROTOCOL_NAME, + version, + hasUserName, + hasPassword, + willRetain, + willQos, + willFlag, + cleanSession, + keepAlive.value); + return new Result(mqttConnectVariableHeader, numberOfBytesConsumed); + } + + private static Result decodeConnAckVariableHeader(ByteBuf buffer) { + buffer.readUnsignedByte(); // reserved byte + byte returnCode = buffer.readByte(); + final int numberOfBytesConsumed = 2; + final MqttConnAckVariableHeader mqttConnAckVariableHeader = + new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode)); + return new Result(mqttConnAckVariableHeader, numberOfBytesConsumed); + } + + private static Result decodeMessageIdVariableHeader(ByteBuf buffer) { + final Result messageId = decodeMessageId(buffer); + return new Result( + MqttMessageIdVariableHeader.from(messageId.value), + messageId.numberOfBytesConsumed); + } + + private static Result decodePublishVariableHeader( + ByteBuf buffer, + MqttFixedHeader mqttFixedHeader) { + final Result decodedTopic = decodeString(buffer); + if (!isValidPublishTopicName(decodedTopic.value)) { + throw new DecoderException(String.format("Publish topic name %s contains wildcards.", decodedTopic.value)); + } + int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed; + + int messageId = -1; + if (mqttFixedHeader.qosLevel().value() > 0) { + final Result decodedMessageId = decodeMessageId(buffer); + messageId = decodedMessageId.value; + numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed; + } + final MqttPublishVariableHeader mqttPublishVariableHeader = + new MqttPublishVariableHeader(decodedTopic.value, messageId); + return new Result(mqttPublishVariableHeader, numberOfBytesConsumed); + } + + private static Result decodeMessageId(ByteBuf buffer) { + final Result messageId = decodeMsbLsb(buffer); + if (!isValidMessageId(messageId.value)) { + throw new DecoderException(String.format("Invalid messageId %d", messageId.value)); + } + return messageId; + } + + /** + * Decodes the payload. + * + * @param buffer the buffer to decode from + * @param messageType type of the message being decoded + * @param bytesRemainingInVariablePart bytes remaining + * @param variableHeader variable header of the same message + * @return the payload + */ + private static Result decodePayload( + ByteBuf buffer, + MqttMessageType messageType, + int bytesRemainingInVariablePart, + Object variableHeader) { + switch (messageType) { + case CONNECT: + return decodeConnectionPayload(buffer, (MqttConnectVariableHeader) variableHeader); + + case SUBSCRIBE: + return decodeSubscribePayload(buffer, bytesRemainingInVariablePart); + + case SUBACK: + return decodeSubackPayload(buffer, bytesRemainingInVariablePart); + + case UNSUBSCRIBE: + return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart); + + case PUBLISH: + return decodePublishPayload(buffer, bytesRemainingInVariablePart); + + default: + // unknown payload , no byte consumed + return new Result(null, 0); + } + } + + private static Result decodeConnectionPayload( + ByteBuf buffer, + MqttConnectVariableHeader mqttConnectVariableHeader) { + final Result decodedClientId = decodeString(buffer); + final String decodedClientIdValue = decodedClientId.value; + if (!isValidClientId(decodedClientIdValue)) { + throw new DecoderException( + String.format("Invalid clientIdentifier %s ", + decodedClientIdValue != null ? decodedClientIdValue : "null")); + } + int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed; + + Result decodedWillTopic = null; + Result decodedWillMessage = null; + if (mqttConnectVariableHeader.isWillFlag()) { + decodedWillTopic = decodeString(buffer, 0, 32767); + numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed; + decodedWillMessage = decodeAsciiString(buffer); + numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed; + } + Result decodedUserName = null; + Result decodedPassword = null; + if (mqttConnectVariableHeader.hasUserName()) { + decodedUserName = decodeString(buffer); + numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed; + } + if (mqttConnectVariableHeader.hasPassword()) { + decodedPassword = decodeString(buffer); + numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed; + } + + final MqttConnectPayload mqttConnectPayload = + new MqttConnectPayload( + decodedClientId.value, + decodedWillTopic.value, + decodedWillMessage.value, + decodedUserName.value, + decodedPassword.value); + return new Result(mqttConnectPayload, numberOfBytesConsumed); + } + + private static Result decodeSubscribePayload( + ByteBuf buffer, + int bytesRemainingInVariablePart) { + final List subscribeTopics = new ArrayList(); + int numberOfBytesConsumed = 0; + while (numberOfBytesConsumed < bytesRemainingInVariablePart) { + final Result decodedTopicName = decodeString(buffer); + numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; + int qos = buffer.readUnsignedByte() & 0x03; + numberOfBytesConsumed++; + subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, QoS.valueOf(qos))); + } + return new Result(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed); + } + + private static Result decodeSubackPayload( + ByteBuf buffer, + int bytesRemainingInVariablePart) { + final List grantedQos = new ArrayList(); + int numberOfBytesConsumed = 0; + while (numberOfBytesConsumed < bytesRemainingInVariablePart) { + int qos = buffer.readUnsignedByte() & 0x03; + numberOfBytesConsumed++; + grantedQos.add(qos); + } + return new Result(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed); + } + + private static Result decodeUnsubscribePayload( + ByteBuf buffer, + int bytesRemainingInVariablePart) { + final List unsubscribeTopics = new ArrayList(); + int numberOfBytesConsumed = 0; + while (numberOfBytesConsumed < bytesRemainingInVariablePart) { + final Result decodedTopicName = decodeString(buffer); + numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; + unsubscribeTopics.add(decodedTopicName.value); + } + return new Result( + new MqttUnsubscribePayload(unsubscribeTopics), + numberOfBytesConsumed); + } + + private static Result decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) { + ByteBuf b = buffer.readSlice(bytesRemainingInVariablePart).retain(); + return new Result(b, bytesRemainingInVariablePart); + } + + private static Result decodeString(ByteBuf buffer) { + return decodeString(buffer, 0, Integer.MAX_VALUE); + } + + private static Result decodeAsciiString(ByteBuf buffer) { + Result result = decodeString(buffer, 0, Integer.MAX_VALUE); + final String s = result.value; + for (int i = 0; i < s.length(); i++) { + if (s.charAt(i) > 127) { + return new Result(null, result.numberOfBytesConsumed); + } + } + return new Result(s, result.numberOfBytesConsumed); + } + + private static Result decodeString(ByteBuf buffer, int minBytes, int maxBytes) { + final Result decodedSize = decodeMsbLsb(buffer); + int size = decodedSize.value; + int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed; + if (size < minBytes || size > maxBytes) { + buffer.skipBytes(size); + numberOfBytesConsumed += size; + return new Result(null, numberOfBytesConsumed); + } + ByteBuf buf = buffer.readBytes(size); + numberOfBytesConsumed += size; + return new Result(buf.toString(CharsetUtil.UTF_8), numberOfBytesConsumed); + } + + private static Result decodeMsbLsb(ByteBuf buffer) { + return decodeMsbLsb(buffer, 0, 65535); + } + + private static Result decodeMsbLsb(ByteBuf buffer, int min, int max) { + short msbSize = buffer.readUnsignedByte(); + short lsbSize = buffer.readUnsignedByte(); + final int numberOfBytesConsumed = 2; + int result = msbSize << 8 | lsbSize; + if (result < min || result > max) { + result = -1; + } + return new Result(result, numberOfBytesConsumed); + } + + private static final class Result { + + private final T value; + private final int numberOfBytesConsumed; + + Result(T value, int numberOfBytesConsumed) { + this.value = value; + this.numberOfBytesConsumed = numberOfBytesConsumed; + } + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java new file mode 100644 index 0000000000..4e7bd6d112 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java @@ -0,0 +1,395 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.util.CharsetUtil; + +import java.util.List; + +/** + * Encodes Mqtt messages into bytes following the protocl specification v3.1 + * as described here MQTTV3.1 + */ +public class MqttEncoder extends MessageToMessageEncoder { + + public static final MqttEncoder DEFAUL_ENCODER = new MqttEncoder(); + + private static final byte[] EMPTY = new byte[0]; + + private static final byte[] CONNECT_VARIABLE_HEADER_START = {0, 6, 'M', 'Q', 'I', 's', 'd', 'p'}; + + @Override + protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List out) throws Exception { + out.add(doEncode(ctx.alloc(), msg)); + } + + /** + * This is the main encoding method. + * It's only visible for testing. + * + * @param byteBufAllocator Allocates ByteBuf + * @param message MQTT message to encode + * @return ByteBuf with encoded bytes + */ + static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) { + + switch (message.fixedHeader().messageType()) { + case CONNECT: + return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message); + + case CONNACK: + return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message); + + case PUBLISH: + return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message); + + case SUBSCRIBE: + return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message); + + case UNSUBSCRIBE: + return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message); + + case SUBACK: + return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message); + + case UNSUBACK: + case PUBACK: + case PUBREC: + case PUBREL: + case PUBCOMP: + return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message); + + case PINGREQ: + case PINGRESP: + case DISCONNECT: + return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message); + + default: + throw new IllegalArgumentException( + "Unknown message type: " + message.fixedHeader().messageType().value()); + } + } + + private static ByteBuf encodeConnectMessage( + ByteBufAllocator byteBufAllocator, + MqttConnectMessage message) { + int variableHeaderBufferSize = 12; + int payloadBufferSize = 0; + + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttConnectVariableHeader variableHeader = message.variableHeader(); + MqttConnectPayload payload = message.payload(); + + // Client id + String clientIdentifier = payload.clientIdentifier(); + if (!isValidClientIdentifier(clientIdentifier)) { + throw new IllegalArgumentException( + String.format("Invalid clientIdentifier %s. Must be less than 23 chars long", + clientIdentifier != null ? clientIdentifier : "null")); + } + byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier); + payloadBufferSize += 2 + clientIdentifierBytes.length; + + // Will topic and message + String willTopic = payload.willTopic(); + byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EMPTY; + String willMessage = payload.willMessage(); + byte[] willMessageBytes = willMessage != null ? encodeStringUtf8(willMessage) : EMPTY; + if (variableHeader.isWillFlag()) { + payloadBufferSize += 2 + willTopicBytes.length; + payloadBufferSize += 2 + willMessageBytes.length; + } + + String userName = payload.userName(); + byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EMPTY; + if (variableHeader.hasUserName()) { + payloadBufferSize += 2 + userNameBytes.length; + } + + String password = payload.password(); + byte[] passwordBytes = password != null ? encodeStringUtf8(password) : EMPTY; + if (variableHeader.hasPassword()) { + payloadBufferSize += 2 + passwordBytes.length; + } + + // Fixed header + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + + buf.writeBytes(CONNECT_VARIABLE_HEADER_START); + + buf.writeByte(variableHeader.version()); + buf.writeByte(getConnVariableHeaderFlag(variableHeader)); + buf.writeShort(variableHeader.keepAliveTimeSeconds()); + + // Payload + buf.writeShort(clientIdentifierBytes.length); + buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length); + if (variableHeader.isWillFlag()) { + buf.writeShort(willTopicBytes.length); + buf.writeBytes(willTopicBytes, 0, willTopicBytes.length); + buf.writeShort(willMessageBytes.length); + buf.writeBytes(willMessageBytes, 0, willMessageBytes.length); + } + if (variableHeader.hasUserName()) { + buf.writeShort(userNameBytes.length); + buf.writeBytes(userNameBytes, 0, userNameBytes.length); + } + if (variableHeader.hasPassword()) { + buf.writeShort(passwordBytes.length); + buf.writeBytes(passwordBytes, 0, passwordBytes.length); + } + return buf; + } + + private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) { + int flagByte = 0; + if (variableHeader.hasUserName()) { + flagByte |= 0x80; + } + if (variableHeader.hasPassword()) { + flagByte |= 0x40; + } + if (variableHeader.isWillRetain()) { + flagByte |= 0x20; + } + flagByte |= (variableHeader.willQos() & 0x03) << 3; + if (variableHeader.isWillFlag()) { + flagByte |= 0x04; + } + if (variableHeader.isCleanSession()) { + flagByte |= 0x02; + } + return flagByte; + } + + private static ByteBuf encodeConnAckMessage( + ByteBufAllocator byteBufAllocator, + MqttConnAckMessage message) { + ByteBuf buf = byteBufAllocator.buffer(4); + buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); + buf.writeByte(2); + buf.writeByte(0); + buf.writeByte(message.variableHeader().connectReturnCode().value()); + + return buf; + } + + private static ByteBuf encodeSubscribeMessage( + ByteBufAllocator byteBufAllocator, + MqttSubscribeMessage message) { + int variableHeaderBufferSize = 2; + int payloadBufferSize = 0; + + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = message.variableHeader(); + MqttSubscribePayload payload = message.payload(); + + for (MqttTopicSubscription topic : payload.topicSubscriptions()) { + String topicName = topic.topicName(); + byte[] topicNameBytes = encodeStringUtf8(topicName); + payloadBufferSize += 2 + topicNameBytes.length; + payloadBufferSize += 1; + } + + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + + ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + + // Variable Header + int messageId = variableHeader.messageId(); + buf.writeShort(messageId); + + // Payload + for (MqttTopicSubscription topic : payload.topicSubscriptions()) { + String topicName = topic.topicName(); + byte[] topicNameBytes = encodeStringUtf8(topicName); + buf.writeShort(topicNameBytes.length); + buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); + buf.writeByte(topic.qualityOfService().value()); + } + + return buf; + } + + private static ByteBuf encodeUnsubscribeMessage( + ByteBufAllocator byteBufAllocator, + MqttUnsubscribeMessage message) { + int variableHeaderBufferSize = 2; + int payloadBufferSize = 0; + + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = message.variableHeader(); + MqttUnsubscribePayload payload = message.payload(); + + for (String topicName : payload.topics()) { + byte[] topicNameBytes = encodeStringUtf8(topicName); + payloadBufferSize += 2 + topicNameBytes.length; + } + + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + + ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + + // Variable Header + int messageId = variableHeader.messageId(); + buf.writeShort(messageId); + + // Payload + for (String topicName : payload.topics()) { + byte[] topicNameBytes = encodeStringUtf8(topicName); + buf.writeShort(topicNameBytes.length); + buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); + } + + return buf; + } + + private static ByteBuf encodeSubAckMessage( + ByteBufAllocator byteBufAllocator, + MqttSubAckMessage message) { + int variableHeaderBufferSize = 2; + int payloadBufferSize = message.payload().grantedQoSLevels().size(); + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); + writeVariableLengthInt(buf, variablePartSize); + buf.writeShort(message.variableHeader().messageId()); + for (int qos : message.payload().grantedQoSLevels()) { + buf.writeByte(qos); + } + + return buf; + } + + private static ByteBuf encodePublishMessage( + ByteBufAllocator byteBufAllocator, + MqttPublishMessage message) { + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttPublishVariableHeader variableHeader = message.variableHeader(); + ByteBuf payload = message.payload().duplicate(); + + String topicName = variableHeader.topicName(); + byte[] topicNameBytes = encodeStringUtf8(topicName); + + int variableHeaderBufferSize = 2 + topicNameBytes.length + + (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0); + int payloadBufferSize = payload.readableBytes(); + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + + ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + buf.writeShort(topicNameBytes.length); + buf.writeBytes(topicNameBytes); + if (mqttFixedHeader.qosLevel().value() > 0) { + buf.writeShort(variableHeader.messageId()); + } + buf.writeBytes(payload); + + return buf; + } + + private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId( + ByteBufAllocator byteBufAllocator, + MqttMessage message) { + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); + int msgId = variableHeader.messageId(); + + int variableHeaderBufferSize = 2; // variable part only has a message id + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize); + ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variableHeaderBufferSize); + buf.writeShort(msgId); + + return buf; + } + + private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader( + ByteBufAllocator byteBufAllocator, + MqttMessage message) { + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + ByteBuf buf = byteBufAllocator.buffer(2); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + buf.writeByte(0); + + return buf; + } + + private static int getFixedHeaderByte1(MqttFixedHeader header) { + int ret = 0; + ret |= header.messageType().value() << 4; + if (header.isDup()) { + ret |= 0x08; + } + ret |= header.qosLevel().value() << 1; + if (header.isRetain()) { + ret |= 0x01; + } + return ret; + } + + private static void writeVariableLengthInt(ByteBuf buf, int num) { + do { + int digit = num % 128; + num /= 128; + if (num > 0) { + digit |= 0x80; + } + buf.writeByte(digit); + } while (num > 0); + } + + private static int getVariableLengthInt(int num) { + int count = 0; + do { + num /= 128; + count++; + } while (num > 0); + return count; + } + + private static byte[] encodeStringUtf8(String s) { + return s.getBytes(CharsetUtil.UTF_8); + } + + private static boolean isValidClientIdentifier(String clientIdentifier) { + if (clientIdentifier == null) { + return false; + } + int length = clientIdentifier.length(); + return length >= 1 && length <= 23; + } + +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttFixedHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttFixedHeader.java new file mode 100644 index 0000000000..c239d5e858 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttFixedHeader.java @@ -0,0 +1,78 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * See + * MQTTV3.1/fixed-header + */ +public class MqttFixedHeader { + + private final MqttMessageType messageType; + private final boolean isDup; + private final QoS qosLevel; + private final boolean isRetain; + private final int remainingLength; + + public MqttFixedHeader( + MqttMessageType messageType, + boolean isDup, + QoS qosLevel, + boolean isRetain, + int remainingLength) { + this.messageType = messageType; + this.isDup = isDup; + this.qosLevel = qosLevel; + this.isRetain = isRetain; + this.remainingLength = remainingLength; + } + + public MqttMessageType messageType() { + return messageType; + } + + public boolean isDup() { + return isDup; + } + + public QoS qosLevel() { + return qosLevel; + } + + public boolean isRetain() { + return isRetain; + } + + public int remainingLength() { + return remainingLength; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("messageType=").append(messageType); + builder.append(", isDup=").append(isDup); + builder.append(", qosLevel=").append(qosLevel); + builder.append(", isRetain=").append(isRetain); + builder.append(", remainingLength=").append(remainingLength); + builder.append(']'); + return builder.toString(); + } + +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessage.java new file mode 100644 index 0000000000..c92aa015cf --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessage.java @@ -0,0 +1,80 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.handler.codec.DecoderResult; +import io.netty.util.internal.StringUtil; + +/** + * Base class for all MQTT message types. + */ +public class MqttMessage { + + private final MqttFixedHeader mqttFixedHeader; + private final Object variableHeader; + private final Object payload; + private final DecoderResult decoderResult; + + public MqttMessage(MqttFixedHeader mqttFixedHeader) { + this(mqttFixedHeader, null, null); + } + + public MqttMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader) { + this(mqttFixedHeader, variableHeader, null); + } + + public MqttMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) { + this(mqttFixedHeader, variableHeader, payload, DecoderResult.SUCCESS); + } + + public MqttMessage( + MqttFixedHeader mqttFixedHeader, + Object variableHeader, + Object payload, + DecoderResult decoderResult) { + this.mqttFixedHeader = mqttFixedHeader; + this.variableHeader = variableHeader; + this.payload = payload; + this.decoderResult = decoderResult; + } + + public MqttFixedHeader fixedHeader() { + return mqttFixedHeader; + } + + public Object variableHeader() { + return variableHeader; + } + + public Object payload() { + return payload; + } + + public DecoderResult decoderResult() { + return decoderResult; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("fixedHeader=").append(fixedHeader() != null ? fixedHeader().toString() : ""); + builder.append(", variableHeader=").append(variableHeader() != null ? variableHeader.toString() : ""); + builder.append(", payload=").append(payload() != null ? payload.toString() : ""); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java new file mode 100644 index 0000000000..ee8848ca9d --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java @@ -0,0 +1,89 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.DecoderResult; + +/** + * Utility class with factory methods to create different types of MQTT messages. + */ +public final class MqttMessageFactory { + + public static MqttMessage create(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) { + switch (mqttFixedHeader.messageType()) { + case CONNECT : + return new MqttConnectMessage( + mqttFixedHeader, + (MqttConnectVariableHeader) variableHeader, + (MqttConnectPayload) payload); + + case CONNACK: + return new MqttConnAckMessage(mqttFixedHeader, (MqttConnAckVariableHeader) variableHeader); + + case SUBSCRIBE: + return new MqttSubscribeMessage( + mqttFixedHeader, + (MqttMessageIdVariableHeader) variableHeader, + (MqttSubscribePayload) payload); + + case SUBACK: + return new MqttSubAckMessage( + mqttFixedHeader, + (MqttMessageIdVariableHeader) variableHeader, + (MqttSubAckPayload) payload); + + case UNSUBACK: + return new MqttUnsubAckMessage( + mqttFixedHeader, + (MqttMessageIdVariableHeader) variableHeader); + + case UNSUBSCRIBE: + return new MqttUnsubscribeMessage( + mqttFixedHeader, + (MqttMessageIdVariableHeader) variableHeader, + (MqttUnsubscribePayload) payload); + + case PUBLISH: + return new MqttPublishMessage( + mqttFixedHeader, + (MqttPublishVariableHeader) variableHeader, + (ByteBuf) payload); + + case PUBACK: + return new MqttPubAckMessage(mqttFixedHeader, (MqttMessageIdVariableHeader) variableHeader); + case PUBREC: + case PUBREL: + case PUBCOMP: + return new MqttMessage(mqttFixedHeader, variableHeader); + + case PINGREQ: + case PINGRESP: + case DISCONNECT: + return new MqttMessage(mqttFixedHeader); + + default: + throw new IllegalArgumentException("Unknown message type: " + mqttFixedHeader.messageType()); + } + } + + public static MqttMessage createInvalidMessage(Throwable cause) { + return new MqttMessage(null, null, null, DecoderResult.failure(cause)); + } + + private MqttMessageFactory() { } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java new file mode 100644 index 0000000000..081ccd3838 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java @@ -0,0 +1,53 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable Header containing only Message Id + * See MQTTV3.1/msg-id + */ +public final class MqttMessageIdVariableHeader { + + private final int messageId; + + public static MqttMessageIdVariableHeader from(int messageId) { + if (messageId < 1 || messageId > 0xffff) { + throw new IllegalArgumentException( + String.format("Message id must be in the range of 1 - 0xffff but %d given ", + messageId)); + } + return new MqttMessageIdVariableHeader(messageId); + } + + private MqttMessageIdVariableHeader(int messageId) { + this.messageId = messageId; + } + + public int messageId() { + return messageId; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("messageId=").append(messageId); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java new file mode 100644 index 0000000000..06ec11744b --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java @@ -0,0 +1,57 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * MQTT Message Types. + */ +public enum MqttMessageType { + CONNECT(1), + CONNACK(2), + PUBLISH(3), + PUBACK(4), + PUBREC(5), + PUBREL(6), + PUBCOMP(7), + SUBSCRIBE(8), + SUBACK(9), + UNSUBSCRIBE(10), + UNSUBACK(11), + PINGREQ(12), + PINGRESP(13), + DISCONNECT(14); + + private final int value; + + MqttMessageType(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static MqttMessageType valueOf(int type) { + for (MqttMessageType t : values()) { + if (t.value == type) { + return t; + } + } + throw new IllegalArgumentException("message type " + type + " unsupported"); + } +} + diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubAckMessage.java new file mode 100644 index 0000000000..a22414edd9 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubAckMessage.java @@ -0,0 +1,32 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See MQTTV3.1/puback + */ +public class MqttPubAckMessage extends MqttMessage { + + public MqttPubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) { + super(mqttFixedHeader, variableHeader); + } + + @Override + public MqttMessageIdVariableHeader variableHeader() { + return (MqttMessageIdVariableHeader) super.variableHeader(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishMessage.java new file mode 100644 index 0000000000..57f96665d7 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishMessage.java @@ -0,0 +1,103 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.util.IllegalReferenceCountException; + +/** + * See MQTTV3.1/publish + */ +public class MqttPublishMessage extends MqttMessage implements ByteBufHolder { + + public MqttPublishMessage( + MqttFixedHeader mqttFixedHeader, + MqttPublishVariableHeader variableHeader, + ByteBuf payload) { + super(mqttFixedHeader, variableHeader, payload); + } + + @Override + public MqttPublishVariableHeader variableHeader() { + return (MqttPublishVariableHeader) super.variableHeader(); + } + + @Override + public ByteBuf payload() { + return content(); + } + + @Override + public ByteBuf content() { + final ByteBuf data = (ByteBuf) super.payload(); + if (data.refCnt() <= 0) { + throw new IllegalReferenceCountException(data.refCnt()); + } + return data; + } + + @Override + public MqttPublishMessage copy() { + return new MqttPublishMessage(fixedHeader(), variableHeader(), content().copy()); + } + + @Override + public MqttPublishMessage duplicate() { + return new MqttPublishMessage(fixedHeader(), variableHeader(), content().duplicate()); + } + + @Override + public int refCnt() { + return content().refCnt(); + } + + @Override + public MqttPublishMessage retain() { + content().retain(); + return this; + } + + @Override + public MqttPublishMessage retain(int increment) { + content().retain(increment); + return this; + } + + @Override + public MqttPublishMessage touch() { + content().touch(); + return this; + } + + @Override + public MqttPublishMessage touch(Object hint) { + content().touch(hint); + return this; + } + + @Override + public boolean release() { + return content().release(); + } + + @Override + public boolean release(int decrement) { + return content().release(decrement); + } + +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java new file mode 100644 index 0000000000..df7e744c07 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java @@ -0,0 +1,50 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable Header of the {@link MqttPublishMessage} + */ +public class MqttPublishVariableHeader { + + private final String topicName; + private final int messageId; + + public MqttPublishVariableHeader(String topicName, int messageId) { + this.topicName = topicName; + this.messageId = messageId; + } + + public String topicName() { + return topicName; + } + + public int messageId() { + return messageId; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("topicName=").append(topicName); + builder.append(", messageId=").append(messageId); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java new file mode 100644 index 0000000000..95be4e2109 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java @@ -0,0 +1,40 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See MQTTV3.1/suback + */ +public class MqttSubAckMessage extends MqttMessage { + + public MqttSubAckMessage( + MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttSubAckPayload payload) { + super(mqttFixedHeader, variableHeader, payload); + } + + @Override + public MqttMessageIdVariableHeader variableHeader() { + return (MqttMessageIdVariableHeader) super.variableHeader(); + } + + @Override + public MqttSubAckPayload payload() { + return (MqttSubAckPayload) super.payload(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckPayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckPayload.java new file mode 100644 index 0000000000..954faf41a2 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckPayload.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +import java.util.List; + +/** + * Payload of the {@link MqttSubAckMessage} + */ +public class MqttSubAckPayload { + + private final List grantedQoSLevels; + + public MqttSubAckPayload(List grantedQoSLevels) { + this.grantedQoSLevels = grantedQoSLevels; + } + + public List grantedQoSLevels() { + return grantedQoSLevels; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("grantedQoSLevels=").append(grantedQoSLevels); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java new file mode 100644 index 0000000000..12dcd30229 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See + * MQTTV3.1/subscribe + */ +public class MqttSubscribeMessage extends MqttMessage { + + public MqttSubscribeMessage( + MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttSubscribePayload payload) { + super(mqttFixedHeader, variableHeader, payload); + } + + @Override + public MqttMessageIdVariableHeader variableHeader() { + return (MqttMessageIdVariableHeader) super.variableHeader(); + } + + @Override + public MqttSubscribePayload payload() { + return (MqttSubscribePayload) super.payload(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribePayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribePayload.java new file mode 100644 index 0000000000..dbdb3de181 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribePayload.java @@ -0,0 +1,49 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +import java.util.Collections; +import java.util.List; + +/** + * Payload of the {@link MqttSubscribeMessage} + */ +public class MqttSubscribePayload { + + private final List topicSubscriptions; + + public MqttSubscribePayload(List topicSubscriptions) { + this.topicSubscriptions = Collections.unmodifiableList(topicSubscriptions); + } + + public List topicSubscriptions() { + return topicSubscriptions; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + for (int i = 0; i < topicSubscriptions.size() - 1; i++) { + builder.append(topicSubscriptions.get(i)).append(", "); + } + builder.append(topicSubscriptions.get(topicSubscriptions.size() - 1)); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java new file mode 100644 index 0000000000..46389febe2 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java @@ -0,0 +1,51 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Contains a topic name and Qos Level. + * This is part of the {@link MqttSubscribePayload} + */ +public class MqttTopicSubscription { + + private final String topicFilter; + private final QoS qualityOfService; + + public MqttTopicSubscription(String topicFilter, QoS qualityOfService) { + this.topicFilter = topicFilter; + this.qualityOfService = qualityOfService; + } + + public String topicName() { + return topicFilter; + } + + public QoS qualityOfService() { + return qualityOfService; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + builder.append("topicFilter=").append(topicFilter); + builder.append(", qualityOfService=").append(qualityOfService); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java new file mode 100644 index 0000000000..032188406d --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java @@ -0,0 +1,32 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See MQTTV3.1/unsuback + */ +public class MqttUnsubAckMessage extends MqttMessage { + + public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) { + super(mqttFixedHeader, variableHeader, null); + } + + @Override + public MqttMessageIdVariableHeader variableHeader() { + return (MqttMessageIdVariableHeader) super.variableHeader(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java new file mode 100644 index 0000000000..bbf770a602 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * See + * MQTTV3.1/unsubscribe + */ +public class MqttUnsubscribeMessage extends MqttMessage { + + public MqttUnsubscribeMessage( + MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttUnsubscribePayload payload) { + super(mqttFixedHeader, variableHeader, payload); + } + + @Override + public MqttMessageIdVariableHeader variableHeader() { + return (MqttMessageIdVariableHeader) super.variableHeader(); + } + + @Override + public MqttUnsubscribePayload payload() { + return (MqttUnsubscribePayload) super.payload(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribePayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribePayload.java new file mode 100644 index 0000000000..b203760e07 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribePayload.java @@ -0,0 +1,49 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +import java.util.Collections; +import java.util.List; + +/** + * Pyaload of the {@link MqttUnsubscribeMessage} + */ +public class MqttUnsubscribePayload { + + private final List topics; + + public MqttUnsubscribePayload(List topics) { + this.topics = Collections.unmodifiableList(topics); + } + + public List topics() { + return topics; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(StringUtil.simpleClassName(this)).append('['); + for (int i = 0; i < topics.size() - 1; i++) { + builder.append("topicName = " + topics.get(i)).append(", "); + } + builder.append("topicName = " + topics.get(topics.size() - 1)); + builder.append(']'); + return builder.toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttValidationUtil.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttValidationUtil.java new file mode 100644 index 0000000000..82f57c2f8f --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttValidationUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.handler.codec.DecoderException; + +public final class MqttValidationUtil { + + private static final char[] TOPIC_WILDCARDS = {'#', '+'}; + private static final int MAX_CLIENT_ID_LENGTH = 23; + + public static boolean isValidPublishTopicName(String topicName) { + // publish topic name must not contain any wildcard + for (char c : TOPIC_WILDCARDS) { + if (topicName.indexOf(c) >= 0) { + return false; + } + } + return true; + } + + public static boolean isValidMessageId(int messageId) { + return messageId != 0; + } + + public static boolean isValidClientId(String clientId) { + return clientId != null && clientId.length() <= MAX_CLIENT_ID_LENGTH; + } + + public static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) { + switch (mqttFixedHeader.messageType()) { + case PUBREL: + case SUBSCRIBE: + case UNSUBSCRIBE: + if (mqttFixedHeader.qosLevel() != QoS.AT_LEAST_ONCE) { + throw new DecoderException(String.format("%s message must have QoS 1", + mqttFixedHeader.messageType().name())); + } + default: + return mqttFixedHeader; + } + } + + private MqttValidationUtil() { } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java new file mode 100644 index 0000000000..2608362058 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java @@ -0,0 +1,27 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +/** + * Holds Constant values used by multiple classes in mqtt-codec. + */ +final class MqttVersion { + + static final String PROTOCOL_NAME = "MQIsdp"; + + private MqttVersion() { } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/QoS.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/QoS.java new file mode 100644 index 0000000000..79e328712e --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/QoS.java @@ -0,0 +1,40 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package io.netty.handler.codec.mqtt; + +public enum QoS { + AT_MOST_ONCE(0), + AT_LEAST_ONCE(1), + EXACTLY_ONCE(2); + + private final int value; + QoS(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static QoS valueOf(int value) { + for (QoS q : values()) { + if (q.value == value) { + return q; + } + } + throw new IllegalArgumentException(String.format("Invalid QoS: %d", value)); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/package-info.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/package-info.java new file mode 100644 index 0000000000..56004883e2 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Encoder, decoder and different Message Types for MQTT. + */ +package io.netty.handler.codec.mqtt; diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java new file mode 100644 index 0000000000..bd96a77941 --- /dev/null +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java @@ -0,0 +1,435 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.CharsetUtil; +import org.easymock.Mock; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; + +import java.util.LinkedList; +import java.util.List; + +import static io.netty.handler.codec.mqtt.MqttVersion.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for MqttEncoder and MqttDecoder. + */ +public class MqttCodecTest { + + private static final String CLIENT_ID = "RANDOM_TEST_CLIENT"; + private static final String WILL_TOPIC = "/my_will"; + private static final String WILL_MESSAGE = "gone"; + private static final String USER_NAME = "happy_user"; + private static final String PASSWORD = "123_or_no_pwd"; + + private static final int PROTOCOL_VERSION = 3; + private static final int KEEP_ALIVE_SECONDS = 600; + + private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + + @Mock + private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + @Mock + private final Channel channel = mock(Channel.class); + + private final MqttDecoder mqttDecoder = new MqttDecoder(); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + when(ctx.channel()).thenReturn(channel); + } + + @Test + public void testConnectMessage() throws Exception { + final MqttConnectMessage message = createConnectMessage(); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttConnectMessage decodedMessage = (MqttConnectMessage) out.get(0); + + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + vlidateConnectVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validateConnectPayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testConnAckMessage() throws Exception { + final MqttConnAckMessage message = createConnAckMessage(); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttConnAckMessage decodedMessage = (MqttConnAckMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateConnAckVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + } + + @Test + public void testPublishMessage() throws Exception { + final MqttPublishMessage message = createPublishMessage(); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttPublishMessage decodedMessage = (MqttPublishMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePublishVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validatePublishPayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testPubAckMessage() throws Exception { + testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBACK); + } + + @Test + public void testPubRecMessage() throws Exception { + testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBREC); + } + + @Test + public void testPubRelMessage() throws Exception { + testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBREL); + } + + @Test + public void testPubCompMessage() throws Exception { + testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.PUBCOMP); + } + + @Test + public void testSubscribeMessage() throws Exception { + final MqttSubscribeMessage message = createSubscribeMessage(); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttSubscribeMessage decodedMessage = (MqttSubscribeMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validateSubscribePayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testSubAckMessage() throws Exception { + final MqttSubAckMessage message = createSubAckMessage(); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttSubAckMessage decodedMessage = (MqttSubAckMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validateSubAckPayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testUnSubscribeMessage() throws Exception { + final MqttUnsubscribeMessage message = createUnsubscribeMessage(); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttUnsubscribeMessage decodedMessage = (MqttUnsubscribeMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validateUnsubscribePayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testUnsubAckMessage() throws Exception { + testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType.UNSUBACK); + } + + @Test + public void testPingReqMessage() throws Exception { + testMessageWithOnlyFixedHeader(MqttMessageType.PINGREQ); + } + + @Test + public void testPingRespMessage() throws Exception { + testMessageWithOnlyFixedHeader(MqttMessageType.PINGRESP); + } + + @Test + public void testDisconnectMessage() throws Exception { + testMessageWithOnlyFixedHeader(MqttMessageType.DISCONNECT); + } + + private void testMessageWithOnlyFixedHeader(MqttMessageType messageType) throws Exception { + MqttMessage message = createMessageWithFixedHeader(messageType); + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + } + + private void testMessageWithOnlyFixedHeaderAndMessageIdVariableHeader(MqttMessageType messageType) + throws Exception { + MqttMessage message = createMessageWithFixedHeaderAndMessageIdVariableHeader(messageType); + + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object bout got " + out.size(), 1, out.size()); + + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader( + (MqttMessageIdVariableHeader) message.variableHeader(), + (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); + } + + // Factory methods of different MQTT + // Message types to help testing + + private static MqttMessage createMessageWithFixedHeader(MqttMessageType messageType) { + return new MqttMessage(new MqttFixedHeader(messageType, false, QoS.AT_MOST_ONCE, false, 0)); + } + + private static MqttMessage createMessageWithFixedHeaderAndMessageIdVariableHeader(MqttMessageType messageType) { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader( + messageType, + false, + messageType == MqttMessageType.PUBREL ? QoS.AT_LEAST_ONCE : QoS.AT_MOST_ONCE, + false, + 0); + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); + return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); + } + + private static MqttConnectMessage createConnectMessage() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.CONNECT, false, QoS.AT_MOST_ONCE, false, 0); + MqttConnectVariableHeader mqttConnectVariableHeader = + new MqttConnectVariableHeader( + PROTOCOL_NAME, + PROTOCOL_VERSION, + true, + true, + true, + 1, + true, + true, + KEEP_ALIVE_SECONDS); + MqttConnectPayload mqttConnectPayload = + new MqttConnectPayload(CLIENT_ID, WILL_TOPIC, WILL_MESSAGE, USER_NAME, PASSWORD); + + return new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader, mqttConnectPayload); + } + + private static MqttConnAckMessage createConnAckMessage() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.CONNACK, false, QoS.AT_MOST_ONCE, false, 0); + MqttConnAckVariableHeader mqttConnAckVariableHeader = + new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED); + return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); + } + + private static MqttPublishMessage createPublishMessage() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.PUBLISH, false, QoS.AT_LEAST_ONCE, true, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", 1234); + ByteBuf payload = ALLOCATOR.buffer(); + payload.writeBytes("whatever".getBytes(CharsetUtil.UTF_8)); + return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload); + } + + private static MqttSubscribeMessage createSubscribeMessage() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, QoS.AT_LEAST_ONCE, true, 0); + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); + + List topicSubscriptions = new LinkedList(); + topicSubscriptions.add(new MqttTopicSubscription("/abc", QoS.AT_LEAST_ONCE)); + topicSubscriptions.add(new MqttTopicSubscription("/def", QoS.AT_LEAST_ONCE)); + topicSubscriptions.add(new MqttTopicSubscription("/xyz", QoS.EXACTLY_ONCE)); + + MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(topicSubscriptions); + return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload); + } + + private static MqttSubAckMessage createSubAckMessage() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.SUBACK, false, QoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); + List grantedQosLevels = new LinkedList(); + grantedQosLevels.add(1); + grantedQosLevels.add(2); + grantedQosLevels.add(0); + MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQosLevels); + return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload); + } + + private static MqttUnsubscribeMessage createUnsubscribeMessage() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, QoS.AT_LEAST_ONCE, true, 0); + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); + + List topics = new LinkedList(); + topics.add("/abc"); + topics.add("/def"); + topics.add("/xyz"); + + MqttUnsubscribePayload mqttUnsubscribePayload = new MqttUnsubscribePayload(topics); + return new MqttUnsubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttUnsubscribePayload); + } + + // Helper methdos to compare expected and actual + // MQTT messages + + private static void validateFixedHeaders(MqttFixedHeader expected, MqttFixedHeader actual) { + assertEquals("MqttFixedHeader MqttMessageType mismatch ", expected.messageType(), actual.messageType()); + assertEquals("MqttFixedHeader Qos mismatch ", expected.qosLevel(), actual.qosLevel()); + } + + private static void vlidateConnectVariableHeader( + MqttConnectVariableHeader expected, + MqttConnectVariableHeader actual) { + assertEquals("MqttConnectVariableHeader Name mismatch ", expected.name(), actual.name()); + assertEquals( + "MqttConnectVariableHeader KeepAliveTimeSeconds mismatch ", + expected.keepAliveTimeSeconds(), + actual.keepAliveTimeSeconds()); + assertEquals("MqttConnectVariableHeader Version mismatch ", expected.version(), actual.version()); + assertEquals("MqttConnectVariableHeader WillQos mismatch ", expected.willQos(), actual.willQos()); + + assertEquals("MqttConnectVariableHeader HasUserName mismatch ", expected.hasUserName(), actual.hasUserName()); + assertEquals("MqttConnectVariableHeader HasPassword mismatch ", expected.hasPassword(), actual.hasPassword()); + assertEquals( + "MqttConnectVariableHeader IsCleanSession mismatch ", + expected.isCleanSession(), + actual.isCleanSession()); + assertEquals("MqttConnectVariableHeader IsWillFlag mismatch ", expected.isWillFlag(), actual.isWillFlag()); + assertEquals( + "MqttConnectVariableHeader IsWillRetain mismatch ", + expected.isWillRetain(), + actual.isWillRetain()); + } + + private static void validateConnectPayload(MqttConnectPayload expected, MqttConnectPayload actual) { + assertEquals( + "MqttConnectPayload ClientIdentifier mismatch ", + expected.clientIdentifier(), + actual.clientIdentifier()); + assertEquals("MqttConnectPayload UserName mismatch ", expected.userName(), actual.userName()); + assertEquals("MqttConnectPayload Password mismatch ", expected.password(), actual.password()); + assertEquals("MqttConnectPayload WillMessage mismatch ", expected.willMessage(), actual.willMessage()); + assertEquals("MqttConnectPayload WillTopic mismatch ", expected.willTopic(), actual.willTopic()); + } + + private static void validateConnAckVariableHeader( + MqttConnAckVariableHeader expected, + MqttConnAckVariableHeader actual) { + assertEquals( + "MqttConnAckVariableHeader MqttConnectReturnCode mismatch", + expected.connectReturnCode(), + actual.connectReturnCode()); + } + + private static void validatePublishVariableHeader( + MqttPublishVariableHeader expected, + MqttPublishVariableHeader actual) { + assertEquals("MqttPublishVariableHeader TopicName mismatch ", expected.topicName(), actual.topicName()); + assertEquals("MqttPublishVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId()); + } + + private static void validatePublishPayload(ByteBuf expected, ByteBuf actual) { + assertEquals("PublishPayload mismatch ", 0, expected.compareTo(actual)); + } + + private static void validateMessageIdVariableHeader( + MqttMessageIdVariableHeader expected, + MqttMessageIdVariableHeader actual) { + assertEquals("MqttMessageIdVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId()); + } + + private static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) { + List expectedTopicSubscriptions = expected.topicSubscriptions(); + List actualTopicSubscriptions = actual.topicSubscriptions(); + + assertEquals( + "MqttSubscribePayload TopicSubscriptionList size mismatch ", + expectedTopicSubscriptions.size(), + actualTopicSubscriptions.size()); + for (int i = 0; i < expectedTopicSubscriptions.size(); i++) { + validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i)); + } + } + + private static void validateTopicSubscription( + MqttTopicSubscription expected, + MqttTopicSubscription actual) { + assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName()); + assertEquals( + "MqttTopicSubscription Qos mismatch ", + expected.qualityOfService(), + actual.qualityOfService()); + } + + private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) { + assertArrayEquals( + "MqttSubAckPayload GrantedQosLevels mismatch ", + expected.grantedQoSLevels().toArray(), + actual.grantedQoSLevels().toArray()); + } + + private static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) { + assertArrayEquals( + "MqttUnsubscribePayload TopicList mismatch ", + expected.topics().toArray(), + actual.topics().toArray()); + } +} diff --git a/pom.xml b/pom.xml index feb92f236d..5d469c9ecf 100644 --- a/pom.xml +++ b/pom.xml @@ -339,8 +339,9 @@ codec-haproxy codec-http codec-memcache - codec-stomp + codec-mqtt codec-socks + codec-stomp transport transport-rxtx transport-sctp @@ -534,6 +535,14 @@ 0.5-rc1 test + + + + org.mockito + mockito-all + 1.9.5 + test +