Overall clean-up on codec-mqtt

- Use simple string concatenation instead of String.format()
- Rewrite exception messages so that it follows our style
- Merge MqttCommonUtil and MqttValidationUtil into MqttCodecUtil
- Hide MqttCodecUtil from users
- Rename MqttConnectReturnCode.value to byteValue
- Rename MqttMessageFactory.create*() to new*()
- Rename QoS to MqttQoS
- Make MqttSubAckPayload.grantedQoSLevels immutable and add more useful
  constructor
This commit is contained in:
Trustin Lee 2014-06-21 16:46:09 +09:00
parent 9325b9121c
commit a49c4518c0
15 changed files with 123 additions and 146 deletions

View File

@ -16,9 +16,45 @@
package io.netty.handler.codec.mqtt;
public final class MqttCommonUtil {
import io.netty.handler.codec.DecoderException;
public static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
final class MqttCodecUtil {
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
private static final int MAX_CLIENT_ID_LENGTH = 23;
static boolean isValidPublishTopicName(String topicName) {
// publish topic name must not contain any wildcard
for (char c : TOPIC_WILDCARDS) {
if (topicName.indexOf(c) >= 0) {
return false;
}
}
return true;
}
static boolean isValidMessageId(int messageId) {
return messageId != 0;
}
static boolean isValidClientId(String clientId) {
return clientId != null && clientId.length() <= MAX_CLIENT_ID_LENGTH;
}
static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBREL:
case SUBSCRIBE:
case UNSUBSCRIBE:
if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
}
default:
return mqttFixedHeader;
}
}
static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case CONNECT:
case CONNACK:
@ -31,12 +67,12 @@ public final class MqttCommonUtil {
case PINGRESP:
case DISCONNECT:
if (mqttFixedHeader.isDup() ||
mqttFixedHeader.qosLevel() != QoS.AT_MOST_ONCE ||
mqttFixedHeader.isRetain()) {
mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
mqttFixedHeader.messageType(),
false,
QoS.AT_MOST_ONCE,
MqttQoS.AT_MOST_ONCE,
false,
mqttFixedHeader.remainingLength());
}
@ -58,5 +94,5 @@ public final class MqttCommonUtil {
}
}
private MqttCommonUtil() { }
private MqttCodecUtil() { }
}

View File

@ -29,5 +29,4 @@ public final class MqttConnAckMessage extends MqttMessage {
public MqttConnAckVariableHeader variableHeader() {
return (MqttConnAckVariableHeader) super.variableHeader();
}
}

View File

@ -19,7 +19,7 @@ package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
/**
* Variable header of {@link MqttConnectMessage }
* Variable header of {@link MqttConnectMessage}
*/
public class MqttConnAckVariableHeader {

View File

@ -35,26 +35,26 @@ public enum MqttConnectReturnCode {
static {
final Map<Byte, MqttConnectReturnCode> valueMap = new HashMap<Byte, MqttConnectReturnCode>();
for (MqttConnectReturnCode code : values()) {
valueMap.put(code.value, code);
for (MqttConnectReturnCode code: values()) {
valueMap.put(code.byteValue, code);
}
valueToCodeMap = Collections.unmodifiableMap(valueMap);
}
private final byte value;
private final byte byteValue;
MqttConnectReturnCode(byte value) {
this.value = value;
MqttConnectReturnCode(byte byteValue) {
this.byteValue = byteValue;
}
public byte value() {
return value;
public byte byteValue() {
return byteValue;
}
public static MqttConnectReturnCode valueOf(byte b) {
if (valueToCodeMap.containsKey(b)) {
return valueToCodeMap.get(b);
}
throw new IllegalArgumentException("connect retirn code " + b + " unsupported");
throw new IllegalArgumentException("unknown connect return code: " + (b & 0xFF));
}
}

View File

@ -26,9 +26,7 @@ import io.netty.util.CharsetUtil;
import java.util.ArrayList;
import java.util.List;
import static io.netty.handler.codec.mqtt.MqttValidationUtil.*;
import static io.netty.handler.codec.mqtt.MqttCommonUtil.*;
import static io.netty.handler.codec.mqtt.MqttCodecUtil.*;
import static io.netty.handler.codec.mqtt.MqttVersion.*;
/**
@ -79,10 +77,7 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
case READ_VARIABLE_HEADER: try {
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException(
String.format(
"Client tried to send very large message: %d bytes",
bytesRemainingInVariablePart));
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
@ -105,15 +100,11 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
if (bytesRemainingInVariablePart != 0) {
throw new DecoderException(
String.format(
"Non-zero bytes remaining (%d) should never happen. "
+ "Message type: %s. Channel: %s",
bytesRemainingInVariablePart,
mqttFixedHeader.messageType(),
ctx.channel()));
"non-zero remaining payload bytes: " +
bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
}
checkpoint(DecoderState.READ_FIXED_HEADER);
MqttMessage message = MqttMessageFactory.create(mqttFixedHeader, variableHeader, payload);
MqttMessage message = MqttMessageFactory.newMessage(mqttFixedHeader, variableHeader, payload);
mqttFixedHeader = null;
variableHeader = null;
payload = null;
@ -130,13 +121,14 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
break;
default:
throw new Error("Shouldn't reach here");
// Shouldn't reach here.
throw new Error();
}
}
private MqttMessage invalidMessage(Throwable cause) {
checkpoint(DecoderState.BAD_MESSAGE);
return MqttMessageFactory.createInvalidMessage(cause);
return MqttMessageFactory.newInvalidMessage(cause);
}
/**
@ -166,15 +158,10 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
// MQTT protocol limits Remaining Length to 4 bytes
if (loops == 4 && (digit & 128) != 0) {
throw new DecoderException(
String.format(
"Failed to read fixed header of MQTT message. " +
"It has more than 4 digits for Remaining Length. " +
"MqttMessageType: %s.",
messageType));
throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
}
MqttFixedHeader decodedFixedHeader =
new MqttFixedHeader(messageType, dupFlag, QoS.valueOf(qosLevel), retain, remainingLength);
new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
}
@ -217,7 +204,7 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
final Result<String> protoString = decodeString(buffer);
if (!PROTOCOL_NAME.equals(protoString.value)) {
throw new DecoderException(PROTOCOL_NAME + " signature is missing. Closing channel.");
throw new DecoderException("missing " + PROTOCOL_NAME + " signature");
}
int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
@ -270,7 +257,7 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
MqttFixedHeader mqttFixedHeader) {
final Result<String> decodedTopic = decodeString(buffer);
if (!isValidPublishTopicName(decodedTopic.value)) {
throw new DecoderException(String.format("Publish topic name %s contains wildcards.", decodedTopic.value));
throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
}
int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
@ -288,7 +275,7 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
private static Result<Integer> decodeMessageId(ByteBuf buffer) {
final Result<Integer> messageId = decodeMsbLsb(buffer);
if (!isValidMessageId(messageId.value)) {
throw new DecoderException(String.format("Invalid messageId %d", messageId.value));
throw new DecoderException("invalid messageId: " + messageId.value);
}
return messageId;
}
@ -335,9 +322,7 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
final Result<String> decodedClientId = decodeString(buffer);
final String decodedClientIdValue = decodedClientId.value;
if (!isValidClientId(decodedClientIdValue)) {
throw new DecoderException(
String.format("Invalid clientIdentifier %s ",
decodedClientIdValue != null ? decodedClientIdValue : "null"));
throw new DecoderException("invalid clientIdentifier: " + decodedClientIdValue);
}
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
@ -380,7 +365,7 @@ public class MqttDecoder extends ReplayingDecoder<DecoderState> {
numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
int qos = buffer.readUnsignedByte() & 0x03;
numberOfBytesConsumed++;
subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, QoS.valueOf(qos)));
subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos)));
}
return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
}

View File

@ -102,8 +102,7 @@ public class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
String clientIdentifier = payload.clientIdentifier();
if (!isValidClientIdentifier(clientIdentifier)) {
throw new IllegalArgumentException(
String.format("Invalid clientIdentifier %s. Must be less than 23 chars long",
clientIdentifier != null ? clientIdentifier : "null"));
"invalid clientIdentifier: " + clientIdentifier + " (expected: less than 23 chars long)");
}
byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
payloadBufferSize += 2 + clientIdentifierBytes.length;
@ -191,7 +190,7 @@ public class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
buf.writeByte(2);
buf.writeByte(0);
buf.writeByte(message.variableHeader().connectReturnCode().value());
buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
return buf;
}
@ -391,5 +390,4 @@ public class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
int length = clientIdentifier.length();
return length >= 1 && length <= 23;
}
}

View File

@ -26,14 +26,14 @@ public class MqttFixedHeader {
private final MqttMessageType messageType;
private final boolean isDup;
private final QoS qosLevel;
private final MqttQoS qosLevel;
private final boolean isRetain;
private final int remainingLength;
public MqttFixedHeader(
MqttMessageType messageType,
boolean isDup,
QoS qosLevel,
MqttQoS qosLevel,
boolean isRetain,
int remainingLength) {
this.messageType = messageType;
@ -51,7 +51,7 @@ public class MqttFixedHeader {
return isDup;
}
public QoS qosLevel() {
public MqttQoS qosLevel() {
return qosLevel;
}
@ -74,5 +74,4 @@ public class MqttFixedHeader {
builder.append(']');
return builder.toString();
}
}

View File

@ -24,7 +24,7 @@ import io.netty.handler.codec.DecoderResult;
*/
public final class MqttMessageFactory {
public static MqttMessage create(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) {
public static MqttMessage newMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader, Object payload) {
switch (mqttFixedHeader.messageType()) {
case CONNECT :
return new MqttConnectMessage(
@ -77,11 +77,11 @@ public final class MqttMessageFactory {
return new MqttMessage(mqttFixedHeader);
default:
throw new IllegalArgumentException("Unknown message type: " + mqttFixedHeader.messageType());
throw new IllegalArgumentException("unknown message type: " + mqttFixedHeader.messageType());
}
}
public static MqttMessage createInvalidMessage(Throwable cause) {
public static MqttMessage newInvalidMessage(Throwable cause) {
return new MqttMessage(null, null, null, DecoderResult.failure(cause));
}

View File

@ -28,9 +28,7 @@ public final class MqttMessageIdVariableHeader {
public static MqttMessageIdVariableHeader from(int messageId) {
if (messageId < 1 || messageId > 0xffff) {
throw new IllegalArgumentException(
String.format("Message id must be in the range of 1 - 0xffff but %d given ",
messageId));
throw new IllegalArgumentException("messageId: " + messageId + " (expected: 1 ~ 65535)");
}
return new MqttMessageIdVariableHeader(messageId);
}

View File

@ -51,7 +51,7 @@ public enum MqttMessageType {
return t;
}
}
throw new IllegalArgumentException("message type " + type + " unsupported");
throw new IllegalArgumentException("unknown message type: " + type);
}
}

View File

@ -15,13 +15,14 @@
package io.netty.handler.codec.mqtt;
public enum QoS {
public enum MqttQoS {
AT_MOST_ONCE(0),
AT_LEAST_ONCE(1),
EXACTLY_ONCE(2);
private final int value;
QoS(int value) {
MqttQoS(int value) {
this.value = value;
}
@ -29,12 +30,12 @@ public enum QoS {
return value;
}
public static QoS valueOf(int value) {
for (QoS q : values()) {
public static MqttQoS valueOf(int value) {
for (MqttQoS q: values()) {
if (q.value == value) {
return q;
}
}
throw new IllegalArgumentException(String.format("Invalid QoS: %d", value));
throw new IllegalArgumentException("invalid QoS: " + value);
}
}

View File

@ -18,6 +18,8 @@ package io.netty.handler.codec.mqtt;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -27,8 +29,30 @@ public class MqttSubAckPayload {
private final List<Integer> grantedQoSLevels;
public MqttSubAckPayload(List<Integer> grantedQoSLevels) {
this.grantedQoSLevels = grantedQoSLevels;
public MqttSubAckPayload(int... grantedQoSLevels) {
if (grantedQoSLevels == null) {
throw new NullPointerException("grantedQoSLevels");
}
List<Integer> list = new ArrayList<Integer>(grantedQoSLevels.length);
for (int v: grantedQoSLevels) {
list.add(v);
}
this.grantedQoSLevels = Collections.unmodifiableList(list);
}
public MqttSubAckPayload(Iterable<Integer> grantedQoSLevels) {
if (grantedQoSLevels == null) {
throw new NullPointerException("grantedQoSLevels");
}
List<Integer> list = new ArrayList<Integer>();
for (Integer v: grantedQoSLevels) {
if (v == null) {
break;
}
list.add(v);
}
this.grantedQoSLevels = Collections.unmodifiableList(list);
}
public List<Integer> grantedQoSLevels() {

View File

@ -25,9 +25,9 @@ import io.netty.util.internal.StringUtil;
public class MqttTopicSubscription {
private final String topicFilter;
private final QoS qualityOfService;
private final MqttQoS qualityOfService;
public MqttTopicSubscription(String topicFilter, QoS qualityOfService) {
public MqttTopicSubscription(String topicFilter, MqttQoS qualityOfService) {
this.topicFilter = topicFilter;
this.qualityOfService = qualityOfService;
}
@ -36,7 +36,7 @@ public class MqttTopicSubscription {
return topicFilter;
}
public QoS qualityOfService() {
public MqttQoS qualityOfService() {
return qualityOfService;
}

View File

@ -1,59 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.handler.codec.DecoderException;
public final class MqttValidationUtil {
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
private static final int MAX_CLIENT_ID_LENGTH = 23;
public static boolean isValidPublishTopicName(String topicName) {
// publish topic name must not contain any wildcard
for (char c : TOPIC_WILDCARDS) {
if (topicName.indexOf(c) >= 0) {
return false;
}
}
return true;
}
public static boolean isValidMessageId(int messageId) {
return messageId != 0;
}
public static boolean isValidClientId(String clientId) {
return clientId != null && clientId.length() <= MAX_CLIENT_ID_LENGTH;
}
public static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBREL:
case SUBSCRIBE:
case UNSUBSCRIBE:
if (mqttFixedHeader.qosLevel() != QoS.AT_LEAST_ONCE) {
throw new DecoderException(String.format("%s message must have QoS 1",
mqttFixedHeader.messageType().name()));
}
default:
return mqttFixedHeader;
}
}
private MqttValidationUtil() { }
}

View File

@ -235,7 +235,7 @@ public class MqttCodecTest {
// Message types to help testing
private static MqttMessage createMessageWithFixedHeader(MqttMessageType messageType) {
return new MqttMessage(new MqttFixedHeader(messageType, false, QoS.AT_MOST_ONCE, false, 0));
return new MqttMessage(new MqttFixedHeader(messageType, false, MqttQoS.AT_MOST_ONCE, false, 0));
}
private static MqttMessage createMessageWithFixedHeaderAndMessageIdVariableHeader(MqttMessageType messageType) {
@ -243,7 +243,7 @@ public class MqttCodecTest {
new MqttFixedHeader(
messageType,
false,
messageType == MqttMessageType.PUBREL ? QoS.AT_LEAST_ONCE : QoS.AT_MOST_ONCE,
messageType == MqttMessageType.PUBREL ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE,
false,
0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
@ -252,7 +252,7 @@ public class MqttCodecTest {
private static MqttConnectMessage createConnectMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNECT, false, QoS.AT_MOST_ONCE, false, 0);
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnectVariableHeader mqttConnectVariableHeader =
new MqttConnectVariableHeader(
PROTOCOL_NAME,
@ -272,7 +272,7 @@ public class MqttCodecTest {
private static MqttConnAckMessage createConnAckMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, QoS.AT_MOST_ONCE, false, 0);
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
@ -280,7 +280,7 @@ public class MqttCodecTest {
private static MqttPublishMessage createPublishMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, QoS.AT_LEAST_ONCE, true, 0);
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", 1234);
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes("whatever".getBytes(CharsetUtil.UTF_8));
@ -289,13 +289,13 @@ public class MqttCodecTest {
private static MqttSubscribeMessage createSubscribeMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, QoS.AT_LEAST_ONCE, true, 0);
new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
List<MqttTopicSubscription> topicSubscriptions = new LinkedList<MqttTopicSubscription>();
topicSubscriptions.add(new MqttTopicSubscription("/abc", QoS.AT_LEAST_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/def", QoS.AT_LEAST_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/xyz", QoS.EXACTLY_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/abc", MqttQoS.AT_LEAST_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/def", MqttQoS.AT_LEAST_ONCE));
topicSubscriptions.add(new MqttTopicSubscription("/xyz", MqttQoS.EXACTLY_ONCE));
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(topicSubscriptions);
return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
@ -303,19 +303,15 @@ public class MqttCodecTest {
private static MqttSubAckMessage createSubAckMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, QoS.AT_MOST_ONCE, false, 0);
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
List<Integer> grantedQosLevels = new LinkedList<Integer>();
grantedQosLevels.add(1);
grantedQosLevels.add(2);
grantedQosLevels.add(0);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQosLevels);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(1, 2, 0);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
}
private static MqttUnsubscribeMessage createUnsubscribeMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, QoS.AT_LEAST_ONCE, true, 0);
new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
List<String> topics = new LinkedList<String>();