diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java index d1ad30fc9c..b4854cbb9e 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java @@ -98,7 +98,7 @@ public final class MqttDecoder extends ReplayingDecoder { final Result decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader); variableHeader = decodedVariableHeader.value; if (bytesRemainingInVariablePart > maxBytesInMessage) { - buffer.skipBytes(buffer.readableBytes()); + buffer.skipBytes(actualReadableBytes()); throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes"); } bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java index 735050b368..b820e7f895 100644 --- a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java @@ -25,12 +25,17 @@ import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.EncoderException; import io.netty.util.Attribute; import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -70,6 +75,8 @@ public class MqttCodecTest { @Mock private final Attribute versionAttrMock = mock(Attribute.class); + private final List out = new ArrayList(); + private final MqttDecoder mqttDecoder = new MqttDecoder(); /** @@ -78,11 +85,28 @@ public class MqttCodecTest { private final MqttDecoder mqttDecoderLimitedMessageSize = new MqttDecoder(1); @Before - public void setup() { + public void setup() throws Exception { MockitoAnnotations.initMocks(this); when(ctx.channel()).thenReturn(channel); when(ctx.alloc()).thenReturn(ALLOCATOR); + when(ctx.fireChannelRead(any())).then(new Answer() { + @Override + public ChannelHandlerContext answer(InvocationOnMock invocation) { + out.add(invocation.getArguments()[0]); + return ctx; + } + }); when(channel.attr(MqttCodecUtil.MQTT_VERSION_KEY)).thenReturn(versionAttrMock); + mqttDecoder.handlerAdded(ctx); + mqttDecoderLimitedMessageSize.handlerAdded(ctx); + } + + @After + public void after() { + for (Object o : out) { + ReferenceCountUtil.release(o); + } + out.clear(); } @Test @@ -91,7 +115,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttConnectMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttConnectMessage decodedMessage = captor.getValue(); @@ -107,7 +131,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttConnectMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttConnectMessage decodedMessage = captor.getValue(); @@ -121,22 +145,18 @@ public class MqttCodecTest { public void testConnectMessageWithNonZeroReservedFlagForMqtt311() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - try { - // Set the reserved flag in the CONNECT Packet to 1 - byteBuf.setByte(9, byteBuf.getByte(9) | 0x1); + // Set the reserved flag in the CONNECT Packet to 1 + byteBuf.setByte(9, byteBuf.getByte(9) | 0x1); - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoder.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - final MqttMessage decodedMessage = captor.getValue(); - assertTrue(decodedMessage.decoderResult().isFailure()); - Throwable cause = decodedMessage.decoderResult().cause(); - assertTrue(cause instanceof DecoderException); - assertEquals("non-zero reserved flag", cause.getMessage()); - } finally { - byteBuf.release(); - } + final MqttMessage decodedMessage = captor.getValue(); + assertTrue(decodedMessage.decoderResult().isFailure()); + Throwable cause = decodedMessage.decoderResult().cause(); + assertTrue(cause instanceof DecoderException); + assertEquals("non-zero reserved flag", cause.getMessage()); } @Test @@ -161,7 +181,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttConnAckMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttConnAckMessage decodedMessage = captor.getValue(); @@ -175,7 +195,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttPublishMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttPublishMessage decodedMessage = captor.getValue(); @@ -210,7 +230,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubscribeMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttSubscribeMessage decodedMessage = captor.getValue(); @@ -225,7 +245,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubAckMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttSubAckMessage decodedMessage = captor.getValue(); @@ -246,7 +266,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubAckMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); MqttSubAckMessage decodedMessage = captor.getValue(); @@ -263,7 +283,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttUnsubscribeMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttUnsubscribeMessage decodedMessage = captor.getValue(); @@ -298,44 +318,35 @@ public class MqttCodecTest { final MqttMessage message = createMessageWithFixedHeader(MqttMessageType.PINGREQ); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - try { - // setting an invalid message type (15, reserved and forbidden by MQTT 3.1.1 spec) - byteBuf.setByte(0, 0xF0); + // setting an invalid message type (15, reserved and forbidden by MQTT 3.1.1 spec) + byteBuf.setByte(0, 0xF0); - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoder.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - final MqttMessage decodedMessage = captor.getValue(); - assertTrue(decodedMessage.decoderResult().isFailure()); - Throwable cause = decodedMessage.decoderResult().cause(); - assertTrue(cause instanceof DecoderException); - assertEquals("AUTH message requires at least MQTT 5", cause.getMessage()); - } finally { - byteBuf.release(); - } + final MqttMessage decodedMessage = captor.getValue(); + assertTrue(decodedMessage.decoderResult().isFailure()); + Throwable cause = decodedMessage.decoderResult().cause(); + assertTrue(cause instanceof DecoderException); + assertEquals("AUTH message requires at least MQTT 5", cause.getMessage()); } @Test public void testConnectMessageForMqtt31TooLarge() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); - - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validateConnectVariableHeader(message.variableHeader(), - (MqttConnectVariableHeader) decodedMessage.variableHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateConnectVariableHeader(message.variableHeader(), + (MqttConnectVariableHeader) decodedMessage.variableHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test @@ -343,63 +354,49 @@ public class MqttCodecTest { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validateConnectVariableHeader(message.variableHeader(), - (MqttConnectVariableHeader) decodedMessage.variableHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateConnectVariableHeader(message.variableHeader(), + (MqttConnectVariableHeader) decodedMessage.variableHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test public void testConnAckMessageTooLarge() throws Exception { final MqttConnAckMessage message = createConnAckMessage(); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); - - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test public void testPublishMessageTooLarge() throws Exception { final MqttPublishMessage message = createPublishMessage(); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); - - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validatePublishVariableHeader(message.variableHeader(), - (MqttPublishVariableHeader) decodedMessage.variableHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePublishVariableHeader(message.variableHeader(), + (MqttPublishVariableHeader) decodedMessage.variableHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test @@ -407,21 +404,17 @@ public class MqttCodecTest { final MqttSubscribeMessage message = createSubscribeMessage(); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validateMessageIdVariableHeader(message.variableHeader(), - (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), + (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test @@ -429,21 +422,17 @@ public class MqttCodecTest { final MqttSubAckMessage message = createSubAckMessage(); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validateMessageIdVariableHeader(message.variableHeader(), - (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), + (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test @@ -451,21 +440,17 @@ public class MqttCodecTest { final MqttUnsubscribeMessage message = createUnsubscribeMessage(); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - try { - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoderLimitedMessageSize.decode(ctx, byteBuf); - verify(ctx).fireChannelRead(captor.capture()); + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); + mqttDecoderLimitedMessageSize.channelRead(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); - final MqttMessage decodedMessage = captor.getValue(); - assertEquals(0, byteBuf.readableBytes()); + final MqttMessage decodedMessage = captor.getValue(); + assertEquals(0, byteBuf.readableBytes()); - validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); - validateMessageIdVariableHeader(message.variableHeader(), - (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); - validateDecoderExceptionTooLargeMessage(decodedMessage); - } finally { - byteBuf.release(); - } + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), + (MqttMessageIdVariableHeader) decodedMessage.variableHeader()); + validateDecoderExceptionTooLargeMessage(decodedMessage); } @Test @@ -480,7 +465,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttConnectMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttConnectMessage decodedMessage = captor.getValue(); @@ -500,7 +485,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttConnAckMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttConnAckMessage decodedMessage = captor.getValue(); @@ -529,7 +514,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttPublishMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttPublishMessage decodedMessage = captor.getValue(); @@ -549,7 +534,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue(); @@ -566,7 +551,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue(); @@ -584,7 +569,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubAckMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttSubAckMessage decodedMessage = captor.getValue(); @@ -617,7 +602,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubscribeMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttSubscribeMessage decodedMessage = captor.getValue(); @@ -648,10 +633,8 @@ public class MqttCodecTest { .build(); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); - final List out = new LinkedList(); - ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubscribeMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttSubscribeMessage decodedMessage = captor.getValue(); @@ -682,7 +665,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttUnsubAckMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttUnsubAckMessage decodedMessage = captor.getValue(); @@ -708,7 +691,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue(); @@ -729,7 +712,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue(); @@ -753,7 +736,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue(); @@ -775,7 +758,7 @@ public class MqttCodecTest { clearInvocations(versionAttrMock); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttConnectMessage.class); - mqttDecoder.decode(ctx, connectByteBuf); + mqttDecoder.channelRead(ctx, connectByteBuf); verify(ctx).fireChannelRead(captor.capture()); verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5); @@ -792,7 +775,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue(); @@ -806,7 +789,7 @@ public class MqttCodecTest { ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ArgumentCaptor captor = ArgumentCaptor.forClass(MqttMessage.class); - mqttDecoder.decode(ctx, byteBuf); + mqttDecoder.channelRead(ctx, byteBuf); verify(ctx).fireChannelRead(captor.capture()); final MqttMessage decodedMessage = captor.getValue();