Reduce GC in ChannelBuffer bulk operations
- Pre-create an NIO ByteBuffer for reuse instead of creating a new one every time
This commit is contained in:
parent
25599018f2
commit
02cb7adf03
|
@ -32,6 +32,7 @@ import java.nio.channels.ScatteringByteChannel;
|
||||||
public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
|
|
||||||
private final ByteBuffer buffer;
|
private final ByteBuffer buffer;
|
||||||
|
private final ByteBuffer tmpBuf;
|
||||||
private final ByteOrder order;
|
private final ByteOrder order;
|
||||||
private final int capacity;
|
private final int capacity;
|
||||||
|
|
||||||
|
@ -45,12 +46,14 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
|
|
||||||
order = buffer.order();
|
order = buffer.order();
|
||||||
this.buffer = buffer.slice().order(order);
|
this.buffer = buffer.slice().order(order);
|
||||||
|
tmpBuf = this.buffer.duplicate();
|
||||||
capacity = buffer.remaining();
|
capacity = buffer.remaining();
|
||||||
writerIndex(capacity);
|
writerIndex(capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {
|
private ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {
|
||||||
this.buffer = buffer.buffer;
|
this.buffer = buffer.buffer;
|
||||||
|
tmpBuf = this.buffer.duplicate();
|
||||||
order = buffer.order;
|
order = buffer.order;
|
||||||
capacity = buffer.capacity;
|
capacity = buffer.capacity;
|
||||||
setIndex(buffer.readerIndex(), buffer.writerIndex());
|
setIndex(buffer.readerIndex(), buffer.writerIndex());
|
||||||
|
@ -126,9 +129,8 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
|
public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
|
||||||
if (dst instanceof ByteBufferBackedChannelBuffer) {
|
if (dst instanceof ByteBufferBackedChannelBuffer) {
|
||||||
ByteBufferBackedChannelBuffer bbdst = (ByteBufferBackedChannelBuffer) dst;
|
ByteBufferBackedChannelBuffer bbdst = (ByteBufferBackedChannelBuffer) dst;
|
||||||
ByteBuffer data = bbdst.buffer.duplicate();
|
ByteBuffer data = bbdst.tmpBuf;
|
||||||
|
data.clear().position(dstIndex).limit(dstIndex + length);
|
||||||
data.limit(dstIndex + length).position(dstIndex);
|
|
||||||
getBytes(index, data);
|
getBytes(index, data);
|
||||||
} else if (buffer.hasArray()) {
|
} else if (buffer.hasArray()) {
|
||||||
dst.setBytes(dstIndex, buffer.array(), index + buffer.arrayOffset(), length);
|
dst.setBytes(dstIndex, buffer.array(), index + buffer.arrayOffset(), length);
|
||||||
|
@ -139,25 +141,23 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
|
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
|
||||||
ByteBuffer data = buffer.duplicate();
|
|
||||||
try {
|
try {
|
||||||
data.limit(index + length).position(index);
|
tmpBuf.clear().position(index).limit(index + length);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new IndexOutOfBoundsException();
|
throw new IndexOutOfBoundsException();
|
||||||
}
|
}
|
||||||
data.get(dst, dstIndex, length);
|
tmpBuf.get(dst, dstIndex, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getBytes(int index, ByteBuffer dst) {
|
public void getBytes(int index, ByteBuffer dst) {
|
||||||
ByteBuffer data = buffer.duplicate();
|
|
||||||
int bytesToCopy = Math.min(capacity() - index, dst.remaining());
|
int bytesToCopy = Math.min(capacity() - index, dst.remaining());
|
||||||
try {
|
try {
|
||||||
data.limit(index + bytesToCopy).position(index);
|
tmpBuf.clear().position(index).limit(index + bytesToCopy);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new IndexOutOfBoundsException();
|
throw new IndexOutOfBoundsException();
|
||||||
}
|
}
|
||||||
dst.put(data);
|
dst.put(tmpBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -191,9 +191,9 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
public void setBytes(int index, ChannelBuffer src, int srcIndex, int length) {
|
public void setBytes(int index, ChannelBuffer src, int srcIndex, int length) {
|
||||||
if (src instanceof ByteBufferBackedChannelBuffer) {
|
if (src instanceof ByteBufferBackedChannelBuffer) {
|
||||||
ByteBufferBackedChannelBuffer bbsrc = (ByteBufferBackedChannelBuffer) src;
|
ByteBufferBackedChannelBuffer bbsrc = (ByteBufferBackedChannelBuffer) src;
|
||||||
ByteBuffer data = bbsrc.buffer.duplicate();
|
ByteBuffer data = bbsrc.tmpBuf;
|
||||||
|
|
||||||
data.limit(srcIndex + length).position(srcIndex);
|
data.clear().position(srcIndex).limit(srcIndex + length);
|
||||||
setBytes(index, data);
|
setBytes(index, data);
|
||||||
} else if (buffer.hasArray()) {
|
} else if (buffer.hasArray()) {
|
||||||
src.getBytes(srcIndex, buffer.array(), index + buffer.arrayOffset(), length);
|
src.getBytes(srcIndex, buffer.array(), index + buffer.arrayOffset(), length);
|
||||||
|
@ -204,16 +204,14 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBytes(int index, byte[] src, int srcIndex, int length) {
|
public void setBytes(int index, byte[] src, int srcIndex, int length) {
|
||||||
ByteBuffer data = buffer.duplicate();
|
tmpBuf.clear().position(index).limit(index + length);
|
||||||
data.limit(index + length).position(index);
|
tmpBuf.put(src, srcIndex, length);
|
||||||
data.put(src, srcIndex, length);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBytes(int index, ByteBuffer src) {
|
public void setBytes(int index, ByteBuffer src) {
|
||||||
ByteBuffer data = buffer.duplicate();
|
tmpBuf.clear().position(index).limit(index + src.remaining());
|
||||||
data.limit(index + src.remaining()).position(index);
|
tmpBuf.put(src);
|
||||||
data.put(src);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -229,7 +227,8 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
length);
|
length);
|
||||||
} else {
|
} else {
|
||||||
byte[] tmp = new byte[length];
|
byte[] tmp = new byte[length];
|
||||||
((ByteBuffer) buffer.duplicate().position(index)).get(tmp);
|
tmpBuf.clear().position(index);
|
||||||
|
tmpBuf.get(tmp);
|
||||||
out.write(tmp);
|
out.write(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -240,7 +239,8 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return out.write((ByteBuffer) buffer.duplicate().position(index).limit(index + length));
|
tmpBuf.clear().position(index).limit(index + length);
|
||||||
|
return out.write(tmpBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -279,7 +279,8 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
readBytes += localReadBytes;
|
readBytes += localReadBytes;
|
||||||
i += readBytes;
|
i += readBytes;
|
||||||
} while (i < tmp.length);
|
} while (i < tmp.length);
|
||||||
((ByteBuffer) buffer.duplicate().position(index)).put(tmp);
|
tmpBuf.clear().position(index);
|
||||||
|
tmpBuf.put(tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return readBytes;
|
return readBytes;
|
||||||
|
@ -289,29 +290,12 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
public int setBytes(int index, ScatteringByteChannel in, int length)
|
public int setBytes(int index, ScatteringByteChannel in, int length)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
ByteBuffer slice = (ByteBuffer) buffer.duplicate().limit(index + length).position(index);
|
tmpBuf.clear().position(index).limit(index + length);
|
||||||
int readBytes = 0;
|
try {
|
||||||
|
return in.read(tmpBuf);
|
||||||
while (readBytes < length) {
|
} catch (ClosedChannelException e) {
|
||||||
int localReadBytes;
|
return -1;
|
||||||
try {
|
|
||||||
localReadBytes = in.read(slice);
|
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
localReadBytes = -1;
|
|
||||||
}
|
|
||||||
if (localReadBytes < 0) {
|
|
||||||
if (readBytes == 0) {
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
return readBytes;
|
|
||||||
}
|
|
||||||
} else if (localReadBytes == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
readBytes += localReadBytes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return readBytes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -319,7 +303,7 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
if (index == 0 && length == capacity()) {
|
if (index == 0 && length == capacity()) {
|
||||||
return buffer.duplicate().order(order());
|
return buffer.duplicate().order(order());
|
||||||
} else {
|
} else {
|
||||||
return ((ByteBuffer) buffer.duplicate().position(
|
return ((ByteBuffer) tmpBuf.clear().position(
|
||||||
index).limit(index + length)).slice().order(order());
|
index).limit(index + length)).slice().order(order());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -335,7 +319,7 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
return ChannelBuffers.EMPTY_BUFFER;
|
return ChannelBuffers.EMPTY_BUFFER;
|
||||||
}
|
}
|
||||||
return new ByteBufferBackedChannelBuffer(
|
return new ByteBufferBackedChannelBuffer(
|
||||||
((ByteBuffer) buffer.duplicate().position(
|
((ByteBuffer) tmpBuf.clear().position(
|
||||||
index).limit(index + length)).order(order()));
|
index).limit(index + length)).order(order()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -349,12 +333,12 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||||
public ChannelBuffer copy(int index, int length) {
|
public ChannelBuffer copy(int index, int length) {
|
||||||
ByteBuffer src;
|
ByteBuffer src;
|
||||||
try {
|
try {
|
||||||
src = (ByteBuffer) buffer.duplicate().position(index).limit(index + length);
|
src = (ByteBuffer) tmpBuf.clear().position(index).limit(index + length);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new IndexOutOfBoundsException();
|
throw new IndexOutOfBoundsException();
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuffer dst = buffer.isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
|
ByteBuffer dst = src.isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
|
||||||
dst.put(src);
|
dst.put(src);
|
||||||
dst.order(order());
|
dst.order(order());
|
||||||
dst.clear();
|
dst.clear();
|
||||||
|
|
|
@ -33,6 +33,8 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
|
||||||
*/
|
*/
|
||||||
protected final byte[] array;
|
protected final byte[] array;
|
||||||
|
|
||||||
|
protected final ByteBuffer nioBuf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new heap buffer with a newly allocated byte array.
|
* Creates a new heap buffer with a newly allocated byte array.
|
||||||
*
|
*
|
||||||
|
@ -64,6 +66,7 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
|
||||||
}
|
}
|
||||||
this.array = array;
|
this.array = array;
|
||||||
setIndex(readerIndex, writerIndex);
|
setIndex(readerIndex, writerIndex);
|
||||||
|
nioBuf = ByteBuffer.wrap(array);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -124,7 +127,7 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
|
||||||
@Override
|
@Override
|
||||||
public int getBytes(int index, GatheringByteChannel out, int length)
|
public int getBytes(int index, GatheringByteChannel out, int length)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return out.write(ByteBuffer.wrap(array, index, length));
|
return out.write((ByteBuffer) nioBuf.clear().position(index).limit(index + length));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -173,29 +176,11 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
|
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
|
||||||
ByteBuffer buf = ByteBuffer.wrap(array, index, length);
|
try {
|
||||||
int readBytes = 0;
|
return in.read((ByteBuffer) nioBuf.clear().position(index).limit(index + length));
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
do {
|
return -1;
|
||||||
int localReadBytes;
|
}
|
||||||
try {
|
|
||||||
localReadBytes = in.read(buf);
|
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
localReadBytes = -1;
|
|
||||||
}
|
|
||||||
if (localReadBytes < 0) {
|
|
||||||
if (readBytes == 0) {
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (localReadBytes == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
readBytes += localReadBytes;
|
|
||||||
} while (readBytes < length);
|
|
||||||
|
|
||||||
return readBytes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue
Block a user