/* * Copyright 2014 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.handler.codec.mqtt; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState; import io.netty.util.CharsetUtil; import java.util.ArrayList; import java.util.List; import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId; import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId; import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName; import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields; import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader; /** * Decodes Mqtt messages from bytes, following * * the MQTT protocol specification v3.1 */ public final class MqttDecoder extends ReplayingDecoder { private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092; /** * States of the decoder. * We start at READ_FIXED_HEADER, followed by * READ_VARIABLE_HEADER and finally READ_PAYLOAD. */ enum DecoderState { READ_FIXED_HEADER, READ_VARIABLE_HEADER, READ_PAYLOAD, BAD_MESSAGE, } private MqttFixedHeader mqttFixedHeader; private Object variableHeader; private int bytesRemainingInVariablePart; private final int maxBytesInMessage; public MqttDecoder() { this(DEFAULT_MAX_BYTES_IN_MESSAGE); } public MqttDecoder(int maxBytesInMessage) { super(DecoderState.READ_FIXED_HEADER); this.maxBytesInMessage = maxBytesInMessage; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { switch (state()) { case READ_FIXED_HEADER: try { mqttFixedHeader = decodeFixedHeader(buffer); bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); checkpoint(DecoderState.READ_VARIABLE_HEADER); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case READ_VARIABLE_HEADER: try { if (bytesRemainingInVariablePart > maxBytesInMessage) { throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes"); } final Result decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); variableHeader = decodedVariableHeader.value; bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; checkpoint(DecoderState.READ_PAYLOAD); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case READ_PAYLOAD: try { final Result decodedPayload = decodePayload( buffer, mqttFixedHeader.messageType(), bytesRemainingInVariablePart, variableHeader); bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; if (bytesRemainingInVariablePart != 0) { throw new DecoderException( "non-zero remaining payload bytes: " + bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')'); } checkpoint(DecoderState.READ_FIXED_HEADER); MqttMessage message = MqttMessageFactory.newMessage( mqttFixedHeader, variableHeader, decodedPayload.value); mqttFixedHeader = null; variableHeader = null; out.add(message); break; } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case BAD_MESSAGE: // Keep discarding until disconnection. buffer.skipBytes(actualReadableBytes()); break; default: // Shouldn't reach here. throw new Error(); } } private MqttMessage invalidMessage(Throwable cause) { checkpoint(DecoderState.BAD_MESSAGE); return MqttMessageFactory.newInvalidMessage(cause); } /** * Decodes the fixed header. It's one byte for the flags and then variable bytes for the remaining length. * * @param buffer the buffer to decode from * @return the fixed header */ private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) { short b1 = buffer.readUnsignedByte(); MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4); boolean dupFlag = (b1 & 0x08) == 0x08; int qosLevel = (b1 & 0x06) >> 1; boolean retain = (b1 & 0x01) != 0; int remainingLength = 0; int multiplier = 1; short digit; int loops = 0; do { digit = buffer.readUnsignedByte(); remainingLength += (digit & 127) * multiplier; multiplier *= 128; loops++; } while ((digit & 128) != 0 && loops < 4); // MQTT protocol limits Remaining Length to 4 bytes if (loops == 4 && (digit & 128) != 0) { throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')'); } MqttFixedHeader decodedFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength); return validateFixedHeader(resetUnusedFields(decodedFixedHeader)); } /** * Decodes the variable header (if any) * @param buffer the buffer to decode from * @param mqttFixedHeader MqttFixedHeader of the same message * @return the variable header */ private static Result decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { switch (mqttFixedHeader.messageType()) { case CONNECT: return decodeConnectionVariableHeader(buffer); case CONNACK: return decodeConnAckVariableHeader(buffer); case SUBSCRIBE: case UNSUBSCRIBE: case SUBACK: case UNSUBACK: case PUBACK: case PUBREC: case PUBCOMP: case PUBREL: return decodeMessageIdVariableHeader(buffer); case PUBLISH: return decodePublishVariableHeader(buffer, mqttFixedHeader); case PINGREQ: case PINGRESP: case DISCONNECT: // Empty variable header return new Result(null, 0); } return new Result(null, 0); //should never reach here } private static Result decodeConnectionVariableHeader(ByteBuf buffer) { final Result protoString = decodeString(buffer); int numberOfBytesConsumed = protoString.numberOfBytesConsumed; final byte protocolLevel = buffer.readByte(); numberOfBytesConsumed += 1; final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel); final int b1 = buffer.readUnsignedByte(); numberOfBytesConsumed += 1; final Result keepAlive = decodeMsbLsb(buffer); numberOfBytesConsumed += keepAlive.numberOfBytesConsumed; final boolean hasUserName = (b1 & 0x80) == 0x80; final boolean hasPassword = (b1 & 0x40) == 0x40; final boolean willRetain = (b1 & 0x20) == 0x20; final int willQos = (b1 & 0x18) >> 3; final boolean willFlag = (b1 & 0x04) == 0x04; final boolean cleanSession = (b1 & 0x02) == 0x02; if (mqttVersion == MqttVersion.MQTT_3_1_1) { final boolean zeroReservedFlag = (b1 & 0x01) == 0x0; if (!zeroReservedFlag) { // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is // set to zero and disconnect the Client if it is not zero. // See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230 throw new DecoderException("non-zero reserved flag"); } } final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader( mqttVersion.protocolName(), mqttVersion.protocolLevel(), hasUserName, hasPassword, willRetain, willQos, willFlag, cleanSession, keepAlive.value); return new Result(mqttConnectVariableHeader, numberOfBytesConsumed); } private static Result decodeConnAckVariableHeader(ByteBuf buffer) { final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01; byte returnCode = buffer.readByte(); final int numberOfBytesConsumed = 2; final MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent); return new Result(mqttConnAckVariableHeader, numberOfBytesConsumed); } private static Result decodeMessageIdVariableHeader(ByteBuf buffer) { final Result messageId = decodeMessageId(buffer); return new Result( MqttMessageIdVariableHeader.from(messageId.value), messageId.numberOfBytesConsumed); } private static Result decodePublishVariableHeader( ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { final Result decodedTopic = decodeString(buffer); if (!isValidPublishTopicName(decodedTopic.value)) { throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)"); } int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed; int messageId = -1; if (mqttFixedHeader.qosLevel().value() > 0) { final Result decodedMessageId = decodeMessageId(buffer); messageId = decodedMessageId.value; numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed; } final MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(decodedTopic.value, messageId); return new Result(mqttPublishVariableHeader, numberOfBytesConsumed); } private static Result decodeMessageId(ByteBuf buffer) { final Result messageId = decodeMsbLsb(buffer); if (!isValidMessageId(messageId.value)) { throw new DecoderException("invalid messageId: " + messageId.value); } return messageId; } /** * Decodes the payload. * * @param buffer the buffer to decode from * @param messageType type of the message being decoded * @param bytesRemainingInVariablePart bytes remaining * @param variableHeader variable header of the same message * @return the payload */ private static Result decodePayload( ByteBuf buffer, MqttMessageType messageType, int bytesRemainingInVariablePart, Object variableHeader) { switch (messageType) { case CONNECT: return decodeConnectionPayload(buffer, (MqttConnectVariableHeader) variableHeader); case SUBSCRIBE: return decodeSubscribePayload(buffer, bytesRemainingInVariablePart); case SUBACK: return decodeSubackPayload(buffer, bytesRemainingInVariablePart); case UNSUBSCRIBE: return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart); case PUBLISH: return decodePublishPayload(buffer, bytesRemainingInVariablePart); default: // unknown payload , no byte consumed return new Result(null, 0); } } private static Result decodeConnectionPayload( ByteBuf buffer, MqttConnectVariableHeader mqttConnectVariableHeader) { final Result decodedClientId = decodeString(buffer); final String decodedClientIdValue = decodedClientId.value; final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(), (byte) mqttConnectVariableHeader.version()); if (!isValidClientId(mqttVersion, decodedClientIdValue)) { throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue); } int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed; Result decodedWillTopic = null; Result decodedWillMessage = null; if (mqttConnectVariableHeader.isWillFlag()) { decodedWillTopic = decodeString(buffer, 0, 32767); numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed; decodedWillMessage = decodeByteArray(buffer); numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed; } Result decodedUserName = null; Result decodedPassword = null; if (mqttConnectVariableHeader.hasUserName()) { decodedUserName = decodeString(buffer); numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed; } if (mqttConnectVariableHeader.hasPassword()) { decodedPassword = decodeByteArray(buffer); numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed; } final MqttConnectPayload mqttConnectPayload = new MqttConnectPayload( decodedClientId.value, decodedWillTopic != null ? decodedWillTopic.value : null, decodedWillMessage != null ? decodedWillMessage.value : null, decodedUserName != null ? decodedUserName.value : null, decodedPassword != null ? decodedPassword.value : null); return new Result(mqttConnectPayload, numberOfBytesConsumed); } private static Result decodeSubscribePayload( ByteBuf buffer, int bytesRemainingInVariablePart) { final List subscribeTopics = new ArrayList(); int numberOfBytesConsumed = 0; while (numberOfBytesConsumed < bytesRemainingInVariablePart) { final Result decodedTopicName = decodeString(buffer); numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; int qos = buffer.readUnsignedByte() & 0x03; numberOfBytesConsumed++; subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos))); } return new Result(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed); } private static Result decodeSubackPayload( ByteBuf buffer, int bytesRemainingInVariablePart) { final List grantedQos = new ArrayList(); int numberOfBytesConsumed = 0; while (numberOfBytesConsumed < bytesRemainingInVariablePart) { int qos = buffer.readUnsignedByte(); if (qos != MqttQoS.FAILURE.value()) { qos &= 0x03; } numberOfBytesConsumed++; grantedQos.add(qos); } return new Result(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed); } private static Result decodeUnsubscribePayload( ByteBuf buffer, int bytesRemainingInVariablePart) { final List unsubscribeTopics = new ArrayList(); int numberOfBytesConsumed = 0; while (numberOfBytesConsumed < bytesRemainingInVariablePart) { final Result decodedTopicName = decodeString(buffer); numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; unsubscribeTopics.add(decodedTopicName.value); } return new Result( new MqttUnsubscribePayload(unsubscribeTopics), numberOfBytesConsumed); } private static Result decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) { ByteBuf b = buffer.readRetainedSlice(bytesRemainingInVariablePart); return new Result(b, bytesRemainingInVariablePart); } private static Result decodeString(ByteBuf buffer) { return decodeString(buffer, 0, Integer.MAX_VALUE); } private static Result decodeString(ByteBuf buffer, int minBytes, int maxBytes) { final Result decodedSize = decodeMsbLsb(buffer); int size = decodedSize.value; int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed; if (size < minBytes || size > maxBytes) { buffer.skipBytes(size); numberOfBytesConsumed += size; return new Result(null, numberOfBytesConsumed); } String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8); buffer.skipBytes(size); numberOfBytesConsumed += size; return new Result(s, numberOfBytesConsumed); } private static Result decodeByteArray(ByteBuf buffer) { final Result decodedSize = decodeMsbLsb(buffer); int size = decodedSize.value; byte[] bytes = new byte[size]; buffer.readBytes(bytes); return new Result(bytes, decodedSize.numberOfBytesConsumed + size); } private static Result decodeMsbLsb(ByteBuf buffer) { return decodeMsbLsb(buffer, 0, 65535); } private static Result decodeMsbLsb(ByteBuf buffer, int min, int max) { short msbSize = buffer.readUnsignedByte(); short lsbSize = buffer.readUnsignedByte(); final int numberOfBytesConsumed = 2; int result = msbSize << 8 | lsbSize; if (result < min || result > max) { result = -1; } return new Result(result, numberOfBytesConsumed); } private static final class Result { private final T value; private final int numberOfBytesConsumed; Result(T value, int numberOfBytesConsumed) { this.value = value; this.numberOfBytesConsumed = numberOfBytesConsumed; } } }