Give a choice for app to extend the length limitation of clientId even in mqtt v3.1 on the server side (#11205)
Motivation: In the mqtt v3.1 protocol, the default maximum Client Identifier length is 23.However, in (#11114), there are many cases, the server may still receive a client ID with a length greater than 23. Perhaps should consider letting the user decide whether accept client id greater than 23 on the server side Modification: - Allow to specify max length. Result: Give a choice for app to extend the length limitation of clientId even in mqtt v3.1 on the server side. Signed-off-by: xingrufei <xingrufei@sogou-inc.com> Co-authored-by: xingrufei <xingrufei@sogou-inc.com>
This commit is contained in:
parent
def8a3f17d
commit
d22a35628d
@ -21,11 +21,11 @@ import io.netty.handler.codec.DecoderException;
|
|||||||
import io.netty.util.Attribute;
|
import io.netty.util.Attribute;
|
||||||
import io.netty.util.AttributeKey;
|
import io.netty.util.AttributeKey;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.mqtt.MqttConstant.MIN_CLIENT_ID_LENGTH;
|
||||||
|
|
||||||
final class MqttCodecUtil {
|
final class MqttCodecUtil {
|
||||||
|
|
||||||
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
|
private static final char[] TOPIC_WILDCARDS = {'#', '+'};
|
||||||
private static final int MIN_CLIENT_ID_LENGTH = 1;
|
|
||||||
private static final int MAX_CLIENT_ID_LENGTH = 23;
|
|
||||||
|
|
||||||
static final AttributeKey<MqttVersion> MQTT_VERSION_KEY = AttributeKey.valueOf("NETTY_CODEC_MQTT_VERSION");
|
static final AttributeKey<MqttVersion> MQTT_VERSION_KEY = AttributeKey.valueOf("NETTY_CODEC_MQTT_VERSION");
|
||||||
|
|
||||||
@ -57,10 +57,10 @@ final class MqttCodecUtil {
|
|||||||
return messageId != 0;
|
return messageId != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isValidClientId(MqttVersion mqttVersion, String clientId) {
|
static boolean isValidClientId(MqttVersion mqttVersion, int maxClientIdLength, String clientId) {
|
||||||
if (mqttVersion == MqttVersion.MQTT_3_1) {
|
if (mqttVersion == MqttVersion.MQTT_3_1) {
|
||||||
return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
|
return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
|
||||||
clientId.length() <= MAX_CLIENT_ID_LENGTH;
|
clientId.length() <= maxClientIdLength;
|
||||||
}
|
}
|
||||||
if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_5) {
|
if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_5) {
|
||||||
// In 3.1.3.1 Client Identifier of MQTT 3.1.1 and 5.0 specifications, The Server MAY allow ClientId’s
|
// In 3.1.3.1 Client Identifier of MQTT 3.1.1 and 5.0 specifications, The Server MAY allow ClientId’s
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.mqtt;
|
||||||
|
|
||||||
|
public final class MqttConstant {
|
||||||
|
|
||||||
|
private MqttConstant() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default max bytes in message
|
||||||
|
*/
|
||||||
|
public static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* min client id length
|
||||||
|
*/
|
||||||
|
public static final int MIN_CLIENT_ID_LENGTH = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default max client id length,In the mqtt3.1 protocol,
|
||||||
|
* the default maximum Client Identifier length is 23
|
||||||
|
*/
|
||||||
|
public static final int DEFAULT_MAX_CLIENT_ID_LENGTH = 23;
|
||||||
|
|
||||||
|
}
|
@ -24,6 +24,7 @@ import io.netty.handler.codec.TooLongFrameException;
|
|||||||
import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
|
import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
|
||||||
import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
|
import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -33,6 +34,8 @@ import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
|
|||||||
import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
|
import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
|
||||||
import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
|
import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
|
||||||
import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
|
import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
|
||||||
|
import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
|
||||||
|
import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
|
||||||
import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
|
import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -45,8 +48,6 @@ import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlin
|
|||||||
*/
|
*/
|
||||||
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
|
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
|
||||||
|
|
||||||
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* States of the decoder.
|
* States of the decoder.
|
||||||
* We start at READ_FIXED_HEADER, followed by
|
* We start at READ_FIXED_HEADER, followed by
|
||||||
@ -64,14 +65,20 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
|
|||||||
private int bytesRemainingInVariablePart;
|
private int bytesRemainingInVariablePart;
|
||||||
|
|
||||||
private final int maxBytesInMessage;
|
private final int maxBytesInMessage;
|
||||||
|
private final int maxClientIdLength;
|
||||||
|
|
||||||
public MqttDecoder() {
|
public MqttDecoder() {
|
||||||
this(DEFAULT_MAX_BYTES_IN_MESSAGE);
|
this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MqttDecoder(int maxBytesInMessage) {
|
public MqttDecoder(int maxBytesInMessage) {
|
||||||
|
this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
|
||||||
super(DecoderState.READ_FIXED_HEADER);
|
super(DecoderState.READ_FIXED_HEADER);
|
||||||
this.maxBytesInMessage = maxBytesInMessage;
|
this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
|
||||||
|
this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -108,6 +115,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
|
|||||||
buffer,
|
buffer,
|
||||||
mqttFixedHeader.messageType(),
|
mqttFixedHeader.messageType(),
|
||||||
bytesRemainingInVariablePart,
|
bytesRemainingInVariablePart,
|
||||||
|
maxClientIdLength,
|
||||||
variableHeader);
|
variableHeader);
|
||||||
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
|
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
|
||||||
if (bytesRemainingInVariablePart != 0) {
|
if (bytesRemainingInVariablePart != 0) {
|
||||||
@ -434,10 +442,11 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
|
|||||||
ByteBuf buffer,
|
ByteBuf buffer,
|
||||||
MqttMessageType messageType,
|
MqttMessageType messageType,
|
||||||
int bytesRemainingInVariablePart,
|
int bytesRemainingInVariablePart,
|
||||||
|
int maxClientIdLength,
|
||||||
Object variableHeader) {
|
Object variableHeader) {
|
||||||
switch (messageType) {
|
switch (messageType) {
|
||||||
case CONNECT:
|
case CONNECT:
|
||||||
return decodeConnectionPayload(buffer, (MqttConnectVariableHeader) variableHeader);
|
return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
|
||||||
|
|
||||||
case SUBSCRIBE:
|
case SUBSCRIBE:
|
||||||
return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
|
return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
|
||||||
@ -462,12 +471,13 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
|
|||||||
|
|
||||||
private static Result<MqttConnectPayload> decodeConnectionPayload(
|
private static Result<MqttConnectPayload> decodeConnectionPayload(
|
||||||
ByteBuf buffer,
|
ByteBuf buffer,
|
||||||
|
int maxClientIdLength,
|
||||||
MqttConnectVariableHeader mqttConnectVariableHeader) {
|
MqttConnectVariableHeader mqttConnectVariableHeader) {
|
||||||
final Result<String> decodedClientId = decodeString(buffer);
|
final Result<String> decodedClientId = decodeString(buffer);
|
||||||
final String decodedClientIdValue = decodedClientId.value;
|
final String decodedClientIdValue = decodedClientId.value;
|
||||||
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
|
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
|
||||||
(byte) mqttConnectVariableHeader.version());
|
(byte) mqttConnectVariableHeader.version());
|
||||||
if (!isValidClientId(mqttVersion, decodedClientIdValue)) {
|
if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
|
||||||
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
|
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
|
||||||
}
|
}
|
||||||
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
|
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
|
||||||
|
@ -31,6 +31,7 @@ import static io.netty.buffer.ByteBufUtil.*;
|
|||||||
import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
|
import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
|
||||||
import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
|
import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
|
||||||
import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
|
import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
|
||||||
|
import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encodes Mqtt messages into bytes following the protocol specification v3.1
|
* Encodes Mqtt messages into bytes following the protocol specification v3.1
|
||||||
@ -124,7 +125,7 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
|
|||||||
|
|
||||||
// Client id
|
// Client id
|
||||||
String clientIdentifier = payload.clientIdentifier();
|
String clientIdentifier = payload.clientIdentifier();
|
||||||
if (!isValidClientId(mqttVersion, clientIdentifier)) {
|
if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
|
||||||
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
|
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
|
||||||
}
|
}
|
||||||
int clientIdentifierBytes = utf8Bytes(clientIdentifier);
|
int clientIdentifierBytes = utf8Bytes(clientIdentifier);
|
||||||
|
Loading…
Reference in New Issue
Block a user