diff --git a/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java b/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java index 871d28a49f..1be9f6cedf 100644 --- a/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java +++ b/src/main/java/org/jboss/netty/buffer/ChannelBuffers.java @@ -185,6 +185,14 @@ public class ChannelBuffers { return dynamicBuffer(BIG_ENDIAN, 256); } + public static ChannelBuffer dynamicBuffer(ChannelBufferFactory factory) { + if (factory == null) { + throw new NullPointerException("factory"); + } + + return new DynamicChannelBuffer(factory.getDefaultOrder(), 256, factory); + } + /** * Creates a new big-endian dynamic buffer with the specified estimated * data length. More accurate estimation yields less unexpected diff --git a/src/main/java/org/jboss/netty/buffer/DirectChannelBufferFactory.java b/src/main/java/org/jboss/netty/buffer/DirectChannelBufferFactory.java index c7767b22ce..c4f880a368 100644 --- a/src/main/java/org/jboss/netty/buffer/DirectChannelBufferFactory.java +++ b/src/main/java/org/jboss/netty/buffer/DirectChannelBufferFactory.java @@ -109,6 +109,7 @@ public class DirectChannelBufferFactory extends AbstractChannelBufferFactory { } else { slice = allocateLittleEndianBuffer(capacity); } + slice.clear(); return slice; } diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index 947c046858..83e9e8e062 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -25,10 +25,12 @@ package org.jboss.netty.handler.codec.frame; import static org.jboss.netty.channel.Channels.*; import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicReference; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelStateEvent; @@ -147,8 +149,8 @@ import org.jboss.netty.channel.SimpleChannelHandler; @ChannelPipelineCoverage("one") public abstract class FrameDecoder extends SimpleChannelHandler { - // TODO Respect ChannelBufferFactory - private final ChannelBuffer cumulation = ChannelBuffers.dynamicBuffer(); + private final AtomicReference cumulation = + new AtomicReference(); @Override public void messageReceived( @@ -165,6 +167,7 @@ public abstract class FrameDecoder extends SimpleChannelHandler { return; } + ChannelBuffer cumulation = cumulation(e); if (cumulation.readable()) { cumulation.discardReadBytes(); cumulation.writeBytes(input); @@ -253,6 +256,7 @@ public abstract class FrameDecoder extends SimpleChannelHandler { private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + ChannelBuffer cumulation = cumulation(e); try { if (cumulation.readable()) { // Make sure all frames are read before notifying a closed channel. @@ -269,4 +273,16 @@ public abstract class FrameDecoder extends SimpleChannelHandler { ctx.sendUpstream(e); } } + + private ChannelBuffer cumulation(ChannelEvent e) { + ChannelBuffer buf = cumulation.get(); + if (buf == null) { + buf = ChannelBuffers.dynamicBuffer( + e.getChannel().getConfig().getBufferFactory()); + if (!cumulation.compareAndSet(null, buf)) { + buf = cumulation.get(); + } + } + return buf; + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java index 6ff898dd0c..c25c491a5a 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java @@ -89,9 +89,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder> extends SimpleChannelHandler { - private final ChannelBuffer cumulation = new UnsafeDynamicChannelBuffer(256); - private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(cumulation); + private final AtomicReference cumulation = + new AtomicReference(); + private volatile ReplayingDecoderBuffer replayable; private volatile T state; private volatile int checkpoint; @@ -235,7 +238,7 @@ public abstract class ReplayingDecoder> extends SimpleChannelH * Stores the internal cumulative buffer's reader position. */ protected void checkpoint() { - checkpoint = cumulation.readerIndex(); + checkpoint = cumulation().readerIndex(); } /** @@ -243,8 +246,8 @@ public abstract class ReplayingDecoder> extends SimpleChannelH * the current decoder state. */ protected void checkpoint(T state) { + checkpoint = cumulation().readerIndex(); this.state = state; - checkpoint = cumulation.readerIndex(); } /** @@ -309,9 +312,10 @@ public abstract class ReplayingDecoder> extends SimpleChannelH return; } + ChannelBuffer cumulation = cumulation(ctx); cumulation.discardReadBytes(); cumulation.writeBytes(input); - callDecode(ctx, e.getChannel(), e.getRemoteAddress()); + callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); } @Override @@ -332,7 +336,7 @@ public abstract class ReplayingDecoder> extends SimpleChannelH ctx.sendUpstream(e); } - private void callDecode(ChannelHandlerContext context, Channel channel, SocketAddress remoteAddress) throws Exception { + private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { while (cumulation.readable()) { int oldReaderIndex = checkpoint = cumulation.readerIndex(); Object result = null; @@ -373,10 +377,11 @@ public abstract class ReplayingDecoder> extends SimpleChannelH private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + ChannelBuffer cumulation = cumulation(ctx); try { if (cumulation.readable()) { // Make sure all data was read before notifying a closed channel. - callDecode(ctx, e.getChannel(), null); + callDecode(ctx, e.getChannel(), cumulation, null); if (cumulation.readable()) { // and send the remainders too if necessary. Object partiallyDecoded = decodeLast(ctx, e.getChannel(), cumulation, state); @@ -391,4 +396,27 @@ public abstract class ReplayingDecoder> extends SimpleChannelH ctx.sendUpstream(e); } } + + + private ChannelBuffer cumulation(ChannelHandlerContext ctx) { + ChannelBuffer buf = cumulation.get(); + if (buf == null) { + ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); + buf = new UnsafeDynamicChannelBuffer(factory); + if (cumulation.compareAndSet(null, buf)) { + replayable = new ReplayingDecoderBuffer(buf); + } else { + buf = cumulation.get(); + } + } + return buf; + } + + private ChannelBuffer cumulation() { + ChannelBuffer cumulation = this.cumulation.get(); + if (cumulation == null) { + throw new IllegalStateException("Should be called in decode() only"); + } + return cumulation; + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java b/src/main/java/org/jboss/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java index ab9f2420d1..f4a928851b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java +++ b/src/main/java/org/jboss/netty/handler/codec/replay/UnsafeDynamicChannelBuffer.java @@ -22,8 +22,7 @@ */ package org.jboss.netty.handler.codec.replay; -import java.nio.ByteOrder; - +import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.DynamicChannelBuffer; /** @@ -35,12 +34,8 @@ import org.jboss.netty.buffer.DynamicChannelBuffer; */ class UnsafeDynamicChannelBuffer extends DynamicChannelBuffer { - UnsafeDynamicChannelBuffer(int estimatedLength) { - super(estimatedLength); - } - - UnsafeDynamicChannelBuffer(ByteOrder endianness, int estimatedLength) { - super(endianness, estimatedLength); + UnsafeDynamicChannelBuffer(ChannelBufferFactory factory) { + super(factory.getDefaultOrder(), 256, factory); } @Override diff --git a/src/main/java/org/jboss/netty/handler/codec/serialization/CompatibleObjectEncoder.java b/src/main/java/org/jboss/netty/handler/codec/serialization/CompatibleObjectEncoder.java index e875d9efe7..7f22b7d6d6 100644 --- a/src/main/java/org/jboss/netty/handler/codec/serialization/CompatibleObjectEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/serialization/CompatibleObjectEncoder.java @@ -22,15 +22,17 @@ */ package org.jboss.netty.handler.codec.serialization; -import static org.jboss.netty.buffer.ChannelBuffers.*; import static org.jboss.netty.channel.Channels.*; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicReference; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBufferOutputStream; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; @@ -49,11 +51,11 @@ import org.jboss.netty.channel.MessageEvent; * * @version $Rev:231 $, $Date:2008-06-12 16:44:50 +0900 (목, 12 6월 2008) $ */ -@ChannelPipelineCoverage("all") +@ChannelPipelineCoverage("one") public class CompatibleObjectEncoder implements ChannelDownstreamHandler { - // TODO Respect ChannelBufferFactory - private final ChannelBuffer buffer = dynamicBuffer(); + private final AtomicReference buffer = + new AtomicReference(); private final int resetInterval; private volatile ObjectOutputStream oout; private int writtenObjects; @@ -99,12 +101,8 @@ public class CompatibleObjectEncoder implements ChannelDownstreamHandler { } MessageEvent e = (MessageEvent) evt; - - buffer.clear(); - if (oout == null) { - oout = newObjectOutputStream(new ChannelBufferOutputStream(buffer)); - } - + ChannelBuffer buffer = buffer(context); + ObjectOutputStream oout = this.oout; if (resetInterval != 0) { // Resetting will prevent OOM on the receiving side. writtenObjects ++; @@ -116,6 +114,29 @@ public class CompatibleObjectEncoder implements ChannelDownstreamHandler { oout.flush(); ChannelBuffer encoded = buffer.readBytes(buffer.readableBytes()); + buffer.discardReadBytes(); write(context, e.getChannel(), e.getFuture(), encoded, e.getRemoteAddress()); } + + private ChannelBuffer buffer(ChannelHandlerContext ctx) throws Exception { + ChannelBuffer buf = buffer.get(); + if (buf == null) { + ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); + buf = ChannelBuffers.dynamicBuffer(factory); + if (buffer.compareAndSet(null, buf)) { + boolean success = false; + try { + oout = newObjectOutputStream(new ChannelBufferOutputStream(buf)); + success = true; + } finally { + if (!success) { + oout = null; + } + } + } else { + buf = buffer.get(); + } + } + return buf; + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoder.java b/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoder.java index 2e9be8cbf3..b0daed16b9 100644 --- a/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoder.java @@ -96,9 +96,9 @@ public class ObjectEncoder implements ChannelDownstreamHandler { } MessageEvent e = (MessageEvent) evt; - // TODO Respect ChannelBufferFactory ChannelBufferOutputStream bout = - new ChannelBufferOutputStream(dynamicBuffer(estimatedLength)); + new ChannelBufferOutputStream(dynamicBuffer( + estimatedLength, e.getChannel().getConfig().getBufferFactory())); bout.write(LENGTH_PLACEHOLDER); ObjectOutputStream oout = new CompactObjectOutputStream(bout); oout.writeObject(e.getMessage()); diff --git a/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java b/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java index 6e47bea5df..319af9ef43 100644 --- a/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java +++ b/src/main/java/org/jboss/netty/handler/codec/serialization/ObjectEncoderOutputStream.java @@ -92,8 +92,8 @@ public class ObjectEncoderOutputStream extends OutputStream implements } public void writeObject(Object obj) throws IOException { - // TODO Respect ChannelBufferFactory - ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(estimatedLength)); + ChannelBufferOutputStream bout = new ChannelBufferOutputStream( + ChannelBuffers.dynamicBuffer(estimatedLength)); ObjectOutputStream oout = new CompactObjectOutputStream(bout); oout.writeObject(obj); oout.flush();