From c2d2f0b254e92d11b15db48d8ae4a6c695a9ddb1 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 28 Feb 2012 16:30:00 -0800 Subject: [PATCH] Optimize FrameDecoder and ReplayingDecoder * Overall code cleanup on FrameDecoder and ReplayingDecoder * FrameDecoder discards readableBytes only when it has to * Replaced createCumulationDynamicBuffer with newCumulationBuffer with an additional hint * ReplayingDecoder does not perform memory copy if possible --- .../codec/frame/FixedLengthFrameDecoder.java | 9 +- .../handler/codec/frame/FrameDecoder.java | 60 +++++------ .../codec/replay/ReplayingDecoder.java | 99 ++++++++++++------- .../replay/UnsafeDynamicChannelBuffer.java | 4 +- 4 files changed, 91 insertions(+), 81 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/frame/FixedLengthFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/frame/FixedLengthFrameDecoder.java index 21537efced..57863f3208 100644 --- a/codec/src/main/java/io/netty/handler/codec/frame/FixedLengthFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/frame/FixedLengthFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.frame; import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferFactory; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -74,11 +75,13 @@ public class FixedLengthFrameDecoder extends FrameDecoder { } @Override - protected ChannelBuffer createCumulationDynamicBuffer(ChannelHandlerContext ctx) { + protected ChannelBuffer newCumulationBuffer(ChannelHandlerContext ctx, int minimumCapacity) { + ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); if (allocateFullBuffer) { - return ChannelBuffers.dynamicBuffer(frameLength, ctx.getChannel().getConfig().getBufferFactory()); + return ChannelBuffers.dynamicBuffer( + factory.getDefaultOrder(), frameLength, ctx.getChannel().getConfig().getBufferFactory()); } - return super.createCumulationDynamicBuffer(ctx); + return super.newCumulationBuffer(ctx, minimumCapacity); } } diff --git a/codec/src/main/java/io/netty/handler/codec/frame/FrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/frame/FrameDecoder.java index 73c797191b..4e2238d356 100644 --- a/codec/src/main/java/io/netty/handler/codec/frame/FrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/frame/FrameDecoder.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.frame; import java.net.SocketAddress; import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferFactory; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; @@ -206,23 +207,20 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); if (input.readable()) { // seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it - ChannelBuffer cumulation = cumulation(ctx); - cumulation.writeBytes(input); + (this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input); } } else { - ChannelBuffer cumulation = cumulation(ctx); - if (cumulation.readable()) { + ChannelBuffer cumulation = this.cumulation; + assert cumulation.readable(); + if (cumulation.writableBytes() < input.readableBytes()) { cumulation.discardReadBytes(); - cumulation.writeBytes(input); - callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); - } else { - callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); - if (input.readable()) { - cumulation.writeBytes(input); - } + } + cumulation.writeBytes(input); + callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); + if (!cumulation.readable()) { + this.cumulation = null; } } - } @Override @@ -303,10 +301,6 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { unfoldAndFireMessageReceived(context, remoteAddress, frame); } - - if (!cumulation.readable()) { - this.cumulation = null; - } } private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { @@ -333,10 +327,10 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { ChannelBuffer cumulation = this.cumulation; if (cumulation == null) { return; - } else { - this.cumulation = null; } + this.cumulation = null; + if (cumulation.readable()) { // Make sure all frames are read before notifying a closed channel. callDecode(ctx, ctx.getChannel(), cumulation, null); @@ -355,28 +349,18 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { } /** - * Get the currently used {@link ChannelBuffer} for cumulation or create one in a lazy fashion if none exist yet - * - * @param ctx the {@link ChannelHandlerContext} for this handler - * @return buffer the {@link ChannelBuffer} which is used for cumulation - */ - private ChannelBuffer cumulation(ChannelHandlerContext ctx) { - ChannelBuffer c = cumulation; - if (c == null) { - c = createCumulationDynamicBuffer(ctx); - cumulation = c; - } - return c; - } - - /** - * Create a new {@link ChannelBuffer} which is used for the cumulation. Be aware that this MUST be a dynamic buffer. Sub-classes may override this to provide a - * dynamic {@link ChannelBuffer} which has some prelocated size that better fit their need. - * + * Create a new {@link ChannelBuffer} which is used for the cumulation. + * Be aware that this MUST be a dynamic buffer. Sub-classes may override + * this to provide a dynamic {@link ChannelBuffer} which has some + * pre-allocated size that better fit their need. + * * @param ctx {@link ChannelHandlerContext} for this handler * @return buffer the {@link ChannelBuffer} which is used for cumulation */ - protected ChannelBuffer createCumulationDynamicBuffer(ChannelHandlerContext ctx) { - return ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory()); + protected ChannelBuffer newCumulationBuffer( + ChannelHandlerContext ctx, int minimumCapacity) { + ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); + return ChannelBuffers.dynamicBuffer( + factory.getDefaultOrder(), minimumCapacity, factory); } } diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java index 058f046625..0964c4e2f4 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java @@ -18,7 +18,6 @@ package io.netty.handler.codec.replay; import java.net.SocketAddress; import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; @@ -289,7 +288,6 @@ public abstract class ReplayingDecoder> private ChannelBuffer cumulation; - private boolean needsCleanup; private final boolean unfold; private ReplayingDecoderBuffer replayable; private T state; @@ -430,11 +428,53 @@ public abstract class ReplayingDecoder> return; } - ChannelBuffer cumulation = cumulation(ctx); - needsCleanup = true; - cumulation.discardReadBytes(); - cumulation.writeBytes(input); - callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); + if (cumulation == null) { + // the cumulation buffer is not created yet so just pass the input + // to callDecode(...) method + this.cumulation = input; + replayable = new ReplayingDecoderBuffer(input); + + int oldReaderIndex = input.readerIndex(); + int inputSize = input.readableBytes(); + callDecode( + ctx, e.getChannel(), + input, replayable, + e.getRemoteAddress()); + + if (input.readable()) { + // seems like there is something readable left in the input buffer + // or decoder wants a replay - create the cumulation buffer and + // copy the input into it + if (checkpoint >= 0) { + ChannelBuffer cumulation = this.cumulation = + newCumulationBuffer(ctx, inputSize); + cumulation.writeBytes(input, oldReaderIndex, inputSize); + cumulation.readerIndex(input.readerIndex()); + replayable = new ReplayingDecoderBuffer(cumulation); + } else { + System.out.println("B"); + ChannelBuffer cumulation = this.cumulation = + newCumulationBuffer(ctx, input.readableBytes()); + cumulation.writeBytes(input); + replayable = new ReplayingDecoderBuffer(cumulation); + } + } else { + this.cumulation = null; + replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; + } + } else { + ChannelBuffer cumulation = this.cumulation; + assert cumulation.readable(); + if (cumulation.writableBytes() < input.readableBytes()) { + cumulation.discardReadBytes(); + } + cumulation.writeBytes(input); + callDecode(ctx, e.getChannel(), cumulation, replayable, e.getRemoteAddress()); + if (!cumulation.readable()) { + this.cumulation = null; + replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; + } + } } @Override @@ -455,15 +495,15 @@ public abstract class ReplayingDecoder> ctx.sendUpstream(e); } - private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { - while (cumulation.readable()) { - int oldReaderIndex = checkpoint = cumulation.readerIndex(); + private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception { + while (input.readable()) { + int oldReaderIndex = checkpoint = input.readerIndex(); Object result = null; T oldState = state; try { - result = decode(context, channel, replayable, state); + result = decode(context, channel, replayableInput, state); if (result == null) { - if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { + if (oldReaderIndex == input.readerIndex() && oldState == state) { throw new IllegalStateException( "null cannot be returned if no data is consumed and state didn't change."); } else { @@ -476,7 +516,7 @@ public abstract class ReplayingDecoder> // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { - cumulation.readerIndex(checkpoint); + input.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. @@ -489,7 +529,7 @@ public abstract class ReplayingDecoder> break; } - if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { + if (oldReaderIndex == input.readerIndex() && oldState == state) { throw new IllegalStateException( "decode() method must consume at least one byte " + "if it returned a decoded message (caused by: " + @@ -498,11 +538,6 @@ public abstract class ReplayingDecoder> // A successful decode unfoldAndFireMessageReceived(context, result, remoteAddress); - - if (!cumulation.readable()) { - this.cumulation = null; - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; - } } } @@ -528,19 +563,17 @@ public abstract class ReplayingDecoder> private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { try { - if (!needsCleanup) { + ChannelBuffer cumulation = this.cumulation; + if (cumulation == null) { return; - } else { - needsCleanup = false; } - ChannelBuffer cumulation = this.cumulation; this.cumulation = null; replayable.terminate(); if (cumulation != null && cumulation.readable()) { // Make sure all data was read before notifying a closed channel. - callDecode(ctx, e.getChannel(), cumulation, null); + callDecode(ctx, e.getChannel(), cumulation, replayable, null); } // Call decodeLast() finally. Please note that decodeLast() is @@ -558,19 +591,9 @@ public abstract class ReplayingDecoder> } } - private ChannelBuffer cumulation(ChannelHandlerContext ctx) { - ChannelBuffer buf = this.cumulation; - if (buf == null) { - - if (cumulation == null) { - ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); - buf = new UnsafeDynamicChannelBuffer(factory); - cumulation = buf; - replayable = new ReplayingDecoderBuffer(buf); - } else { - buf = cumulation; - } - } - return buf; + private ChannelBuffer newCumulationBuffer( + ChannelHandlerContext ctx, int minimumCapacity) { + return new UnsafeDynamicChannelBuffer( + ctx.getChannel().getConfig().getBufferFactory(), minimumCapacity); } } diff --git a/codec/src/main/java/io/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java b/codec/src/main/java/io/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java index 57b5be4fc7..2a181ed99e 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java @@ -20,8 +20,8 @@ import io.netty.buffer.DynamicChannelBuffer; class UnsafeDynamicChannelBuffer extends DynamicChannelBuffer { - UnsafeDynamicChannelBuffer(ChannelBufferFactory factory) { - super(factory.getDefaultOrder(), 256, factory); + UnsafeDynamicChannelBuffer(ChannelBufferFactory factory, int minimumCapacity) { + super(factory.getDefaultOrder(), minimumCapacity, factory); } @Override