From 1bf0dfe64a592f21eee7714fda6d091e7b8c5dbc Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 17 May 2012 12:37:37 +0900 Subject: [PATCH] Retrofit ReplayingDecoder with the new API - Moved up to 'codec' from 'codec.replay' - Test passes with Redis client codec --- .../codec/http/HttpMessageDecoder.java | 2 +- .../websocketx/WebSocket00FrameDecoder.java | 4 +- .../websocketx/WebSocket08FrameDecoder.java | 2 +- .../handler/codec/spdy/SpdyFrameEncoder.java | 2 +- .../codec/spdy/SpdySessionHandler.java | 2 +- .../codec/{replay => }/ReplayError.java | 9 +- .../codec/{replay => }/ReplayingDecoder.java | 379 +++++------------- .../{replay => }/ReplayingDecoderBuffer.java | 2 +- .../handler/codec/StreamToMessageDecoder.java | 2 +- .../UnreplayableOperationException.java | 2 +- .../handler/codec/{replay => }/VoidEnum.java | 2 +- .../handler/codec/redis/RedisDecoder.java | 20 +- .../handler/codec/redis/RedisEncoder.java | 32 +- .../handler/codec/replay/package-info.java | 28 -- .../CompatibleObjectDecoder.java | 2 +- .../{replay => }/ReplayingDecoderTest.java | 21 +- 16 files changed, 154 insertions(+), 357 deletions(-) rename codec/src/main/java/io/netty/handler/codec/{replay => }/ReplayError.java (80%) rename codec/src/main/java/io/netty/handler/codec/{replay => }/ReplayingDecoder.java (54%) rename codec/src/main/java/io/netty/handler/codec/{replay => }/ReplayingDecoderBuffer.java (99%) rename codec/src/main/java/io/netty/handler/codec/{replay => }/UnreplayableOperationException.java (97%) rename codec/src/main/java/io/netty/handler/codec/{replay => }/VoidEnum.java (95%) delete mode 100644 codec/src/main/java/io/netty/handler/codec/replay/package-info.java rename codec/src/test/java/io/netty/handler/codec/{replay => }/ReplayingDecoderTest.java (79%) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java index 0c5c962493..95b9ed0ea8 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java @@ -22,8 +22,8 @@ import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.replay.ReplayingDecoder; /** * Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java index df008eed42..37d8c014d5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java @@ -18,9 +18,9 @@ package io.netty.handler.codec.http.websocketx; import io.netty.buffer.ChannelBuffer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.replay.ReplayingDecoder; -import io.netty.handler.codec.replay.VoidEnum; +import io.netty.handler.codec.VoidEnum; /** * Decodes {@link ChannelBuffer}s into {@link WebSocketFrame}s. diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java index 2aca19d07a..85a3a427b2 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java @@ -59,8 +59,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.replay.ReplayingDecoder; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 878dacf405..4b8e95393a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -57,7 +57,7 @@ public class SpdyFrameEncoder extends OneToOneEncoder { ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { if (evt instanceof ChannelStateEvent) { ChannelStateEvent e = (ChannelStateEvent) evt; - switch (e.getState()) { + switch (e.state()) { case OPEN: case CONNECTED: case BOUND: diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 95bb053625..3ed64fef5a 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -257,7 +257,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler throws Exception { if (evt instanceof ChannelStateEvent) { ChannelStateEvent e = (ChannelStateEvent) evt; - switch (e.getState()) { + switch (e.state()) { case OPEN: case CONNECTED: case BOUND: diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayError.java b/codec/src/main/java/io/netty/handler/codec/ReplayError.java similarity index 80% rename from codec/src/main/java/io/netty/handler/codec/replay/ReplayError.java rename to codec/src/main/java/io/netty/handler/codec/ReplayError.java index 42aabe8548..a523bf060b 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayError.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayError.java @@ -13,11 +13,14 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.replay; +package io.netty.handler.codec; - -class ReplayError extends Error { +final class ReplayError extends Error { private static final long serialVersionUID = 2666698631187527681L; + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } } diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java similarity index 54% rename from codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java rename to codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index f017f5c8bc..54ecb00e62 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -13,26 +13,21 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.replay; - -import java.net.SocketAddress; +package io.netty.handler.codec; +import static io.netty.handler.codec.MessageToMessageEncoder.*; import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.Channels; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.handler.codec.FrameDecoder; /** - * A specialized variation of {@link FrameDecoder} which enables implementation + * A specialized variation of {@link StreamToMessageDecoder} which enables implementation * of a non-blocking decoder in the blocking I/O paradigm. *

* The biggest difference between {@link ReplayingDecoder} and @@ -282,17 +277,16 @@ import io.netty.handler.codec.FrameDecoder; * the state type; use {@link VoidEnum} if state management is unused * * @apiviz.landmark - * @apiviz.has io.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws + * @apiviz.has io.netty.handler.codec.UnreplayableOperationException oneway - - throws */ -public abstract class ReplayingDecoder> - extends SimpleChannelUpstreamHandler { +public abstract class ReplayingDecoder> extends ChannelInboundHandlerAdapter { - - private ChannelBuffer cumulation; - private final boolean unfold; - private ReplayingDecoderBuffer replayable; - private T state; - private int checkpoint; + private final ChannelBufferHolder in = ChannelBufferHolders.byteBuffer(); + private final ChannelBuffer cumulation = in.byteBuffer(); + private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(cumulation); + private S state; + private int checkpoint = -1; + private volatile boolean inUse; /** * Creates a new instance with no initial state (i.e: {@code null}). @@ -301,48 +295,34 @@ public abstract class ReplayingDecoder> this(null); } - protected ReplayingDecoder(boolean unfold) { - this(null, unfold); - } - /** * Creates a new instance with the specified initial state. */ - protected ReplayingDecoder(T initialState) { - this(initialState, false); - } - - protected ReplayingDecoder(T initialState, boolean unfold) { + protected ReplayingDecoder(S initialState) { this.state = initialState; - this.unfold = unfold; } /** * Stores the internal cumulative buffer's reader position. */ protected void checkpoint() { - ChannelBuffer cumulation = this.cumulation; - if (cumulation != null) { - checkpoint = cumulation.readerIndex(); - } else { - checkpoint = -1; // buffer not available (already cleaned up) - } + checkpoint = cumulation.readerIndex(); } /** * Stores the internal cumulative buffer's reader position and updates * the current decoder state. */ - protected void checkpoint(T state) { + protected void checkpoint(S state) { checkpoint(); - setState(state); + state(state); } /** * Returns the current state of this decoder. * @return the current state of this decoder */ - protected T getState() { + protected S state() { return state; } @@ -350,41 +330,106 @@ public abstract class ReplayingDecoder> * Sets the current state of this decoder. * @return the old state of this decoder */ - protected T setState(T newState) { - T oldState = state; + protected S state(S newState) { + S oldState = state; state = newState; return oldState; } - /** - * Returns the actual number of readable bytes in the internal cumulative - * buffer of this decoder. You usually do not need to rely on this value - * to write a decoder. Use it only when you muse use it at your own risk. - * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. - */ - protected int actualReadableBytes() { - return internalBuffer().readableBytes(); + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + if (inUse) { + throw new IllegalStateException( + ReplayingDecoder.class.getSimpleName() + " cannot be shared."); + } + inUse = true; + return in; } - /** - * Returns the internal cumulative buffer of this decoder. You usually - * do not need to access the internal buffer directly to write a decoder. - * Use it only when you must use it at your own risk. - */ - protected ChannelBuffer internalBuffer() { - ChannelBuffer buf = this.cumulation; - if (buf == null) { - return ChannelBuffers.EMPTY_BUFFER; + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + callDecode(ctx); + } + + @Override + public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { + replayable.terminate(); + ChannelBuffer in = cumulation; + if (in.readable()) { + callDecode(ctx); + } + + try { + if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable, state))) { + in.discardReadBytes(); + ctx.fireInboundBufferUpdated(); + } + } catch (ReplayError replay) { + // Ignore + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } + + ctx.fireChannelInactive(); + } + + private void callDecode(ChannelInboundHandlerContext ctx) { + ChannelBuffer in = cumulation; + while (in.readable()) { + try { + int oldReaderIndex = checkpoint = in.readerIndex(); + Object result = null; + S oldState = state; + try { + result = decode(ctx, replayable, state); + if (result == null) { + if (oldReaderIndex == in.readerIndex() && oldState == state) { + throw new IllegalStateException( + "null cannot be returned if no data is consumed and state didn't change."); + } else { + // Previous data has been discarded or caused state transition. + // Probably it is reading on. + continue; + } + } + } catch (ReplayError replay) { + // Return to the checkpoint (or oldPosition) and retry. + int checkpoint = this.checkpoint; + if (checkpoint >= 0) { + in.readerIndex(checkpoint); + } else { + // Called by cleanup() - no need to maintain the readerIndex + // anymore because the buffer has been released already. + } + } + + if (result == null) { + // Seems like more data is required. + // Let us wait for the next notification. + break; + } + + if (oldReaderIndex == in.readerIndex() && oldState == state) { + throw new IllegalStateException( + "decode() method must consume at least one byte " + + "if it returned a decoded message (caused by: " + + getClass() + ")"); + } + + // A successful decode + MessageToMessageEncoder.unfoldAndAdd(ctx, ctx.nextIn(), result); + } catch (Throwable t) { + ctx.fireExceptionCaught(t); + } } - return buf; } /** * Decodes the received packets so far into a frame. * * @param ctx the context of this handler - * @param channel the current channel - * @param buffer the cumulative buffer of received packets so far. + * @param in the cumulative buffer of received packets so far. * Note that the buffer might be empty, which means you * should not make an assumption that the buffer contains * at least one byte in your decoder implementation. @@ -392,16 +437,14 @@ public abstract class ReplayingDecoder> * * @return the decoded frame */ - protected abstract Object decode(ChannelHandlerContext ctx, - Channel channel, ChannelBuffer buffer, T state) throws Exception; + public abstract O decode(ChannelInboundHandlerContext ctx, ChannelBuffer in, S state) throws Exception; /** * Decodes the received data so far into a frame when the channel is * disconnected. * * @param ctx the context of this handler - * @param channel the current channel - * @param buffer the cumulative buffer of received packets so far. + * @param in the cumulative buffer of received packets so far. * Note that the buffer might be empty, which means you * should not make an assumption that the buffer contains * at least one byte in your decoder implementation. @@ -409,207 +452,7 @@ public abstract class ReplayingDecoder> * * @return the decoded frame */ - protected Object decodeLast( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception { - return decode(ctx, channel, buffer, state); - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - - Object m = e.getMessage(); - if (!(m instanceof ChannelBuffer)) { - ctx.sendUpstream(e); - return; - } - - ChannelBuffer input = (ChannelBuffer) m; - if (!input.readable()) { - return; - } - - if (cumulation == null) { - // the cumulation buffer is not created yet so just pass the input - // to callDecode(...) method - this.cumulation = input; - replayable = new ReplayingDecoderBuffer(input); - - int oldReaderIndex = input.readerIndex(); - int inputSize = input.readableBytes(); - callDecode( - ctx, e.channel(), - input, replayable, - e.getRemoteAddress()); - - if (input.readable()) { - // seems like there is something readable left in the input buffer - // or decoder wants a replay - create the cumulation buffer and - // copy the input into it - ChannelBuffer cumulation; - if (checkpoint > 0) { - int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex); - cumulation = this.cumulation = - newCumulationBuffer(ctx, bytesToPreserve); - cumulation.writeBytes(input, checkpoint, bytesToPreserve); - } else if (checkpoint == 0) { - cumulation = this.cumulation = - newCumulationBuffer(ctx, inputSize); - cumulation.writeBytes(input, oldReaderIndex, inputSize); - cumulation.readerIndex(input.readerIndex()); - - } else { - cumulation = this.cumulation = - newCumulationBuffer(ctx, input.readableBytes()); - cumulation.writeBytes(input); - } - replayable = new ReplayingDecoderBuffer(cumulation); - } else { - this.cumulation = null; - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; - } - } else { - ChannelBuffer cumulation = this.cumulation; - assert cumulation.readable(); - if (cumulation.writableBytes() < input.readableBytes()) { - cumulation.discardReadBytes(); - } - cumulation.writeBytes(input); - callDecode(ctx, e.channel(), cumulation, replayable, e.getRemoteAddress()); - if (!cumulation.readable()) { - this.cumulation = null; - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; - } - } - } - - @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - cleanup(ctx, e); - } - - @Override - public void channelClosed(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - cleanup(ctx, e); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - ctx.sendUpstream(e); - } - - private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception { - while (input.readable()) { - int oldReaderIndex = checkpoint = input.readerIndex(); - Object result = null; - T oldState = state; - try { - result = decode(context, channel, replayableInput, state); - if (result == null) { - if (oldReaderIndex == input.readerIndex() && oldState == state) { - throw new IllegalStateException( - "null cannot be returned if no data is consumed and state didn't change."); - } else { - // Previous data has been discarded or caused state transition. - // Probably it is reading on. - continue; - } - } - } catch (ReplayError replay) { - // Return to the checkpoint (or oldPosition) and retry. - int checkpoint = this.checkpoint; - if (checkpoint >= 0) { - input.readerIndex(checkpoint); - } else { - // Called by cleanup() - no need to maintain the readerIndex - // anymore because the buffer has been released already. - } - } - - if (result == null) { - // Seems like more data is required. - // Let us wait for the next notification. - break; - } - - if (oldReaderIndex == input.readerIndex() && oldState == state) { - throw new IllegalStateException( - "decode() method must consume at least one byte " + - "if it returned a decoded message (caused by: " + - getClass() + ")"); - } - - // A successful decode - unfoldAndFireMessageReceived(context, result, remoteAddress); - } - } - - private void unfoldAndFireMessageReceived( - ChannelHandlerContext context, Object result, SocketAddress remoteAddress) { - if (unfold) { - if (result instanceof Object[]) { - for (Object r: (Object[]) result) { - Channels.fireMessageReceived(context, r, remoteAddress); - } - } else if (result instanceof Iterable) { - for (Object r: (Iterable) result) { - Channels.fireMessageReceived(context, r, remoteAddress); - } - } else { - Channels.fireMessageReceived(context, result, remoteAddress); - } - } else { - Channels.fireMessageReceived(context, result, remoteAddress); - } - } - - private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - try { - ChannelBuffer cumulation = this.cumulation; - if (cumulation == null) { - return; - } - - this.cumulation = null; - replayable.terminate(); - - if (cumulation != null && cumulation.readable()) { - // Make sure all data was read before notifying a closed channel. - callDecode(ctx, e.channel(), cumulation, replayable, null); - } - - // Call decodeLast() finally. Please note that decodeLast() is - // called even if there's nothing more to read from the buffer to - // notify a user that the connection was closed explicitly. - Object partiallyDecoded = decodeLast(ctx, e.channel(), replayable, state); - if (partiallyDecoded != null) { - unfoldAndFireMessageReceived(ctx, partiallyDecoded, null); - } - } catch (ReplayError replay) { - // Ignore - } finally { - replayable = ReplayingDecoderBuffer.EMPTY_BUFFER; - ctx.sendUpstream(e); - } - } - - /** - * Create a new {@link ChannelBuffer} which is used for the cumulation. - * Be aware that this MUST be a dynamic buffer. Sub-classes may override - * this to provide a dynamic {@link ChannelBuffer} which has some - * pre-allocated size that better fit their need. - * - * @param ctx {@link ChannelHandlerContext} for this handler - * @return buffer the {@link ChannelBuffer} which is used for cumulation - */ - protected ChannelBuffer newCumulationBuffer( - ChannelHandlerContext ctx, int minimumCapacity) { - ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory(); - return ChannelBuffers.dynamicBuffer( - factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory); + public O decodeLast(ChannelInboundHandlerContext ctx, ChannelBuffer in, S state) throws Exception { + return decode(ctx, in, state); } } diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java similarity index 99% rename from codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoderBuffer.java rename to codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index b258af7185..379e0ea41b 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.replay; +package io.netty.handler.codec; import java.io.IOException; import java.io.InputStream; diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index 06ec9c3da7..f40236c99e 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -23,7 +23,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { ChannelBuffer in = ctx.in().byteBuffer(); - if (!in.readable()) { + if (in.readable()) { callDecode(ctx); } diff --git a/codec/src/main/java/io/netty/handler/codec/replay/UnreplayableOperationException.java b/codec/src/main/java/io/netty/handler/codec/UnreplayableOperationException.java similarity index 97% rename from codec/src/main/java/io/netty/handler/codec/replay/UnreplayableOperationException.java rename to codec/src/main/java/io/netty/handler/codec/UnreplayableOperationException.java index daf1250937..e911c9e525 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/UnreplayableOperationException.java +++ b/codec/src/main/java/io/netty/handler/codec/UnreplayableOperationException.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.replay; +package io.netty.handler.codec; import io.netty.buffer.ChannelBuffer; diff --git a/codec/src/main/java/io/netty/handler/codec/replay/VoidEnum.java b/codec/src/main/java/io/netty/handler/codec/VoidEnum.java similarity index 95% rename from codec/src/main/java/io/netty/handler/codec/replay/VoidEnum.java rename to codec/src/main/java/io/netty/handler/codec/VoidEnum.java index e0b35cb61f..fa620f13c4 100644 --- a/codec/src/main/java/io/netty/handler/codec/replay/VoidEnum.java +++ b/codec/src/main/java/io/netty/handler/codec/VoidEnum.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.replay; +package io.netty.handler.codec; /** * A placeholder {@link Enum} which could be specified as a type parameter of diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java index 9bca7d6afc..0bafe413b6 100644 --- a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java @@ -17,23 +17,23 @@ package io.netty.handler.codec.redis; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBufferIndexFinder; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.replay.ReplayingDecoder; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import io.netty.handler.codec.VoidEnum; import java.io.IOException; /** * {@link ReplayingDecoder} which handles Redis protocol - * + * * */ -public class RedisDecoder extends ReplayingDecoder { +public class RedisDecoder extends ReplayingDecoder { private static final char CR = '\r'; private static final char LF = '\n'; private static final char ZERO = '0'; - + // We track the current multibulk reply in the case // where we do not get a complete reply in a single // decode invocation. @@ -42,7 +42,7 @@ public class RedisDecoder extends ReplayingDecoder { /** * Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer} * via the {@link #readInteger(ChannelBuffer)} method - * + * * @param is the {@link ChannelBuffer} to read from * @throws IOException is thrown if the line-ending is not CRLF */ @@ -96,7 +96,7 @@ public class RedisDecoder extends ReplayingDecoder { } @Override - protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception { + public Reply decode(ChannelInboundHandlerContext channelHandlerContext, ChannelBuffer channelBuffer, VoidEnum anEnum) throws Exception { if (reply != null) { reply.read(this, channelBuffer); Reply ret = reply; @@ -138,7 +138,3 @@ public class RedisDecoder extends ReplayingDecoder { return reply; } } - -enum State { - -} diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java index 2adaffca3e..060f7bca65 100644 --- a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java @@ -16,48 +16,34 @@ package io.netty.handler.codec.redis; import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelDownstreamHandler; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToStreamEncoder; /** * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s */ @Sharable -public class RedisEncoder extends SimpleChannelDownstreamHandler { +public class RedisEncoder extends MessageToStreamEncoder { @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - Object o = e.getMessage(); + public void encode(ChannelOutboundHandlerContext ctx, Object msg, ChannelBuffer out) throws Exception { + Object o = msg; if (o instanceof Command) { - ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); - ChannelFuture future = e.getFuture(); - Command command = (Command) o; - command.write(cb); - Channels.write(ctx, future, cb); - + command.write(out); } else if (o instanceof Iterable) { - ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); - ChannelFuture future = e.getFuture(); - // Useful for transactions and database select for (Object i : (Iterable) o) { if (i instanceof Command) { Command command = (Command) i; - command.write(cb); + command.write(out); } else { - super.writeRequested(ctx, e); - return; + break; } } - Channels.write(ctx, future, cb); } else { - super.writeRequested(ctx, e); + throw new IllegalArgumentException("unsupported message type: " + msg.getClass().getName()); } } } diff --git a/codec/src/main/java/io/netty/handler/codec/replay/package-info.java b/codec/src/main/java/io/netty/handler/codec/replay/package-info.java deleted file mode 100644 index 39da245c7d..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/replay/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * Specialized variation of {@link io.netty.handler.codec.FrameDecoder} - * which enables implementation of a non-blocking decoder in the blocking I/O - * paradigm. - * - * @apiviz.exclude ^java\.lang\. - * @apiviz.exclude \.SimpleChannelUpstreamHandler$ - * @apiviz.exclude \.VoidEnum$ - * @apiviz.exclude \.codec\.(?!replay)[a-z0-9]+\. - */ -package io.netty.handler.codec.replay; - diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java index 79544f571d..d1717d7786 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java @@ -24,7 +24,7 @@ import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBufferInputStream; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.replay.ReplayingDecoder; +import io.netty.handler.codec.ReplayingDecoder; /** * A decoder which deserializes the received {@link ChannelBuffer}s into Java diff --git a/codec/src/test/java/io/netty/handler/codec/replay/ReplayingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java similarity index 79% rename from codec/src/test/java/io/netty/handler/codec/replay/ReplayingDecoderTest.java rename to codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java index 1b8af565b8..441e7ffc41 100644 --- a/codec/src/test/java/io/netty/handler/codec/replay/ReplayingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java @@ -13,24 +13,22 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.replay; +package io.netty.handler.codec; import static org.junit.Assert.*; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBufferIndexFinder; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.embedder.DecoderEmbedder; + import org.junit.Test; public class ReplayingDecoderTest { @Test public void testLineProtocol() { - DecoderEmbedder e = new DecoderEmbedder( - new LineDecoder()); + DecoderEmbedder e = new DecoderEmbedder(new LineDecoder()); // Ordinary input e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' })); @@ -49,17 +47,16 @@ public class ReplayingDecoderTest { assertNull(e.poll()); } - private static final class LineDecoder extends ReplayingDecoder { + private static final class LineDecoder extends ReplayingDecoder { LineDecoder() { } @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer, VoidEnum state) throws Exception { - ChannelBuffer msg = buffer.readBytes( - buffer.bytesBefore(ChannelBufferIndexFinder.LF)); - buffer.skipBytes(1); + public ChannelBuffer decode(ChannelInboundHandlerContext ctx, + ChannelBuffer in, VoidEnum state) throws Exception { + ChannelBuffer msg = in.readBytes(in.bytesBefore(ChannelBufferIndexFinder.LF)); + in.skipBytes(1); return msg; } }