The MqttDecoder incorrectly skip bytes before throwing TooLongFrameException (#11362)

Motivation:

Commit c32c520edd incorrectly skip the bytes of the replay decoder buffer. The number of bytes to skip is determined by ByteBuf#readableBytes() instead of using ByteToMessageDecoder#actualReadableBytes(). As result it throws an exception because the ByteBuf provided will return a too large value (Integer.MAX_VALUE - reader index) causing a bound check error in the skipBytes method. This is not detected by the tests because most tests are calling the decode(...) method with a regular ByteBuf. In practice when this method is called with a specialized ByteBuf when channelRead(...) is called. Such tests should actually use channelRead with proper mocking of the ChannelHandlerContext

Modification:

- Rewrite the MqttCodecTest to use channelRead(...) instead of decode(...) and use proper mocking of ChannelHandlerContext to get the message emitted by the decoder.
- Use actualReadableBytes() instead of buff.readableBytes() to compute the number of bytes to skip

Result:

Skip correctly the number of bytes when a too large message is found and improve testing. See #11361

Signed-off-by: Julien Viet <julien@julienviet.com>
This commit is contained in:
Julien Viet 2021-06-10 15:05:25 +02:00 committed by GitHub
parent c78afbc4f1
commit 06f7deb030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 122 additions and 177 deletions

View File

@ -98,7 +98,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
final Result<?> decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
buffer.skipBytes(buffer.readableBytes());
buffer.skipBytes(actualReadableBytes());
throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;

View File

@ -25,11 +25,16 @@ import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Attribute;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@ -69,6 +74,8 @@ public class MqttCodecTest {
@Mock
private final Attribute<MqttVersion> versionAttrMock = mock(Attribute.class);
private final List<Object> out = new ArrayList<Object>();
private final MqttDecoder mqttDecoder = new MqttDecoder();
/**
@ -81,16 +88,30 @@ public class MqttCodecTest {
MockitoAnnotations.initMocks(this);
when(ctx.channel()).thenReturn(channel);
when(ctx.alloc()).thenReturn(ALLOCATOR);
when(ctx.fireChannelRead(any())).then(new Answer<ChannelHandlerContext>() {
@Override
public ChannelHandlerContext answer(InvocationOnMock invocation) {
out.add(invocation.getArguments()[0]);
return ctx;
}
});
when(channel.attr(MqttCodecUtil.MQTT_VERSION_KEY)).thenReturn(versionAttrMock);
}
@After
public void after() {
for (Object o : out) {
ReferenceCountUtil.release(o);
}
out.clear();
}
@Test
public void testConnectMessageForMqtt31() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -106,8 +127,7 @@ public class MqttCodecTest {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -122,22 +142,18 @@ public class MqttCodecTest {
public void testConnectMessageWithNonZeroReservedFlagForMqtt311() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
// Set the reserved flag in the CONNECT Packet to 1
byteBuf.setByte(9, byteBuf.getByte(9) | 0x1);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
// Set the reserved flag in the CONNECT Packet to 1
byteBuf.setByte(9, byteBuf.getByte(9) | 0x1);
mqttDecoder.channelRead(ctx, byteBuf);
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
assertTrue(decodedMessage.decoderResult().isFailure());
Throwable cause = decodedMessage.decoderResult().cause();
assertTrue(cause instanceof DecoderException);
assertEquals("non-zero reserved flag", cause.getMessage());
} finally {
byteBuf.release();
}
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
assertTrue(decodedMessage.decoderResult().isFailure());
Throwable cause = decodedMessage.decoderResult().cause();
assertTrue(cause instanceof DecoderException);
assertEquals("non-zero reserved flag", cause.getMessage());
}
@Test
@ -161,8 +177,7 @@ public class MqttCodecTest {
final MqttConnAckMessage message = createConnAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -176,8 +191,7 @@ public class MqttCodecTest {
final MqttPublishMessage message = createPublishMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -212,8 +226,7 @@ public class MqttCodecTest {
final MqttSubscribeMessage message = createSubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -228,8 +241,7 @@ public class MqttCodecTest {
final MqttSubAckMessage message = createSubAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -250,8 +262,7 @@ public class MqttCodecTest {
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -268,8 +279,7 @@ public class MqttCodecTest {
final MqttUnsubscribeMessage message = createUnsubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -305,22 +315,18 @@ public class MqttCodecTest {
final MqttMessage message = createMessageWithFixedHeader(MqttMessageType.PINGREQ);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
// setting an invalid message type (15, reserved and forbidden by MQTT 3.1.1 spec)
byteBuf.setByte(0, 0xF0);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
// setting an invalid message type (15, reserved and forbidden by MQTT 3.1.1 spec)
byteBuf.setByte(0, 0xF0);
mqttDecoder.channelRead(ctx, byteBuf);
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
assertTrue(decodedMessage.decoderResult().isFailure());
Throwable cause = decodedMessage.decoderResult().cause();
assertTrue(cause instanceof DecoderException);
assertEquals("AUTH message requires at least MQTT 5", cause.getMessage());
} finally {
byteBuf.release();
}
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
assertTrue(decodedMessage.decoderResult().isFailure());
Throwable cause = decodedMessage.decoderResult().cause();
assertTrue(cause instanceof DecoderException);
assertEquals("AUTH message requires at least MQTT 5", cause.getMessage());
}
@Test
@ -328,23 +334,18 @@ public class MqttCodecTest {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(),
(MqttConnectVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(),
(MqttConnectVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -352,23 +353,18 @@ public class MqttCodecTest {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(),
(MqttConnectVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(),
(MqttConnectVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -376,20 +372,15 @@ public class MqttCodecTest {
final MqttConnAckMessage message = createConnAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -397,23 +388,18 @@ public class MqttCodecTest {
final MqttPublishMessage message = createPublishMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePublishVariableHeader(message.variableHeader(),
(MqttPublishVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePublishVariableHeader(message.variableHeader(),
(MqttPublishVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -421,22 +407,17 @@ public class MqttCodecTest {
final MqttSubscribeMessage message = createSubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -444,22 +425,17 @@ public class MqttCodecTest {
final MqttSubAckMessage message = createSubAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -467,22 +443,17 @@ public class MqttCodecTest {
final MqttUnsubscribeMessage message = createUnsubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals("Expected one object but got " + out.size(), 1, out.size());
assertEquals(0, byteBuf.readableBytes());
assertEquals(0, byteBuf.readableBytes());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
}
@Test
@ -496,8 +467,7 @@ public class MqttCodecTest {
createConnectMessage(MqttVersion.MQTT_5, USER_NAME, PASSWORD, props, willProps);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -517,9 +487,7 @@ public class MqttCodecTest {
final MqttConnAckMessage message = createConnAckMessage(props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -547,9 +515,7 @@ public class MqttCodecTest {
final MqttPublishMessage message = createPublishMessage(props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -568,9 +534,7 @@ public class MqttCodecTest {
final MqttMessage message = createPubAckMessage((byte) 0x87, props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -586,9 +550,7 @@ public class MqttCodecTest {
final MqttMessage message = createPubAckMessage((byte) 0, MqttProperties.NO_PROPERTIES);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -605,9 +567,7 @@ public class MqttCodecTest {
final MqttSubAckMessage message = createSubAckMessage(props, new int[] {1, 2, 0, 0x87 /* not authorized */});
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -639,9 +599,7 @@ public class MqttCodecTest {
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttSubscribeMessage decodedMessage = (MqttSubscribeMessage) out.get(0);
@ -671,9 +629,7 @@ public class MqttCodecTest {
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttSubscribeMessage decodedMessage = (MqttSubscribeMessage) out.get(0);
@ -704,9 +660,7 @@ public class MqttCodecTest {
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -731,9 +685,7 @@ public class MqttCodecTest {
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
@ -752,9 +704,7 @@ public class MqttCodecTest {
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
@ -776,9 +726,7 @@ public class MqttCodecTest {
.build();
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
@ -798,14 +746,13 @@ public class MqttCodecTest {
verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5);
clearInvocations(versionAttrMock);
final List<Object> connectOut = new LinkedList<Object>();
mqttDecoder.decode(ctx, connectByteBuf, connectOut);
mqttDecoder.channelRead(ctx, connectByteBuf);
verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5);
assertEquals("Expected one CONNECT object but got " + connectOut.size(), 1, connectOut.size());
assertEquals("Expected one CONNECT object but got " + out.size(), 1, out.size());
final MqttConnectMessage decodedConnectMessage = (MqttConnectMessage) connectOut.get(0);
final MqttConnectMessage decodedConnectMessage = (MqttConnectMessage) out.get(0);
validateFixedHeaders(connectMessage.fixedHeader(), decodedConnectMessage.fixedHeader());
validateConnectVariableHeader(connectMessage.variableHeader(), decodedConnectMessage.variableHeader());
@ -817,8 +764,7 @@ public class MqttCodecTest {
private void testMessageWithOnlyFixedHeader(MqttMessage message) throws Exception {
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
@ -832,8 +778,7 @@ public class MqttCodecTest {
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>();
mqttDecoder.decode(ctx, byteBuf, out);
mqttDecoder.channelRead(ctx, byteBuf);
assertEquals("Expected one object but got " + out.size(), 1, out.size());