diff --git a/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java index 636336b559..d5f6ccfc90 100644 --- a/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/DelimiterBasedFrameDecoder.java @@ -55,7 +55,7 @@ import io.netty.channel.ChannelInboundHandlerContext; * * @apiviz.uses io.netty.handler.codec.frame.Delimiters - - useful */ -public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder { +public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder { private final ChannelBuffer[] delimiters; private final int maxFrameLength; @@ -187,7 +187,7 @@ public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder ctx, ChannelBuffer buffer) throws Exception { + public Object decode(ChannelInboundHandlerContext ctx, ChannelBuffer buffer) throws Exception { // Try all delimiters and choose the delimiter which yields the shortest frame. int minFrameLength = Integer.MAX_VALUE; ChannelBuffer minDelim = null; diff --git a/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java index a8eb60adfa..dfac1f07c4 100644 --- a/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/FixedLengthFrameDecoder.java @@ -37,7 +37,7 @@ import io.netty.channel.ChannelInboundHandlerContext; * +-----+-----+-----+ * */ -public class FixedLengthFrameDecoder extends StreamToMessageDecoder { +public class FixedLengthFrameDecoder extends StreamToMessageDecoder { private final int frameLength; private final boolean allocateFullBuffer; @@ -75,7 +75,7 @@ public class FixedLengthFrameDecoder extends StreamToMessageDecoder ctx, ChannelBuffer in) throws Exception { + public Object decode(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { diff --git a/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java index b1a4c7cc58..60700ce130 100644 --- a/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/LengthFieldBasedFrameDecoder.java @@ -180,7 +180,7 @@ import io.netty.handler.codec.serialization.ObjectDecoder; * * @see LengthFieldPrepender */ -public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder { +public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder { private final int maxFrameLength; private final int lengthFieldOffset; @@ -308,7 +308,7 @@ public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder ctx, ChannelBuffer in) throws Exception { + public Object decode(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes()); diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index f40236c99e..2e51d26717 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -45,7 +45,22 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda boolean decoded = false; for (;;) { try { - if (unfoldAndAdd(ctx, ctx.nextIn(), decode(ctx, in))) { + int oldInputLength = in.readableBytes(); + O o = decode(ctx, in); + if (o == null) { + if (oldInputLength == in.readableBytes()) { + break; + } else { + continue; + } + } else { + if (oldInputLength == in.readableBytes()) { + throw new IllegalStateException( + "decode() did not read anything but decoded a message."); + } + } + + if (unfoldAndAdd(ctx, ctx.nextIn(), o)) { decoded = true; } else { break; diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 9d30684bac..07db4691e7 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -18,7 +18,6 @@ package io.netty.handler.codec.embedder; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerContext; @@ -84,14 +83,11 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { @SuppressWarnings("unchecked") private E product(Object p) { + if (p instanceof CodecEmbedderException) { + throw (CodecEmbedderException) p; + } if (p instanceof Throwable) { - if (p instanceof RuntimeException) { - throw (RuntimeException) p; - } - if (p instanceof Error) { - throw (Error) p; - } - throw new ChannelException((Throwable) p); + throw new CodecEmbedderException((Throwable) p); } return (E) p; } @@ -157,6 +153,11 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { return ChannelBufferHolders.messageBuffer(productQueue); } + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + // NOOP + } + @Override public void exceptionCaught(ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { productQueue.add(cause); diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java index cf73447e61..b798736b45 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java @@ -37,7 +37,7 @@ import com.google.protobuf.CodedInputStream; * * @see com.google.protobuf.CodedInputStream */ -public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder { +public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder { // TODO maxFrameLength + safe skip + fail-fast option // (just like LengthFieldBasedFrameDecoder) @@ -49,7 +49,7 @@ public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder ctx, ChannelBuffer in) throws Exception { + public Object decode(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception { in.markReaderIndex(); final byte[] buf = new byte[5]; for (int i = 0; i < buf.length; i ++) { diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java deleted file mode 100644 index d1717d7786..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoder.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.serialization; - -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamConstants; - -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferInputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ReplayingDecoder; - -/** - * A decoder which deserializes the received {@link ChannelBuffer}s into Java - * objects (interoperability version). - *

- * This decoder is interoperable with the standard Java object - * streams such as {@link ObjectInputStream} and {@link ObjectOutputStream}. - *

- * However, this decoder might perform worse than {@link ObjectDecoder} if - * the serialized object is big and complex. Also, it does not limit the - * maximum size of the object, and consequently your application might face - * the risk of DoS attack. - * Please use {@link ObjectEncoder} and {@link ObjectDecoder} if you are not - * required to keep the interoperability with the standard object streams. - * - * @deprecated This decoder has a known critical bug which fails to decode and - * raises a random exception in some circumstances. Avoid to use - * it whenever you can. The only workaround is to replace - * {@link CompatibleObjectEncoder}, {@link CompatibleObjectDecoder}, - * {@link ObjectInputStream}, and {@link ObjectOutputStream} with - * {@link ObjectEncoder}, {@link ObjectDecoder}, - * {@link ObjectEncoderOutputStream}, and - * {@link ObjectDecoderInputStream} respectively. This workaround - * requires both a client and a server to be modified. - */ -@Deprecated -public class CompatibleObjectDecoder extends ReplayingDecoder { - - private final SwitchableInputStream bin = new SwitchableInputStream(); - private ObjectInputStream oin; - - /** - * Creates a new decoder. - */ - public CompatibleObjectDecoder() { - super(CompatibleObjectDecoderState.READ_HEADER); - } - - /** - * Creates a new {@link ObjectInputStream} which wraps the specified - * {@link InputStream}. Override this method to use a subclass of the - * {@link ObjectInputStream}. - */ - protected ObjectInputStream newObjectInputStream(InputStream in) throws Exception { - return new ObjectInputStream(in); - } - - @Override - protected Object decode( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, CompatibleObjectDecoderState state) throws Exception { - bin.switchStream(new ChannelBufferInputStream(buffer)); - switch (state) { - case READ_HEADER: - oin = newObjectInputStream(bin); - checkpoint(CompatibleObjectDecoderState.READ_OBJECT); - case READ_OBJECT: - return oin.readObject(); - default: - throw new IllegalStateException("Unknown state: " + state); - } - } - - @Override - protected Object decodeLast(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer, CompatibleObjectDecoderState state) - throws Exception { - switch (buffer.readableBytes()) { - case 0: - return null; - case 1: - // Ignore the last TC_RESET - if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) { - buffer.skipBytes(1); - oin.close(); - return null; - } - } - - Object decoded = decode(ctx, channel, buffer, state); - oin.close(); - return decoded; - } -} diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoderState.java b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoderState.java deleted file mode 100644 index a98fbc99cc..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectDecoderState.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.serialization; - -enum CompatibleObjectDecoderState { - READ_HEADER, - READ_OBJECT, -} diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java index 05b2b9d2a9..e4bcf65ae3 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java @@ -15,18 +15,16 @@ */ package io.netty.handler.codec.serialization; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferOutputStream; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToStreamEncoder; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicReference; - -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.buffer.ChannelBufferOutputStream; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.oneone.OneToOneEncoder; /** * An encoder which serializes a Java object into a {@link ChannelBuffer} @@ -35,12 +33,13 @@ import io.netty.handler.codec.oneone.OneToOneEncoder; * This encoder is interoperable with the standard Java object streams such as * {@link ObjectInputStream} and {@link ObjectOutputStream}. */ -public class CompatibleObjectEncoder extends OneToOneEncoder { +public class CompatibleObjectEncoder extends MessageToStreamEncoder { + + private static final AttributeKey OOS = + new AttributeKey( + CompatibleObjectEncoder.class.getName() + ".oos", ObjectOutputStream.class); - private final AtomicReference buffer = - new AtomicReference(); private final int resetInterval; - private volatile ObjectOutputStream oout; private int writtenObjects; /** @@ -77,44 +76,31 @@ public class CompatibleObjectEncoder extends OneToOneEncoder { } @Override - protected Object encode(ChannelHandlerContext context, Channel channel, Object msg) throws Exception { - ChannelBuffer buffer = buffer(context); - ObjectOutputStream oout = this.oout; - if (resetInterval != 0) { - // Resetting will prevent OOM on the receiving side. - writtenObjects ++; - if (writtenObjects % resetInterval == 0) { - oout.reset(); - - // Also discard the byproduct to avoid OOM on the sending side. - buffer.discardReadBytes(); + public void encode(ChannelOutboundHandlerContext ctx, Object msg, ChannelBuffer out) throws Exception { + Attribute oosAttr = ctx.attr(OOS); + ObjectOutputStream oos = oosAttr.get(); + if (oos == null) { + oos = newObjectOutputStream(new ChannelBufferOutputStream(out)); + ObjectOutputStream newOos = oosAttr.setIfAbsent(oos); + if (newOos != null) { + oos = newOos; } } - oout.writeObject(msg); - oout.flush(); - return buffer.readBytes(buffer.readableBytes()); - } + synchronized (oos) { + if (resetInterval != 0) { + // Resetting will prevent OOM on the receiving side. + writtenObjects ++; + if (writtenObjects % resetInterval == 0) { + oos.reset(); - private ChannelBuffer buffer(ChannelHandlerContext ctx) throws Exception { - ChannelBuffer buf = buffer.get(); - if (buf == null) { - ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory(); - buf = ChannelBuffers.dynamicBuffer(factory); - if (buffer.compareAndSet(null, buf)) { - boolean success = false; - try { - oout = newObjectOutputStream(new ChannelBufferOutputStream(buf)); - success = true; - } finally { - if (!success) { - oout = null; - } + // Also discard the byproduct to avoid OOM on the sending side. + out.discardReadBytes(); } - } else { - buf = buffer.get(); } + + oos.writeObject(msg); + oos.flush(); } - return buf; } } diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java index 1cf322cf5a..6792835891 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java @@ -15,15 +15,14 @@ */ package io.netty.handler.codec.serialization; -import java.io.ObjectOutputStream; -import java.io.StreamCorruptedException; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBufferInputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import java.io.ObjectOutputStream; +import java.io.StreamCorruptedException; + /** * A decoder which deserializes the received {@link ChannelBuffer}s into Java * objects. @@ -44,40 +43,12 @@ public class ObjectDecoder extends LengthFieldBasedFrameDecoder { * bytes. If the size of the received object is greater than * {@code 1048576} bytes, a {@link StreamCorruptedException} will be * raised. - * - * @deprecated use {@link #ObjectDecoder(ClassResolver)} - */ - @Deprecated - public ObjectDecoder() { - this(1048576); - } - - - /** - * Creates a new decoder whose maximum object size is {@code 1048576} - * bytes. If the size of the received object is greater than - * {@code 1048576} bytes, a {@link StreamCorruptedException} will be - * raised. - * + * * @param classResolver the {@link ClassResolver} to use for this decoder */ public ObjectDecoder(ClassResolver classResolver) { this(1048576, classResolver); } - - /** - * Creates a new decoder with the specified maximum object size. - * - * @param maxObjectSize the maximum byte length of the serialized object. - * if the length of the received object is greater - * than this value, {@link StreamCorruptedException} - * will be raised. - * @deprecated use {@link #ObjectDecoder(int, ClassResolver)} - */ - @Deprecated - public ObjectDecoder(int maxObjectSize) { - this(maxObjectSize, ClassResolvers.weakCachingResolver(null)); - } /** * Creates a new decoder with the specified maximum object size. @@ -94,27 +65,9 @@ public class ObjectDecoder extends LengthFieldBasedFrameDecoder { this.classResolver = classResolver; } - - /** - * Create a new decoder with the specified maximum object size and the {@link ClassLoader} wrapped in {@link ClassResolvers#weakCachingResolver(ClassLoader)} - * - * @param maxObjectSize the maximum byte length of the serialized object. - * if the length of the received object is greater - * than this value, {@link StreamCorruptedException} - * will be raised. - * @param classLoader the the classloader to use - * @deprecated use {@link #ObjectDecoder(int, ClassResolver)} - */ - @Deprecated - public ObjectDecoder(int maxObjectSize, ClassLoader classLoader) { - this(maxObjectSize, ClassResolvers.weakCachingResolver(classLoader)); - } - @Override - protected Object decode( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { - - ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer); + public Object decode(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception { + ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, in); if (frame == null) { return null; } diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java index dc28fa61c3..8c3515a739 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java @@ -15,18 +15,15 @@ */ package io.netty.handler.codec.serialization; -import static io.netty.buffer.ChannelBuffers.*; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferOutputStream; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.handler.codec.MessageToStreamEncoder; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.handler.codec.oneone.OneToOneEncoder; - /** * An encoder which serializes a Java object into a {@link ChannelBuffer}. *

@@ -38,7 +35,7 @@ import io.netty.handler.codec.oneone.OneToOneEncoder; * @apiviz.has io.netty.handler.codec.serialization.ObjectEncoderOutputStream - - - compatible with */ @Sharable -public class ObjectEncoder extends OneToOneEncoder { +public class ObjectEncoder extends MessageToStreamEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; private final int estimatedLength; @@ -70,18 +67,18 @@ public class ObjectEncoder extends OneToOneEncoder { } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - ChannelBufferOutputStream bout = - new ChannelBufferOutputStream(dynamicBuffer( - estimatedLength, ctx.channel().getConfig().getBufferFactory())); + public void encode(ChannelOutboundHandlerContext ctx, Object msg, ChannelBuffer out) throws Exception { + int startIdx = out.writerIndex(); + + ChannelBufferOutputStream bout = new ChannelBufferOutputStream(out); bout.write(LENGTH_PLACEHOLDER); ObjectOutputStream oout = new CompactObjectOutputStream(bout); oout.writeObject(msg); oout.flush(); oout.close(); - ChannelBuffer encoded = bout.buffer(); - encoded.setInt(0, encoded.writerIndex() - 4); - return encoded; + int endIdx = out.writerIndex(); + + out.setInt(startIdx, endIdx - startIdx - 4); } } diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java index a76f8d03db..b2ab0273a2 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringDecoder.java @@ -15,18 +15,16 @@ */ package io.netty.handler.codec.string; -import java.nio.charset.Charset; - import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.MessageEvent; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; -import io.netty.handler.codec.FrameDecoder; -import io.netty.handler.codec.oneone.OneToOneDecoder; +import io.netty.handler.codec.MessageToMessageDecoder; + +import java.nio.charset.Charset; /** * Decodes a received {@link ChannelBuffer} into a {@link String}. Please @@ -55,7 +53,7 @@ import io.netty.handler.codec.oneone.OneToOneDecoder; * @apiviz.landmark */ @Sharable -public class StringDecoder extends OneToOneDecoder { +public class StringDecoder extends MessageToMessageDecoder { // TODO Use CharsetDecoder instead. private final Charset charset; @@ -78,11 +76,7 @@ public class StringDecoder extends OneToOneDecoder { } @Override - protected Object decode( - ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } - return ((ChannelBuffer) msg).toString(charset); + public String decode(ChannelInboundHandlerContext ctx, ChannelBuffer msg) throws Exception { + return msg.toString(charset); } } diff --git a/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java b/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java index 3dd75f9712..b893e4b61c 100644 --- a/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/string/StringEncoder.java @@ -15,19 +15,17 @@ */ package io.netty.handler.codec.string; -import static io.netty.buffer.ChannelBuffers.*; - -import java.nio.charset.Charset; - import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.MessageEvent; +import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; -import io.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.nio.charset.Charset; /** * Encodes the requested {@link String} into a {@link ChannelBuffer}. @@ -53,7 +51,7 @@ import io.netty.handler.codec.oneone.OneToOneEncoder; * @apiviz.landmark */ @Sharable -public class StringEncoder extends OneToOneEncoder { +public class StringEncoder extends MessageToMessageEncoder { // TODO Use CharsetEncoder instead. private final Charset charset; @@ -76,11 +74,7 @@ public class StringEncoder extends OneToOneEncoder { } @Override - protected Object encode( - ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - if (!(msg instanceof String)) { - return msg; - } - return copiedBuffer((String) msg, charset); + public ChannelBuffer encode(ChannelOutboundHandlerContext ctx, String msg) throws Exception { + return ChannelBuffers.copiedBuffer(msg, charset); } } diff --git a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java index a3797a25ab..4b63d82e16 100644 --- a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java @@ -18,6 +18,7 @@ package io.netty.handler.codec.bytes; import static io.netty.buffer.ChannelBuffers.*; import static org.hamcrest.core.Is.*; import static org.junit.Assert.*; +import io.netty.handler.codec.embedder.CodecEmbedderException; import io.netty.handler.codec.embedder.DecoderEmbedder; import java.util.Random; @@ -58,8 +59,9 @@ public class ByteArrayDecoderTest { try { embedder.poll(); fail(); - } catch (ClassCastException e) { + } catch (CodecEmbedderException e) { // Expected + assertTrue(e.getCause() instanceof ClassCastException); } } } diff --git a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java index 6d5e778177..2f0fd31707 100644 --- a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.core.Is.*; import static org.hamcrest.core.IsNull.*; import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; +import io.netty.handler.codec.embedder.CodecEmbedderException; import io.netty.handler.codec.embedder.EncoderEmbedder; import java.util.Random; @@ -60,8 +61,9 @@ public class ByteArrayEncoderTest { try { embedder.poll(); fail(); - } catch (ClassCastException e) { + } catch (CodecEmbedderException e) { // Expected + assertTrue(e.getCause() instanceof ClassCastException); } } diff --git a/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java index 4f57243d2d..2058d19121 100644 --- a/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java @@ -15,6 +15,7 @@ */ package io.netty.handler.codec.frame; +import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.handler.codec.DelimiterBasedFrameDecoder; @@ -23,6 +24,7 @@ import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.embedder.CodecEmbedderException; import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.util.CharsetUtil; + import org.junit.Assert; import org.junit.Test; @@ -35,7 +37,8 @@ public class DelimiterBasedFrameDecoderTest { for (int i = 0; i < 2; i ++) { embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })); try { - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 })); + assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }))); + embedder.poll(); Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised."); } catch (CodecEmbedderException e) { Assert.assertTrue(e.getCause() instanceof TooLongFrameException); @@ -55,7 +58,8 @@ public class DelimiterBasedFrameDecoderTest { for (int i = 0; i < 2; i ++) { try { - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })); + assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }))); + embedder.poll(); Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised."); } catch (CodecEmbedderException e) { Assert.assertTrue(e.getCause() instanceof TooLongFrameException); diff --git a/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java index 7ec6c8ece0..74c2387faa 100644 --- a/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java @@ -15,6 +15,7 @@ */ package io.netty.handler.codec.frame; +import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -22,6 +23,7 @@ import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.embedder.CodecEmbedderException; import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.util.CharsetUtil; + import org.junit.Assert; import org.junit.Test; @@ -32,9 +34,10 @@ public class LengthFieldBasedFrameDecoderTest { new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false)); for (int i = 0; i < 2; i ++) { - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 })); + assertFalse(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); try { - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 })); + assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 }))); + embedder.poll(); Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised."); } catch (CodecEmbedderException e) { Assert.assertTrue(e.getCause() instanceof TooLongFrameException); @@ -54,7 +57,8 @@ public class LengthFieldBasedFrameDecoderTest { for (int i = 0; i < 2; i ++) { try { - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 })); + assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); + embedder.poll(); Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised."); } catch (CodecEmbedderException e) { Assert.assertTrue(e.getCause() instanceof TooLongFrameException);