Finish porting the codec package to the new API
- Removed deprecated classes - Changed type parameter of StreamToMessageDecoder and MessageToMessageDecoder for more flexibility - Made all tests in the codec package pass
This commit is contained in:
parent
1bf0dfe64a
commit
5344dc242c
@ -55,7 +55,7 @@ import io.netty.channel.ChannelInboundHandlerContext;
|
||||
* </pre>
|
||||
* @apiviz.uses io.netty.handler.codec.frame.Delimiters - - useful
|
||||
*/
|
||||
public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
|
||||
public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder<Object> {
|
||||
|
||||
private final ChannelBuffer[] delimiters;
|
||||
private final int maxFrameLength;
|
||||
@ -187,7 +187,7 @@ public class DelimiterBasedFrameDecoder extends StreamToMessageDecoder<ChannelBu
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws Exception {
|
||||
public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer buffer) throws Exception {
|
||||
// Try all delimiters and choose the delimiter which yields the shortest frame.
|
||||
int minFrameLength = Integer.MAX_VALUE;
|
||||
ChannelBuffer minDelim = null;
|
||||
|
@ -37,7 +37,7 @@ import io.netty.channel.ChannelInboundHandlerContext;
|
||||
* +-----+-----+-----+
|
||||
* </pre>
|
||||
*/
|
||||
public class FixedLengthFrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
|
||||
public class FixedLengthFrameDecoder extends StreamToMessageDecoder<Object> {
|
||||
|
||||
private final int frameLength;
|
||||
private final boolean allocateFullBuffer;
|
||||
@ -75,7 +75,7 @@ public class FixedLengthFrameDecoder extends StreamToMessageDecoder<ChannelBuffe
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
if (in.readableBytes() < frameLength) {
|
||||
return null;
|
||||
} else {
|
||||
|
@ -180,7 +180,7 @@ import io.netty.handler.codec.serialization.ObjectDecoder;
|
||||
* </pre>
|
||||
* @see LengthFieldPrepender
|
||||
*/
|
||||
public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
|
||||
public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder<Object> {
|
||||
|
||||
private final int maxFrameLength;
|
||||
private final int lengthFieldOffset;
|
||||
@ -308,7 +308,7 @@ public class LengthFieldBasedFrameDecoder extends StreamToMessageDecoder<Channel
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
if (discardingTooLongFrame) {
|
||||
long bytesToDiscard = this.bytesToDiscard;
|
||||
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
|
||||
|
@ -45,7 +45,22 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
|
||||
boolean decoded = false;
|
||||
for (;;) {
|
||||
try {
|
||||
if (unfoldAndAdd(ctx, ctx.nextIn(), decode(ctx, in))) {
|
||||
int oldInputLength = in.readableBytes();
|
||||
O o = decode(ctx, in);
|
||||
if (o == null) {
|
||||
if (oldInputLength == in.readableBytes()) {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (oldInputLength == in.readableBytes()) {
|
||||
throw new IllegalStateException(
|
||||
"decode() did not read anything but decoded a message.");
|
||||
}
|
||||
}
|
||||
|
||||
if (unfoldAndAdd(ctx, ctx.nextIn(), o)) {
|
||||
decoded = true;
|
||||
} else {
|
||||
break;
|
||||
|
@ -18,7 +18,6 @@ package io.netty.handler.codec.embedder;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelBufferHolder;
|
||||
import io.netty.channel.ChannelBufferHolders;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInboundHandlerContext;
|
||||
@ -84,14 +83,11 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private E product(Object p) {
|
||||
if (p instanceof CodecEmbedderException) {
|
||||
throw (CodecEmbedderException) p;
|
||||
}
|
||||
if (p instanceof Throwable) {
|
||||
if (p instanceof RuntimeException) {
|
||||
throw (RuntimeException) p;
|
||||
}
|
||||
if (p instanceof Error) {
|
||||
throw (Error) p;
|
||||
}
|
||||
throw new ChannelException((Throwable) p);
|
||||
throw new CodecEmbedderException((Throwable) p);
|
||||
}
|
||||
return (E) p;
|
||||
}
|
||||
@ -157,6 +153,11 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
|
||||
return ChannelBufferHolders.messageBuffer(productQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx) throws Exception {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx, Throwable cause) throws Exception {
|
||||
productQueue.add(cause);
|
||||
|
@ -37,7 +37,7 @@ import com.google.protobuf.CodedInputStream;
|
||||
*
|
||||
* @see com.google.protobuf.CodedInputStream
|
||||
*/
|
||||
public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder<ChannelBuffer> {
|
||||
public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder<Object> {
|
||||
|
||||
// TODO maxFrameLength + safe skip + fail-fast option
|
||||
// (just like LengthFieldBasedFrameDecoder)
|
||||
@ -49,7 +49,7 @@ public class ProtobufVarint32FrameDecoder extends StreamToMessageDecoder<Channel
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
in.markReaderIndex();
|
||||
final byte[] buf = new byte[5];
|
||||
for (int i = 0; i < buf.length; i ++) {
|
||||
|
@ -1,110 +0,0 @@
|
||||
/*
|
||||
* Copyright 2011 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.serialization;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.ObjectStreamConstants;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBufferInputStream;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ReplayingDecoder;
|
||||
|
||||
/**
|
||||
* A decoder which deserializes the received {@link ChannelBuffer}s into Java
|
||||
* objects (interoperability version).
|
||||
* <p>
|
||||
* This decoder is interoperable with the standard Java object
|
||||
* streams such as {@link ObjectInputStream} and {@link ObjectOutputStream}.
|
||||
* <p>
|
||||
* However, this decoder might perform worse than {@link ObjectDecoder} if
|
||||
* the serialized object is big and complex. Also, it does not limit the
|
||||
* maximum size of the object, and consequently your application might face
|
||||
* the risk of <a href="http://en.wikipedia.org/wiki/DoS">DoS attack</a>.
|
||||
* Please use {@link ObjectEncoder} and {@link ObjectDecoder} if you are not
|
||||
* required to keep the interoperability with the standard object streams.
|
||||
*
|
||||
* @deprecated This decoder has a known critical bug which fails to decode and
|
||||
* raises a random exception in some circumstances. Avoid to use
|
||||
* it whenever you can. The only workaround is to replace
|
||||
* {@link CompatibleObjectEncoder}, {@link CompatibleObjectDecoder},
|
||||
* {@link ObjectInputStream}, and {@link ObjectOutputStream} with
|
||||
* {@link ObjectEncoder}, {@link ObjectDecoder},
|
||||
* {@link ObjectEncoderOutputStream}, and
|
||||
* {@link ObjectDecoderInputStream} respectively. This workaround
|
||||
* requires both a client and a server to be modified.
|
||||
*/
|
||||
@Deprecated
|
||||
public class CompatibleObjectDecoder extends ReplayingDecoder<CompatibleObjectDecoderState> {
|
||||
|
||||
private final SwitchableInputStream bin = new SwitchableInputStream();
|
||||
private ObjectInputStream oin;
|
||||
|
||||
/**
|
||||
* Creates a new decoder.
|
||||
*/
|
||||
public CompatibleObjectDecoder() {
|
||||
super(CompatibleObjectDecoderState.READ_HEADER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link ObjectInputStream} which wraps the specified
|
||||
* {@link InputStream}. Override this method to use a subclass of the
|
||||
* {@link ObjectInputStream}.
|
||||
*/
|
||||
protected ObjectInputStream newObjectInputStream(InputStream in) throws Exception {
|
||||
return new ObjectInputStream(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(
|
||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, CompatibleObjectDecoderState state) throws Exception {
|
||||
bin.switchStream(new ChannelBufferInputStream(buffer));
|
||||
switch (state) {
|
||||
case READ_HEADER:
|
||||
oin = newObjectInputStream(bin);
|
||||
checkpoint(CompatibleObjectDecoderState.READ_OBJECT);
|
||||
case READ_OBJECT:
|
||||
return oin.readObject();
|
||||
default:
|
||||
throw new IllegalStateException("Unknown state: " + state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decodeLast(ChannelHandlerContext ctx, Channel channel,
|
||||
ChannelBuffer buffer, CompatibleObjectDecoderState state)
|
||||
throws Exception {
|
||||
switch (buffer.readableBytes()) {
|
||||
case 0:
|
||||
return null;
|
||||
case 1:
|
||||
// Ignore the last TC_RESET
|
||||
if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) {
|
||||
buffer.skipBytes(1);
|
||||
oin.close();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Object decoded = decode(ctx, channel, buffer, state);
|
||||
oin.close();
|
||||
return decoded;
|
||||
}
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
/*
|
||||
* Copyright 2011 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.serialization;
|
||||
|
||||
enum CompatibleObjectDecoderState {
|
||||
READ_HEADER,
|
||||
READ_OBJECT,
|
||||
}
|
@ -15,18 +15,16 @@
|
||||
*/
|
||||
package io.netty.handler.codec.serialization;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBufferOutputStream;
|
||||
import io.netty.channel.ChannelOutboundHandlerContext;
|
||||
import io.netty.handler.codec.MessageToStreamEncoder;
|
||||
import io.netty.util.Attribute;
|
||||
import io.netty.util.AttributeKey;
|
||||
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBufferFactory;
|
||||
import io.netty.buffer.ChannelBufferOutputStream;
|
||||
import io.netty.buffer.ChannelBuffers;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
|
||||
/**
|
||||
* An encoder which serializes a Java object into a {@link ChannelBuffer}
|
||||
@ -35,12 +33,13 @@ import io.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
* This encoder is interoperable with the standard Java object streams such as
|
||||
* {@link ObjectInputStream} and {@link ObjectOutputStream}.
|
||||
*/
|
||||
public class CompatibleObjectEncoder extends OneToOneEncoder {
|
||||
public class CompatibleObjectEncoder extends MessageToStreamEncoder<Object> {
|
||||
|
||||
private static final AttributeKey<ObjectOutputStream> OOS =
|
||||
new AttributeKey<ObjectOutputStream>(
|
||||
CompatibleObjectEncoder.class.getName() + ".oos", ObjectOutputStream.class);
|
||||
|
||||
private final AtomicReference<ChannelBuffer> buffer =
|
||||
new AtomicReference<ChannelBuffer>();
|
||||
private final int resetInterval;
|
||||
private volatile ObjectOutputStream oout;
|
||||
private int writtenObjects;
|
||||
|
||||
/**
|
||||
@ -77,44 +76,31 @@ public class CompatibleObjectEncoder extends OneToOneEncoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object encode(ChannelHandlerContext context, Channel channel, Object msg) throws Exception {
|
||||
ChannelBuffer buffer = buffer(context);
|
||||
ObjectOutputStream oout = this.oout;
|
||||
if (resetInterval != 0) {
|
||||
// Resetting will prevent OOM on the receiving side.
|
||||
writtenObjects ++;
|
||||
if (writtenObjects % resetInterval == 0) {
|
||||
oout.reset();
|
||||
|
||||
// Also discard the byproduct to avoid OOM on the sending side.
|
||||
buffer.discardReadBytes();
|
||||
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
|
||||
Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS);
|
||||
ObjectOutputStream oos = oosAttr.get();
|
||||
if (oos == null) {
|
||||
oos = newObjectOutputStream(new ChannelBufferOutputStream(out));
|
||||
ObjectOutputStream newOos = oosAttr.setIfAbsent(oos);
|
||||
if (newOos != null) {
|
||||
oos = newOos;
|
||||
}
|
||||
}
|
||||
oout.writeObject(msg);
|
||||
oout.flush();
|
||||
|
||||
return buffer.readBytes(buffer.readableBytes());
|
||||
}
|
||||
synchronized (oos) {
|
||||
if (resetInterval != 0) {
|
||||
// Resetting will prevent OOM on the receiving side.
|
||||
writtenObjects ++;
|
||||
if (writtenObjects % resetInterval == 0) {
|
||||
oos.reset();
|
||||
|
||||
private ChannelBuffer buffer(ChannelHandlerContext ctx) throws Exception {
|
||||
ChannelBuffer buf = buffer.get();
|
||||
if (buf == null) {
|
||||
ChannelBufferFactory factory = ctx.channel().getConfig().getBufferFactory();
|
||||
buf = ChannelBuffers.dynamicBuffer(factory);
|
||||
if (buffer.compareAndSet(null, buf)) {
|
||||
boolean success = false;
|
||||
try {
|
||||
oout = newObjectOutputStream(new ChannelBufferOutputStream(buf));
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
oout = null;
|
||||
}
|
||||
// Also discard the byproduct to avoid OOM on the sending side.
|
||||
out.discardReadBytes();
|
||||
}
|
||||
} else {
|
||||
buf = buffer.get();
|
||||
}
|
||||
|
||||
oos.writeObject(msg);
|
||||
oos.flush();
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
@ -15,15 +15,14 @@
|
||||
*/
|
||||
package io.netty.handler.codec.serialization;
|
||||
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBufferInputStream;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerContext;
|
||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
/**
|
||||
* A decoder which deserializes the received {@link ChannelBuffer}s into Java
|
||||
* objects.
|
||||
@ -39,20 +38,6 @@ public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
|
||||
|
||||
private final ClassResolver classResolver;
|
||||
|
||||
/**
|
||||
* Creates a new decoder whose maximum object size is {@code 1048576}
|
||||
* bytes. If the size of the received object is greater than
|
||||
* {@code 1048576} bytes, a {@link StreamCorruptedException} will be
|
||||
* raised.
|
||||
*
|
||||
* @deprecated use {@link #ObjectDecoder(ClassResolver)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ObjectDecoder() {
|
||||
this(1048576);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new decoder whose maximum object size is {@code 1048576}
|
||||
* bytes. If the size of the received object is greater than
|
||||
@ -65,20 +50,6 @@ public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
|
||||
this(1048576, classResolver);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new decoder with the specified maximum object size.
|
||||
*
|
||||
* @param maxObjectSize the maximum byte length of the serialized object.
|
||||
* if the length of the received object is greater
|
||||
* than this value, {@link StreamCorruptedException}
|
||||
* will be raised.
|
||||
* @deprecated use {@link #ObjectDecoder(int, ClassResolver)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ObjectDecoder(int maxObjectSize) {
|
||||
this(maxObjectSize, ClassResolvers.weakCachingResolver(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new decoder with the specified maximum object size.
|
||||
*
|
||||
@ -94,27 +65,9 @@ public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
|
||||
this.classResolver = classResolver;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new decoder with the specified maximum object size and the {@link ClassLoader} wrapped in {@link ClassResolvers#weakCachingResolver(ClassLoader)}
|
||||
*
|
||||
* @param maxObjectSize the maximum byte length of the serialized object.
|
||||
* if the length of the received object is greater
|
||||
* than this value, {@link StreamCorruptedException}
|
||||
* will be raised.
|
||||
* @param classLoader the the classloader to use
|
||||
* @deprecated use {@link #ObjectDecoder(int, ClassResolver)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ObjectDecoder(int maxObjectSize, ClassLoader classLoader) {
|
||||
this(maxObjectSize, ClassResolvers.weakCachingResolver(classLoader));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(
|
||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
|
||||
ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);
|
||||
public Object decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||
ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, in);
|
||||
if (frame == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -15,18 +15,15 @@
|
||||
*/
|
||||
package io.netty.handler.codec.serialization;
|
||||
|
||||
import static io.netty.buffer.ChannelBuffers.*;
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBufferOutputStream;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelOutboundHandlerContext;
|
||||
import io.netty.handler.codec.MessageToStreamEncoder;
|
||||
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBufferOutputStream;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
|
||||
/**
|
||||
* An encoder which serializes a Java object into a {@link ChannelBuffer}.
|
||||
* <p>
|
||||
@ -38,7 +35,7 @@ import io.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
* @apiviz.has io.netty.handler.codec.serialization.ObjectEncoderOutputStream - - - compatible with
|
||||
*/
|
||||
@Sharable
|
||||
public class ObjectEncoder extends OneToOneEncoder {
|
||||
public class ObjectEncoder extends MessageToStreamEncoder<Object> {
|
||||
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
|
||||
|
||||
private final int estimatedLength;
|
||||
@ -70,18 +67,18 @@ public class ObjectEncoder extends OneToOneEncoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
|
||||
ChannelBufferOutputStream bout =
|
||||
new ChannelBufferOutputStream(dynamicBuffer(
|
||||
estimatedLength, ctx.channel().getConfig().getBufferFactory()));
|
||||
public void encode(ChannelOutboundHandlerContext<Object> ctx, Object msg, ChannelBuffer out) throws Exception {
|
||||
int startIdx = out.writerIndex();
|
||||
|
||||
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(out);
|
||||
bout.write(LENGTH_PLACEHOLDER);
|
||||
ObjectOutputStream oout = new CompactObjectOutputStream(bout);
|
||||
oout.writeObject(msg);
|
||||
oout.flush();
|
||||
oout.close();
|
||||
|
||||
ChannelBuffer encoded = bout.buffer();
|
||||
encoded.setInt(0, encoded.writerIndex() - 4);
|
||||
return encoded;
|
||||
int endIdx = out.writerIndex();
|
||||
|
||||
out.setInt(startIdx, endIdx - startIdx - 4);
|
||||
}
|
||||
}
|
||||
|
@ -15,18 +15,16 @@
|
||||
*/
|
||||
package io.netty.handler.codec.string;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.MessageEvent;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import io.netty.handler.codec.Delimiters;
|
||||
import io.netty.handler.codec.FrameDecoder;
|
||||
import io.netty.handler.codec.oneone.OneToOneDecoder;
|
||||
import io.netty.handler.codec.MessageToMessageDecoder;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/**
|
||||
* Decodes a received {@link ChannelBuffer} into a {@link String}. Please
|
||||
@ -55,7 +53,7 @@ import io.netty.handler.codec.oneone.OneToOneDecoder;
|
||||
* @apiviz.landmark
|
||||
*/
|
||||
@Sharable
|
||||
public class StringDecoder extends OneToOneDecoder {
|
||||
public class StringDecoder extends MessageToMessageDecoder<ChannelBuffer, String> {
|
||||
|
||||
// TODO Use CharsetDecoder instead.
|
||||
private final Charset charset;
|
||||
@ -78,11 +76,7 @@ public class StringDecoder extends OneToOneDecoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(
|
||||
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
|
||||
if (!(msg instanceof ChannelBuffer)) {
|
||||
return msg;
|
||||
}
|
||||
return ((ChannelBuffer) msg).toString(charset);
|
||||
public String decode(ChannelInboundHandlerContext<ChannelBuffer> ctx, ChannelBuffer msg) throws Exception {
|
||||
return msg.toString(charset);
|
||||
}
|
||||
}
|
||||
|
@ -15,19 +15,17 @@
|
||||
*/
|
||||
package io.netty.handler.codec.string;
|
||||
|
||||
import static io.netty.buffer.ChannelBuffers.*;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.MessageEvent;
|
||||
import io.netty.buffer.ChannelBuffers;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOutboundHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import io.netty.handler.codec.Delimiters;
|
||||
import io.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
import io.netty.handler.codec.MessageToMessageEncoder;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/**
|
||||
* Encodes the requested {@link String} into a {@link ChannelBuffer}.
|
||||
@ -53,7 +51,7 @@ import io.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
* @apiviz.landmark
|
||||
*/
|
||||
@Sharable
|
||||
public class StringEncoder extends OneToOneEncoder {
|
||||
public class StringEncoder extends MessageToMessageEncoder<String, ChannelBuffer> {
|
||||
|
||||
// TODO Use CharsetEncoder instead.
|
||||
private final Charset charset;
|
||||
@ -76,11 +74,7 @@ public class StringEncoder extends OneToOneEncoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object encode(
|
||||
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
|
||||
if (!(msg instanceof String)) {
|
||||
return msg;
|
||||
}
|
||||
return copiedBuffer((String) msg, charset);
|
||||
public ChannelBuffer encode(ChannelOutboundHandlerContext<String> ctx, String msg) throws Exception {
|
||||
return ChannelBuffers.copiedBuffer(msg, charset);
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package io.netty.handler.codec.bytes;
|
||||
import static io.netty.buffer.ChannelBuffers.*;
|
||||
import static org.hamcrest.core.Is.*;
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.handler.codec.embedder.CodecEmbedderException;
|
||||
import io.netty.handler.codec.embedder.DecoderEmbedder;
|
||||
|
||||
import java.util.Random;
|
||||
@ -58,8 +59,9 @@ public class ByteArrayDecoderTest {
|
||||
try {
|
||||
embedder.poll();
|
||||
fail();
|
||||
} catch (ClassCastException e) {
|
||||
} catch (CodecEmbedderException e) {
|
||||
// Expected
|
||||
assertTrue(e.getCause() instanceof ClassCastException);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import static org.hamcrest.core.Is.*;
|
||||
import static org.hamcrest.core.IsNull.*;
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.handler.codec.embedder.CodecEmbedderException;
|
||||
import io.netty.handler.codec.embedder.EncoderEmbedder;
|
||||
|
||||
import java.util.Random;
|
||||
@ -60,8 +61,9 @@ public class ByteArrayEncoderTest {
|
||||
try {
|
||||
embedder.poll();
|
||||
fail();
|
||||
} catch (ClassCastException e) {
|
||||
} catch (CodecEmbedderException e) {
|
||||
// Expected
|
||||
assertTrue(e.getCause() instanceof ClassCastException);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package io.netty.handler.codec.frame;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBuffers;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
@ -23,6 +24,7 @@ import io.netty.handler.codec.TooLongFrameException;
|
||||
import io.netty.handler.codec.embedder.CodecEmbedderException;
|
||||
import io.netty.handler.codec.embedder.DecoderEmbedder;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -35,7 +37,8 @@ public class DelimiterBasedFrameDecoderTest {
|
||||
for (int i = 0; i < 2; i ++) {
|
||||
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }));
|
||||
try {
|
||||
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }));
|
||||
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 })));
|
||||
embedder.poll();
|
||||
Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised.");
|
||||
} catch (CodecEmbedderException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof TooLongFrameException);
|
||||
@ -55,7 +58,8 @@ public class DelimiterBasedFrameDecoderTest {
|
||||
|
||||
for (int i = 0; i < 2; i ++) {
|
||||
try {
|
||||
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }));
|
||||
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })));
|
||||
embedder.poll();
|
||||
Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised.");
|
||||
} catch (CodecEmbedderException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof TooLongFrameException);
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package io.netty.handler.codec.frame;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.buffer.ChannelBuffers;
|
||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
@ -22,6 +23,7 @@ import io.netty.handler.codec.TooLongFrameException;
|
||||
import io.netty.handler.codec.embedder.CodecEmbedderException;
|
||||
import io.netty.handler.codec.embedder.DecoderEmbedder;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -32,9 +34,10 @@ public class LengthFieldBasedFrameDecoderTest {
|
||||
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false));
|
||||
|
||||
for (int i = 0; i < 2; i ++) {
|
||||
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }));
|
||||
assertFalse(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
|
||||
try {
|
||||
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 }));
|
||||
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 })));
|
||||
embedder.poll();
|
||||
Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised.");
|
||||
} catch (CodecEmbedderException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof TooLongFrameException);
|
||||
@ -54,7 +57,8 @@ public class LengthFieldBasedFrameDecoderTest {
|
||||
|
||||
for (int i = 0; i < 2; i ++) {
|
||||
try {
|
||||
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }));
|
||||
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
|
||||
embedder.poll();
|
||||
Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised.");
|
||||
} catch (CodecEmbedderException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof TooLongFrameException);
|
||||
|
Loading…
Reference in New Issue
Block a user