Retrofit ReplayingDecoder with the new API

- Moved up to 'codec' from 'codec.replay'
- Test passes with Redis client codec
This commit is contained in:
Trustin Lee 2012-05-17 12:37:37 +09:00
parent 2c99fda7b5
commit 1bf0dfe64a
16 changed files with 154 additions and 357 deletions

View File

@ -22,8 +22,8 @@ import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.replay.ReplayingDecoder;
/** /**
* Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and * Decodes {@link ChannelBuffer}s into {@link HttpMessage}s and

View File

@ -18,9 +18,9 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.replay.ReplayingDecoder; import io.netty.handler.codec.VoidEnum;
import io.netty.handler.codec.replay.VoidEnum;
/** /**
* Decodes {@link ChannelBuffer}s into {@link WebSocketFrame}s. * Decodes {@link ChannelBuffer}s into {@link WebSocketFrame}s.

View File

@ -59,8 +59,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CorruptedFrameException; import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.replay.ReplayingDecoder;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;

View File

@ -57,7 +57,7 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (evt instanceof ChannelStateEvent) { if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt; ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) { switch (e.state()) {
case OPEN: case OPEN:
case CONNECTED: case CONNECTED:
case BOUND: case BOUND:

View File

@ -257,7 +257,7 @@ public class SpdySessionHandler extends SimpleChannelUpstreamHandler
throws Exception { throws Exception {
if (evt instanceof ChannelStateEvent) { if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt; ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) { switch (e.state()) {
case OPEN: case OPEN:
case CONNECTED: case CONNECTED:
case BOUND: case BOUND:

View File

@ -13,11 +13,14 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay; package io.netty.handler.codec;
final class ReplayError extends Error {
class ReplayError extends Error {
private static final long serialVersionUID = 2666698631187527681L; private static final long serialVersionUID = 2666698631187527681L;
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
} }

View File

@ -13,26 +13,21 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay; package io.netty.handler.codec;
import java.net.SocketAddress;
import static io.netty.handler.codec.MessageToMessageEncoder.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.FrameDecoder;
/** /**
* A specialized variation of {@link FrameDecoder} which enables implementation * A specialized variation of {@link StreamToMessageDecoder} which enables implementation
* of a non-blocking decoder in the blocking I/O paradigm. * of a non-blocking decoder in the blocking I/O paradigm.
* <p> * <p>
* The biggest difference between {@link ReplayingDecoder} and * The biggest difference between {@link ReplayingDecoder} and
@ -282,17 +277,16 @@ import io.netty.handler.codec.FrameDecoder;
* the state type; use {@link VoidEnum} if state management is unused * the state type; use {@link VoidEnum} if state management is unused
* *
* @apiviz.landmark * @apiviz.landmark
* @apiviz.has io.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws * @apiviz.has io.netty.handler.codec.UnreplayableOperationException oneway - - throws
*/ */
public abstract class ReplayingDecoder<T extends Enum<T>> public abstract class ReplayingDecoder<O, S extends Enum<S>> extends ChannelInboundHandlerAdapter<Byte> {
extends SimpleChannelUpstreamHandler {
private final ChannelBufferHolder<Byte> in = ChannelBufferHolders.byteBuffer();
private ChannelBuffer cumulation; private final ChannelBuffer cumulation = in.byteBuffer();
private final boolean unfold; private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(cumulation);
private ReplayingDecoderBuffer replayable; private S state;
private T state; private int checkpoint = -1;
private int checkpoint; private volatile boolean inUse;
/** /**
* Creates a new instance with no initial state (i.e: {@code null}). * Creates a new instance with no initial state (i.e: {@code null}).
@ -301,48 +295,34 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
this(null); this(null);
} }
protected ReplayingDecoder(boolean unfold) {
this(null, unfold);
}
/** /**
* Creates a new instance with the specified initial state. * Creates a new instance with the specified initial state.
*/ */
protected ReplayingDecoder(T initialState) { protected ReplayingDecoder(S initialState) {
this(initialState, false);
}
protected ReplayingDecoder(T initialState, boolean unfold) {
this.state = initialState; this.state = initialState;
this.unfold = unfold;
} }
/** /**
* Stores the internal cumulative buffer's reader position. * Stores the internal cumulative buffer's reader position.
*/ */
protected void checkpoint() { protected void checkpoint() {
ChannelBuffer cumulation = this.cumulation; checkpoint = cumulation.readerIndex();
if (cumulation != null) {
checkpoint = cumulation.readerIndex();
} else {
checkpoint = -1; // buffer not available (already cleaned up)
}
} }
/** /**
* Stores the internal cumulative buffer's reader position and updates * Stores the internal cumulative buffer's reader position and updates
* the current decoder state. * the current decoder state.
*/ */
protected void checkpoint(T state) { protected void checkpoint(S state) {
checkpoint(); checkpoint();
setState(state); state(state);
} }
/** /**
* Returns the current state of this decoder. * Returns the current state of this decoder.
* @return the current state of this decoder * @return the current state of this decoder
*/ */
protected T getState() { protected S state() {
return state; return state;
} }
@ -350,41 +330,106 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
* Sets the current state of this decoder. * Sets the current state of this decoder.
* @return the old state of this decoder * @return the old state of this decoder
*/ */
protected T setState(T newState) { protected S state(S newState) {
T oldState = state; S oldState = state;
state = newState; state = newState;
return oldState; return oldState;
} }
/** @Override
* Returns the actual number of readable bytes in the internal cumulative public ChannelBufferHolder<Byte> newInboundBuffer(
* buffer of this decoder. You usually do not need to rely on this value ChannelInboundHandlerContext<Byte> ctx) throws Exception {
* to write a decoder. Use it only when you muse use it at your own risk. if (inUse) {
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. throw new IllegalStateException(
*/ ReplayingDecoder.class.getSimpleName() + " cannot be shared.");
protected int actualReadableBytes() { }
return internalBuffer().readableBytes(); inUse = true;
return in;
} }
/** @Override
* Returns the internal cumulative buffer of this decoder. You usually public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
* do not need to access the internal buffer directly to write a decoder. callDecode(ctx);
* Use it only when you must use it at your own risk. }
*/
protected ChannelBuffer internalBuffer() { @Override
ChannelBuffer buf = this.cumulation; public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
if (buf == null) { replayable.terminate();
return ChannelBuffers.EMPTY_BUFFER; ChannelBuffer in = cumulation;
if (in.readable()) {
callDecode(ctx);
}
try {
if (unfoldAndAdd(ctx, ctx.nextIn(), decodeLast(ctx, replayable, state))) {
in.discardReadBytes();
ctx.fireInboundBufferUpdated();
}
} catch (ReplayError replay) {
// Ignore
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
ctx.fireChannelInactive();
}
private void callDecode(ChannelInboundHandlerContext<Byte> ctx) {
ChannelBuffer in = cumulation;
while (in.readable()) {
try {
int oldReaderIndex = checkpoint = in.readerIndex();
Object result = null;
S oldState = state;
try {
result = decode(ctx, replayable, state);
if (result == null) {
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new IllegalStateException(
"null cannot be returned if no data is consumed and state didn't change.");
} else {
// Previous data has been discarded or caused state transition.
// Probably it is reading on.
continue;
}
}
} catch (ReplayError replay) {
// Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
}
if (result == null) {
// Seems like more data is required.
// Let us wait for the next notification.
break;
}
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new IllegalStateException(
"decode() method must consume at least one byte " +
"if it returned a decoded message (caused by: " +
getClass() + ")");
}
// A successful decode
MessageToMessageEncoder.unfoldAndAdd(ctx, ctx.nextIn(), result);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} }
return buf;
} }
/** /**
* Decodes the received packets so far into a frame. * Decodes the received packets so far into a frame.
* *
* @param ctx the context of this handler * @param ctx the context of this handler
* @param channel the current channel * @param in the cumulative buffer of received packets so far.
* @param buffer the cumulative buffer of received packets so far.
* Note that the buffer might be empty, which means you * Note that the buffer might be empty, which means you
* should not make an assumption that the buffer contains * should not make an assumption that the buffer contains
* at least one byte in your decoder implementation. * at least one byte in your decoder implementation.
@ -392,16 +437,14 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
* *
* @return the decoded frame * @return the decoded frame
*/ */
protected abstract Object decode(ChannelHandlerContext ctx, public abstract O decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in, S state) throws Exception;
Channel channel, ChannelBuffer buffer, T state) throws Exception;
/** /**
* Decodes the received data so far into a frame when the channel is * Decodes the received data so far into a frame when the channel is
* disconnected. * disconnected.
* *
* @param ctx the context of this handler * @param ctx the context of this handler
* @param channel the current channel * @param in the cumulative buffer of received packets so far.
* @param buffer the cumulative buffer of received packets so far.
* Note that the buffer might be empty, which means you * Note that the buffer might be empty, which means you
* should not make an assumption that the buffer contains * should not make an assumption that the buffer contains
* at least one byte in your decoder implementation. * at least one byte in your decoder implementation.
@ -409,207 +452,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
* *
* @return the decoded frame * @return the decoded frame
*/ */
protected Object decodeLast( public O decodeLast(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in, S state) throws Exception {
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception { return decode(ctx, in, state);
return decode(ctx, channel, buffer, state);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}
ChannelBuffer input = (ChannelBuffer) m;
if (!input.readable()) {
return;
}
if (cumulation == null) {
// the cumulation buffer is not created yet so just pass the input
// to callDecode(...) method
this.cumulation = input;
replayable = new ReplayingDecoderBuffer(input);
int oldReaderIndex = input.readerIndex();
int inputSize = input.readableBytes();
callDecode(
ctx, e.channel(),
input, replayable,
e.getRemoteAddress());
if (input.readable()) {
// seems like there is something readable left in the input buffer
// or decoder wants a replay - create the cumulation buffer and
// copy the input into it
ChannelBuffer cumulation;
if (checkpoint > 0) {
int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
cumulation = this.cumulation =
newCumulationBuffer(ctx, bytesToPreserve);
cumulation.writeBytes(input, checkpoint, bytesToPreserve);
} else if (checkpoint == 0) {
cumulation = this.cumulation =
newCumulationBuffer(ctx, inputSize);
cumulation.writeBytes(input, oldReaderIndex, inputSize);
cumulation.readerIndex(input.readerIndex());
} else {
cumulation = this.cumulation =
newCumulationBuffer(ctx, input.readableBytes());
cumulation.writeBytes(input);
}
replayable = new ReplayingDecoderBuffer(cumulation);
} else {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
} else {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) {
cumulation.discardReadBytes();
}
cumulation.writeBytes(input);
callDecode(ctx, e.channel(), cumulation, replayable, e.getRemoteAddress());
if (!cumulation.readable()) {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
}
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void channelClosed(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
ctx.sendUpstream(e);
}
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (input.readable()) {
int oldReaderIndex = checkpoint = input.readerIndex();
Object result = null;
T oldState = state;
try {
result = decode(context, channel, replayableInput, state);
if (result == null) {
if (oldReaderIndex == input.readerIndex() && oldState == state) {
throw new IllegalStateException(
"null cannot be returned if no data is consumed and state didn't change.");
} else {
// Previous data has been discarded or caused state transition.
// Probably it is reading on.
continue;
}
}
} catch (ReplayError replay) {
// Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
input.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
}
if (result == null) {
// Seems like more data is required.
// Let us wait for the next notification.
break;
}
if (oldReaderIndex == input.readerIndex() && oldState == state) {
throw new IllegalStateException(
"decode() method must consume at least one byte " +
"if it returned a decoded message (caused by: " +
getClass() + ")");
}
// A successful decode
unfoldAndFireMessageReceived(context, result, remoteAddress);
}
}
private void unfoldAndFireMessageReceived(
ChannelHandlerContext context, Object result, SocketAddress remoteAddress) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result instanceof Iterable<?>) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) {
return;
}
this.cumulation = null;
replayable.terminate();
if (cumulation != null && cumulation.readable()) {
// Make sure all data was read before notifying a closed channel.
callDecode(ctx, e.channel(), cumulation, replayable, null);
}
// Call decodeLast() finally. Please note that decodeLast() is
// called even if there's nothing more to read from the buffer to
// notify a user that the connection was closed explicitly.
Object partiallyDecoded = decodeLast(ctx, e.channel(), replayable, state);
if (partiallyDecoded != null) {
unfoldAndFireMessageReceived(ctx, partiallyDecoded, null);
}
} catch (ReplayError replay) {
// Ignore
} finally {
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
ctx.sendUpstream(e);
}
}
/**
* Create a new {@link ChannelBuffer} which is used for the cumulation.
* Be aware that this MUST be a dynamic buffer. Sub-classes may override
* this to provide a dynamic {@link ChannelBuffer} which has some
* pre-allocated size that better fit their need.
*
* @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation
*/
protected ChannelBuffer newCumulationBuffer(
ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory();
return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
} }
} }

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay; package io.netty.handler.codec;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;

View File

@ -23,7 +23,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
@Override @Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception { public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
ChannelBuffer in = ctx.in().byteBuffer(); ChannelBuffer in = ctx.in().byteBuffer();
if (!in.readable()) { if (in.readable()) {
callDecode(ctx); callDecode(ctx);
} }

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay; package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay; package io.netty.handler.codec;
/** /**
* A placeholder {@link Enum} which could be specified as a type parameter of * A placeholder {@link Enum} which could be specified as a type parameter of

View File

@ -17,23 +17,23 @@ package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferIndexFinder; import io.netty.buffer.ChannelBufferIndexFinder;
import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.replay.ReplayingDecoder; import io.netty.handler.codec.VoidEnum;
import java.io.IOException; import java.io.IOException;
/** /**
* {@link ReplayingDecoder} which handles Redis protocol * {@link ReplayingDecoder} which handles Redis protocol
* *
* *
*/ */
public class RedisDecoder extends ReplayingDecoder<State> { public class RedisDecoder extends ReplayingDecoder<Reply, VoidEnum> {
private static final char CR = '\r'; private static final char CR = '\r';
private static final char LF = '\n'; private static final char LF = '\n';
private static final char ZERO = '0'; private static final char ZERO = '0';
// We track the current multibulk reply in the case // We track the current multibulk reply in the case
// where we do not get a complete reply in a single // where we do not get a complete reply in a single
// decode invocation. // decode invocation.
@ -42,7 +42,7 @@ public class RedisDecoder extends ReplayingDecoder<State> {
/** /**
* Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer} * Return a byte array which contains only the content of the request. The size of the content is read from the given {@link ChannelBuffer}
* via the {@link #readInteger(ChannelBuffer)} method * via the {@link #readInteger(ChannelBuffer)} method
* *
* @param is the {@link ChannelBuffer} to read from * @param is the {@link ChannelBuffer} to read from
* @throws IOException is thrown if the line-ending is not CRLF * @throws IOException is thrown if the line-ending is not CRLF
*/ */
@ -96,7 +96,7 @@ public class RedisDecoder extends ReplayingDecoder<State> {
} }
@Override @Override
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer, State anEnum) throws Exception { public Reply decode(ChannelInboundHandlerContext<Byte> channelHandlerContext, ChannelBuffer channelBuffer, VoidEnum anEnum) throws Exception {
if (reply != null) { if (reply != null) {
reply.read(this, channelBuffer); reply.read(this, channelBuffer);
Reply ret = reply; Reply ret = reply;
@ -138,7 +138,3 @@ public class RedisDecoder extends ReplayingDecoder<State> {
return reply; return reply;
} }
} }
enum State {
}

View File

@ -16,48 +16,34 @@
package io.netty.handler.codec.redis; package io.netty.handler.codec.redis;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.Channels; import io.netty.handler.codec.MessageToStreamEncoder;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelDownstreamHandler;
/** /**
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s * {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
*/ */
@Sharable @Sharable
public class RedisEncoder extends SimpleChannelDownstreamHandler { public class RedisEncoder extends MessageToStreamEncoder<Object> {
@Override @Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
Object o = e.getMessage(); Object o = msg;
if (o instanceof Command) { if (o instanceof Command) {
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();
Command command = (Command) o; Command command = (Command) o;
command.write(cb); command.write(out);
Channels.write(ctx, future, cb);
} else if (o instanceof Iterable) { } else if (o instanceof Iterable) {
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();
// Useful for transactions and database select // Useful for transactions and database select
for (Object i : (Iterable<?>) o) { for (Object i : (Iterable<?>) o) {
if (i instanceof Command) { if (i instanceof Command) {
Command command = (Command) i; Command command = (Command) i;
command.write(cb); command.write(out);
} else { } else {
super.writeRequested(ctx, e); break;
return;
} }
} }
Channels.write(ctx, future, cb);
} else { } else {
super.writeRequested(ctx, e); throw new IllegalArgumentException("unsupported message type: " + msg.getClass().getName());
} }
} }
} }

View File

@ -1,28 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Specialized variation of {@link io.netty.handler.codec.FrameDecoder}
* which enables implementation of a non-blocking decoder in the blocking I/O
* paradigm.
*
* @apiviz.exclude ^java\.lang\.
* @apiviz.exclude \.SimpleChannelUpstreamHandler$
* @apiviz.exclude \.VoidEnum$
* @apiviz.exclude \.codec\.(?!replay)[a-z0-9]+\.
*/
package io.netty.handler.codec.replay;

View File

@ -24,7 +24,7 @@ import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferInputStream; import io.netty.buffer.ChannelBufferInputStream;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.replay.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
/** /**
* A decoder which deserializes the received {@link ChannelBuffer}s into Java * A decoder which deserializes the received {@link ChannelBuffer}s into Java

View File

@ -13,24 +13,22 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay; package io.netty.handler.codec;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferIndexFinder; import io.netty.buffer.ChannelBufferIndexFinder;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.handler.codec.embedder.DecoderEmbedder;
import org.junit.Test; import org.junit.Test;
public class ReplayingDecoderTest { public class ReplayingDecoderTest {
@Test @Test
public void testLineProtocol() { public void testLineProtocol() {
DecoderEmbedder<ChannelBuffer> e = new DecoderEmbedder<ChannelBuffer>( DecoderEmbedder<ChannelBuffer> e = new DecoderEmbedder<ChannelBuffer>(new LineDecoder());
new LineDecoder());
// Ordinary input // Ordinary input
e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' })); e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' }));
@ -49,17 +47,16 @@ public class ReplayingDecoderTest {
assertNull(e.poll()); assertNull(e.poll());
} }
private static final class LineDecoder extends ReplayingDecoder<VoidEnum> { private static final class LineDecoder extends ReplayingDecoder<ChannelBuffer, VoidEnum> {
LineDecoder() { LineDecoder() {
} }
@Override @Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx,
ChannelBuffer buffer, VoidEnum state) throws Exception { ChannelBuffer in, VoidEnum state) throws Exception {
ChannelBuffer msg = buffer.readBytes( ChannelBuffer msg = in.readBytes(in.bytesBefore(ChannelBufferIndexFinder.LF));
buffer.bytesBefore(ChannelBufferIndexFinder.LF)); in.skipBytes(1);
buffer.skipBytes(1);
return msg; return msg;
} }
} }