Replace free() with reference counting / Fix SlicedByteBuf.unsafe()

- based on @normanmaurer's feed back
- Added Unpooled.compositeBuffer(int)
This commit is contained in:
Trustin Lee 2012-07-20 12:33:17 +09:00
parent 5a613f379e
commit 8d813b127c
11 changed files with 180 additions and 34 deletions

View File

@ -39,6 +39,8 @@ public abstract class AbstractByteBuf implements ByteBuf {
private int markedReaderIndex; private int markedReaderIndex;
private int markedWriterIndex; private int markedWriterIndex;
int refCnt = 1;
protected AbstractByteBuf(ByteOrder endianness, int maxCapacity) { protected AbstractByteBuf(ByteOrder endianness, int maxCapacity) {
if (endianness == null) { if (endianness == null) {
throw new NullPointerException("endianness"); throw new NullPointerException("endianness");

View File

@ -1797,7 +1797,7 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
/** /**
* Returns the internal NIO buffer that is reused for I/O. * Returns the internal NIO buffer that is reused for I/O.
* *
* @throws IllegalStateException if the buffer has no internal NIO buffer * @throws UnsupportedOperationException if the buffer has no internal NIO buffer
*/ */
ByteBuffer nioBuffer(); ByteBuffer nioBuffer();
@ -1809,9 +1809,15 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
ByteBuf newBuffer(int initialCapacity); ByteBuf newBuffer(int initialCapacity);
/** /**
* Deallocates the internal memory block of the buffer explicitly. The result of accessing * Increases the reference count of the buffer.
* a freed buffer is unspecified and can even cause JVM crash.
*/ */
void free(); void acquire();
/**
* Decreases the reference count of the buffer. If decreased to 0, the internal memory
* block of the buffer will be deallocated. The result of accessing a freed buffer is
* unspecified and can even cause JVM crash.
*/
void release();
} }
} }

View File

@ -17,7 +17,7 @@ package io.netty.buffer;
import java.util.List; import java.util.List;
public interface CompositeByteBuf extends ByteBuf { public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
void addComponent(ByteBuf buffer); void addComponent(ByteBuf buffer);
void addComponent(int cIndex, ByteBuf buffer); void addComponent(int cIndex, ByteBuf buffer);

View File

@ -26,6 +26,7 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel; import java.nio.channels.ScatteringByteChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
@ -44,6 +45,11 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
private Component lastAccessed; private Component lastAccessed;
private int lastAccessedId; private int lastAccessedId;
public DefaultCompositeByteBuf(int maxNumComponents) {
super(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE);
this.maxNumComponents = maxNumComponents;
}
public DefaultCompositeByteBuf(int maxNumComponents, ByteBuf... buffers) { public DefaultCompositeByteBuf(int maxNumComponents, ByteBuf... buffers) {
super(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE); super(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE);
@ -58,7 +64,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
this.maxNumComponents = maxNumComponents; this.maxNumComponents = maxNumComponents;
// TODO: Handle the case where the numer of specified buffers already exceeds maxNumComponents. // TODO: Handle the case where the number of specified buffers already exceeds maxNumComponents.
for (ByteBuf b: buffers) { for (ByteBuf b: buffers) {
if (b == null) { if (b == null) {
break; break;
@ -124,7 +130,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
} }
consolidated.writeBytes(buffer, buffer.readerIndex(), readableBytes); consolidated.writeBytes(buffer, buffer.readerIndex(), readableBytes);
Component c = new Component(consolidated.slice()); Component c = new Component(consolidated);
c.endOffset = c.length; c.endOffset = c.length;
components.clear(); components.clear();
components.add(c); components.add(c);
@ -201,6 +207,15 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
updateComponentOffsets(cIndex); updateComponentOffsets(cIndex);
} }
@Override
public Iterator<ByteBuf> iterator() {
List<ByteBuf> list = new ArrayList<ByteBuf>(components.size());
for (Component c: components) {
list.add(c.buf);
}
return list.iterator();
}
@Override @Override
public List<ByteBuf> decompose(int offset, int length) { public List<ByteBuf> decompose(int offset, int length) {
if (length == 0) { if (length == 0) {
@ -305,7 +320,6 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
padding = last.buf.unsafe().newBuffer(paddingLength); padding = last.buf.unsafe().newBuffer(paddingLength);
} }
padding.setIndex(0, paddingLength); padding.setIndex(0, paddingLength);
padding = padding.slice();
addComponent(padding); addComponent(padding);
} else if (newCapacity < oldCapacity) { } else if (newCapacity < oldCapacity) {
int bytesToTrim = oldCapacity - newCapacity; int bytesToTrim = oldCapacity - newCapacity;
@ -321,6 +335,8 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
Component newC = new Component(c.buf.slice(0, c.length - bytesToTrim)); Component newC = new Component(c.buf.slice(0, c.length - bytesToTrim));
newC.offset = c.offset; newC.offset = c.offset;
newC.endOffset = newC.offset + newC.length; newC.endOffset = newC.offset + newC.length;
c.buf.unsafe().release();
i.set(newC);
break; break;
} }
} }
@ -916,14 +932,16 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
final ByteBuf consolidated = last.buf.unsafe().newBuffer(capacity); final ByteBuf consolidated = last.buf.unsafe().newBuffer(capacity);
for (int i = cIndex; i < endCIndex; i ++) { for (int i = cIndex; i < endCIndex; i ++) {
consolidated.writeBytes(components.get(i).buf); ByteBuf b = components.get(i).buf;
consolidated.writeBytes(b);
b.unsafe().release();
} }
for (int i = numComponents - 1; i > 0; i --) { for (int i = numComponents - 1; i > 0; i --) {
components.remove(cIndex); components.remove(cIndex);
} }
components.set(cIndex, new Component(consolidated.slice())); components.set(cIndex, new Component(consolidated));
updateComponentOffsets(cIndex); updateComponentOffsets(cIndex);
} }
@ -937,6 +955,9 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
// Discard everything if (readerIndex = writerIndex = capacity). // Discard everything if (readerIndex = writerIndex = capacity).
int writerIndex = writerIndex(); int writerIndex = writerIndex();
if (readerIndex == writerIndex && writerIndex == capacity()) { if (readerIndex == writerIndex && writerIndex == capacity()) {
for (Component c: components) {
c.buf.unsafe().release();
}
components.clear(); components.clear();
setIndex(0, 0); setIndex(0, 0);
adjustMarkers(readerIndex); adjustMarkers(readerIndex);
@ -946,7 +967,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
// Remove read components. // Remove read components.
int firstComponentId = toComponentIndex(readerIndex); int firstComponentId = toComponentIndex(readerIndex);
for (int i = 0; i < firstComponentId; i ++) { for (int i = 0; i < firstComponentId; i ++) {
components.remove(0); components.remove(0).buf.unsafe().release();
} }
// Update indexes and markers. // Update indexes and markers.
@ -966,6 +987,9 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
// Discard everything if (readerIndex = writerIndex = capacity). // Discard everything if (readerIndex = writerIndex = capacity).
int writerIndex = writerIndex(); int writerIndex = writerIndex();
if (readerIndex == writerIndex && writerIndex == capacity()) { if (readerIndex == writerIndex && writerIndex == capacity()) {
for (Component c: components) {
c.buf.unsafe().release();
}
components.clear(); components.clear();
setIndex(0, 0); setIndex(0, 0);
adjustMarkers(readerIndex); adjustMarkers(readerIndex);
@ -975,14 +999,15 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
// Remove read components. // Remove read components.
int firstComponentId = toComponentIndex(readerIndex); int firstComponentId = toComponentIndex(readerIndex);
for (int i = 0; i < firstComponentId; i ++) { for (int i = 0; i < firstComponentId; i ++) {
components.remove(0); components.remove(0).buf.unsafe().release();
} }
// Replace the first readable component with a new slice. // Replace the first readable component with a new slice.
Component c = components.get(0); Component c = components.get(0);
int adjustment = readerIndex - c.offset; int adjustment = readerIndex - c.offset;
c = new Component(c.buf.slice(adjustment, c.length - adjustment)); Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
components.set(0, c); c.buf.unsafe().release();
components.set(0, newC);
// Update indexes and markers. // Update indexes and markers.
updateComponentOffsets(0); updateComponentOffsets(0);
@ -1017,7 +1042,10 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
private final class CompositeUnsafe implements Unsafe { private final class CompositeUnsafe implements Unsafe {
@Override @Override
public ByteBuffer nioBuffer() { public ByteBuffer nioBuffer() {
return null; if (components.size() == 1) {
return components.get(0).buf.unsafe().nioBuffer();
}
throw new UnsupportedOperationException();
} }
@Override @Override
@ -1028,8 +1056,27 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
} }
@Override @Override
public void free() { public void acquire() {
// NOOP if (refCnt <= 0) {
throw new IllegalStateException();
}
refCnt ++;
}
@Override
public void release() {
if (refCnt <= 0) {
throw new IllegalStateException();
}
refCnt --;
if (refCnt == 0) {
for (Component c: components) {
c.buf.unsafe().release();
}
components.clear();
lastAccessed = null;
}
} }
} }
} }

View File

@ -412,19 +412,29 @@ public class DirectByteBuf extends AbstractByteBuf {
} }
@Override @Override
public void free() { public void acquire() {
if (buffer == null) { if (refCnt <= 0) {
return; throw new IllegalStateException();
} }
refCnt ++;
}
@Override
public void release() {
if (refCnt <= 0) {
throw new IllegalStateException();
}
refCnt --;
if (refCnt == 0) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(buffer);
}
if (doNotFree) {
buffer = null; buffer = null;
doNotFree = false; tmpBuf = null;
return;
} }
freeDirect(buffer);
buffer = null;
} }
} }
} }

View File

@ -30,6 +30,7 @@ import java.nio.channels.ScatteringByteChannel;
*/ */
public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf { public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
private final Unsafe unsafe = new DuplicatedUnsafe();
final ByteBuf buffer; final ByteBuf buffer;
public DuplicatedByteBuf(ByteBuf buffer) { public DuplicatedByteBuf(ByteBuf buffer) {
@ -42,6 +43,8 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
} }
setIndex(buffer.readerIndex(), buffer.writerIndex()); setIndex(buffer.readerIndex(), buffer.writerIndex());
buffer.unsafe().acquire();
} }
@Override @Override
@ -210,6 +213,29 @@ public class DuplicatedByteBuf extends AbstractByteBuf implements WrappedByteBuf
@Override @Override
public Unsafe unsafe() { public Unsafe unsafe() {
return buffer.unsafe(); return unsafe;
}
private final class DuplicatedUnsafe implements Unsafe {
@Override
public ByteBuffer nioBuffer() {
return buffer.unsafe().nioBuffer();
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
return buffer.unsafe().newBuffer(initialCapacity);
}
@Override
public void acquire() {
buffer.unsafe().acquire();
}
@Override
public void release() {
buffer.unsafe().release();
}
} }
} }

View File

@ -301,9 +301,23 @@ public class HeapByteBuf extends AbstractByteBuf {
} }
@Override @Override
public void free() { public void acquire() {
array = null; if (refCnt <= 0) {
nioBuf = null; throw new IllegalStateException();
}
refCnt ++;
}
@Override
public void release() {
if (refCnt <= 0) {
throw new IllegalStateException();
}
refCnt --;
if (refCnt == 0) {
array = null;
nioBuf = null;
}
} }
} }
} }

View File

@ -31,6 +31,7 @@ import java.nio.channels.ScatteringByteChannel;
*/ */
public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf { public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
private final Unsafe unsafe = new SlicedUnsafe();
private final ByteBuf buffer; private final ByteBuf buffer;
private final int adjustment; private final int adjustment;
private final int length; private final int length;
@ -60,6 +61,8 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
this.length = length; this.length = length;
writerIndex(length); writerIndex(length);
buffer.unsafe().acquire();
} }
@Override @Override
@ -277,6 +280,29 @@ public class SlicedByteBuf extends AbstractByteBuf implements WrappedByteBuf {
@Override @Override
public Unsafe unsafe() { public Unsafe unsafe() {
return buffer.unsafe(); return unsafe;
}
private final class SlicedUnsafe implements Unsafe {
@Override
public ByteBuffer nioBuffer() {
return buffer.nioBuffer(adjustment, length);
}
@Override
public ByteBuf newBuffer(int initialCapacity) {
return buffer.unsafe().newBuffer(initialCapacity);
}
@Override
public void acquire() {
buffer.unsafe().acquire();
}
@Override
public void release() {
buffer.unsafe().release();
}
} }
} }

View File

@ -353,6 +353,20 @@ public final class Unpooled {
return EMPTY_BUFFER; return EMPTY_BUFFER;
} }
/**
* Returns a new big-endian composite buffer with no components.
*/
public static CompositeByteBuf compositeBuffer() {
return compositeBuffer(16);
}
/**
* Returns a new big-endian composite buffer with no components.
*/
public static CompositeByteBuf compositeBuffer(int maxNumComponents) {
return new DefaultCompositeByteBuf(maxNumComponents);
}
/** /**
* Creates a new big-endian buffer whose content is a copy of the * Creates a new big-endian buffer whose content is a copy of the
* specified {@code array}. The new buffer's {@code readerIndex} and * specified {@code array}. The new buffer's {@code readerIndex} and

View File

@ -52,7 +52,9 @@ public abstract class AbstractCompositeChannelBufferTest extends
buffers.add(Unpooled.EMPTY_BUFFER); buffers.add(Unpooled.EMPTY_BUFFER);
} }
buffer = Unpooled.wrappedBuffer(buffers.toArray(new ByteBuf[buffers.size()])).order(order); buffer = Unpooled.wrappedBuffer(
Integer.MAX_VALUE, buffers.toArray(new ByteBuf[buffers.size()])).order(order);
assertEquals(length, buffer.capacity()); assertEquals(length, buffer.capacity());
assertEquals(length, buffer.readableBytes()); assertEquals(length, buffer.readableBytes());
assertFalse(buffer.writable()); assertFalse(buffer.writable());

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaders.*; import static io.netty.handler.codec.http.HttpHeaders.*;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.DefaultCompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -138,7 +137,7 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING); m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
} }
m.setChunked(false); m.setChunked(false);
m.setContent(new DefaultCompositeByteBuf(maxCumulationBufferComponents)); m.setContent(Unpooled.compositeBuffer(maxCumulationBufferComponents));
this.currentMessage = m; this.currentMessage = m;
return null; return null;
} else { } else {