diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java
index 0c5c962493..95b9ed0ea8 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpMessageDecoder.java
@@ -22,8 +22,8 @@ import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.replay.ReplayingDecoder;
/**
* Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java
index df008eed42..37d8c014d5 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameDecoder.java
@@ -18,9 +18,9 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.replay.ReplayingDecoder;
-import io.netty.handler.codec.replay.VoidEnum;
+import io.netty.handler.codec.VoidEnum;
/**
* Decodes {@link ChannelBuffer}s into {@link WebSocketFrame}s.
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java
index 2aca19d07a..85a3a427b2 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket08FrameDecoder.java
@@ -59,8 +59,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.replay.ReplayingDecoder;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java
index 878dacf405..4b8e95393a 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java
@@ -57,7 +57,7 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
- switch (e.getState()) {
+ switch (e.state()) {
case OPEN:
case CONNECTED:
case BOUND:
diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java
index 95bb053625..3ed64fef5a 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java
@@ -257,7 +257,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
- switch (e.getState()) {
+ switch (e.state()) {
case OPEN:
case CONNECTED:
case BOUND:
diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayError.java b/codec/src/main/java/io/netty/handler/codec/ReplayError.java
similarity index 80%
rename from codec/src/main/java/io/netty/handler/codec/replay/ReplayError.java
rename to codec/src/main/java/io/netty/handler/codec/ReplayError.java
index 42aabe8548..a523bf060b 100644
--- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayError.java
+++ b/codec/src/main/java/io/netty/handler/codec/ReplayError.java
@@ -13,11 +13,14 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.netty.handler.codec.replay;
+package io.netty.handler.codec;
-
-class ReplayError extends Error {
+final class ReplayError extends Error {
private static final long serialVersionUID = 2666698631187527681L;
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
}
diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java
similarity index 54%
rename from codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java
rename to codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java
index f017f5c8bc..54ecb00e62 100644
--- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java
@@ -13,26 +13,21 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.netty.handler.codec.replay;
-
-import java.net.SocketAddress;
+package io.netty.handler.codec;
+import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.buffer.ChannelBuffer;
-import io.netty.buffer.ChannelBufferFactory;
-import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelBufferHolder;
+import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelStateEvent;
-import io.netty.channel.Channels;
-import io.netty.channel.ExceptionEvent;
-import io.netty.channel.MessageEvent;
-import io.netty.channel.SimpleChannelUpstreamHandler;
-import io.netty.handler.codec.FrameDecoder;
/**
- * A specialized variation of {@link FrameDecoder} which enables implementation
+ * A specialized variation of {@link StreamToMessageDecoder} which enables implementation
* of a non-blocking decoder in the blocking I/O paradigm.
*
* The biggest difference between {@link ReplayingDecoder} and
@@ -282,17 +277,16 @@ import io.netty.handler.codec.FrameDecoder;
* the state type; use {@link VoidEnum} if state management is unused
*
* @apiviz.landmark
- * @apiviz.has io.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
+ * @apiviz.has io.netty.handler.codec.UnreplayableOperationException oneway - - throws
*/
-public abstract class ReplayingDecoder>
- extends SimpleChannelUpstreamHandler {
+public abstract class ReplayingDecoder> extends ChannelInboundHandlerAdapter {
-
- private ChannelBuffer cumulation;
- private final boolean unfold;
- private ReplayingDecoderBuffer replayable;
- private T state;
- private int checkpoint;
+ private final ChannelBufferHolder in = ChannelBufferHolders.byteBuffer();
+ private final ChannelBuffer cumulation = in.byteBuffer();
+ private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(cumulation);
+ private S state;
+ private int checkpoint = -1;
+ private volatile boolean inUse;
/**
* Creates a new instance with no initial state (i.e: {@code null}).
@@ -301,48 +295,34 @@ public abstract class ReplayingDecoder>
this(null);
}
- protected ReplayingDecoder(boolean unfold) {
- this(null, unfold);
- }
-
/**
* Creates a new instance with the specified initial state.
*/
- protected ReplayingDecoder(T initialState) {
- this(initialState, false);
- }
-
- protected ReplayingDecoder(T initialState, boolean unfold) {
+ protected ReplayingDecoder(S initialState) {
this.state = initialState;
- this.unfold = unfold;
}
/**
* Stores the internal cumulative buffer's reader position.
*/
protected void checkpoint() {
- ChannelBuffer cumulation = this.cumulation;
- if (cumulation != null) {
- checkpoint = cumulation.readerIndex();
- } else {
- checkpoint = -1; // buffer not available (already cleaned up)
- }
+ checkpoint = cumulation.readerIndex();
}
/**
* Stores the internal cumulative buffer's reader position and updates
* the current decoder state.
*/
- protected void checkpoint(T state) {
+ protected void checkpoint(S state) {
checkpoint();
- setState(state);
+ state(state);
}
/**
* Returns the current state of this decoder.
* @return the current state of this decoder
*/
- protected T getState() {
+ protected S state() {
return state;
}
@@ -350,41 +330,106 @@ public abstract class ReplayingDecoder>
* Sets the current state of this decoder.
* @return the old state of this decoder
*/
- protected T setState(T newState) {
- T oldState = state;
+ protected S state(S newState) {
+ S oldState = state;
state = newState;
return oldState;
}
- /**
- * Returns the actual number of readable bytes in the internal cumulative
- * buffer of this decoder. You usually do not need to rely on this value
- * to write a decoder. Use it only when you muse use it at your own risk.
- * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
- */
- protected int actualReadableBytes() {
- return internalBuffer().readableBytes();
+ @Override
+ public ChannelBufferHolder newInboundBuffer(
+ ChannelInboundHandlerContext ctx) throws Exception {
+ if (inUse) {
+ throw new IllegalStateException(
+ ReplayingDecoder.class.getSimpleName() + " cannot be shared.");
+ }
+ inUse = true;
+ return in;
}
- /**
- * Returns the internal cumulative buffer of this decoder. You usually
- * do not need to access the internal buffer directly to write a decoder.
- * Use it only when you must use it at your own risk.
- */
- protected ChannelBuffer internalBuffer() {
- ChannelBuffer buf = this.cumulation;
- if (buf == null) {
- return ChannelBuffers.EMPTY_BUFFER;
+ @Override
+ public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception {
+ callDecode(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception {
+ replayable.terminate();
+ ChannelBuffer in = cumulation;
+ if (in.readable()) {
+ callDecode(ctx);
+ }
+
+ try {
+ if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable, state))) {
+ in.discardReadBytes();
+ ctx.fireInboundBufferUpdated();
+ }
+ } catch (ReplayError replay) {
+ // Ignore
+ } catch (Throwable t) {
+ ctx.fireExceptionCaught(t);
+ }
+
+ ctx.fireChannelInactive();
+ }
+
+ private void callDecode(ChannelInboundHandlerContext ctx) {
+ ChannelBuffer in = cumulation;
+ while (in.readable()) {
+ try {
+ int oldReaderIndex = checkpoint = in.readerIndex();
+ Object result = null;
+ S oldState = state;
+ try {
+ result = decode(ctx, replayable, state);
+ if (result == null) {
+ if (oldReaderIndex == in.readerIndex() && oldState == state) {
+ throw new IllegalStateException(
+ "null cannot be returned if no data is consumed and state didn't change.");
+ } else {
+ // Previous data has been discarded or caused state transition.
+ // Probably it is reading on.
+ continue;
+ }
+ }
+ } catch (ReplayError replay) {
+ // Return to the checkpoint (or oldPosition) and retry.
+ int checkpoint = this.checkpoint;
+ if (checkpoint >= 0) {
+ in.readerIndex(checkpoint);
+ } else {
+ // Called by cleanup() - no need to maintain the readerIndex
+ // anymore because the buffer has been released already.
+ }
+ }
+
+ if (result == null) {
+ // Seems like more data is required.
+ // Let us wait for the next notification.
+ break;
+ }
+
+ if (oldReaderIndex == in.readerIndex() && oldState == state) {
+ throw new IllegalStateException(
+ "decode() method must consume at least one byte " +
+ "if it returned a decoded message (caused by: " +
+ getClass() + ")");
+ }
+
+ // A successful decode
+ MessageToMessageEncoder.unfoldAndAdd(ctx, ctx.nextIn(), result);
+ } catch (Throwable t) {
+ ctx.fireExceptionCaught(t);
+ }
}
- return buf;
}
/**
* Decodes the received packets so far into a frame.
*
* @param ctx the context of this handler
- * @param channel the current channel
- * @param buffer the cumulative buffer of received packets so far.
+ * @param in the cumulative buffer of received packets so far.
* Note that the buffer might be empty, which means you
* should not make an assumption that the buffer contains
* at least one byte in your decoder implementation.
@@ -392,16 +437,14 @@ public abstract class ReplayingDecoder>
*
* @return the decoded frame
*/
- protected abstract Object decode(ChannelHandlerContext ctx,
- Channel channel, ChannelBuffer buffer, T state) throws Exception;
+ public abstract O decode(ChannelInboundHandlerContext ctx, ChannelBuffer in, S state) throws Exception;
/**
* Decodes the received data so far into a frame when the channel is
* disconnected.
*
* @param ctx the context of this handler
- * @param channel the current channel
- * @param buffer the cumulative buffer of received packets so far.
+ * @param in the cumulative buffer of received packets so far.
* Note that the buffer might be empty, which means you
* should not make an assumption that the buffer contains
* at least one byte in your decoder implementation.
@@ -409,207 +452,7 @@ public abstract class ReplayingDecoder>
*
* @return the decoded frame
*/
- protected Object decodeLast(
- ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception {
- return decode(ctx, channel, buffer, state);
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
-
- Object m = e.getMessage();
- if (!(m instanceof ChannelBuffer)) {
- ctx.sendUpstream(e);
- return;
- }
-
- ChannelBuffer input = (ChannelBuffer) m;
- if (!input.readable()) {
- return;
- }
-
- if (cumulation == null) {
- // the cumulation buffer is not created yet so just pass the input
- // to callDecode(...) method
- this.cumulation = input;
- replayable = new ReplayingDecoderBuffer(input);
-
- int oldReaderIndex = input.readerIndex();
- int inputSize = input.readableBytes();
- callDecode(
- ctx, e.channel(),
- input, replayable,
- e.getRemoteAddress());
-
- if (input.readable()) {
- // seems like there is something readable left in the input buffer
- // or decoder wants a replay - create the cumulation buffer and
- // copy the input into it
- ChannelBuffer cumulation;
- if (checkpoint > 0) {
- int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
- cumulation = this.cumulation =
- newCumulationBuffer(ctx, bytesToPreserve);
- cumulation.writeBytes(input, checkpoint, bytesToPreserve);
- } else if (checkpoint == 0) {
- cumulation = this.cumulation =
- newCumulationBuffer(ctx, inputSize);
- cumulation.writeBytes(input, oldReaderIndex, inputSize);
- cumulation.readerIndex(input.readerIndex());
-
- } else {
- cumulation = this.cumulation =
- newCumulationBuffer(ctx, input.readableBytes());
- cumulation.writeBytes(input);
- }
- replayable = new ReplayingDecoderBuffer(cumulation);
- } else {
- this.cumulation = null;
- replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
- }
- } else {
- ChannelBuffer cumulation = this.cumulation;
- assert cumulation.readable();
- if (cumulation.writableBytes() < input.readableBytes()) {
- cumulation.discardReadBytes();
- }
- cumulation.writeBytes(input);
- callDecode(ctx, e.channel(), cumulation, replayable, e.getRemoteAddress());
- if (!cumulation.readable()) {
- this.cumulation = null;
- replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
- }
- }
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- cleanup(ctx, e);
- }
-
- @Override
- public void channelClosed(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- cleanup(ctx, e);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- ctx.sendUpstream(e);
- }
-
- private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
- while (input.readable()) {
- int oldReaderIndex = checkpoint = input.readerIndex();
- Object result = null;
- T oldState = state;
- try {
- result = decode(context, channel, replayableInput, state);
- if (result == null) {
- if (oldReaderIndex == input.readerIndex() && oldState == state) {
- throw new IllegalStateException(
- "null cannot be returned if no data is consumed and state didn't change.");
- } else {
- // Previous data has been discarded or caused state transition.
- // Probably it is reading on.
- continue;
- }
- }
- } catch (ReplayError replay) {
- // Return to the checkpoint (or oldPosition) and retry.
- int checkpoint = this.checkpoint;
- if (checkpoint >= 0) {
- input.readerIndex(checkpoint);
- } else {
- // Called by cleanup() - no need to maintain the readerIndex
- // anymore because the buffer has been released already.
- }
- }
-
- if (result == null) {
- // Seems like more data is required.
- // Let us wait for the next notification.
- break;
- }
-
- if (oldReaderIndex == input.readerIndex() && oldState == state) {
- throw new IllegalStateException(
- "decode() method must consume at least one byte " +
- "if it returned a decoded message (caused by: " +
- getClass() + ")");
- }
-
- // A successful decode
- unfoldAndFireMessageReceived(context, result, remoteAddress);
- }
- }
-
- private void unfoldAndFireMessageReceived(
- ChannelHandlerContext context, Object result, SocketAddress remoteAddress) {
- if (unfold) {
- if (result instanceof Object[]) {
- for (Object r: (Object[]) result) {
- Channels.fireMessageReceived(context, r, remoteAddress);
- }
- } else if (result instanceof Iterable>) {
- for (Object r: (Iterable>) result) {
- Channels.fireMessageReceived(context, r, remoteAddress);
- }
- } else {
- Channels.fireMessageReceived(context, result, remoteAddress);
- }
- } else {
- Channels.fireMessageReceived(context, result, remoteAddress);
- }
- }
-
- private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- try {
- ChannelBuffer cumulation = this.cumulation;
- if (cumulation == null) {
- return;
- }
-
- this.cumulation = null;
- replayable.terminate();
-
- if (cumulation != null && cumulation.readable()) {
- // Make sure all data was read before notifying a closed channel.
- callDecode(ctx, e.channel(), cumulation, replayable, null);
- }
-
- // Call decodeLast() finally. Please note that decodeLast() is
- // called even if there's nothing more to read from the buffer to
- // notify a user that the connection was closed explicitly.
- Object partiallyDecoded = decodeLast(ctx, e.channel(), replayable, state);
- if (partiallyDecoded != null) {
- unfoldAndFireMessageReceived(ctx, partiallyDecoded, null);
- }
- } catch (ReplayError replay) {
- // Ignore
- } finally {
- replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
- ctx.sendUpstream(e);
- }
- }
-
- /**
- * Create a new {@link ChannelBuffer} which is used for the cumulation.
- * Be aware that this MUST be a dynamic buffer. Sub-classes may override
- * this to provide a dynamic {@link ChannelBuffer} which has some
- * pre-allocated size that better fit their need.
- *
- * @param ctx {@link ChannelHandlerContext} for this handler
- * @return buffer the {@link ChannelBuffer} which is used for cumulation
- */
- protected ChannelBuffer newCumulationBuffer(
- ChannelHandlerContext ctx, int minimumCapacity) {
- ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory();
- return ChannelBuffers.dynamicBuffer(
- factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
+ public O decodeLast(ChannelInboundHandlerContext ctx, ChannelBuffer in, S state) throws Exception {
+ return decode(ctx, in, state);
}
}
diff --git a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java
similarity index 99%
rename from codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoderBuffer.java
rename to codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java
index b258af7185..379e0ea41b 100644
--- a/codec/src/main/java/io/netty/handler/codec/replay/ReplayingDecoderBuffer.java
+++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.netty.handler.codec.replay;
+package io.netty.handler.codec;
import java.io.IOException;
import java.io.InputStream;
diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java
index 06ec9c3da7..f40236c99e 100644
--- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java
@@ -23,7 +23,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda
@Override
public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception {
ChannelBuffer in = ctx.in().byteBuffer();
- if (!in.readable()) {
+ if (in.readable()) {
callDecode(ctx);
}
diff --git a/codec/src/main/java/io/netty/handler/codec/replay/UnreplayableOperationException.java b/codec/src/main/java/io/netty/handler/codec/UnreplayableOperationException.java
similarity index 97%
rename from codec/src/main/java/io/netty/handler/codec/replay/UnreplayableOperationException.java
rename to codec/src/main/java/io/netty/handler/codec/UnreplayableOperationException.java
index daf1250937..e911c9e525 100644
--- a/codec/src/main/java/io/netty/handler/codec/replay/UnreplayableOperationException.java
+++ b/codec/src/main/java/io/netty/handler/codec/UnreplayableOperationException.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.netty.handler.codec.replay;
+package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer;
diff --git a/codec/src/main/java/io/netty/handler/codec/replay/VoidEnum.java b/codec/src/main/java/io/netty/handler/codec/VoidEnum.java
similarity index 95%
rename from codec/src/main/java/io/netty/handler/codec/replay/VoidEnum.java
rename to codec/src/main/java/io/netty/handler/codec/VoidEnum.java
index e0b35cb61f..fa620f13c4 100644
--- a/codec/src/main/java/io/netty/handler/codec/replay/VoidEnum.java
+++ b/codec/src/main/java/io/netty/handler/codec/VoidEnum.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.netty.handler.codec.replay;
+package io.netty.handler.codec;
/**
* A placeholder {@link Enum} which could be specified as a type parameter of
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
index 9bca7d6afc..0bafe413b6 100644
--- a/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
@@ -17,23 +17,23 @@ package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferIndexFinder;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.replay.ReplayingDecoder;
+import io.netty.channel.ChannelInboundHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+import io.netty.handler.codec.VoidEnum;
import java.io.IOException;
/**
* {@link ReplayingDecoder} which handles Redis protocol
- *
+ *
*
*/
-public class RedisDecoder extends ReplayingDecoder {
+public class RedisDecoder extends ReplayingDecoder {
private static final char CR = '\r';
private static final char LF = '\n';
private static final char ZERO = '0';
-
+
// We track the current multibulk reply in the case
// where we do not get a complete reply in a single
// decode invocation.
@@ -42,7 +42,7 @@ public class RedisDecoder extends ReplayingDecoder {
/**
* Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
* via the {@link #readInteger(ChannelBuffer)} method
- *
+ *
* @param is the {@link ChannelBuffer} to read from
* @throws IOException is thrown if the line-ending is not CRLF
*/
@@ -96,7 +96,7 @@ public class RedisDecoder extends ReplayingDecoder {
}
@Override
- protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception {
+ public Reply decode(ChannelInboundHandlerContext channelHandlerContext, ChannelBuffer channelBuffer, VoidEnum anEnum) throws Exception {
if (reply != null) {
reply.read(this, channelBuffer);
Reply ret = reply;
@@ -138,7 +138,3 @@ public class RedisDecoder extends ReplayingDecoder {
return reply;
}
}
-
-enum State {
-
-}
diff --git a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java
index 2adaffca3e..060f7bca65 100644
--- a/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java
@@ -16,48 +16,34 @@
package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer;
-import io.netty.buffer.ChannelBuffers;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.Channels;
-import io.netty.channel.MessageEvent;
-import io.netty.channel.SimpleChannelDownstreamHandler;
+import io.netty.channel.ChannelOutboundHandlerContext;
+import io.netty.handler.codec.MessageToStreamEncoder;
/**
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
*/
@Sharable
-public class RedisEncoder extends SimpleChannelDownstreamHandler {
+public class RedisEncoder extends MessageToStreamEncoder