Retain AbstractBinaryMemcacheDecoder.currentMessage when sending it out and release it when it's not used.
Motivation: AbstractBinaryMemcacheDecoder.currentMessage is not retained after sending it out. Hence, if a message contains `extras`, `io.netty.util.IllegalReferenceCountException` will be thrown in `channelInactive`. Modifications: Retain AbstractBinaryMemcacheDecoder.currentMessage After putting it to `out` and release it when it's not used. Result: No IllegalReferenceCountException or leak.
This commit is contained in:
parent
78b508a7eb
commit
4606890513
@ -79,6 +79,7 @@ public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMess
|
|||||||
currentMessage = decodeHeader(in);
|
currentMessage = decodeHeader(in);
|
||||||
state = State.READ_EXTRAS;
|
state = State.READ_EXTRAS;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
resetDecoder();
|
||||||
out.add(invalidMessage(e));
|
out.add(invalidMessage(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -94,6 +95,7 @@ public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMess
|
|||||||
|
|
||||||
state = State.READ_KEY;
|
state = State.READ_KEY;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
resetDecoder();
|
||||||
out.add(invalidMessage(e));
|
out.add(invalidMessage(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -107,10 +109,10 @@ public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMess
|
|||||||
currentMessage.setKey(in.toString(in.readerIndex(), keyLength, CharsetUtil.UTF_8));
|
currentMessage.setKey(in.toString(in.readerIndex(), keyLength, CharsetUtil.UTF_8));
|
||||||
in.skipBytes(keyLength);
|
in.skipBytes(keyLength);
|
||||||
}
|
}
|
||||||
|
out.add(currentMessage.retain());
|
||||||
out.add(currentMessage);
|
|
||||||
state = State.READ_CONTENT;
|
state = State.READ_CONTENT;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
resetDecoder();
|
||||||
out.add(invalidMessage(e));
|
out.add(invalidMessage(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -150,9 +152,11 @@ public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMess
|
|||||||
out.add(LastMemcacheContent.EMPTY_LAST_CONTENT);
|
out.add(LastMemcacheContent.EMPTY_LAST_CONTENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resetDecoder();
|
||||||
state = State.READ_HEADER;
|
state = State.READ_HEADER;
|
||||||
return;
|
return;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
resetDecoder();
|
||||||
out.add(invalidChunk(e));
|
out.add(invalidChunk(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -200,10 +204,6 @@ public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMess
|
|||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
super.channelInactive(ctx);
|
super.channelInactive(ctx);
|
||||||
|
|
||||||
if (currentMessage != null) {
|
|
||||||
currentMessage.release();
|
|
||||||
}
|
|
||||||
|
|
||||||
resetDecoder();
|
resetDecoder();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,7 +211,10 @@ public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMess
|
|||||||
* Prepare for next decoding iteration.
|
* Prepare for next decoding iteration.
|
||||||
*/
|
*/
|
||||||
protected void resetDecoder() {
|
protected void resetDecoder() {
|
||||||
|
if (currentMessage != null) {
|
||||||
|
currentMessage.release();
|
||||||
currentMessage = null;
|
currentMessage = null;
|
||||||
|
}
|
||||||
alreadyReadChunkSize = 0;
|
alreadyReadChunkSize = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ import static org.hamcrest.CoreMatchers.*;
|
|||||||
import static org.hamcrest.MatcherAssert.*;
|
import static org.hamcrest.MatcherAssert.*;
|
||||||
import static org.hamcrest.core.IsNull.notNullValue;
|
import static org.hamcrest.core.IsNull.notNullValue;
|
||||||
import static org.hamcrest.core.IsNull.nullValue;
|
import static org.hamcrest.core.IsNull.nullValue;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifies the correct functionality of the {@link AbstractBinaryMemcacheDecoder}.
|
* Verifies the correct functionality of the {@link AbstractBinaryMemcacheDecoder}.
|
||||||
@ -248,4 +249,24 @@ public class BinaryMemcacheDecoderTest {
|
|||||||
assertThat(content.content().toString(CharsetUtil.UTF_8), is(msgBody));
|
assertThat(content.content().toString(CharsetUtil.UTF_8), is(msgBody));
|
||||||
content.release();
|
content.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldRetainCurrentMessageWhenSendingItOut() {
|
||||||
|
channel = new EmbeddedChannel(
|
||||||
|
new BinaryMemcacheRequestEncoder(),
|
||||||
|
new BinaryMemcacheRequestDecoder());
|
||||||
|
|
||||||
|
String key = "Netty";
|
||||||
|
ByteBuf extras = Unpooled.copiedBuffer("extras", CharsetUtil.UTF_8);
|
||||||
|
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(key, extras);
|
||||||
|
request.setKeyLength((short) key.length());
|
||||||
|
request.setExtrasLength((byte) extras.readableBytes());
|
||||||
|
|
||||||
|
assertTrue(channel.writeOutbound(request));
|
||||||
|
assertTrue(channel.writeInbound(channel.outboundMessages().toArray()));
|
||||||
|
|
||||||
|
BinaryMemcacheRequest read = channel.readInbound();
|
||||||
|
read.release();
|
||||||
|
// tearDown will call "channel.finish()"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user