* Added ChannelBufferFactory and its implementations

* Made sure ChannelBuffer implementations respect the associated ChannelBufferFactory
This commit is contained in:
Trustin Lee 2008-12-08 08:20:34 +00:00
parent 43de025c7c
commit fe98713cba
33 changed files with 572 additions and 93 deletions

View File

@ -267,8 +267,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer {
if (length == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
// TODO: Allow a user to specify the buffer factory as an overloaded method.
ChannelBuffer buf = ChannelBuffers.buffer(order(), length);
ChannelBuffer buf = factory().getBuffer(order(), length);
buf.writeBytes(this, readerIndex, length);
readerIndex += length;
return buf;

View File

@ -0,0 +1,66 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.buffer;
import java.nio.ByteOrder;
/**
* A skeletal implementation of {@link ChannelBufferFactory}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*/
public abstract class AbstractChannelBufferFactory implements ChannelBufferFactory {
private final ByteOrder defaultOrder;
/**
* Creates a new factory whose default {@link ByteOrder} is
* {@link ByteOrder#BIG_ENDIAN}.
*/
protected AbstractChannelBufferFactory() {
this(ByteOrder.BIG_ENDIAN);
}
/**
* Creates a new factory with the specified default {@link ByteOrder}.
*
* @param defaultOrder the default {@link ByteOrder} of this factory
*/
protected AbstractChannelBufferFactory(ByteOrder defaultOrder) {
if (defaultOrder == null) {
throw new NullPointerException("defaultOrder");
}
this.defaultOrder = defaultOrder;
}
public ChannelBuffer getBuffer(int capacity) {
return getBuffer(getDefaultOrder(), capacity);
}
public ByteOrder getDefaultOrder() {
return defaultOrder;
}
}

View File

@ -59,6 +59,10 @@ public class BigEndianHeapChannelBuffer extends HeapChannelBuffer {
super(array, readerIndex, writerIndex);
}
public ChannelBufferFactory factory() {
return HeapChannelBufferFactory.getInstance(ByteOrder.BIG_ENDIAN);
}
public ByteOrder order() {
return ByteOrder.BIG_ENDIAN;
}

View File

@ -68,6 +68,14 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
setIndex(buffer.readerIndex(), buffer.writerIndex());
}
public ChannelBufferFactory factory() {
if (buffer.isDirect()) {
return DirectChannelBufferFactory.getInstance(order());
} else {
return HeapChannelBufferFactory.getInstance(order());
}
}
public ByteOrder order() {
return buffer.order();
}

View File

@ -232,6 +232,12 @@ import java.util.NoSuchElementException;
*/
public interface ChannelBuffer extends Comparable<ChannelBuffer> {
/**
* Returns the factory which creates a {@link ChannelBuffer} whose
* type and default {@link ByteOrder} are same with this buffer.
*/
ChannelBufferFactory factory();
/**
* Returns the number of bytes (octets) this buffer can contain.
*/

View File

@ -0,0 +1,65 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.buffer;
import java.nio.ByteOrder;
/**
* Manages a set of reusable {@link ChannelBuffer}s.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*/
public interface ChannelBufferFactory {
/**
* Returns a {@link ChannelBuffer} with the specified {@code capacity}.
* This method is identical to {@code getBuffer(getDefaultOrder(), capacity)}.
*
* @param capacity the capacity of the returned {@link ChannelBuffer}
* @return a {@link ChannelBuffer} with the specified {@code capacity},
* whose {@code readerIndex} and {@code writerIndex} are {@code 0}
*/
ChannelBuffer getBuffer(int capacity);
/**
* Returns a {@link ChannelBuffer} with the specified {@code endianness}
* and {@code capacity}.
*
* @param endianness the endianness of the returned {@link ChannelBuffer}
* @param capacity the capacity of the returned {@link ChannelBuffer}
* @return a {@link ChannelBuffer} with the specified {@code endianness} and
* {@code capacity}, whose {@code readerIndex} and {@code writerIndex}
* are {@code 0}
*/
ChannelBuffer getBuffer(ByteOrder endianness, int capacity);
/**
* Returns the default endianness of the {@link ChannelBuffer} which is
* returned by {@link #getBuffer(int)}.
*
* @return the default endianness of the {@link ChannelBuffer} which is
* returned by {@link #getBuffer(int)}
*/
ByteOrder getDefaultOrder();
}

View File

@ -205,6 +205,18 @@ public class ChannelBuffers {
return new DynamicChannelBuffer(endianness, estimatedLength);
}
public static ChannelBuffer dynamicBuffer(int estimatedLength, ChannelBufferFactory factory) {
if (factory == null) {
throw new NullPointerException("factory");
}
return new DynamicChannelBuffer(factory.getDefaultOrder(), estimatedLength, factory);
}
public static ChannelBuffer dynamicBuffer(ByteOrder endianness, int estimatedLength, ChannelBufferFactory factory) {
return new DynamicChannelBuffer(endianness, estimatedLength, factory);
}
/**
* Creates a new big-endian buffer which wraps the specified {@code array}.
* A modification on the specified array's content will be visible to the

View File

@ -92,6 +92,10 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
setIndex(buffer.readerIndex(), buffer.writerIndex());
}
public ChannelBufferFactory factory() {
return HeapChannelBufferFactory.getInstance(order());
}
public ByteOrder order() {
return order;
}
@ -438,7 +442,12 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
throw new IndexOutOfBoundsException();
}
ChannelBuffer dst = ChannelBuffers.buffer(order(), length);
ChannelBuffer dst = factory().getBuffer(order(), length);
copyTo(index, length, sliceId, dst);
return dst;
}
private void copyTo(int index, int length, int sliceId, ChannelBuffer dst) {
int dstIndex = 0;
int i = sliceId;
@ -454,7 +463,6 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
}
dst.writerIndex(dst.capacity());
return dst;
}
public ChannelBuffer slice(int index, int length) {

View File

@ -0,0 +1,152 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.buffer;
import java.lang.ref.ReferenceQueue;
import java.nio.ByteOrder;
/**
* A {@link ChannelBufferFactory} which pre-allocates a large chunk of direct
* buffer and returns its slice on demand. Direct buffers are reclaimed via
* {@link ReferenceQueue} in most JDK implementations, and therefore they are
* deallocated less efficiently than an ordinary heap buffer. Consequently,
* a user will get {@link OutOfMemoryError} when one tries to allocate small
* direct buffers more often than the GC throughput of direct buffers, which
* is much lower than the GC throughput of heap buffers. This factory avoids
* this problem by allocating a large chunk of pre-allocated direct buffer and
* reducing the number of the garbage collected internal direct buffer objects.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class DirectChannelBufferFactory extends AbstractChannelBufferFactory {
private static final DirectChannelBufferFactory INSTANCE_BE =
new DirectChannelBufferFactory(ByteOrder.BIG_ENDIAN);
private static final DirectChannelBufferFactory INSTANCE_LE =
new DirectChannelBufferFactory(ByteOrder.LITTLE_ENDIAN);
public static ChannelBufferFactory getInstance() {
return INSTANCE_BE;
}
public static ChannelBufferFactory getInstance(ByteOrder endianness) {
if (endianness == ByteOrder.BIG_ENDIAN) {
return INSTANCE_BE;
} else if (endianness == ByteOrder.LITTLE_ENDIAN) {
return INSTANCE_LE;
} else if (endianness == null) {
throw new NullPointerException("endianness");
} else {
throw new IllegalStateException("Should not reach here");
}
}
private final Object bigEndianLock = new Object();
private final Object littleEndianLock = new Object();
private final int preallocatedBufferCapacity = 1048576;
private ChannelBuffer preallocatedBigEndianBuffer = null;
private int preallocatedBigEndianBufferPosition;
private ChannelBuffer preallocatedLittleEndianBuffer = null;
private int preallocatedLittleEndianBufferPosition;
/**
* Creates a new factory whose default {@link ByteOrder} is
* {@link ByteOrder#BIG_ENDIAN}.
*/
public DirectChannelBufferFactory() {
super();
}
/**
* Creates a new factory with the specified default {@link ByteOrder}.
*
* @param defaultOrder the default {@link ByteOrder} of this factory
*/
public DirectChannelBufferFactory(ByteOrder defaultOrder) {
super(defaultOrder);
}
public ChannelBuffer getBuffer(ByteOrder order, int capacity) {
if (order == null) {
throw new NullPointerException("order");
}
if (capacity < 0) {
throw new IllegalArgumentException("capacity: " + capacity);
}
if (capacity == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
if (capacity >= preallocatedBufferCapacity) {
return ChannelBuffers.directBuffer(order, capacity);
}
ChannelBuffer slice;
if (order == ByteOrder.BIG_ENDIAN) {
slice = allocateBigEndianBuffer(capacity);
} else {
slice = allocateLittleEndianBuffer(capacity);
}
return slice;
}
private ChannelBuffer allocateBigEndianBuffer(int capacity) {
ChannelBuffer slice;
synchronized (bigEndianLock) {
if (preallocatedBigEndianBuffer == null) {
preallocatedBigEndianBuffer = ChannelBuffers.directBuffer(ByteOrder.BIG_ENDIAN, preallocatedBufferCapacity);
slice = preallocatedBigEndianBuffer.slice(0, capacity);
preallocatedBigEndianBufferPosition = capacity;
} else if (preallocatedBigEndianBuffer.capacity() - preallocatedBigEndianBufferPosition >= capacity) {
slice = preallocatedBigEndianBuffer.slice(preallocatedBigEndianBufferPosition, capacity);
preallocatedBigEndianBufferPosition += capacity;
} else {
preallocatedBigEndianBuffer = ChannelBuffers.directBuffer(ByteOrder.BIG_ENDIAN, preallocatedBufferCapacity);
slice = preallocatedBigEndianBuffer.slice(0, capacity);
preallocatedBigEndianBufferPosition = capacity;
}
}
return slice;
}
private synchronized ChannelBuffer allocateLittleEndianBuffer(int capacity) {
ChannelBuffer slice;
synchronized (littleEndianLock) {
if (preallocatedLittleEndianBuffer == null) {
preallocatedLittleEndianBuffer = ChannelBuffers.directBuffer(ByteOrder.LITTLE_ENDIAN, preallocatedBufferCapacity);
slice = preallocatedLittleEndianBuffer.slice(0, capacity);
preallocatedLittleEndianBufferPosition = capacity;
} else if (preallocatedLittleEndianBuffer.capacity() - preallocatedLittleEndianBufferPosition >= capacity) {
slice = preallocatedLittleEndianBuffer.slice(preallocatedLittleEndianBufferPosition, capacity);
preallocatedLittleEndianBufferPosition += capacity;
} else {
preallocatedLittleEndianBuffer = ChannelBuffers.directBuffer(ByteOrder.LITTLE_ENDIAN, preallocatedBufferCapacity);
slice = preallocatedLittleEndianBuffer.slice(0, capacity);
preallocatedLittleEndianBufferPosition = capacity;
}
}
return slice;
}
}

View File

@ -63,6 +63,10 @@ public class DuplicatedChannelBuffer extends AbstractChannelBuffer implements Wr
return buffer;
}
public ChannelBufferFactory factory() {
return buffer.factory();
}
public ByteOrder order() {
return buffer.order();
}

View File

@ -44,6 +44,7 @@ import java.nio.channels.ScatteringByteChannel;
*/
public class DynamicChannelBuffer extends AbstractChannelBuffer {
private final ChannelBufferFactory factory;
private final int initialCapacity;
private final ByteOrder endianness;
private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
@ -53,17 +54,28 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
}
public DynamicChannelBuffer(ByteOrder endianness, int estimatedLength) {
this(endianness, estimatedLength, HeapChannelBufferFactory.getInstance(endianness));
}
public DynamicChannelBuffer(ByteOrder endianness, int estimatedLength, ChannelBufferFactory factory) {
if (estimatedLength < 0) {
throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
}
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (factory == null) {
throw new NullPointerException("factory");
}
this.factory = factory;
initialCapacity = estimatedLength;
this.endianness = endianness;
}
public ChannelBufferFactory factory() {
return factory;
}
public ByteOrder order() {
return endianness;
}
@ -215,7 +227,7 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
}
public ChannelBuffer copy(int index, int length) {
DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(endianness, Math.max(length, 64));
DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(order(), Math.max(length, 64), factory());
copiedBuffer.buffer = buffer.copy(index, length);
copiedBuffer.setIndex(0, length);
return copiedBuffer;
@ -239,10 +251,10 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
return buffer.toByteBuffer(index, length);
}
public String toString(int index, int length, String charsetName) {
return buffer.toString(index, length, charsetName);
}
private void ensureWritableBytes(int requestedBytes) {
if (requestedBytes <= writableBytes()) {
return;
@ -262,7 +274,7 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
newCapacity <<= 1;
}
ChannelBuffer newBuffer = ChannelBuffers.buffer(endianness, newCapacity);
ChannelBuffer newBuffer = factory().getBuffer(order(), newCapacity);
newBuffer.writeBytes(buffer, readerIndex(), readableBytes());
buffer = newBuffer;
}

View File

@ -0,0 +1,81 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.buffer;
import java.nio.ByteOrder;
/**
* A {@link ChannelBufferFactory} which merely allocates a heap buffer with
* the specified capacity. {@link HeapChannelBufferFactory} should perform
* very well in most situations because it relies on the JVM garbage collector,
* which is highly optimized for heap allocation.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class HeapChannelBufferFactory extends AbstractChannelBufferFactory {
private static final HeapChannelBufferFactory INSTANCE_BE =
new HeapChannelBufferFactory(ByteOrder.BIG_ENDIAN);
private static final HeapChannelBufferFactory INSTANCE_LE =
new HeapChannelBufferFactory(ByteOrder.LITTLE_ENDIAN);
public static ChannelBufferFactory getInstance() {
return INSTANCE_BE;
}
public static ChannelBufferFactory getInstance(ByteOrder endianness) {
if (endianness == ByteOrder.BIG_ENDIAN) {
return INSTANCE_BE;
} else if (endianness == ByteOrder.LITTLE_ENDIAN) {
return INSTANCE_LE;
} else if (endianness == null) {
throw new NullPointerException("endianness");
} else {
throw new IllegalStateException("Should not reach here");
}
}
/**
* Creates a new factory whose default {@link ByteOrder} is
* {@link ByteOrder#BIG_ENDIAN}.
*/
public HeapChannelBufferFactory() {
super();
}
/**
* Creates a new factory with the specified default {@link ByteOrder}.
*
* @param defaultOrder the default {@link ByteOrder} of this factory
*/
public HeapChannelBufferFactory(ByteOrder defaultOrder) {
super(defaultOrder);
}
public ChannelBuffer getBuffer(ByteOrder order, int capacity) {
return ChannelBuffers.buffer(order, capacity);
}
}

View File

@ -59,6 +59,10 @@ public class LittleEndianHeapChannelBuffer extends HeapChannelBuffer {
super(array, readerIndex, writerIndex);
}
public ChannelBufferFactory factory() {
return HeapChannelBufferFactory.getInstance(ByteOrder.LITTLE_ENDIAN);
}
public ByteOrder order() {
return ByteOrder.LITTLE_ENDIAN;
}

View File

@ -62,6 +62,10 @@ public class ReadOnlyChannelBuffer extends AbstractChannelBuffer implements Wrap
return buffer;
}
public ChannelBufferFactory factory() {
return buffer.factory();
}
public ByteOrder order() {
return buffer.order();
}

View File

@ -68,6 +68,10 @@ public class SlicedChannelBuffer extends AbstractChannelBuffer implements Wrappe
return buffer;
}
public ChannelBufferFactory factory() {
return buffer.factory();
}
public ByteOrder order() {
return buffer.order();
}

View File

@ -62,6 +62,10 @@ public class TruncatedChannelBuffer extends AbstractChannelBuffer implements Wra
return buffer;
}
public ChannelBufferFactory factory() {
return buffer.factory();
}
public ByteOrder order() {
return buffer.order();
}

View File

@ -25,10 +25,10 @@ package org.jboss.netty.channel;
import java.io.IOException;
import java.util.Map;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
/**
* A set of configuration properties of a {@link Channel}.
* <p>
@ -79,6 +79,10 @@ public interface ChannelConfig {
*/
void setOptions(Map<String, Object> options);
ChannelBufferFactory getBufferFactory();
void setBufferFactory(ChannelBufferFactory bufferFactory);
/**
* Returns the {@link ChannelPipelineFactory} which will be used when
* a child channel is created. If the {@link Channel} does not create

View File

@ -27,6 +27,8 @@ import java.net.SocketException;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.util.ConversionUtil;
@ -44,6 +46,7 @@ public class DefaultServerSocketChannelConfig implements ServerSocketChannelConf
private final ServerSocket socket;
private volatile int backlog;
private volatile ChannelPipelineFactory pipelineFactory;
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
/**
* Creates a new instance.
@ -72,6 +75,10 @@ public class DefaultServerSocketChannelConfig implements ServerSocketChannelConf
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
} else if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
} else if (key.equals("bufferFactory")) {
setBufferFactory((ChannelBufferFactory) value);
} else {
return false;
}
@ -125,6 +132,18 @@ public class DefaultServerSocketChannelConfig implements ServerSocketChannelConf
this.pipelineFactory = pipelineFactory;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
public int getBacklog() {
return backlog;
}

View File

@ -27,6 +27,8 @@ import java.net.SocketException;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.util.ConversionUtil;
@ -43,6 +45,7 @@ import org.jboss.netty.util.ConversionUtil;
public class DefaultSocketChannelConfig implements SocketChannelConfig {
private final Socket socket;
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile int connectTimeoutMillis = 10000; // 10 seconds
/**
@ -86,6 +89,8 @@ public class DefaultSocketChannelConfig implements SocketChannelConfig {
setConnectTimeoutMillis(ConversionUtil.toInt(value));
} else if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
} else if (key.equals("bufferFactory")) {
setBufferFactory((ChannelBufferFactory) value);
} else {
return false;
}
@ -217,6 +222,17 @@ public class DefaultSocketChannelConfig implements SocketChannelConfig {
return connectTimeoutMillis;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
public ChannelPipelineFactory getPipelineFactory() {
return null;
}

View File

@ -86,6 +86,7 @@ public interface NioSocketChannelConfig extends SocketChannelConfig {
*/
void setWriteSpinCount(int writeSpinCount);
// TODO Deprecate receiveBufferSizePredictor
/**
* Returns the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default

View File

@ -41,7 +41,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
@ -65,7 +64,6 @@ class NioWorker implements Runnable {
InternalLoggerFactory.getInstance(NioWorker.class);
private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
private static final boolean USE_DIRECT_BUFFER = false; // Hard-coded for now
private final int bossId;
private final int id;
@ -243,16 +241,7 @@ class NioWorker implements Runnable {
}
if (k.isReadable()) {
// TODO Replace ReceiveBufferSizePredictor with
// ChannelBufferAllocator and let user specify it per
// Channel. (Netty 3.1)
if (USE_DIRECT_BUFFER) {
readIntoDirectBuffer(k);
} else {
readIntoHeapBuffer(k);
}
read(k);
}
if (!k.isValid()) {
@ -266,69 +255,28 @@ class NioWorker implements Runnable {
}
}
private static void readIntoHeapBuffer(SelectionKey k) {
private ChannelBuffer preallocatedBuffer;
private static void read(SelectionKey k) {
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
NioSocketChannel channel = (NioSocketChannel) k.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
ChannelBuffer buf = ChannelBuffers.buffer(predictor.nextReceiveBufferSize());
int ret = 0;
int readBytes = 0;
boolean failure = true;
try {
while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) {
readBytes += ret;
if (!buf.writable()) {
break;
}
}
failure = false;
} catch (AsynchronousCloseException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buf);
}
if (ret < 0 || failure) {
close(k);
}
}
private ChannelBuffer preallocatedDirectBuffer;
private static void readIntoDirectBuffer(SelectionKey k) {
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
NioSocketChannel channel = (NioSocketChannel) k.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
ChannelBuffer preallocatedDirectBuffer = channel.getWorker().preallocatedDirectBuffer;
ChannelBuffer preallocatedBuffer = channel.getWorker().preallocatedBuffer;
NioWorker worker = channel.getWorker();
worker.preallocatedDirectBuffer = null;
worker.preallocatedBuffer = null;
if (preallocatedDirectBuffer == null) {
preallocatedDirectBuffer = ChannelBuffers.directBuffer(1048576);
if (preallocatedBuffer == null) {
// TODO Magic number
preallocatedBuffer = channel.getConfig().getBufferFactory().getBuffer(1048576);
}
int ret = 0;
int readBytes = 0;
boolean failure = true;
try {
while ((ret = preallocatedDirectBuffer.writeBytes(ch, preallocatedDirectBuffer.writableBytes())) > 0) {
while ((ret = preallocatedBuffer.writeBytes(ch, preallocatedBuffer.writableBytes())) > 0) {
readBytes += ret;
if (!preallocatedDirectBuffer.writable()) {
if (!preallocatedBuffer.writable()) {
break;
}
}
@ -340,20 +288,17 @@ class NioWorker implements Runnable {
}
if (readBytes > 0) {
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
ChannelBuffer slice = preallocatedDirectBuffer.slice(
preallocatedDirectBuffer.readerIndex(),
preallocatedDirectBuffer.readableBytes());
preallocatedDirectBuffer.readerIndex(preallocatedDirectBuffer.writerIndex());
if (preallocatedDirectBuffer.writable()) {
worker.preallocatedDirectBuffer = preallocatedDirectBuffer;
ChannelBuffer slice = preallocatedBuffer.slice(
preallocatedBuffer.readerIndex(),
preallocatedBuffer.readableBytes());
preallocatedBuffer.readerIndex(preallocatedBuffer.writerIndex());
if (preallocatedBuffer.writable()) {
worker.preallocatedBuffer = preallocatedBuffer;
}
fireMessageReceived(channel, slice);
} else if (readBytes == 0) {
worker.preallocatedDirectBuffer = preallocatedDirectBuffer;
worker.preallocatedBuffer = preallocatedBuffer;
}
if (ret < 0 || failure) {

View File

@ -27,6 +27,7 @@ import static org.jboss.netty.channel.Channels.*;
import java.util.LinkedList;
import java.util.Queue;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandler;
@ -54,6 +55,27 @@ public abstract class AbstractCodecEmbedder<T> implements CodecEmbedder<T> {
final Queue<Object> productQueue = new LinkedList<Object>();
protected AbstractCodecEmbedder(ChannelHandler... handlers) {
pipeline = Channels.pipeline();
configurePipeline(handlers);
channel = new EmbeddedChannel(pipeline, sink);
fireInitialEvents();
}
protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
pipeline = Channels.pipeline();
configurePipeline(handlers);
channel = new EmbeddedChannel(bufferFactory, pipeline, sink);
fireInitialEvents();
}
private void fireInitialEvents() {
// Fire the typical initial events.
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
fireChannelConnected(channel, channel.getRemoteAddress());
}
private void configurePipeline(ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -64,7 +86,6 @@ public abstract class AbstractCodecEmbedder<T> implements CodecEmbedder<T> {
ChannelHandler.class.getSimpleName() + '.');
}
pipeline = Channels.pipeline();
for (int i = 0; i < handlers.length; i ++) {
ChannelHandler h = handlers[i];
if (h == null) {
@ -73,12 +94,6 @@ public abstract class AbstractCodecEmbedder<T> implements CodecEmbedder<T> {
pipeline.addLast(String.valueOf(i), handlers[i]);
}
pipeline.addLast("SINK", sink);
channel = new EmbeddedChannel(pipeline, sink);
// Fire the typical initial events.
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
fireChannelConnected(channel, channel.getRemoteAddress());
}
public boolean finish() {

View File

@ -24,6 +24,7 @@ package org.jboss.netty.handler.codec.embedder;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelUpstreamHandler;
/**
@ -39,6 +40,10 @@ public class DecoderEmbedder<T> extends AbstractCodecEmbedder<T> {
super(handlers);
}
public DecoderEmbedder(ChannelBufferFactory bufferFactory, ChannelUpstreamHandler... handlers) {
super(bufferFactory, handlers);
}
public boolean offer(Object input) {
fireMessageReceived(getChannel(), input);
return !super.isEmpty();

View File

@ -24,6 +24,7 @@ package org.jboss.netty.handler.codec.embedder;
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelPipeline;
@ -36,15 +37,22 @@ import org.jboss.netty.channel.ChannelSink;
*/
class EmbeddedChannel extends AbstractChannel {
private final ChannelConfig config;
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
EmbeddedChannel(ChannelPipeline pipeline, ChannelSink sink) {
super(null, EmbeddedChannelFactory.INSTANCE, pipeline, sink);
config = EmbeddedChannelConfig.DEFAULT_INSTANCE;
}
EmbeddedChannel(ChannelBufferFactory bufferFactory, ChannelPipeline pipeline, ChannelSink sink) {
super(null, EmbeddedChannelFactory.INSTANCE, pipeline, sink);
config = new EmbeddedChannelConfig(bufferFactory);
}
public ChannelConfig getConfig() {
return EmbeddedChannelConfig.INSTANCE;
return config;
}
public SocketAddress getLocalAddress() {

View File

@ -24,6 +24,8 @@ package org.jboss.netty.handler.codec.embedder;
import java.util.Map;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -34,10 +36,16 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
*/
class EmbeddedChannelConfig implements ChannelConfig {
static final ChannelConfig INSTANCE = new EmbeddedChannelConfig();
static final ChannelConfig DEFAULT_INSTANCE =
new EmbeddedChannelConfig(HeapChannelBufferFactory.getInstance());
private EmbeddedChannelConfig() {
super();
private final ChannelBufferFactory bufferFactory;
EmbeddedChannelConfig(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
public int getConnectTimeoutMillis() {
@ -48,6 +56,10 @@ class EmbeddedChannelConfig implements ChannelConfig {
return null;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public int getWriteTimeoutMillis() {
return 0;
}
@ -64,6 +76,10 @@ class EmbeddedChannelConfig implements ChannelConfig {
// Unused
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
// Unused
}
public void setWriteTimeoutMillis(int writeTimeoutMillis) {
// Unused
}

View File

@ -24,6 +24,7 @@ package org.jboss.netty.handler.codec.embedder;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelDownstreamHandler;
/**
@ -39,6 +40,10 @@ public class EncoderEmbedder<T> extends AbstractCodecEmbedder<T> {
super(handlers);
}
public EncoderEmbedder(ChannelBufferFactory bufferFactory, ChannelDownstreamHandler... handlers) {
super(bufferFactory, handlers);
}
public boolean offer(Object input) {
write(getChannel(), input).setSuccess();
return !isEmpty();

View File

@ -147,6 +147,7 @@ import org.jboss.netty.channel.SimpleChannelHandler;
@ChannelPipelineCoverage("one")
public abstract class FrameDecoder extends SimpleChannelHandler {
// TODO Respect ChannelBufferFactory
private final ChannelBuffer cumulation = ChannelBuffers.dynamicBuffer();
@Override

View File

@ -89,6 +89,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
return null;
}
case READ_CONTENT: {
// TODO Respect ChannelBufferFactory
if (content == null) {
content = ChannelBuffers.dynamicBuffer();
}
@ -148,6 +149,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
private void readChunkedContent(ChannelBuffer buffer) {
if (content == null) {
// TODO Respect ChannelBufferFactory
content = ChannelBuffers.dynamicBuffer(chunkSize);
}
content.writeBytes(buffer, chunkSize);

View File

@ -51,6 +51,7 @@ public abstract class HttpMessageEncoder extends SimpleChannelHandler {
return;
}
HttpMessage request = (HttpMessage) e.getMessage();
// TODO Respect ChannelBufferFactory
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
encodeInitialLine(buf, request);
encodeHeaders(buf, request);

View File

@ -31,6 +31,7 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
/**
@ -201,6 +202,10 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
reject();
}
public ChannelBufferFactory factory() {
return buffer.factory();
}
public ByteOrder order() {
return buffer.order();
}

View File

@ -52,6 +52,7 @@ import org.jboss.netty.channel.MessageEvent;
@ChannelPipelineCoverage("all")
public class CompatibleObjectEncoder implements ChannelDownstreamHandler {
// TODO Respect ChannelBufferFactory
private final ChannelBuffer buffer = dynamicBuffer();
private final int resetInterval;
private volatile ObjectOutputStream oout;

View File

@ -96,6 +96,7 @@ public class ObjectEncoder implements ChannelDownstreamHandler {
}
MessageEvent e = (MessageEvent) evt;
// TODO Respect ChannelBufferFactory
ChannelBufferOutputStream bout =
new ChannelBufferOutputStream(dynamicBuffer(estimatedLength));
bout.write(LENGTH_PLACEHOLDER);

View File

@ -92,6 +92,7 @@ public class ObjectEncoderOutputStream extends OutputStream implements
}
public void writeObject(Object obj) throws IOException {
// TODO Respect ChannelBufferFactory
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(estimatedLength));
ObjectOutputStream oout = new CompactObjectOutputStream(bout);
oout.writeObject(obj);