Made sure all out-of-the-box encoders and decoders respect the ChannelBufferFactory configuration

This commit is contained in:
Trustin Lee 2008-12-08 09:02:33 +00:00
parent 22b3885fe5
commit 1fa791c4a4
10 changed files with 107 additions and 39 deletions

View File

@ -185,6 +185,14 @@ public class ChannelBuffers {
return dynamicBuffer(BIG_ENDIAN, 256);
}
public static ChannelBuffer dynamicBuffer(ChannelBufferFactory factory) {
if (factory == null) {
throw new NullPointerException("factory");
}
return new DynamicChannelBuffer(factory.getDefaultOrder(), 256, factory);
}
/**
* Creates a new big-endian dynamic buffer with the specified estimated
* data length. More accurate estimation yields less unexpected

View File

@ -109,6 +109,7 @@ public class DirectChannelBufferFactory extends AbstractChannelBufferFactory {
} else {
slice = allocateLittleEndianBuffer(capacity);
}
slice.clear();
return slice;
}

View File

@ -25,10 +25,12 @@ package org.jboss.netty.handler.codec.frame;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
@ -147,8 +149,8 @@ import org.jboss.netty.channel.SimpleChannelHandler;
@ChannelPipelineCoverage("one")
public abstract class FrameDecoder extends SimpleChannelHandler {
// TODO Respect ChannelBufferFactory
private final ChannelBuffer cumulation = ChannelBuffers.dynamicBuffer();
private final AtomicReference<ChannelBuffer> cumulation =
new AtomicReference<ChannelBuffer>();
@Override
public void messageReceived(
@ -165,6 +167,7 @@ public abstract class FrameDecoder extends SimpleChannelHandler {
return;
}
ChannelBuffer cumulation = cumulation(e);
if (cumulation.readable()) {
cumulation.discardReadBytes();
cumulation.writeBytes(input);
@ -253,6 +256,7 @@ public abstract class FrameDecoder extends SimpleChannelHandler {
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ChannelBuffer cumulation = cumulation(e);
try {
if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel.
@ -269,4 +273,16 @@ public abstract class FrameDecoder extends SimpleChannelHandler {
ctx.sendUpstream(e);
}
}
private ChannelBuffer cumulation(ChannelEvent e) {
ChannelBuffer buf = cumulation.get();
if (buf == null) {
buf = ChannelBuffers.dynamicBuffer(
e.getChannel().getConfig().getBufferFactory());
if (!cumulation.compareAndSet(null, buf)) {
buf = cumulation.get();
}
}
return buf;
}
}

View File

@ -89,9 +89,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return null;
}
case READ_CONTENT: {
// TODO Respect ChannelBufferFactory
if (content == null) {
content = ChannelBuffers.dynamicBuffer();
content = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
}
//this will cause a replay error until the channel is closed where this will read whats left in the buffer
content.writeBytes(buffer.readBytes(buffer.readableBytes()));
@ -122,7 +121,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
}
}
case READ_CHUNKED_CONTENT: {
readChunkedContent(buffer);
readChunkedContent(channel, buffer);
}
case READ_CRLF: {
byte next = buffer.readByte();
@ -147,10 +146,10 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return message;
}
private void readChunkedContent(ChannelBuffer buffer) {
private void readChunkedContent(Channel channel, ChannelBuffer buffer) {
if (content == null) {
// TODO Respect ChannelBufferFactory
content = ChannelBuffers.dynamicBuffer(chunkSize);
content = ChannelBuffers.dynamicBuffer(
chunkSize, channel.getConfig().getBufferFactory());
}
content.writeBytes(buffer, chunkSize);
nextState = State.READ_CHUNK_SIZE;

View File

@ -51,8 +51,8 @@ public abstract class HttpMessageEncoder extends SimpleChannelHandler {
return;
}
HttpMessage request = (HttpMessage) e.getMessage();
// TODO Respect ChannelBufferFactory
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(
e.getChannel().getConfig().getBufferFactory());
encodeInitialLine(buf, request);
encodeHeaders(buf, request);
buf.writeBytes(CRLF);

View File

@ -25,8 +25,10 @@ package org.jboss.netty.handler.codec.replay;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
@ -212,8 +214,9 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
@ChannelPipelineCoverage("one")
public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelHandler {
private final ChannelBuffer cumulation = new UnsafeDynamicChannelBuffer(256);
private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(cumulation);
private final AtomicReference<ChannelBuffer> cumulation =
new AtomicReference<ChannelBuffer>();
private volatile ReplayingDecoderBuffer replayable;
private volatile T state;
private volatile int checkpoint;
@ -235,7 +238,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
* Stores the internal cumulative buffer's reader position.
*/
protected void checkpoint() {
checkpoint = cumulation.readerIndex();
checkpoint = cumulation().readerIndex();
}
/**
@ -243,8 +246,8 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
* the current decoder state.
*/
protected void checkpoint(T state) {
checkpoint = cumulation().readerIndex();
this.state = state;
checkpoint = cumulation.readerIndex();
}
/**
@ -309,9 +312,10 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
return;
}
ChannelBuffer cumulation = cumulation(ctx);
cumulation.discardReadBytes();
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), e.getRemoteAddress());
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
}
@Override
@ -332,7 +336,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
ctx.sendUpstream(e);
}
private void callDecode(ChannelHandlerContext context, Channel channel, SocketAddress remoteAddress) throws Exception {
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
while (cumulation.readable()) {
int oldReaderIndex = checkpoint = cumulation.readerIndex();
Object result = null;
@ -373,10 +377,11 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ChannelBuffer cumulation = cumulation(ctx);
try {
if (cumulation.readable()) {
// Make sure all data was read before notifying a closed channel.
callDecode(ctx, e.getChannel(), null);
callDecode(ctx, e.getChannel(), cumulation, null);
if (cumulation.readable()) {
// and send the remainders too if necessary.
Object partiallyDecoded = decodeLast(ctx, e.getChannel(), cumulation, state);
@ -391,4 +396,27 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
ctx.sendUpstream(e);
}
}
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
ChannelBuffer buf = cumulation.get();
if (buf == null) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
buf = new UnsafeDynamicChannelBuffer(factory);
if (cumulation.compareAndSet(null, buf)) {
replayable = new ReplayingDecoderBuffer(buf);
} else {
buf = cumulation.get();
}
}
return buf;
}
private ChannelBuffer cumulation() {
ChannelBuffer cumulation = this.cumulation.get();
if (cumulation == null) {
throw new IllegalStateException("Should be called in decode() only");
}
return cumulation;
}
}

View File

@ -22,8 +22,7 @@
*/
package org.jboss.netty.handler.codec.replay;
import java.nio.ByteOrder;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.DynamicChannelBuffer;
/**
@ -35,12 +34,8 @@ import org.jboss.netty.buffer.DynamicChannelBuffer;
*/
class UnsafeDynamicChannelBuffer extends DynamicChannelBuffer {
UnsafeDynamicChannelBuffer(int estimatedLength) {
super(estimatedLength);
}
UnsafeDynamicChannelBuffer(ByteOrder endianness, int estimatedLength) {
super(endianness, estimatedLength);
UnsafeDynamicChannelBuffer(ChannelBufferFactory factory) {
super(factory.getDefaultOrder(), 256, factory);
}
@Override

View File

@ -22,15 +22,17 @@
*/
package org.jboss.netty.handler.codec.serialization;
import static org.jboss.netty.buffer.ChannelBuffers.*;
import static org.jboss.netty.channel.Channels.*;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -49,11 +51,11 @@ import org.jboss.netty.channel.MessageEvent;
*
* @version $Rev:231 $, $Date:2008-06-12 16:44:50 +0900 (, 12 6월 2008) $
*/
@ChannelPipelineCoverage("all")
@ChannelPipelineCoverage("one")
public class CompatibleObjectEncoder implements ChannelDownstreamHandler {
// TODO Respect ChannelBufferFactory
private final ChannelBuffer buffer = dynamicBuffer();
private final AtomicReference<ChannelBuffer> buffer =
new AtomicReference<ChannelBuffer>();
private final int resetInterval;
private volatile ObjectOutputStream oout;
private int writtenObjects;
@ -99,12 +101,8 @@ public class CompatibleObjectEncoder implements ChannelDownstreamHandler {
}
MessageEvent e = (MessageEvent) evt;
buffer.clear();
if (oout == null) {
oout = newObjectOutputStream(new ChannelBufferOutputStream(buffer));
}
ChannelBuffer buffer = buffer(context);
ObjectOutputStream oout = this.oout;
if (resetInterval != 0) {
// Resetting will prevent OOM on the receiving side.
writtenObjects ++;
@ -116,6 +114,29 @@ public class CompatibleObjectEncoder implements ChannelDownstreamHandler {
oout.flush();
ChannelBuffer encoded = buffer.readBytes(buffer.readableBytes());
buffer.discardReadBytes();
write(context, e.getChannel(), e.getFuture(), encoded, e.getRemoteAddress());
}
private ChannelBuffer buffer(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer buf = buffer.get();
if (buf == null) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
buf = ChannelBuffers.dynamicBuffer(factory);
if (buffer.compareAndSet(null, buf)) {
boolean success = false;
try {
oout = newObjectOutputStream(new ChannelBufferOutputStream(buf));
success = true;
} finally {
if (!success) {
oout = null;
}
}
} else {
buf = buffer.get();
}
}
return buf;
}
}

View File

@ -96,9 +96,9 @@ public class ObjectEncoder implements ChannelDownstreamHandler {
}
MessageEvent e = (MessageEvent) evt;
// TODO Respect ChannelBufferFactory
ChannelBufferOutputStream bout =
new ChannelBufferOutputStream(dynamicBuffer(estimatedLength));
new ChannelBufferOutputStream(dynamicBuffer(
estimatedLength, e.getChannel().getConfig().getBufferFactory()));
bout.write(LENGTH_PLACEHOLDER);
ObjectOutputStream oout = new CompactObjectOutputStream(bout);
oout.writeObject(e.getMessage());

View File

@ -92,8 +92,8 @@ public class ObjectEncoderOutputStream extends OutputStream implements
}
public void writeObject(Object obj) throws IOException {
// TODO Respect ChannelBufferFactory
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(estimatedLength));
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
ChannelBuffers.dynamicBuffer(estimatedLength));
ObjectOutputStream oout = new CompactObjectOutputStream(bout);
oout.writeObject(obj);
oout.flush();