Implement reference counting

- Related: #1029
- Replace Freeable with ReferenceCounted
- Add AbstractReferenceCounted
- Add AbstractReferenceCountedByteBuf
- Add AbstractDerivedByteBuf
- Add EmptyByteBuf
This commit is contained in:
Trustin Lee 2013-02-10 13:10:09 +09:00
parent 8f895a7e9a
commit b9996908b1
71 changed files with 1710 additions and 610 deletions

View File

@ -182,7 +182,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf discardReadBytes() {
checkUnfreed();
ensureAccessible();
if (readerIndex == 0) {
return this;
}
@ -201,7 +201,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf discardSomeReadBytes() {
checkUnfreed();
ensureAccessible();
if (readerIndex == 0) {
return this;
}
@ -972,7 +972,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public String toString() {
if (isFreed()) {
if (refCnt() == 0) {
return getClass().getSimpleName() + "(freed)";
}
@ -999,7 +999,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
}
protected final void checkIndex(int index) {
checkUnfreed();
ensureAccessible();
if (index < 0 || index >= capacity()) {
throw new IndexOutOfBoundsException(String.format(
"index: %d (expected: range(0, %d))", index, capacity()));
@ -1007,7 +1007,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
}
protected final void checkIndex(int index, int fieldLength) {
checkUnfreed();
ensureAccessible();
if (fieldLength < 0) {
throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
}
@ -1023,7 +1023,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
* than the specified value.
*/
protected final void checkReadableBytes(int minimumReadableBytes) {
checkUnfreed();
ensureAccessible();
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
@ -1031,8 +1031,8 @@ public abstract class AbstractByteBuf implements ByteBuf {
}
}
protected final void checkUnfreed() {
if (isFreed()) {
protected final void ensureAccessible() {
if (refCnt() <= 0) {
throw new IllegalBufferAccessException();
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
public abstract class AbstractDerivedByteBuf extends AbstractByteBuf {
protected AbstractDerivedByteBuf(int maxCapacity) {
super(maxCapacity);
}
@Override
public final int refCnt() {
return unwrap().refCnt();
}
@Override
public final void retain() { }
@Override
public final void retain(int increment) { }
@Override
public final boolean release() {
return false;
}
@Override
public final boolean release(int decrement) {
return false;
}
@Override
public final ByteBuf suspendIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
@Override
public final ByteBuf resumeIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
}

View File

@ -22,7 +22,7 @@ import java.util.Collection;
public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements MessageBuf<T> {
private final int maxCapacity;
private boolean freed;
private int refCnt = 1;
protected AbstractMessageBuf(int maxCapacity) {
if (maxCapacity < 0) {
@ -37,24 +37,79 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
}
@Override
public final boolean isFreed() {
return freed;
public final int refCnt() {
return refCnt;
}
@Override
public final void free() {
if (freed) {
return;
public final void retain() {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
try {
doFree();
} finally {
freed = true;
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalBufferAccessException("refCnt overflow");
}
this.refCnt = refCnt + 1;
}
protected abstract void doFree();
@Override
public final void retain(int increment) {
if (increment <= 0) {
throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
}
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCnt > Integer.MAX_VALUE - increment) {
throw new IllegalBufferAccessException("refCnt overflow");
}
this.refCnt = refCnt + 1;
}
@Override
public final boolean release() {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
this.refCnt = refCnt --;
if (refCnt == 0) {
deallocate();
return true;
}
return false;
}
@Override
public final boolean release(int decrement) {
if (decrement <= 0) {
throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
}
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalBufferAccessException();
}
this.refCnt = refCnt -= decrement;
if (refCnt == 0) {
deallocate();
return true;
}
return false;
}
protected abstract void deallocate();
@Override
public final int maxCapacity() {
@ -87,8 +142,8 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
return size() <= maxCapacity - size;
}
protected final void checkUnfreed() {
if (isFreed()) {
protected final void ensureAccessible() {
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
}
@ -133,7 +188,7 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
@Override
public int drainTo(Collection<? super T> c) {
checkUnfreed();
ensureAccessible();
int cnt = 0;
for (;;) {
T o = poll();
@ -148,7 +203,7 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
checkUnfreed();
ensureAccessible();
int cnt = 0;
while (cnt < maxElements) {
T o = poll();
@ -163,7 +218,7 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
@Override
public String toString() {
if (isFreed()) {
if (refCnt <= 0) {
return getClass().getSimpleName() + "(freed)";
}

View File

@ -0,0 +1,104 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public abstract class AbstractReferenceCounted implements ReferenceCounted {
private static final AtomicIntegerFieldUpdater<AbstractReferenceCounted> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCounted.class, "refCnt");
@SuppressWarnings("FieldMayBeFinal")
private volatile int refCnt = 1;
@Override
public int refCnt() {
return refCnt;
}
@Override
public final void retain() {
do {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalBufferAccessException("refCnt overflow");
}
} while (!refCntUpdater.compareAndSet(this, refCnt, refCnt + 1));
}
@Override
public final void retain(int increment) {
if (increment <= 0) {
throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
}
do {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCnt > Integer.MAX_VALUE - increment) {
throw new IllegalBufferAccessException("refCnt overflow");
}
} while (!refCntUpdater.compareAndSet(this, refCnt, refCnt + increment));
}
@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
@Override
public final boolean release(int decrement) {
if (decrement <= 0) {
throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
}
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalBufferAccessException();
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
protected abstract void deallocate();
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
@SuppressWarnings("FieldMayBeFinal")
private volatile int refCnt = 1;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
}
@Override
public int refCnt() {
return refCnt;
}
@Override
public final void retain() {
do {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalBufferAccessException("refCnt overflow");
}
} while (!refCntUpdater.compareAndSet(this, refCnt, refCnt + 1));
}
@Override
public final void retain(int increment) {
if (increment <= 0) {
throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
}
do {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCnt > Integer.MAX_VALUE - increment) {
throw new IllegalBufferAccessException("refCnt overflow");
}
} while (!refCntUpdater.compareAndSet(this, refCnt, refCnt + increment));
}
@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt <= 0) {
throw new IllegalBufferAccessException();
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
@Override
public final boolean release(int decrement) {
if (decrement <= 0) {
throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
}
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalBufferAccessException();
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
protected abstract void deallocate();
}

View File

@ -18,7 +18,7 @@ package io.netty.buffer;
/**
* A buffer to operate on
*/
public interface Buf extends Freeable {
public interface Buf extends ReferenceCounted {
/**
* The BufType which will be handled by the Buf implementation
*/

View File

@ -43,20 +43,47 @@ public final class BufUtil {
}
/**
* Try to call {@link Freeable#free()} if the specified message implements {@link Freeable}. If the specified
* message doesn't implement {@link Freeable}, this method does nothing.
* Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static void free(Object msg) {
if (msg instanceof Freeable) {
try {
((Freeable) msg).free();
} catch (UnsupportedOperationException e) {
// This can happen for derived buffers
// TODO: Think about this
}
public static void retain(Object msg) {
if (msg instanceof ReferenceCounted) {
((ReferenceCounted) msg).retain();
}
}
/**
* Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static void retain(Object msg, int increment) {
if (msg instanceof ReferenceCounted) {
((ReferenceCounted) msg).retain(increment);
}
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
/**
* Try to call {@link ReferenceCounted#release()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
*/
public static boolean release(Object msg, int decrement) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release(decrement);
}
return false;
}
/**
* Returns a <a href="http://en.wikipedia.org/wiki/Hex_dump">hex dump</a>
* of the specified buffer's readable bytes.

View File

@ -1914,19 +1914,4 @@ public interface ByteBuf extends Buf, Comparable<ByteBuf> {
*/
@Override
String toString();
/**
* Deallocates the internal memory block of this buffer or returns it to the allocator or pool it came from.
* The result of accessing a released buffer is unspecified and can even cause JVM crash.
*
* @throws UnsupportedOperationException if this buffer is derived
*/
@Override
void free();
/**
* Returns {@code true} if and only if this buffer has been deallocated by {@link #free()}.
*/
@Override
boolean isFreed();
}

View File

@ -16,15 +16,9 @@
package io.netty.buffer;
/**
* A packet which is send or receive. The contract for a {@link ByteBufHolder} is the
* following:
*
* When send a {@link ByteBufHolder} the {@link ByteBufHolder} will be freed by calling {@link #free()}
* in the actual transport implementation. When receive a {@link ByteBufHolder} the {@link #free()}
* must be called once is is processed.
*
* A packet which is send or receive.
*/
public interface ByteBufHolder extends Freeable {
public interface ByteBufHolder extends ReferenceCounted {
/**
* Return the data which is held by this {@link ByteBufHolder}.
@ -33,20 +27,7 @@ public interface ByteBufHolder extends Freeable {
ByteBuf data();
/**
* Create a copy of this {@link ByteBufHolder} which can be used even after {@link #free()}
* is called.
* Create a deep copy of this {@link ByteBufHolder}.
*/
ByteBufHolder copy();
/**
* Free of the resources that are hold by this instance. This includes the {@link ByteBuf}.
*/
@Override
void free();
/**
* Returns {@code true} if and only if this instances was freed.
*/
@Override
boolean isFreed();
}

View File

@ -20,7 +20,9 @@ package io.netty.buffer;
*
*/
public class DefaultByteBufHolder implements ByteBufHolder {
private final ByteBuf data;
public DefaultByteBufHolder(ByteBuf data) {
if (data == null) {
throw new NullPointerException("data");
@ -34,25 +36,40 @@ public class DefaultByteBufHolder implements ByteBufHolder {
@Override
public ByteBuf data() {
if (data.isFreed()) {
if (data.refCnt() <= 0) {
throw new IllegalBufferAccessException();
}
return data;
}
@Override
public void free() {
data.free();
}
@Override
public boolean isFreed() {
return data.isFreed();
}
@Override
public ByteBufHolder copy() {
return new DefaultByteBufHolder(data().copy());
return new DefaultByteBufHolder(data.copy());
}
@Override
public int refCnt() {
return data.refCnt();
}
@Override
public void retain() {
data.retain();
}
@Override
public void retain(int increment) {
data.retain(increment);
}
@Override
public boolean release() {
return data.release();
}
@Override
public boolean release(int decrement) {
return data.release(decrement);
}
@Override

View File

@ -40,7 +40,7 @@ import java.util.Queue;
* is recommended to use {@link Unpooled#wrappedBuffer(ByteBuf...)}
* instead of calling the constructor explicitly.
*/
public class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf {
public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf implements CompositeByteBuf {
private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0];
@ -1293,7 +1293,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
if (suspendedDeallocations == null) {
buf.free(); // We should not get a NPE here. If so, it must be a bug.
buf.release(); // We should not get a NPE here. If so, it must be a bug.
} else {
suspendedDeallocations.add(buf);
}
@ -1531,12 +1531,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
@Override
public boolean isFreed() {
return freed;
}
@Override
public void free() {
protected void deallocate() {
if (freed) {
return;
}
@ -1552,6 +1547,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
@Override
public CompositeByteBuf suspendIntermediaryDeallocations() {
ensureAccessible();
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<ByteBuf>(2);
}
@ -1568,7 +1564,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
this.suspendedDeallocations = null;
for (ByteBuf buf: suspendedDeallocations) {
buf.free();
buf.release();
}
return this;
}

View File

@ -76,7 +76,7 @@ final class DefaultMessageBuf<T> extends AbstractMessageBuf<T> {
}
@Override
protected void doFree() {
protected void deallocate() {
elements = null;
head = 0;
tail = 0;

View File

@ -29,7 +29,7 @@ import java.nio.channels.ScatteringByteChannel;
* parent. It is recommended to use {@link ByteBuf#duplicate()} instead
* of calling the constructor explicitly.
*/
public class DuplicatedByteBuf extends AbstractByteBuf {
public class DuplicatedByteBuf extends AbstractDerivedByteBuf {
private final ByteBuf buffer;
@ -231,25 +231,5 @@ public class DuplicatedByteBuf extends AbstractByteBuf {
public ByteBuffer[] nioBuffers(int index, int length) {
return buffer.nioBuffers(index, length);
}
@Override
public boolean isFreed() {
return buffer.isFreed();
}
@Override
public void free() {
throw new UnsupportedOperationException("derived");
}
@Override
public ByteBuf suspendIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
@Override
public ByteBuf resumeIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
}

View File

@ -0,0 +1,850 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
public final class EmptyByteBuf implements ByteBuf {
private static final byte[] EMPTY_ARRAY = new byte[0];
public static final EmptyByteBuf INSTANCE_BE = new EmptyByteBuf(ByteOrder.BIG_ENDIAN);
public static final EmptyByteBuf INSTANCE_LE = new EmptyByteBuf(ByteOrder.LITTLE_ENDIAN);
private final ByteOrder order;
private final ByteBuffer nioBuf = ByteBuffer.allocateDirect(0);
private final byte[] array = EMPTY_ARRAY;
private final String str;
private EmptyByteBuf(ByteOrder order) {
this.order = order;
nioBuf.order(order);
str = getClass().getSimpleName() + (order == ByteOrder.BIG_ENDIAN? "BE" : "LE");
}
@Override
public int capacity() {
return 0;
}
@Override
public ByteBuf capacity(int newCapacity) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBufAllocator alloc() {
return UnpooledByteBufAllocator.HEAP_BY_DEFAULT;
}
@Override
public ByteOrder order() {
return order;
}
@Override
public ByteBuf unwrap() {
return null;
}
@Override
public boolean isDirect() {
return false;
}
@Override
public int maxCapacity() {
return 0;
}
@Override
public ByteBuf order(ByteOrder endianness) {
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (endianness == ByteOrder.BIG_ENDIAN) {
return INSTANCE_BE;
}
return INSTANCE_LE;
}
@Override
public int readerIndex() {
return 0;
}
@Override
public ByteBuf readerIndex(int readerIndex) {
return checkIndex(readerIndex);
}
@Override
public int writerIndex() {
return 0;
}
@Override
public ByteBuf writerIndex(int writerIndex) {
return checkIndex(writerIndex);
}
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
checkIndex(readerIndex);
checkIndex(writerIndex);
return this;
}
@Override
public int readableBytes() {
return 0;
}
@Override
public int writableBytes() {
return 0;
}
@Override
public int maxWritableBytes() {
return 0;
}
@Override
public boolean isReadable() {
return false;
}
@Override
public boolean readable() {
return false;
}
@Override
public boolean isWritable() {
return false;
}
@Override
public boolean writable() {
return false;
}
@Override
public ByteBuf clear() {
return this;
}
@Override
public ByteBuf markReaderIndex() {
return this;
}
@Override
public ByteBuf resetReaderIndex() {
return this;
}
@Override
public ByteBuf markWriterIndex() {
return this;
}
@Override
public ByteBuf resetWriterIndex() {
return this;
}
@Override
public ByteBuf discardReadBytes() {
return this;
}
@Override
public ByteBuf discardSomeReadBytes() {
return this;
}
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException("minWritableBytes: " + minWritableBytes + " (expected: >= 0)");
}
if (minWritableBytes != 0) {
throw new IndexOutOfBoundsException();
}
return this;
}
@Override
public ByteBuf ensureWritableBytes(int minWritableBytes) {
return ensureWritable(minWritableBytes);
}
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException("minWritableBytes: " + minWritableBytes + " (expected: >= 0)");
}
if (minWritableBytes == 0) {
return 0;
}
return 1;
}
@Override
public boolean getBoolean(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public byte getByte(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public short getUnsignedByte(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public short getShort(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getUnsignedShort(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getMedium(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getUnsignedMedium(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getInt(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public long getUnsignedInt(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public long getLong(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public char getChar(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public float getFloat(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public double getDouble(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst) {
return checkIndex(index, dst.writableBytes());
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf getBytes(int index, byte[] dst) {
return checkIndex(index, dst.length);
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
return checkIndex(index, dst.remaining());
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) {
return checkIndex(index, length);
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) {
checkIndex(index, length);
return 0;
}
@Override
public ByteBuf setBoolean(int index, boolean value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setByte(int index, int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setShort(int index, int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setMedium(int index, int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setInt(int index, int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setLong(int index, long value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setChar(int index, int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setFloat(int index, float value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setDouble(int index, double value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setBytes(int index, ByteBuf src) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf setBytes(int index, byte[] src) {
return checkIndex(index, src.length);
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
return checkIndex(index, src.remaining());
}
@Override
public int setBytes(int index, InputStream in, int length) {
checkIndex(index, length);
return 0;
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) {
checkIndex(index, length);
return 0;
}
@Override
public ByteBuf setZero(int index, int length) {
return checkIndex(index, length);
}
@Override
public boolean readBoolean() {
throw new IndexOutOfBoundsException();
}
@Override
public byte readByte() {
throw new IndexOutOfBoundsException();
}
@Override
public short readUnsignedByte() {
throw new IndexOutOfBoundsException();
}
@Override
public short readShort() {
throw new IndexOutOfBoundsException();
}
@Override
public int readUnsignedShort() {
throw new IndexOutOfBoundsException();
}
@Override
public int readMedium() {
throw new IndexOutOfBoundsException();
}
@Override
public int readUnsignedMedium() {
throw new IndexOutOfBoundsException();
}
@Override
public int readInt() {
throw new IndexOutOfBoundsException();
}
@Override
public long readUnsignedInt() {
throw new IndexOutOfBoundsException();
}
@Override
public long readLong() {
throw new IndexOutOfBoundsException();
}
@Override
public char readChar() {
throw new IndexOutOfBoundsException();
}
@Override
public float readFloat() {
throw new IndexOutOfBoundsException();
}
@Override
public double readDouble() {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf readBytes(int length) {
return checkLength(length);
}
@Override
public ByteBuf readSlice(int length) {
return checkLength(length);
}
@Override
public ByteBuf readBytes(ByteBuf dst) {
return checkLength(dst.writableBytes());
}
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
return checkLength(length);
}
@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
return checkLength(length);
}
@Override
public ByteBuf readBytes(byte[] dst) {
return checkLength(dst.length);
}
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
return checkLength(length);
}
@Override
public ByteBuf readBytes(ByteBuffer dst) {
return checkLength(dst.remaining());
}
@Override
public ByteBuf readBytes(OutputStream out, int length) {
return checkLength(length);
}
@Override
public int readBytes(GatheringByteChannel out, int length) {
checkLength(length);
return 0;
}
@Override
public ByteBuf skipBytes(int length) {
return checkLength(length);
}
@Override
public ByteBuf writeBoolean(boolean value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeByte(int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeShort(int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeMedium(int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeInt(int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeLong(long value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeChar(int value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeFloat(float value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeDouble(double value) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeBytes(ByteBuf src) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
return checkLength(length);
}
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
return checkLength(length);
}
@Override
public ByteBuf writeBytes(byte[] src) {
return checkLength(src.length);
}
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
return checkLength(length);
}
@Override
public ByteBuf writeBytes(ByteBuffer src) {
return checkLength(src.remaining());
}
@Override
public int writeBytes(InputStream in, int length) {
checkLength(length);
return 0;
}
@Override
public int writeBytes(ScatteringByteChannel in, int length) {
checkLength(length);
return 0;
}
@Override
public ByteBuf writeZero(int length) {
return checkLength(length);
}
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
checkIndex(fromIndex);
checkIndex(toIndex);
return -1;
}
@Override
public int indexOf(int fromIndex, int toIndex, ByteBufIndexFinder indexFinder) {
checkIndex(fromIndex);
checkIndex(toIndex);
return -1;
}
@Override
public int bytesBefore(byte value) {
return -1;
}
@Override
public int bytesBefore(ByteBufIndexFinder indexFinder) {
return -1;
}
@Override
public int bytesBefore(int length, byte value) {
checkLength(length);
return -1;
}
@Override
public int bytesBefore(int length, ByteBufIndexFinder indexFinder) {
checkLength(length);
return -1;
}
@Override
public int bytesBefore(int index, int length, byte value) {
checkIndex(index, length);
return -1;
}
@Override
public int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder) {
checkIndex(index, length);
return -1;
}
@Override
public ByteBuf copy() {
return this;
}
@Override
public ByteBuf copy(int index, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf slice() {
return this;
}
@Override
public ByteBuf slice(int index, int length) {
return checkIndex(index, length);
}
@Override
public ByteBuf duplicate() {
return this;
}
@Override
public int nioBufferCount() {
return 1;
}
@Override
public ByteBuffer nioBuffer() {
return nioBuf;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
return nioBuf;
}
@Override
public ByteBuffer[] nioBuffers() {
return new ByteBuffer[] { nioBuf };
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
return new ByteBuffer[] { nioBuf };
}
@Override
public boolean hasArray() {
return true;
}
@Override
public byte[] array() {
return array;
}
@Override
public int arrayOffset() {
return 0;
}
@Override
public String toString(Charset charset) {
return "";
}
@Override
public String toString(int index, int length, Charset charset) {
checkIndex(index, length);
return "";
}
@Override
public ByteBuf suspendIntermediaryDeallocations() {
return this;
}
@Override
public ByteBuf resumeIntermediaryDeallocations() {
return this;
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean equals(Object obj) {
return obj instanceof ByteBuf && !((ByteBuf) obj).isReadable();
}
@Override
public int compareTo(ByteBuf buffer) {
return buffer.isReadable()? -1 : 0;
}
@Override
public String toString() {
return str;
}
@Override
public BufType type() {
return BufType.BYTE;
}
@Override
public boolean isReadable(int size) {
checkLength(size);
return false;
}
@Override
public boolean isWritable(int size) {
checkLength(size);
return false;
}
@Override
public int refCnt() {
return 1;
}
@Override
public void retain() { }
@Override
public void retain(int increment) { }
@Override
public boolean release() {
return false;
}
@Override
public boolean release(int decrement) {
return false;
}
private ByteBuf checkIndex(int index) {
if (index != 0) {
throw new IndexOutOfBoundsException();
}
return this;
}
private ByteBuf checkIndex(int index, int length) {
if (length < 0) {
throw new IllegalArgumentException("length: " + length);
}
if (index != 0 || length != 0) {
throw new IndexOutOfBoundsException();
}
return this;
}
private ByteBuf checkLength(int length) {
if (length < 0) {
throw new IllegalArgumentException("length: " + length + " (expected: >= 0)");
}
if (length != 0) {
throw new IndexOutOfBoundsException();
}
return this;
}
}

View File

@ -195,13 +195,33 @@ public abstract class FilteredMessageBuf implements MessageBuf<Object> {
}
@Override
public boolean isFreed() {
return buf.isFreed();
public boolean unfoldAndAdd(Object o) {
return buf.unfoldAndAdd(o);
}
@Override
public void free() {
buf.free();
public int refCnt() {
return buf.refCnt();
}
@Override
public void retain() {
buf.retain();
}
@Override
public void retain(int increment) {
buf.retain(increment);
}
@Override
public boolean release() {
return buf.release();
}
@Override
public boolean release(int decrement) {
return buf.release(decrement);
}
@Override

View File

@ -1,32 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
public interface Freeable {
/**
* Returns {@code true} if and only if this resource has been deallocated by {@link #free()}.
*/
boolean isFreed();
/**
* Deallocates the resources.
*
* The result of accessing a freed resource is unspecified and can even cause JVM crash.
*
*/
void free();
}

View File

@ -23,7 +23,7 @@ import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.Queue;
abstract class PooledByteBuf<T> extends AbstractByteBuf {
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
private final ResourceLeak leak = leakDetector.open(this);
@ -74,7 +74,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
@Override
public final ByteBuf capacity(int newCapacity) {
checkUnfreed();
ensureAccessible();
// If the request capacity does not require reallocation, just update the length of the memory.
if (chunk.unpooled) {
@ -144,7 +144,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
@Override
public final ByteBuf suspendIntermediaryDeallocations() {
checkUnfreed();
ensureAccessible();
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<Allocation<T>>(2);
}
@ -153,7 +153,6 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
@Override
public final ByteBuf resumeIntermediaryDeallocations() {
checkUnfreed();
if (suspendedDeallocations == null) {
return this;
}
@ -172,12 +171,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
}
@Override
public final boolean isFreed() {
return memory == null;
}
@Override
public final void free() {
protected final void deallocate() {
if (handle >= 0) {
resumeIntermediaryDeallocations();
final long handle = this.handle;

View File

@ -21,7 +21,7 @@ import java.util.Queue;
final class QueueBackedMessageBuf<T> extends AbstractMessageBuf<T> {
private final Queue<T> queue;
private Queue<T> queue;
QueueBackedMessageBuf(Queue<T> queue) {
super(Integer.MAX_VALUE);
@ -36,19 +36,19 @@ final class QueueBackedMessageBuf<T> extends AbstractMessageBuf<T> {
if (e == null) {
throw new NullPointerException("e");
}
checkUnfreed();
ensureAccessible();
return isWritable() && queue.offer(e);
}
@Override
public T poll() {
checkUnfreed();
ensureAccessible();
return queue.poll();
}
@Override
public T peek() {
checkUnfreed();
ensureAccessible();
return queue.peek();
}
@ -64,66 +64,66 @@ final class QueueBackedMessageBuf<T> extends AbstractMessageBuf<T> {
@Override
public boolean contains(Object o) {
checkUnfreed();
ensureAccessible();
return queue.contains(o);
}
@Override
public Iterator<T> iterator() {
checkUnfreed();
ensureAccessible();
return queue.iterator();
}
@Override
public Object[] toArray() {
checkUnfreed();
ensureAccessible();
return queue.toArray();
}
@Override
public <E> E[] toArray(E[] a) {
checkUnfreed();
ensureAccessible();
return queue.toArray(a);
}
@Override
public boolean remove(Object o) {
checkUnfreed();
ensureAccessible();
return queue.remove(o);
}
@Override
public boolean containsAll(Collection<?> c) {
checkUnfreed();
ensureAccessible();
return queue.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends T> c) {
checkUnfreed();
ensureAccessible();
return isWritable(c.size()) && queue.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
checkUnfreed();
ensureAccessible();
return queue.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
checkUnfreed();
ensureAccessible();
return queue.retainAll(c);
}
@Override
public void clear() {
checkUnfreed();
ensureAccessible();
queue.clear();
}
@Override
protected void doFree() {
clear();
protected void deallocate() {
queue = null;
}
}

View File

@ -29,7 +29,7 @@ import java.nio.channels.ScatteringByteChannel;
* recommended to use {@link Unpooled#unmodifiableBuffer(ByteBuf)}
* instead of calling the constructor explicitly.
*/
public class ReadOnlyByteBuf extends AbstractByteBuf {
public class ReadOnlyByteBuf extends AbstractDerivedByteBuf {
private final ByteBuf buffer;
@ -229,24 +229,4 @@ public class ReadOnlyByteBuf extends AbstractByteBuf {
public ByteBuf capacity(int newCapacity) {
throw new ReadOnlyBufferException();
}
@Override
public boolean isFreed() {
return buffer.isFreed();
}
@Override
public void free() {
throw new UnsupportedOperationException("derived");
}
@Override
public ByteBuf suspendIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
@Override
public ByteBuf resumeIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
/**
* A reference-counted object that requires explicit deallocation.
* <p>
* When a new {@link ReferenceCounted} is instantiated, it starts with the reference count of {@code 1}.
* {@link #retain()} increases the reference count, and {@link #release()} decreases the reference count.
* If the reference count is decreased to {@code 0}, the object will be deallocated explicitly, and accessing
* the deallocated object will usually result in an access violation.
*/
public interface ReferenceCounted {
/**
* Returns the reference count of this object. If {@code 0}, it means this object has been deallocated.
*/
int refCnt();
/**
* Increases the reference count by {@code 1}.
*/
void retain();
/**
* Increases the reference count by the specified {@code increment}.
*/
void retain(int increment);
/**
* Decreases the reference count by {@code 1}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release();
/**
* Decreases the reference count by the specified {@code decrement}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release(int decrement);
}

View File

@ -30,7 +30,7 @@ import java.nio.channels.ScatteringByteChannel;
* {@link ByteBuf#slice(int, int)} instead of calling the constructor
* explicitly.
*/
public class SlicedByteBuf extends AbstractByteBuf {
public class SlicedByteBuf extends AbstractDerivedByteBuf {
private final ByteBuf buffer;
private final int adjustment;
@ -282,24 +282,4 @@ public class SlicedByteBuf extends AbstractByteBuf {
checkIndex(index, length);
return buffer.nioBuffers(index, length);
}
@Override
public boolean isFreed() {
return buffer.isFreed();
}
@Override
public void free() {
throw new UnsupportedOperationException("derived");
}
@Override
public ByteBuf suspendIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
@Override
public ByteBuf resumeIntermediaryDeallocations() {
throw new UnsupportedOperationException("derived");
}
}

View File

@ -816,13 +816,28 @@ public final class SwappedByteBuf implements ByteBuf {
}
@Override
public boolean isFreed() {
return buf.isFreed();
public int refCnt() {
return buf.refCnt();
}
@Override
public void free() {
buf.free();
public void retain() {
buf.retain();
}
@Override
public void retain(int increment) {
buf.retain(increment);
}
@Override
public boolean release() {
return buf.release();
}
@Override
public boolean release(int decrement) {
return buf.release(decrement);
}
@Override

View File

@ -15,14 +15,9 @@
*/
package io.netty.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.ReadOnlyBufferException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@ -97,192 +92,7 @@ public final class Unpooled {
/**
* A buffer whose capacity is {@code 0}.
*/
public static final ByteBuf EMPTY_BUFFER = new AbstractByteBuf(0) {
@Override
public int capacity() {
return 0;
}
@Override
public ByteBuf capacity(int newCapacity) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBufAllocator alloc() {
return ALLOC;
}
@Override
public ByteOrder order() {
return BIG_ENDIAN;
}
@Override
public ByteBuf unwrap() {
return null;
}
@Override
public boolean isDirect() {
return false;
}
@Override
public byte getByte(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public short getShort(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getUnsignedMedium(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public int getInt(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public long getLong(int index) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) {
throw new IndexOutOfBoundsException();
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) {
throw new IndexOutOfBoundsException();
}
@Override
public ByteBuf setByte(int index, int value) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setShort(int index, int value) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setMedium(int index, int value) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setInt(int index, int value) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setLong(int index, long value) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
throw new ReadOnlyBufferException();
}
@Override
public int setBytes(int index, InputStream in, int length) {
throw new ReadOnlyBufferException();
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) {
throw new ReadOnlyBufferException();
}
@Override
public ByteBuf copy(int index, int length) {
throw new IndexOutOfBoundsException();
}
@Override
public int nioBufferCount() {
return 0;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
return ByteBuffer.allocate(0);
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[0];
}
@Override
public boolean hasArray() {
return true;
}
@Override
public byte[] array() {
return new byte[0];
}
@Override
public int arrayOffset() {
return 0;
}
@Override
public ByteBuf suspendIntermediaryDeallocations() {
return this;
}
@Override
public ByteBuf resumeIntermediaryDeallocations() {
return this;
}
@Override
public void free() {
// do nothing
}
@Override
public boolean isFreed() {
return false;
}
};
public static final ByteBuf EMPTY_BUFFER = EmptyByteBuf.INSTANCE_BE;
public static <T> MessageBuf<T> messageBuffer() {
return new DefaultMessageBuf<T>();

View File

@ -34,7 +34,7 @@ import java.util.Queue;
* and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
* constructor explicitly.
*/
final class UnpooledDirectByteBuf extends AbstractByteBuf {
final class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
private final ResourceLeak leak = leakDetector.open(this);
private final ByteBufAllocator alloc;
@ -134,7 +134,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf capacity(int newCapacity) {
checkUnfreed();
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
@ -197,37 +197,37 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public byte getByte(int index) {
checkUnfreed();
ensureAccessible();
return buffer.get(index);
}
@Override
public short getShort(int index) {
checkUnfreed();
ensureAccessible();
return buffer.getShort(index);
}
@Override
public int getUnsignedMedium(int index) {
checkUnfreed();
ensureAccessible();
return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff;
}
@Override
public int getInt(int index) {
checkUnfreed();
ensureAccessible();
return buffer.getInt(index);
}
@Override
public long getLong(int index) {
checkUnfreed();
ensureAccessible();
return buffer.getLong(index);
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkUnfreed();
ensureAccessible();
if (dst instanceof UnpooledDirectByteBuf) {
UnpooledDirectByteBuf bbdst = (UnpooledDirectByteBuf) dst;
ByteBuffer data = bbdst.internalNioBuffer();
@ -274,21 +274,21 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setByte(int index, int value) {
checkUnfreed();
ensureAccessible();
buffer.put(index, (byte) value);
return this;
}
@Override
public ByteBuf setShort(int index, int value) {
checkUnfreed();
ensureAccessible();
buffer.putShort(index, (short) value);
return this;
}
@Override
public ByteBuf setMedium(int index, int value) {
checkUnfreed();
ensureAccessible();
setByte(index, (byte) (value >>> 16));
setByte(index + 1, (byte) (value >>> 8));
setByte(index + 2, (byte) value);
@ -297,21 +297,21 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setInt(int index, int value) {
checkUnfreed();
ensureAccessible();
buffer.putInt(index, value);
return this;
}
@Override
public ByteBuf setLong(int index, long value) {
checkUnfreed();
ensureAccessible();
buffer.putLong(index, value);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
checkUnfreed();
ensureAccessible();
if (src instanceof UnpooledDirectByteBuf) {
UnpooledDirectByteBuf bbsrc = (UnpooledDirectByteBuf) src;
ByteBuffer data = bbsrc.internalNioBuffer();
@ -328,7 +328,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkUnfreed();
ensureAccessible();
ByteBuffer tmpBuf = internalNioBuffer();
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.put(src, srcIndex, length);
@ -337,7 +337,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
checkUnfreed();
ensureAccessible();
ByteBuffer tmpBuf = internalNioBuffer();
if (src == tmpBuf) {
src = src.duplicate();
@ -350,7 +350,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
checkUnfreed();
ensureAccessible();
if (length == 0) {
return this;
}
@ -369,7 +369,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
checkUnfreed();
ensureAccessible();
if (length == 0) {
return 0;
}
@ -381,7 +381,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
checkUnfreed();
ensureAccessible();
if (buffer.hasArray()) {
return in.read(buffer.array(), buffer.arrayOffset() + index, length);
} else {
@ -399,7 +399,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkUnfreed();
ensureAccessible();
ByteBuffer tmpNioBuf = internalNioBuffer();
tmpNioBuf.clear().position(index).limit(index + length);
try {
@ -416,7 +416,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkUnfreed();
ensureAccessible();
if (index == 0 && length == capacity()) {
return buffer.duplicate();
} else {
@ -431,7 +431,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf copy(int index, int length) {
checkUnfreed();
ensureAccessible();
ByteBuffer src;
try {
src = (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
@ -456,12 +456,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
}
@Override
public boolean isFreed() {
return buffer == null;
}
@Override
public void free() {
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
@ -478,6 +473,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
@Override
public ByteBuf suspendIntermediaryDeallocations() {
ensureAccessible();
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<ByteBuffer>(2);
}

View File

@ -27,7 +27,7 @@ import java.nio.channels.ScatteringByteChannel;
/**
* Big endian Java heap buffer implementation.
*/
final class UnpooledHeapByteBuf extends AbstractByteBuf {
final class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
private byte[] array;
@ -96,13 +96,13 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public int capacity() {
checkUnfreed();
ensureAccessible();
return array.length;
}
@Override
public ByteBuf capacity(int newCapacity) {
checkUnfreed();
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
@ -136,7 +136,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public byte[] array() {
checkUnfreed();
ensureAccessible();
return array;
}
@ -147,13 +147,13 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public byte getByte(int index) {
checkUnfreed();
ensureAccessible();
return array[index];
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkUnfreed();
ensureAccessible();
if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
@ -164,41 +164,41 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkUnfreed();
ensureAccessible();
System.arraycopy(array, index, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
checkUnfreed();
ensureAccessible();
dst.put(array, index, Math.min(capacity() - index, dst.remaining()));
return this;
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
checkUnfreed();
ensureAccessible();
out.write(array, index, length);
return this;
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
checkUnfreed();
ensureAccessible();
return out.write((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
}
@Override
public ByteBuf setByte(int index, int value) {
checkUnfreed();
ensureAccessible();
array[index] = (byte) value;
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
checkUnfreed();
ensureAccessible();
if (src.hasArray()) {
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
} else {
@ -209,27 +209,27 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkUnfreed();
ensureAccessible();
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
checkUnfreed();
ensureAccessible();
src.get(array, index, src.remaining());
return this;
}
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
checkUnfreed();
ensureAccessible();
return in.read(array, index, length);
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkUnfreed();
ensureAccessible();
try {
return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
} catch (ClosedChannelException e) {
@ -244,7 +244,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkUnfreed();
ensureAccessible();
return ByteBuffer.wrap(array, index, length);
}
@ -255,13 +255,13 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public short getShort(int index) {
checkUnfreed();
ensureAccessible();
return (short) (array[index] << 8 | array[index + 1] & 0xFF);
}
@Override
public int getUnsignedMedium(int index) {
checkUnfreed();
ensureAccessible();
return (array[index] & 0xff) << 16 |
(array[index + 1] & 0xff) << 8 |
array[index + 2] & 0xff;
@ -269,7 +269,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public int getInt(int index) {
checkUnfreed();
ensureAccessible();
return (array[index] & 0xff) << 24 |
(array[index + 1] & 0xff) << 16 |
(array[index + 2] & 0xff) << 8 |
@ -278,7 +278,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public long getLong(int index) {
checkUnfreed();
ensureAccessible();
return ((long) array[index] & 0xff) << 56 |
((long) array[index + 1] & 0xff) << 48 |
((long) array[index + 2] & 0xff) << 40 |
@ -291,7 +291,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setShort(int index, int value) {
checkUnfreed();
ensureAccessible();
array[index] = (byte) (value >>> 8);
array[index + 1] = (byte) value;
return this;
@ -299,7 +299,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setMedium(int index, int value) {
checkUnfreed();
ensureAccessible();
array[index] = (byte) (value >>> 16);
array[index + 1] = (byte) (value >>> 8);
array[index + 2] = (byte) value;
@ -308,7 +308,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setInt(int index, int value) {
checkUnfreed();
ensureAccessible();
array[index] = (byte) (value >>> 24);
array[index + 1] = (byte) (value >>> 16);
array[index + 2] = (byte) (value >>> 8);
@ -318,7 +318,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
@Override
public ByteBuf setLong(int index, long value) {
checkUnfreed();
ensureAccessible();
array[index] = (byte) (value >>> 56);
array[index + 1] = (byte) (value >>> 48);
array[index + 2] = (byte) (value >>> 40);
@ -347,12 +347,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf {
}
@Override
public boolean isFreed() {
return array == null;
}
@Override
public void free() {
protected void deallocate() {
array = null;
}

View File

@ -30,6 +30,7 @@ import java.util.Random;
import java.util.Set;
import static io.netty.buffer.Unpooled.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
/**
@ -62,8 +63,16 @@ public abstract class AbstractByteBufTest {
@After
public void dispose() {
if (buffer != null) {
if (buffer.unwrap() == null) {
assertThat(buffer.release(), is(true));
assertThat(buffer.refCnt(), is(0));
} else {
assertThat(buffer.release(), is(false));
assertThat(buffer.refCnt(), is(1));
}
try {
buffer.free();
buffer.release();
} catch (Exception e) {
// Ignore.
}
@ -872,8 +881,8 @@ public abstract class AbstractByteBufTest {
}
}
value.free();
expectedValue.free();
value.release();
expectedValue.release();
}
@Test
@ -1049,7 +1058,7 @@ public abstract class AbstractByteBufTest {
assertEquals(0, value.writerIndex());
}
value.free();
value.release();
}
@Test
@ -1090,7 +1099,7 @@ public abstract class AbstractByteBufTest {
assertEquals(valueOffset + BLOCK_SIZE, value.writerIndex());
}
value.free();
value.release();
}
@Test
@ -1623,8 +1632,8 @@ public abstract class AbstractByteBufTest {
assertFalse(set.contains(elemB));
assertEquals(0, set.size());
elemB.free();
elemBCopy.free();
elemB.release();
elemBCopy.release();
}
// Test case for https://github.com/netty/netty/issues/325

View File

@ -52,7 +52,7 @@ public class UnpooledTest {
assertEquals(12 + 512, buffer.readableBytes());
assertEquals(2, buffer.nioBufferCount());
buffer.free();
buffer.release();
}
@Test
@ -160,11 +160,8 @@ public class UnpooledTest {
@Test
public void shouldReturnEmptyBufferWhenLengthIsZero() {
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[0]));
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[0]).order(LITTLE_ENDIAN));
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[8], 0, 0));
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[8], 0, 0).order(LITTLE_ENDIAN));
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[8], 8, 0));
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[8], 8, 0).order(LITTLE_ENDIAN));
assertSame(EMPTY_BUFFER, wrappedBuffer(ByteBuffer.allocateDirect(0)));
assertSame(EMPTY_BUFFER, wrappedBuffer(EMPTY_BUFFER));
assertSame(EMPTY_BUFFER, wrappedBuffer(new byte[0][]));
@ -177,11 +174,8 @@ public class UnpooledTest {
assertSame(EMPTY_BUFFER, wrappedBuffer(buffer(0), buffer(0)));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[0]));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[0]).order(LITTLE_ENDIAN));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[8], 0, 0));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[8], 0, 0).order(LITTLE_ENDIAN));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[8], 8, 0));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[8], 8, 0).order(LITTLE_ENDIAN));
assertSame(EMPTY_BUFFER, copiedBuffer(ByteBuffer.allocateDirect(0)));
assertSame(EMPTY_BUFFER, copiedBuffer(EMPTY_BUFFER));
assertSame(EMPTY_BUFFER, copiedBuffer(new byte[0][]));

View File

@ -48,13 +48,28 @@ public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHt
}
@Override
public boolean isFreed() {
return content.isFreed();
public int refCnt() {
return content.refCnt();
}
@Override
public void free() {
content.free();
public void retain() {
content.retain();
}
@Override
public void retain(int increment) {
content.retain(increment);
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override

View File

@ -50,13 +50,28 @@ public class DefaultFullHttpResponse extends DefaultHttpResponse implements Full
}
@Override
public boolean isFreed() {
return content.isFreed();
public int refCnt() {
return content.refCnt();
}
@Override
public void free() {
content.free();
public void retain() {
content.retain();
}
@Override
public void retain(int increment) {
content.retain(increment);
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override

View File

@ -41,17 +41,32 @@ public class DefaultHttpContent extends DefaultHttpObject implements HttpContent
@Override
public HttpContent copy() {
return new DefaultHttpContent(data().copy());
return new DefaultHttpContent(content.copy());
}
@Override
public boolean isFreed() {
return content.isFreed();
public int refCnt() {
return content.refCnt();
}
@Override
public void free() {
content.free();
public void retain() {
content.retain();
}
@Override
public void retain(int increment) {
content.retain(increment);
}
@Override
public boolean release() {
return content.release();
}
@Override
public boolean release(int decrement) {
return content.release(decrement);
}
@Override

View File

@ -156,7 +156,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
CompositeByteBuf content = (CompositeByteBuf) currentMessage.data();
if (content.readableBytes() > maxContentLength - chunk.data().readableBytes()) {
chunk.free();
chunk.release();
// TODO: Respond with 413 Request Entity Too Large
// and discard the traffic or close the connection.
// No need to notify the upstream handlers - just log.
@ -171,7 +171,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
content.addComponent(chunk.data());
content.writerIndex(content.writerIndex() + chunk.data().readableBytes());
} else {
chunk.free();
chunk.release();
}
final boolean last;

View File

@ -55,13 +55,24 @@ public interface LastHttpContent extends HttpContent {
}
@Override
public boolean isFreed() {
public int refCnt() {
return 1;
}
@Override
public void retain() { }
@Override
public void retain(int increment) { }
@Override
public boolean release() {
return false;
}
@Override
public void free() {
// NOOP
public boolean release(int decrement) {
return false;
}
};

View File

@ -352,13 +352,4 @@ public abstract class AbstractDiskHttpData extends AbstractHttpData {
public File getFile() throws IOException {
return file;
}
@Override
public boolean isFreed() {
if (file == null || !file.exists()) {
return true;
}
return false;
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.AbstractReferenceCounted;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelException;
import io.netty.handler.codec.http.HttpConstants;
@ -25,7 +26,7 @@ import java.nio.charset.Charset;
/**
* Abstract HttpData implementation
*/
public abstract class AbstractHttpData implements HttpData {
public abstract class AbstractHttpData extends AbstractReferenceCounted implements HttpData {
protected final String name;
protected long definedSize;
@ -110,8 +111,7 @@ public abstract class AbstractHttpData implements HttpData {
}
@Override
public void free() {
protected void deallocate() {
delete();
}
}

View File

@ -138,7 +138,7 @@ public abstract class AbstractMemoryHttpData extends AbstractHttpData {
@Override
public byte[] get() {
if (byteBuf == null) {
return new byte[0];
return EMPTY_BUFFER.array();
}
byte[] array = new byte[byteBuf.readableBytes()];
byteBuf.getBytes(byteBuf.readerIndex(), array);
@ -226,9 +226,4 @@ public abstract class AbstractMemoryHttpData extends AbstractHttpData {
public File getFile() throws IOException {
throw new IOException("Not represented by a file");
}
@Override
public boolean isFreed() {
return data().isFreed();
}
}

View File

@ -210,12 +210,27 @@ public class MixedAttribute implements Attribute {
}
@Override
public void free() {
attribute.free();
public int refCnt() {
return attribute.refCnt();
}
@Override
public boolean isFreed() {
return attribute.isFreed();
public void retain() {
attribute.retain();
}
@Override
public void retain(int increment) {
attribute.retain(increment);
}
@Override
public boolean release() {
return attribute.release();
}
@Override
public boolean release(int decrement) {
return attribute.release(decrement);
}
}

View File

@ -26,6 +26,7 @@ import java.nio.charset.Charset;
* Mixed implementation using both in Memory and in File with a limit of size
*/
public class MixedFileUpload implements FileUpload {
private FileUpload fileUpload;
private final long limitSize;
@ -235,12 +236,27 @@ public class MixedFileUpload implements FileUpload {
}
@Override
public void free() {
fileUpload.free();
public int refCnt() {
return fileUpload.refCnt();
}
@Override
public boolean isFreed() {
return fileUpload.isFreed();
public void retain() {
fileUpload.retain();
}
@Override
public void retain(int increment) {
fileUpload.retain(increment);
}
@Override
public boolean release() {
return fileUpload.release();
}
@Override
public boolean release(int decrement) {
return fileUpload.release(decrement);
}
}

View File

@ -101,7 +101,7 @@ public class DefaultSpdyDataFrame extends DefaultByteBufHolder implements SpdyDa
buf.append(streamId);
buf.append(StringUtil.NEWLINE);
buf.append("--> Size = ");
if (isFreed()) {
if (refCnt() <= 0) {
buf.append("(freed)");
} else {
buf.append(data().readableBytes());

View File

@ -72,7 +72,7 @@ public abstract class SpdyOrHttpChooser extends ChannelDuplexHandler implements
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
ctx.inboundByteBuffer().release();
}
@Override

View File

@ -92,7 +92,7 @@ public class SpdySessionHandler
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
ctx.inboundByteBuffer().release();
}
@Override
@ -102,7 +102,7 @@ public class SpdySessionHandler
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
ctx.outboundMessageBuffer().release();
}
@Override

View File

@ -150,7 +150,7 @@ public class WebSocketServerProtocolHandlerTest {
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
ctx.outboundMessageBuffer().release();
}
@Override

View File

@ -174,10 +174,10 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
protected abstract Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception;
protected void freeInboundMessage(INBOUND_IN msg) throws Exception {
BufUtil.free(msg);
BufUtil.release(msg);
}
protected void freeOutboundMessage(OUTBOUND_IN msg) throws Exception {
BufUtil.free(msg);
BufUtil.release(msg);
}
}

View File

@ -879,12 +879,27 @@ final class ReplayingDecoderBuffer implements ByteBuf {
}
@Override
public boolean isFreed() {
return buffer.isFreed();
public int refCnt() {
return buffer.refCnt();
}
@Override
public void free() {
public void retain() {
throw new UnreplayableOperationException();
}
@Override
public void retain(int increment) {
throw new UnreplayableOperationException();
}
@Override
public boolean release() {
throw new UnreplayableOperationException();
}
@Override
public boolean release(int decrement) {
throw new UnreplayableOperationException();
}

View File

@ -47,9 +47,8 @@ public class ByteArrayDecoderTest {
@Test
public void testDecodeEmpty() {
byte[] b = new byte[0];
ch.writeInbound(wrappedBuffer(b));
assertThat((byte[]) ch.readInbound(), is(b));
ch.writeInbound(EMPTY_BUFFER);
assertThat((byte[]) ch.readInbound(), is(new byte[0]));
}
@Test

View File

@ -32,7 +32,8 @@ public class DiscardClientHandler extends ChannelInboundByteHandlerAdapter {
private static final Logger logger = Logger.getLogger(
DiscardClientHandler.class.getName());
private final byte[] content;
private final int messageSize;
private ByteBuf content;
private ChannelHandlerContext ctx;
public DiscardClientHandler(int messageSize) {
@ -40,13 +41,17 @@ public class DiscardClientHandler extends ChannelInboundByteHandlerAdapter {
throw new IllegalArgumentException(
"messageSize: " + messageSize);
}
content = new byte[messageSize];
this.messageSize = messageSize;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
this.ctx = ctx;
// Initialize the message.
content = ctx.alloc().directBuffer(messageSize).writeZero(messageSize);
// Send the initial messages.
generateTraffic();
}
@ -75,7 +80,7 @@ public class DiscardClientHandler extends ChannelInboundByteHandlerAdapter {
// Fill the outbound buffer up to 64KiB
ByteBuf out = ctx.nextOutboundByteBuffer();
while (out.readableBytes() < 65536) {
out.writeBytes(content);
out.writeBytes(content, 0, content.readableBytes());
}
// Flush the outbound buffer to the socket.

View File

@ -30,6 +30,13 @@ public class DiscardServerHandler extends ChannelInboundByteHandlerAdapter {
private static final Logger logger = Logger.getLogger(
DiscardServerHandler.class.getName());
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Use direct buffer if possible.
// If you are going to use a heap buffer, you don't need to override this method.
return ctx.alloc().ioBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in)
throws Exception {

View File

@ -48,6 +48,13 @@ public class EchoClientHandler extends ChannelInboundByteHandlerAdapter {
}
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Use direct buffer if possible.
// If you are going to use a heap buffer, you don't need to override this method.
return ctx.alloc().ioBuffer();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.write(firstMessage);

View File

@ -32,6 +32,13 @@ public class EchoServerHandler extends ChannelInboundByteHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoServerHandler.class.getName());
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
// Use direct buffer if possible.
// If you are going to use a heap buffer, you don't need to override this method.
return ctx.alloc().ioBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf out = ctx.nextOutboundByteBuffer();

View File

@ -118,7 +118,7 @@ public class ByteLoggingHandler
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
ctx.inboundByteBuffer().release();
}
@Override
@ -133,7 +133,7 @@ public class ByteLoggingHandler
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free();
ctx.outboundByteBuffer().release();
}
@Override

View File

@ -388,7 +388,7 @@ public class SslHandler
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
ctx.inboundByteBuffer().release();
}
@Override
@ -403,7 +403,7 @@ public class SslHandler
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free();
ctx.outboundByteBuffer().release();
}
@Override

View File

@ -98,7 +98,7 @@ public class ChunkedWriteHandler
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
queue.free();
queue.release();
}
private boolean isWritable() {

View File

@ -51,7 +51,7 @@ public class ByteBufAllocatorBenchmark extends DefaultBenchmark {
@Override
protected void tearDown() throws Exception {
for (ByteBuf b: queue) {
b.free();
b.release();
}
queue.clear();
}
@ -63,7 +63,7 @@ public class ByteBufAllocatorBenchmark extends DefaultBenchmark {
for (int i = 0; i < reps; i ++) {
queue.add(alloc.buffer(size));
queue.removeFirst().free();
queue.removeFirst().release();
}
}

View File

@ -17,6 +17,7 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@ -234,6 +235,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest {
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
BufUtil.retain(msg);
ctx.write(msg);
}

View File

@ -137,7 +137,7 @@ public final class SctpMessage extends DefaultByteBufHolder {
@Override
public String toString() {
if (isFreed()) {
if (refCnt() <= 0) {
return "SctpFrame{" +
"streamIdentifier=" + streamIdentifier + ", protocolIdentifier=" + protocolIdentifier +
", data=(FREED)}";

View File

@ -27,12 +27,12 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPromise;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.sctp.DefaultSctpChannelConfig;
import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -288,7 +288,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
throw new ChannelException(cause);
} finally {
if (free) {
buffer.free();
buffer.release();
}
}
}
@ -333,7 +333,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
buf.remove();
// packet was written free up buffer
packet.free();
packet.release();
if (buf.isEmpty()) {
// Wrote the outbound buffer completely - clear OP_WRITE.

View File

@ -27,12 +27,12 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPromise;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.sctp.DefaultSctpChannelConfig;
import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -209,7 +209,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
throw new ChannelException(cause);
} finally {
if (free) {
buffer.free();
buffer.release();
}
}
}
@ -252,7 +252,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
ch.send(nioData, mi);
} finally {
packet.free();
packet.release();
}
}
writableKeys.clear();

View File

@ -154,7 +154,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel
maximumMessageSize);
if (receivedMessageSize <= 0) {
byteBuf.free();
byteBuf.release();
return 0;
}
@ -216,7 +216,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel
messageQueue.remove();
message.free();
message.release();
return 1;
}

View File

@ -55,7 +55,6 @@ public class EchoMessageHandler extends
}
message = new UdtMessage(byteBuf);
}
@Override
@ -67,24 +66,17 @@ public class EchoMessageHandler extends
final MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
out.add(message);
ctx.flush();
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx,
final Throwable e) {
log.error("exception : {}", e.getMessage());
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable e) {
log.error("exception", e);
ctx.close();
}
@Override
protected void messageReceived(final ChannelHandlerContext ctx,
final UdtMessage message) throws Exception {
protected void messageReceived(final ChannelHandlerContext ctx, final UdtMessage message) throws Exception {
final ByteBuf byteBuf = message.data();
@ -94,10 +86,8 @@ public class EchoMessageHandler extends
final MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
message.retain();
out.add(message);
ctx.flush();
}
}

View File

@ -285,7 +285,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundMessageBuffer().free();
ctx.inboundMessageBuffer().release();
}
}

View File

@ -43,7 +43,7 @@ public abstract class ChannelInboundByteHandlerAdapter
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
ctx.inboundByteBuffer().release();
}
@Override

View File

@ -71,7 +71,7 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundMessageBuffer().free();
ctx.inboundMessageBuffer().release();
}
@Override
@ -178,6 +178,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
* just pass-through the input message or need it for later usage.
*/
protected void freeInboundMessage(I msg) throws Exception {
BufUtil.free(msg);
BufUtil.release(msg);
}
}

View File

@ -34,7 +34,7 @@ public abstract class ChannelOutboundByteHandlerAdapter
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundByteBuffer().free();
ctx.outboundByteBuffer().release();
}
/**

View File

@ -64,7 +64,7 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
ctx.outboundMessageBuffer().release();
}
/**
@ -179,6 +179,6 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
* just pass-through the input message or need it for later usage.
*/
protected void freeOutboundMessage(I msg) throws Exception {
BufUtil.free(msg);
BufUtil.release(msg);
}
}

View File

@ -1695,7 +1695,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
out.writeBytes(data);
} finally {
data.free();
data.release();
}
}
}

View File

@ -17,8 +17,8 @@ package io.netty.channel;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Freeable;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.ReferenceCounted;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel.Unsafe;
import io.netty.logging.InternalLogger;
@ -1125,8 +1125,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
byteSink.free();
msgSink.free();
byteSink.release();
msgSink.release();
}
@Override
@ -1148,8 +1148,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
break;
}
if (m instanceof Freeable) {
((Freeable) m).free();
if (m instanceof ReferenceCounted) {
((ReferenceCounted) m).release();
}
}
logger.warn(
@ -1262,8 +1262,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
msgSink.free();
byteSink.free();
msgSink.release();
byteSink.release();
}
}
@ -1290,8 +1290,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
discardedMessages ++;
}
if (m instanceof Freeable) {
((Freeable) m).free();
if (m instanceof ReferenceCounted) {
((ReferenceCounted) m).release();
}
}

View File

@ -57,7 +57,7 @@ public final class DatagramPacket extends DefaultByteBufHolder {
@Override
public String toString() {
if (isFreed()) {
if (refCnt() <= 0) {
return "DatagramPacket{remoteAddress=" + remoteAddress().toString() +
", data=(FREED)}";
}

View File

@ -254,7 +254,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
try {
if (buf.isReadable()) {
for (;;) {
if (buf.isFreed()) {
if (buf.refCnt() <= 0) {
break;
}
// Ensure the readerIndex of the buffer is 0 before beginning an async write.
@ -370,7 +370,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.writeInProgress = false;
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
if (buf.isFreed()) {
if (buf.refCnt() <= 0) {
return;
}

View File

@ -217,7 +217,7 @@ public final class NioDatagramChannel
throw new ChannelException(cause);
} finally {
if (free) {
buffer.free();
buffer.release();
}
}
}
@ -258,7 +258,7 @@ public final class NioDatagramChannel
buf.remove();
// packet was written free up buffer
packet.free();
packet.release();
if (buf.isEmpty()) {
// Wrote the outbound buffer completely - clear OP_WRITE.

View File

@ -22,11 +22,11 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPromise;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.DefaultDatagramChannelConfig;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -230,7 +230,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
throw new ChannelException(cause);
} finally {
if (free) {
buffer.free();
buffer.release();
}
}
}
@ -255,7 +255,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
}
socket.send(tmpPacket);
} finally {
p.free();
p.release();
}
}

View File

@ -17,7 +17,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Freeable;
import io.netty.buffer.ReferenceCounted;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
@ -121,15 +121,35 @@ public class DefaultChannelPipelineTest {
public void testFreeCalled() throws InterruptedException{
final CountDownLatch free = new CountDownLatch(1);
final Freeable holder = new Freeable() {
final ReferenceCounted holder = new ReferenceCounted() {
@Override
public void free() {
free.countDown();
public int refCnt() {
return (int) free.getCount();
}
@Override
public boolean isFreed() {
return free.getCount() == 0;
public void retain() {
fail();
}
@Override
public void retain(int increment) {
fail();
}
@Override
public boolean release() {
assertEquals(1, refCnt());
free.countDown();
return true;
}
@Override
public boolean release(int decrement) {
for (int i = 0; i < decrement; i ++) {
release();
}
return true;
}
};
LocalChannel channel = new LocalChannel();

View File

@ -424,7 +424,7 @@ public class LocalTransportThreadModelTest {
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) {
ctx.outboundByteBuffer().free();
ctx.outboundByteBuffer().release();
}
@Override
@ -523,7 +523,7 @@ public class LocalTransportThreadModelTest {
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.inboundByteBuffer().free();
ctx.inboundByteBuffer().release();
}
@Override