Replace ChannelBuffer.toByteBuffer() with hasNioBuffer() and nioBuffer()

... just like we do with byte arrays.  toByteBuffer() and
toByteBuffers() had an indeterministic behavior and thus it could not
tell when the returned NIO buffer is shared or not.  nioBuffer() always
returns a view buffer of the Netty buffer.  The only case where
hasNioBuffer() returns false and nioBuffer() fails is the
CompositeChannelBuffer, which is not very commonly used and *slow*.
This commit is contained in:
Trustin Lee 2012-06-02 01:30:55 -07:00
parent 14e68aca57
commit cc4f705029
21 changed files with 171 additions and 159 deletions

View File

@ -600,18 +600,8 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer {
}
@Override
public ByteBuffer toByteBuffer() {
return toByteBuffer(readerIndex, readableBytes());
}
@Override
public ByteBuffer[] toByteBuffers() {
return toByteBuffers(readerIndex, readableBytes());
}
@Override
public ByteBuffer[] toByteBuffers(int index, int length) {
return new ByteBuffer[] { toByteBuffer(index, length) };
public ByteBuffer nioBuffer() {
return nioBuffer(readerIndex, readableBytes());
}
@Override
@ -625,8 +615,16 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer {
return "";
}
return ChannelBuffers.decodeString(
toByteBuffer(index, length), charset);
ByteBuffer nioBuffer;
if (hasNioBuffer()) {
nioBuffer = nioBuffer(index, length);
} else {
nioBuffer = ByteBuffer.allocate(length);
getBytes(index, nioBuffer);
nioBuffer.flip();
}
return ChannelBuffers.decodeString(nioBuffer, charset);
}
@Override

View File

@ -212,6 +212,10 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
@Override
public void setBytes(int index, ByteBuffer src) {
if (src == tmpBuf) {
src = src.duplicate();
}
tmpBuf.clear().position(index).limit(index + src.remaining());
tmpBuf.put(src);
}
@ -273,7 +277,12 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public boolean hasNioBuffer() {
return true;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
if (index == 0 && length == capacity()) {
return buffer.duplicate().order(order());
} else {

View File

@ -213,10 +213,9 @@ import java.nio.charset.UnsupportedCharsetException;
*
* <h4>NIO Buffers</h4>
*
* Various {@link #toByteBuffer()} and {@link #toByteBuffers()} methods convert
* a {@link ChannelBuffer} into one or more NIO buffers. These methods avoid
* buffer allocation and memory copy whenever possible, but there's no
* guarantee that memory copy will not be involved.
* If a {@link ChannelBuffer} can be converted into an NIO {@link ByteBuffer} which shares its
* content (i.e. view buffer), you can get it via the {@link #nioBuffer()} method. To determine
* if a buffer can be converted into an NIO buffer, use {@link #nioBuffer()}.
*
* <h4>Strings</h4>
*
@ -1651,42 +1650,37 @@ public interface ChannelBuffer extends Comparable<ChannelBuffer> {
ChannelBuffer duplicate();
/**
* Converts this buffer's readable bytes into a NIO buffer. The returned
* buffer might or might not share the content with this buffer, while
* they have separate indexes and marks. This method is identical to
* {@code buf.toByteBuffer(buf.readerIndex(), buf.readableBytes())}.
* This method does not modify {@code readerIndex} or {@code writerIndex} of
* this buffer.
* Returns {@code true} if and only if {@link #nioBuffer()} method will not fail.
*/
ByteBuffer toByteBuffer();
boolean hasNioBuffer();
/**
* Converts this buffer's sub-region into a NIO buffer. The returned
* buffer might or might not share the content with this buffer, while
* they have separate indexes and marks.
* This method does not modify {@code readerIndex} or {@code writerIndex} of
* this buffer.
* Exposes this buffer's readable bytes as an NIO {@link ByteBuffer}. The returned buffer
* shares the content with this buffer, while changing the position and limit of the returned
* NIO buffer does not affect the indexes and marks of this buffer. This method is identical
* to {@code buf.asByteBuffer(buf.readerIndex(), buf.readableBytes())}. This method does not
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer toByteBuffer(int index, int length);
ByteBuffer nioBuffer();
/**
* Converts this buffer's readable bytes into an array of NIO buffers.
* The returned buffers might or might not share the content with this
* buffer, while they have separate indexes and marks. This method is
* identical to {@code buf.toByteBuffers(buf.readerIndex(), buf.readableBytes())}.
* This method does not modify {@code readerIndex} or {@code writerIndex} of
* this buffer.
* Exposes this buffer's sub-region as an NIO {@link ByteBuffer}. The returned buffer
* shares the content with this buffer, while changing the position and limit of the returned
* NIO buffer does not affect the indexes and marks of this buffer. This method does not
* modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
* returned NIO buffer will not see the changes of this buffer if this buffer is a dynamic
* buffer and it adjusted its capacity.
*
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] toByteBuffers();
/**
* Converts this buffer's sub-region into an array of NIO buffers.
* The returned buffers might or might not share the content with this
* buffer, while they have separate indexes and marks.
* This method does not modify {@code readerIndex} or {@code writerIndex} of
* this buffer.
*/
ByteBuffer[] toByteBuffers(int index, int length);
ByteBuffer nioBuffer(int index, int length);
/**
* Returns {@code true} if and only if this buffer has a backing byte array.

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -297,10 +299,18 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
@Override
public int getBytes(int index, GatheringByteChannel out, int length)
throws IOException {
// XXX Gathering write is not supported because of a known issue.
// See http://bugs.sun.com/view_bug.do?bug_id=6210541
// This issue appeared in 2004 and is still unresolved!?
return out.write(toByteBuffer(index, length));
if (DetectionUtil.javaVersion() < 7) {
// XXX Gathering write is not supported because of a known issue.
// See http://bugs.sun.com/view_bug.do?bug_id=6210541
return out.write(copiedNioBuffer(index, length));
} else {
long writtenBytes = out.write(nioBuffers(index, length));
if (writtenBytes > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} else {
return (int) writtenBytes;
}
}
}
@Override
@ -593,12 +603,20 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public boolean hasNioBuffer() {
return false;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
throw new UnsupportedOperationException();
}
private ByteBuffer copiedNioBuffer(int index, int length) {
if (components.length == 1) {
return components[0].toByteBuffer(index, length);
return toNioBuffer(components[0], index, length);
}
ByteBuffer[] buffers = toByteBuffers(index, length);
ByteBuffer[] buffers = nioBuffers(index, length);
ByteBuffer merged = ByteBuffer.allocate(length).order(order());
for (ByteBuffer b: buffers) {
merged.put(b);
@ -607,8 +625,7 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
return merged;
}
@Override
public ByteBuffer[] toByteBuffers(int index, int length) {
private ByteBuffer[] nioBuffers(int index, int length) {
int componentId = componentId(index);
if (index + length > capacity()) {
throw new IndexOutOfBoundsException("Too many bytes to convert - Needs"
@ -619,10 +636,10 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
int i = componentId;
while (length > 0) {
ChannelBuffer s = components[i];
ChannelBuffer c = components[i];
int adjustment = indices[i];
int localLength = Math.min(length, s.capacity() - (index - adjustment));
buffers.add(s.toByteBuffer(index - adjustment, localLength));
int localLength = Math.min(length, c.capacity() - (index - adjustment));
buffers.add(toNioBuffer(c, index - adjustment, localLength));
index += localLength;
length -= localLength;
i ++;
@ -631,6 +648,14 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
return buffers.toArray(new ByteBuffer[buffers.size()]);
}
private static ByteBuffer toNioBuffer(ChannelBuffer buf, int index, int length) {
if (buf.hasNioBuffer()) {
return buf.nioBuffer(index, length);
} else {
return buf.copy(index, length).nioBuffer(0, length);
}
}
private int componentId(int index) {
int lastComponentId = lastAccessedComponentId;
if (index >= indices[lastComponentId]) {

View File

@ -206,7 +206,12 @@ public class DuplicatedChannelBuffer extends AbstractChannelBuffer implements Wr
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
return buffer.toByteBuffer(index, length);
public boolean hasNioBuffer() {
return buffer.hasNioBuffer();
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return buffer.nioBuffer(index, length);
}
}

View File

@ -322,7 +322,12 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
return buffer.toByteBuffer(index, length);
public boolean hasNioBuffer() {
return buffer.hasNioBuffer();
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return buffer.nioBuffer(index, length);
}
}

View File

@ -190,7 +190,12 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public boolean hasNioBuffer() {
return true;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return ByteBuffer.wrap(array, index, length).order(order());
}
}

View File

@ -206,17 +206,13 @@ public class ReadOnlyChannelBuffer extends AbstractChannelBuffer implements Wrap
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
return buffer.toByteBuffer(index, length).asReadOnlyBuffer();
public boolean hasNioBuffer() {
return buffer.hasNioBuffer();
}
@Override
public ByteBuffer[] toByteBuffers(int index, int length) {
ByteBuffer[] bufs = buffer.toByteBuffers(index, length);
for (int i = 0; i < bufs.length; i ++) {
bufs[i] = bufs[i].asReadOnlyBuffer();
}
return bufs;
public ByteBuffer nioBuffer(int index, int length) {
return buffer.nioBuffer(index, length).asReadOnlyBuffer();
}
@Override

View File

@ -240,9 +240,14 @@ public class SlicedChannelBuffer extends AbstractChannelBuffer implements Wrappe
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public boolean hasNioBuffer() {
return buffer.hasNioBuffer();
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
return buffer.toByteBuffer(index + adjustment, length);
return buffer.nioBuffer(index + adjustment, length);
}
private void checkIndex(int index) {

View File

@ -233,9 +233,14 @@ public class TruncatedChannelBuffer extends AbstractChannelBuffer implements Wra
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public boolean hasNioBuffer() {
return buffer.hasNioBuffer();
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
return buffer.toByteBuffer(index, length);
return buffer.nioBuffer(index, length);
}
private void checkIndex(int index) {

View File

@ -28,6 +28,7 @@ import java.util.Random;
import java.util.Set;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@ -1528,71 +1529,36 @@ public abstract class AbstractChannelBufferTest {
}
@Test
public void testToByteBuffer1() {
public void testNioBuffer1() {
Assume.assumeTrue(buffer.hasNioBuffer());
byte[] value = new byte[buffer.capacity()];
random.nextBytes(value);
buffer.clear();
buffer.writeBytes(value);
assertEquals(ByteBuffer.wrap(value), buffer.toByteBuffer());
assertEquals(ByteBuffer.wrap(value), buffer.nioBuffer());
}
@Test
public void testToByteBuffer2() {
Assume.assumeTrue(buffer.hasNioBuffer());
byte[] value = new byte[buffer.capacity()];
random.nextBytes(value);
buffer.clear();
buffer.writeBytes(value);
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
assertEquals(ByteBuffer.wrap(value, i, BLOCK_SIZE), buffer.toByteBuffer(i, BLOCK_SIZE));
assertEquals(ByteBuffer.wrap(value, i, BLOCK_SIZE), buffer.nioBuffer(i, BLOCK_SIZE));
}
}
@Test
public void testToByteBuffer3() {
assertEquals(buffer.order(), buffer.toByteBuffer().order());
}
Assume.assumeTrue(buffer.hasNioBuffer());
@Test
public void testToByteBuffers1() {
byte[] value = new byte[buffer.capacity()];
random.nextBytes(value);
buffer.clear();
buffer.writeBytes(value);
ByteBuffer[] nioBuffers = buffer.toByteBuffers();
int length = 0;
for (ByteBuffer b: nioBuffers) {
length += b.remaining();
}
ByteBuffer nioBuffer = ByteBuffer.allocate(length);
for (ByteBuffer b: nioBuffers) {
nioBuffer.put(b);
}
nioBuffer.flip();
assertEquals(ByteBuffer.wrap(value), nioBuffer);
}
@Test
public void testToByteBuffers2() {
byte[] value = new byte[buffer.capacity()];
random.nextBytes(value);
buffer.clear();
buffer.writeBytes(value);
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
ByteBuffer[] nioBuffers = buffer.toByteBuffers(i, BLOCK_SIZE);
ByteBuffer nioBuffer = ByteBuffer.allocate(BLOCK_SIZE);
for (ByteBuffer b: nioBuffers) {
nioBuffer.put(b);
}
nioBuffer.flip();
assertEquals(ByteBuffer.wrap(value, i, BLOCK_SIZE), nioBuffer);
}
assertEquals(buffer.order(), buffer.nioBuffer().order());
}
@Test

View File

@ -143,9 +143,9 @@ public abstract class AbstractCompositeChannelBufferTest extends
assertTrue(payload.readableBytes() == 512);
assertEquals(12 + 512, buffer.readableBytes());
assertEquals(12 + 512, buffer.toByteBuffer(0, 12 + 512).remaining());
assertFalse(buffer.hasNioBuffer());
}
@Test
public void testSeveralBuffersEquals() {
ChannelBuffer a, b;

View File

@ -49,8 +49,7 @@ public class ChannelBuffersTest {
assertTrue(payload.readableBytes() == 512);
assertEquals(12 + 512, buffer.readableBytes());
assertEquals(12 + 512, buffer.toByteBuffer(0, 12 + 512).remaining());
assertFalse(buffer.hasNioBuffer());
}
@Test

View File

@ -89,10 +89,8 @@ public class ReadOnlyChannelBufferTest {
expect(buf.getLong(21)).andReturn(22L);
ByteBuffer bb = ByteBuffer.allocate(100);
ByteBuffer[] bbs = { ByteBuffer.allocate(101), ByteBuffer.allocate(102) };
expect(buf.toByteBuffer(23, 24)).andReturn(bb);
expect(buf.toByteBuffers(25, 26)).andReturn(bbs);
expect(buf.nioBuffer(23, 24)).andReturn(bb);
expect(buf.capacity()).andReturn(27);
replay(buf);
@ -109,16 +107,10 @@ public class ReadOnlyChannelBufferTest {
assertEquals(20, roBuf.getInt(19));
assertEquals(22L, roBuf.getLong(21));
ByteBuffer roBB = roBuf.toByteBuffer(23, 24);
ByteBuffer roBB = roBuf.nioBuffer(23, 24);
assertEquals(100, roBB.capacity());
assertTrue(roBB.isReadOnly());
ByteBuffer[] roBBs = roBuf.toByteBuffers(25, 26);
assertEquals(2, roBBs.length);
assertEquals(101, roBBs[0].capacity());
assertTrue(roBBs[0].isReadOnly());
assertEquals(102, roBBs[1].capacity());
assertTrue(roBBs[1].isReadOnly());
assertEquals(27, roBuf.capacity());
verify(buf);

View File

@ -113,12 +113,11 @@ public abstract class AbstractDiskHttpData extends AbstractHttpData {
}
FileOutputStream outputStream = new FileOutputStream(file);
FileChannel localfileChannel = outputStream.getChannel();
ByteBuffer byteBuffer = buffer.toByteBuffer();
int written = 0;
while (written < size) {
written += localfileChannel.write(byteBuffer);
written += buffer.readBytes(
localfileChannel, (int) Math.min(size - written, Integer.MAX_VALUE));
}
buffer.readerIndex(buffer.readerIndex() + written);
localfileChannel.force(false);
localfileChannel.close();
completed = true;
@ -133,7 +132,6 @@ public abstract class AbstractDiskHttpData extends AbstractHttpData {
throw new IOException("Out of size: " + (size + localsize) +
" > " + definedSize);
}
ByteBuffer byteBuffer = buffer.toByteBuffer();
int written = 0;
if (file == null) {
file = tempFile();
@ -143,7 +141,7 @@ public abstract class AbstractDiskHttpData extends AbstractHttpData {
fileChannel = outputStream.getChannel();
}
while (written < localsize) {
written += fileChannel.write(byteBuffer);
written += buffer.readBytes(fileChannel, localsize - written);
}
size += localsize;
buffer.readerIndex(buffer.readerIndex() + written);

View File

@ -209,10 +209,9 @@ public abstract class AbstractMemoryHttpData extends AbstractHttpData {
int length = channelBuffer.readableBytes();
FileOutputStream outputStream = new FileOutputStream(dest);
FileChannel fileChannel = outputStream.getChannel();
ByteBuffer byteBuffer = channelBuffer.toByteBuffer();
int written = 0;
while (written < length) {
written += fileChannel.write(byteBuffer);
written += channelBuffer.readBytes(fileChannel, length - written);
}
fileChannel.force(false);
fileChannel.close();

View File

@ -628,25 +628,19 @@ class ReplayingDecoderBuffer implements ChannelBuffer {
}
@Override
public ByteBuffer toByteBuffer() {
public boolean hasNioBuffer() {
return buffer.hasNioBuffer();
}
@Override
public ByteBuffer nioBuffer() {
throw new UnreplayableOperationException();
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
return buffer.toByteBuffer(index, length);
}
@Override
public ByteBuffer[] toByteBuffers() {
throw new UnreplayableOperationException();
}
@Override
public ByteBuffer[] toByteBuffers(int index, int length) {
checkIndex(index, length);
return buffer.toByteBuffers(index, length);
return buffer.nioBuffer(index, length);
}
@Override

View File

@ -46,7 +46,7 @@ public abstract class AbstractCompatibleMarshallingEncoderTest {
ChannelBuffer buffer = encoder.poll();
Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
unmarshaller.start(Marshalling.createByteInput(truncate(buffer).toByteBuffer()));
unmarshaller.start(Marshalling.createByteInput(truncate(buffer).nioBuffer()));
String read = (String) unmarshaller.readObject();
Assert.assertEquals(testObject, read);

View File

@ -276,6 +276,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
}
} else {
ctx.nextOutboundMessageBuffer().add(currentEvent);
this.currentEvent = null;
}
if (!channel.isActive()) {

View File

@ -236,7 +236,12 @@ public final class ChannelBufferHolders {
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
public boolean hasNioBuffer() {
return true;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return ByteBuffer.allocate(0);
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
@ -175,7 +176,17 @@ public final class NioDatagramChannel extends AbstractNioMessageChannel implemen
@Override
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
DatagramPacket packet = (DatagramPacket) buf.peek();
final int writtenBytes = javaChannel().send(packet.data().toByteBuffer(), packet.remoteAddress());
ChannelBuffer data = packet.data();
ByteBuffer nioData;
if (data.hasNioBuffer()) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(data.readableBytes());
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
}
final int writtenBytes = javaChannel().send(nioData, packet.remoteAddress());
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();