Follow the same pattern as we have in the serialization package. So we have some compatible jboss marshalling impl and one optimized. See #324
This commit is contained in:
parent
c5f6af6584
commit
6b0a6db3f5
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.marshalling;
|
||||
|
||||
import java.io.ObjectStreamConstants;
|
||||
|
||||
import org.jboss.marshalling.ByteInput;
|
||||
import org.jboss.marshalling.Unmarshaller;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
||||
import org.jboss.netty.handler.codec.replay.VoidEnum;
|
||||
|
||||
/**
|
||||
* {@link ReplayingDecoder} which use an {@link Unmarshaller} to read the Object out of the {@link ChannelBuffer}.
|
||||
*
|
||||
* If you can you should use {@link MarshallingDecoder}.
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class CompatibleMarshallingDecoder extends ReplayingDecoder<VoidEnum> {
|
||||
protected final UnmarshallerProvider provider;
|
||||
protected final long maxObjectSize;
|
||||
|
||||
/**
|
||||
* Create a new instance of {@link CompatibleMarshallingDecoder}.
|
||||
*
|
||||
* @param provider the {@link UnmarshallerProvider} which is used to obtain the {@link Unmarshaller} for the {@link Channel}
|
||||
* @param maxObjectSize the maximal size (in bytes) of the {@link Object} to unmarshal. Once the size is exceeded
|
||||
* the {@link Channel} will get closed. Use a a maxObjectSize of <= 0 to disable this.
|
||||
* You should only do this if you are sure that the received Objects will never be big and the
|
||||
* sending side are trusted, as this opens the possibility for a DOS-Attack due an {@link OutOfMemoryError}.
|
||||
*
|
||||
*/
|
||||
public CompatibleMarshallingDecoder(UnmarshallerProvider provider, long maxObjectSize) {
|
||||
this.provider = provider;
|
||||
this.maxObjectSize = maxObjectSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) throws Exception {
|
||||
Unmarshaller unmarshaller = provider.getUnmarshaller(channel);
|
||||
ByteInput input = new ChannelBufferByteInput(buffer);
|
||||
if (maxObjectSize > 0) {
|
||||
input = new LimitingByteInput(input, maxObjectSize);
|
||||
}
|
||||
try {
|
||||
unmarshaller.start(input);
|
||||
Object obj = unmarshaller.readObject();
|
||||
unmarshaller.finish();
|
||||
return obj;
|
||||
} catch (LimitingByteInput.TooBigObjectException e) {
|
||||
throw new TooLongFrameException("Object to big to unmarshal");
|
||||
} finally {
|
||||
// Call close in a finally block as the ReplayingDecoder will throw an Error if not enough bytes are
|
||||
// readable. This helps to be sure that we do not leak resource
|
||||
unmarshaller.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decodeLast(ChannelHandlerContext ctx, Channel channel,
|
||||
ChannelBuffer buffer, VoidEnum state)
|
||||
throws Exception {
|
||||
switch (buffer.readableBytes()) {
|
||||
case 0:
|
||||
return null;
|
||||
case 1:
|
||||
// Ignore the last TC_RESET
|
||||
if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) {
|
||||
buffer.skipBytes(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Object decoded = decode(ctx, channel, buffer, state);
|
||||
return decoded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link Channel#close()} if a TooLongFrameException was thrown
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
if (e.getCause() instanceof TooLongFrameException) {
|
||||
e.getChannel().close();
|
||||
|
||||
} else {
|
||||
super.exceptionCaught(ctx, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.marshalling;
|
||||
|
||||
import org.jboss.marshalling.Marshaller;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelHandler.Sharable;
|
||||
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
|
||||
|
||||
/**
|
||||
* {@link OneToOneEncoder} implementation which uses JBoss Marshalling to marshal
|
||||
* an Object.
|
||||
*
|
||||
* See <a href="http://www.jboss.org/jbossmarshalling">JBoss Marshalling website</a>
|
||||
* for more informations
|
||||
*
|
||||
* Use {@link MarshallingEncoder} if possible.
|
||||
*
|
||||
*/
|
||||
@Sharable
|
||||
public class CompatibleMarshallingEncoder extends OneToOneEncoder {
|
||||
|
||||
private final MarshallerProvider provider;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new instance of the {@link CompatibleMarshallingEncoder}
|
||||
*
|
||||
* @param provider the {@link MarshallerProvider} to use to get the {@link Marshaller} for a {@link Channel}
|
||||
*/
|
||||
public CompatibleMarshallingEncoder(MarshallerProvider provider) {
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
|
||||
Marshaller marshaller = provider.getMarshaller(channel);
|
||||
ChannelBufferByteOutput output = new ChannelBufferByteOutput(ctx.getChannel().getConfig().getBufferFactory(), 256);
|
||||
marshaller.start(output);
|
||||
marshaller.writeObject(msg);
|
||||
marshaller.finish();
|
||||
marshaller.close();
|
||||
|
||||
return output.getBuffer();
|
||||
}
|
||||
|
||||
}
|
|
@ -15,58 +15,67 @@
|
|||
*/
|
||||
package org.jboss.netty.handler.codec.marshalling;
|
||||
|
||||
import java.io.ObjectStreamConstants;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
import org.jboss.marshalling.ByteInput;
|
||||
import org.jboss.marshalling.Unmarshaller;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
|
||||
import org.jboss.netty.handler.codec.replay.VoidEnum;
|
||||
|
||||
/**
|
||||
* {@link ReplayingDecoder} which use an {@link Unmarshaller} to read the Object out of the {@link ChannelBuffer}.
|
||||
*
|
||||
* Most times you want to use {@link ThreadLocalMarshallingDecoder} to get a better performance and less overhead.
|
||||
* Decoder which MUST be used with {@link MarshallingEncoder}.
|
||||
*
|
||||
* A {@link LengthFieldBasedFrameDecoder} which use an {@link Unmarshaller} to read the Object out of the {@link ChannelBuffer}.
|
||||
*
|
||||
*/
|
||||
public class MarshallingDecoder extends ReplayingDecoder<VoidEnum> {
|
||||
protected final UnmarshallerProvider provider;
|
||||
protected final long maxObjectSize;
|
||||
|
||||
public class MarshallingDecoder extends LengthFieldBasedFrameDecoder {
|
||||
|
||||
private final UnmarshallerProvider provider;
|
||||
|
||||
/**
|
||||
* Create a new instance of {@link MarshallingDecoder}.
|
||||
* Creates a new decoder whose maximum object size is {@code 1048576}
|
||||
* bytes. If the size of the received object is greater than
|
||||
* {@code 1048576} bytes, a {@link StreamCorruptedException} will be
|
||||
* raised.
|
||||
*
|
||||
* @param provider the {@link UnmarshallerProvider} which is used to obtain the {@link Unmarshaller} for the {@link Channel}
|
||||
* @param maxObjectSize the maximal size (in bytes) of the {@link Object} to unmarshal. Once the size is exceeded
|
||||
* the {@link Channel} will get closed. Use a a maxObjectSize of <= 0 to disable this.
|
||||
* You should only do this if you are sure that the received Objects will never be big and the
|
||||
* sending side are trusted, as this opens the possibility for a DOS-Attack due an {@link OutOfMemoryError}.
|
||||
*
|
||||
*/
|
||||
public MarshallingDecoder(UnmarshallerProvider provider, long maxObjectSize) {
|
||||
this.provider = provider;
|
||||
this.maxObjectSize = maxObjectSize;
|
||||
public MarshallingDecoder(UnmarshallerProvider provider) {
|
||||
this(provider, 1048576);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new decoder with the specified maximum object size.
|
||||
*
|
||||
* @param maxObjectSize the maximum byte length of the serialized object.
|
||||
* if the length of the received object is greater
|
||||
* than this value, {@link TooLongFrameException}
|
||||
* will be raised.
|
||||
*/
|
||||
public MarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {
|
||||
super(maxObjectSize, 0, 4, 0, 4);
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) throws Exception {
|
||||
protected Object decode(
|
||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
|
||||
ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);
|
||||
if (frame == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Unmarshaller unmarshaller = provider.getUnmarshaller(channel);
|
||||
ByteInput input = new ChannelBufferByteInput(buffer);
|
||||
if (maxObjectSize > 0) {
|
||||
input = new LimitingByteInput(input, maxObjectSize);
|
||||
}
|
||||
|
||||
try {
|
||||
unmarshaller.start(input);
|
||||
Object obj = unmarshaller.readObject();
|
||||
unmarshaller.finish();
|
||||
return obj;
|
||||
} catch (LimitingByteInput.TooBigObjectException e) {
|
||||
throw new TooLongFrameException("Object to big to unmarshal");
|
||||
} finally {
|
||||
// Call close in a finally block as the ReplayingDecoder will throw an Error if not enough bytes are
|
||||
// readable. This helps to be sure that we do not leak resource
|
||||
|
@ -75,34 +84,7 @@ public class MarshallingDecoder extends ReplayingDecoder<VoidEnum> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Object decodeLast(ChannelHandlerContext ctx, Channel channel,
|
||||
ChannelBuffer buffer, VoidEnum state)
|
||||
throws Exception {
|
||||
switch (buffer.readableBytes()) {
|
||||
case 0:
|
||||
return null;
|
||||
case 1:
|
||||
// Ignore the last TC_RESET
|
||||
if (buffer.getByte(buffer.readerIndex()) == ObjectStreamConstants.TC_RESET) {
|
||||
buffer.skipBytes(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Object decoded = decode(ctx, channel, buffer, state);
|
||||
return decoded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link Channel#close()} if a TooLongFrameException was thrown
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
if (e.getCause() instanceof TooLongFrameException) {
|
||||
e.getChannel().close();
|
||||
|
||||
} else {
|
||||
super.exceptionCaught(ctx, e);
|
||||
}
|
||||
protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {
|
||||
return buffer.slice(index, length);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
package org.jboss.netty.handler.codec.marshalling;
|
||||
|
||||
import org.jboss.marshalling.Marshaller;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelHandler.Sharable;
|
||||
|
@ -23,7 +24,11 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
|
|||
|
||||
/**
|
||||
* {@link OneToOneEncoder} implementation which uses JBoss Marshalling to marshal
|
||||
* an Object.
|
||||
* an Object. Be aware that this {@link OneToOneEncoder} is not compatible with
|
||||
* an other client that just use JBoss Marshalling as it includes the size of every
|
||||
* {@link Object} that gets serialized in front of the {@link Object} itself.
|
||||
*
|
||||
* Use this with {@link MarshallingDecoder}
|
||||
*
|
||||
* See <a href="http://www.jboss.org/jbossmarshalling">JBoss Marshalling website</a>
|
||||
* for more informations
|
||||
|
@ -31,30 +36,57 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
|
|||
*/
|
||||
@Sharable
|
||||
public class MarshallingEncoder extends OneToOneEncoder {
|
||||
|
||||
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
|
||||
private final MarshallerProvider provider;
|
||||
|
||||
private final int estimatedLength;
|
||||
|
||||
/**
|
||||
* Create a new instance of the {@link MarshallingEncoder}
|
||||
* Creates a new encoder with the estimated length of 512 bytes.
|
||||
*
|
||||
* @param provider the {@link MarshallerProvider} to use to get the {@link Marshaller} for a {@link Channel}
|
||||
* @param provider the {@link MarshallerProvider} to use
|
||||
*/
|
||||
public MarshallingEncoder(MarshallerProvider provider) {
|
||||
this(provider, 512);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new encoder.
|
||||
*
|
||||
* @param provider
|
||||
* the {@link MarshallerProvider} to use
|
||||
* @param estimatedLength
|
||||
* the estimated byte length of the serialized form of an object.
|
||||
* If the length of the serialized form exceeds this value, the
|
||||
* internal buffer will be expanded automatically at the cost of
|
||||
* memory bandwidth. If this value is too big, it will also waste
|
||||
* memory bandwidth. To avoid unnecessary memory copy or allocation
|
||||
* cost, please specify the properly estimated value.
|
||||
*/
|
||||
public MarshallingEncoder(MarshallerProvider provider, int estimatedLength) {
|
||||
if (estimatedLength < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"estimatedLength: " + estimatedLength);
|
||||
}
|
||||
this.estimatedLength = estimatedLength;
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
|
||||
Marshaller marshaller = provider.getMarshaller(channel);
|
||||
ChannelBufferByteOutput output = new ChannelBufferByteOutput(ctx.getChannel().getConfig().getBufferFactory(), 256);
|
||||
ChannelBufferByteOutput output = new ChannelBufferByteOutput(ctx.getChannel().getConfig().getBufferFactory(), estimatedLength);
|
||||
marshaller.start(output);
|
||||
marshaller.write(LENGTH_PLACEHOLDER);
|
||||
marshaller.writeObject(msg);
|
||||
marshaller.finish();
|
||||
marshaller.close();
|
||||
|
||||
return output.getBuffer();
|
||||
|
||||
ChannelBuffer encoded = output.getBuffer();
|
||||
encoded.setInt(0, encoded.writerIndex() - 4);
|
||||
|
||||
return encoded;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ public abstract class AbstractMarshallingDecoderTest {
|
|||
MarshallerFactory marshallerFactory = createMarshallerFactory();
|
||||
MarshallingConfiguration configuration = createMarshallingConfig();
|
||||
|
||||
MarshallingDecoder mDecoder = createDecoder(1);
|
||||
CompatibleMarshallingDecoder mDecoder = createDecoder(1);
|
||||
DecoderEmbedder<Object> decoder = new DecoderEmbedder<Object>(mDecoder);
|
||||
|
||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
||||
|
@ -124,8 +124,8 @@ public abstract class AbstractMarshallingDecoderTest {
|
|||
|
||||
}
|
||||
|
||||
private MarshallingDecoder createDecoder(long maxObjectSize) {
|
||||
return new MarshallingDecoder(createProvider(createMarshallerFactory(), createMarshallingConfig()), maxObjectSize);
|
||||
private CompatibleMarshallingDecoder createDecoder(long maxObjectSize) {
|
||||
return new CompatibleMarshallingDecoder(createProvider(createMarshallerFactory(), createMarshallingConfig()), maxObjectSize);
|
||||
}
|
||||
|
||||
protected UnmarshallerProvider createProvider(MarshallerFactory factory, MarshallingConfiguration config) {
|
||||
|
|
|
@ -36,7 +36,7 @@ public abstract class AbstractMarshallingEncoderTest {
|
|||
final MarshallerFactory marshallerFactory = createMarshallerFactory();
|
||||
final MarshallingConfiguration configuration = createMarshallingConfig();
|
||||
|
||||
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(new MarshallingEncoder(createProvider()));
|
||||
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(new CompatibleMarshallingEncoder(createProvider()));
|
||||
|
||||
encoder.offer(testObject);
|
||||
Assert.assertTrue(encoder.finish());
|
||||
|
|
Loading…
Reference in New Issue
Block a user