Make MessageBuf bounded

- Move common methods from ByteBuf to Buf
- Rename ensureWritableBytes() to ensureWritable()
- Rename readable() to isReadable()
- Rename writable() to isWritable()
- Add isReadable(int) and isWritable(int)
- Add AbstractMessageBuf
- Rewrite DefaultMessageBuf and QueueBackedMessageBuf
  - based on Josh Bloch's public domain ArrayDeque impl
This commit is contained in:
Trustin Lee 2013-01-31 23:28:08 +09:00 committed by Norman Maurer
parent ec013bf2d3
commit 42c65cca3a
26 changed files with 666 additions and 282 deletions

View File

@ -12,7 +12,7 @@ The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@ -74,6 +74,12 @@ facade for Java, which can be obtained at:
* HOMEPAGE:
* http://www.slf4j.org/
This product contains a modified portion of 'ArrayDeque', written by Josh
Bloch of Google, Inc:
* LICENSE:
* license/LICENSE.deque.txt (Public Domain)
This product optionally depends on 'Metrics', Yammer's JVM- and application-
level metrics library, which can be obtained at:

View File

@ -110,13 +110,35 @@ public abstract class AbstractByteBuf implements ByteBuf {
}
@Override
public boolean readable() {
public boolean isReadable() {
return writerIndex > readerIndex;
}
@Override
public boolean writable() {
return writableBytes() > 0;
@Deprecated
public final boolean readable() {
return isReadable();
}
@Override
public boolean isReadable(int numBytes) {
return writerIndex - readerIndex >= numBytes;
}
@Override
public boolean isWritable() {
return capacity() > writerIndex;
}
@Override
@Deprecated
public final boolean writable() {
return isWritable();
}
@Override
public boolean isWritable(int numBytes) {
return capacity() - writerIndex >= numBytes;
}
@Override
@ -216,7 +238,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
}
@Override
public ByteBuf ensureWritableBytes(int minWritableBytes) {
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
@ -241,7 +263,13 @@ public abstract class AbstractByteBuf implements ByteBuf {
}
@Override
public int ensureWritableBytes(int minWritableBytes, boolean force) {
@Deprecated
public final ByteBuf ensureWritableBytes(int minWritableBytes) {
return ensureWritable(minWritableBytes);
}
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
@ -660,14 +688,14 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeByte(int value) {
ensureWritableBytes(1);
ensureWritable(1);
setByte(writerIndex ++, value);
return this;
}
@Override
public ByteBuf writeShort(int value) {
ensureWritableBytes(2);
ensureWritable(2);
setShort(writerIndex, value);
writerIndex += 2;
return this;
@ -675,7 +703,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeMedium(int value) {
ensureWritableBytes(3);
ensureWritable(3);
setMedium(writerIndex, value);
writerIndex += 3;
return this;
@ -683,7 +711,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeInt(int value) {
ensureWritableBytes(4);
ensureWritable(4);
setInt(writerIndex, value);
writerIndex += 4;
return this;
@ -691,7 +719,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeLong(long value) {
ensureWritableBytes(8);
ensureWritable(8);
setLong(writerIndex, value);
writerIndex += 8;
return this;
@ -717,7 +745,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritableBytes(length);
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
@ -748,7 +776,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureWritableBytes(length);
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
@ -757,7 +785,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public ByteBuf writeBytes(ByteBuffer src) {
int length = src.remaining();
ensureWritableBytes(length);
ensureWritable(length);
setBytes(writerIndex, src);
writerIndex += length;
return this;
@ -766,7 +794,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public int writeBytes(InputStream in, int length)
throws IOException {
ensureWritableBytes(length);
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
@ -777,7 +805,7 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public int writeBytes(ScatteringByteChannel in, int length)
throws IOException {
ensureWritableBytes(length);
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
@ -956,6 +984,10 @@ public abstract class AbstractByteBuf implements ByteBuf {
buf.append(writerIndex);
buf.append(", cap: ");
buf.append(capacity());
if (maxCapacity != Integer.MAX_VALUE) {
buf.append('/');
buf.append(maxCapacity);
}
ByteBuf unwrapped = unwrap();
if (unwrapped != null) {

View File

@ -0,0 +1,159 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.util.AbstractQueue;
import java.util.Collection;
public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements MessageBuf<T> {
private final int maxCapacity;
private boolean freed;
protected AbstractMessageBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
}
this.maxCapacity = maxCapacity;
}
@Override
public final BufType type() {
return BufType.MESSAGE;
}
@Override
public final boolean isFreed() {
return freed;
}
@Override
public final void free() {
if (freed) {
return;
}
try {
doFree();
} finally {
freed = true;
}
}
protected abstract void doFree();
@Override
public final int maxCapacity() {
return maxCapacity;
}
@Override
public final boolean isReadable() {
return !isEmpty();
}
@Override
public final boolean isReadable(int size) {
if (size < 0) {
throw new IllegalArgumentException("size: " + size + " (expected: >= 0)");
}
return size() >= size;
}
@Override
public final boolean isWritable() {
return size() < maxCapacity;
}
@Override
public final boolean isWritable(int size) {
if (size < 0) {
throw new IllegalArgumentException("size: " + size + " (expected: >= 0)");
}
return size() <= maxCapacity - size;
}
protected final void checkUnfreed() {
if (isFreed()) {
throw new IllegalBufferAccessException();
}
}
@Override
public final boolean add(T t) {
return super.add(t);
}
@Override
public final T remove() {
return super.remove();
}
@Override
public final T element() {
return super.element();
}
@Override
public int drainTo(Collection<? super T> c) {
checkUnfreed();
int cnt = 0;
for (;;) {
T o = poll();
if (o == null) {
break;
}
c.add(o);
cnt ++;
}
return cnt;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
checkUnfreed();
int cnt = 0;
while (cnt < maxElements) {
T o = poll();
if (o == null) {
break;
}
c.add(o);
cnt ++;
}
return cnt;
}
@Override
public String toString() {
if (isFreed()) {
return getClass().getSimpleName() + "(freed)";
}
StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
buf.append("(size: ");
buf.append(size());
if (maxCapacity != Integer.MAX_VALUE) {
buf.append('/');
buf.append(maxCapacity);
}
buf.append(')');
return buf.toString();
}
}

View File

@ -23,4 +23,30 @@ public interface Buf extends Freeable {
* The BufType which will be handled by the Buf implementation
*/
BufType type();
/**
* Returns the maximum allowed capacity of this buffer.
*/
int maxCapacity();
/**
* Returns {@code true} if and only if this buffer contains at least one readable element.
*/
boolean isReadable();
/**
* Returns {@code true} if and only if this buffer contains equal to or more than the specified number of elements.
*/
boolean isReadable(int size);
/**
* Returns {@code true} if and only if this buffer has enough room to allow writing one element.
*/
boolean isWritable();
/**
* Returns {@code true} if and only if this buffer has enough room to allow writing the specified number of
* elements.
*/
boolean isWritable(int size);
}

View File

@ -246,9 +246,10 @@ public interface ByteBuf extends Buf, Comparable<ByteBuf> {
/**
* Returns the maximum allowed capacity of this buffer. If a user attempts to increase the
* capacity of this buffer beyond the maximum capacity using {@link #capacity(int)} or
* {@link #ensureWritableBytes(int)}, those methods will raise an
* {@link #ensureWritable(int)}, those methods will raise an
* {@link IllegalArgumentException}.
*/
@Override
int maxCapacity();
/**
@ -391,6 +392,13 @@ public interface ByteBuf extends Buf, Comparable<ByteBuf> {
* if and only if {@code (this.writerIndex - this.readerIndex)} is greater
* than {@code 0}.
*/
@Override
boolean isReadable();
/**
* @deprecated Use {@link #isReadable()} or {@link #isReadable(int)} instead.
*/
@Deprecated
boolean readable();
/**
@ -398,6 +406,13 @@ public interface ByteBuf extends Buf, Comparable<ByteBuf> {
* if and only if {@code (this.capacity - this.writerIndex)} is greater
* than {@code 0}.
*/
@Override
boolean isWritable();
/**
* @deprecated Use {@link #isWritable()} or {@link #isWritable(int)} instead.
*/
@Deprecated
boolean writable();
/**
@ -476,11 +491,17 @@ public interface ByteBuf extends Buf, Comparable<ByteBuf> {
* @throws IndexOutOfBoundsException
* if {@link #writerIndex()} + {@code minWritableBytes} > {@link #maxCapacity()}
*/
ByteBuf ensureWritable(int minWritableBytes);
/**
* @deprecated Use {@link #ensureWritable(int)} instead.
*/
@Deprecated
ByteBuf ensureWritableBytes(int minWritableBytes);
/**
* Tries to make sure the number of {@linkplain #writableBytes() the writable bytes}
* is equal to or greater than the specified value. Unlike {@link #ensureWritableBytes(int)},
* is equal to or greater than the specified value. Unlike {@link #ensureWritable(int)},
* this method does not raise an exception but returns a code.
*
* @param minWritableBytes
@ -497,7 +518,7 @@ public interface ByteBuf extends Buf, Comparable<ByteBuf> {
* {@code 3} if the buffer does not have enough bytes, but its capacity has been
* increased to its maximum.
*/
int ensureWritableBytes(int minWritableBytes, boolean force);
int ensureWritable(int minWritableBytes, boolean force);
/**
* Gets a boolean at the specified absolute (@code index) in this buffer.

View File

@ -103,7 +103,7 @@ public class ByteBufInputStream extends InputStream implements DataInput {
@Override
public int read() throws IOException {
if (!buffer.readable()) {
if (!buffer.isReadable()) {
return -1;
}
return buffer.readByte() & 0xff;
@ -143,7 +143,7 @@ public class ByteBufInputStream extends InputStream implements DataInput {
@Override
public byte readByte() throws IOException {
if (!buffer.readable()) {
if (!buffer.isReadable()) {
throw new EOFException();
}
return buffer.readByte();

View File

@ -42,7 +42,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the {@link ByteBuf} to add
* @return self
* this instance
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the index is invalid
*/
CompositeByteBuf addComponent(int cIndex, ByteBuf buffer);
@ -72,7 +72,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the {@link ByteBuf}s to add
* @return self
* this instance
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the index is invalid
*
*/
@ -87,7 +87,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the {@link ByteBuf}s to add
* @return self
* this instance
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the index is invalid
*/
CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers);
@ -99,7 +99,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the index on from which the {@link ByteBuf} will be remove
* @return self
* this instance
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the index is invalid
*/
CompositeByteBuf removeComponent(int cIndex);
@ -113,7 +113,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the number of components to remove
* @return self
* this instance
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the index is invalid
*/
CompositeByteBuf removeComponents(int cIndex, int numComponents);
@ -135,7 +135,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the index for which the {@link ByteBuf} should be returned
* @return buf
* the {@link ByteBuf} on the specified index
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the index is invalid
*/
ByteBuf component(int cIndex);
@ -147,7 +147,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the offset for which the {@link ByteBuf} should be returned
* @return buf
* the {@link ByteBuf} on the specified index
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the offset is invalid
*/
ByteBuf componentAtOffset(int offset);
@ -175,7 +175,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
* the number of components to compose
* @return self
* this instance
* @throws {@link IndexOutOfBoundsException}
* @throws IndexOutOfBoundsException
* if the offset is invalid
*/
CompositeByteBuf consolidate(int cIndex, int numComponents);
@ -226,7 +226,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
CompositeByteBuf discardSomeReadBytes();
@Override
CompositeByteBuf ensureWritableBytes(int minWritableBytes);
CompositeByteBuf ensureWritable(int minWritableBytes);
@Override
CompositeByteBuf getBytes(int index, ByteBuf dst);

View File

@ -197,7 +197,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
if (b == null) {
break;
}
if (b.readable()) {
if (b.isReadable()) {
cIndex = addComponent0(cIndex, b, false) + 1;
int size = components.size();
if (cIndex > size) {
@ -1341,8 +1341,8 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
@Override
public CompositeByteBuf ensureWritableBytes(int minWritableBytes) {
return (CompositeByteBuf) super.ensureWritableBytes(minWritableBytes);
public CompositeByteBuf ensureWritable(int minWritableBytes) {
return (CompositeByteBuf) super.ensureWritable(minWritableBytes);
}
@Override

View File

@ -13,179 +13,308 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Josh Bloch of Google Inc. and released to the public domain,
* as explained at http://creativecommons.org/publicdomain/zero/1.0/.
*/
package io.netty.buffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.lang.reflect.Array;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Default {@link MessageBuf} implementation
*
*/
final class DefaultMessageBuf<T> extends ArrayDeque<T> implements MessageBuf<T> {
final class DefaultMessageBuf<T> extends AbstractMessageBuf<T> {
private static final long serialVersionUID = 1229808623624907552L;
private static final int MIN_INITIAL_CAPACITY = 8;
private boolean freed;
private T[] elements;
private int head;
private int tail;
DefaultMessageBuf() { }
DefaultMessageBuf() {
this(MIN_INITIAL_CAPACITY << 1);
}
DefaultMessageBuf(int initialCapacity) {
super(initialCapacity);
this(initialCapacity, Integer.MAX_VALUE);
}
DefaultMessageBuf(int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: >= 0)");
}
if (maxCapacity < initialCapacity) {
throw new IllegalArgumentException(
"maxCapacity: " + maxCapacity + " (expected: >= initialCapacity(" + initialCapacity + ')');
}
// Find the best power of two to hold elements.
// Tests "<=" because arrays aren't kept full.
if (initialCapacity >= MIN_INITIAL_CAPACITY) {
initialCapacity |= initialCapacity >>> 1;
initialCapacity |= initialCapacity >>> 2;
initialCapacity |= initialCapacity >>> 4;
initialCapacity |= initialCapacity >>> 8;
initialCapacity |= initialCapacity >>> 16;
initialCapacity ++;
if (initialCapacity < 0) { // Too many elements, must back off
initialCapacity >>>= 1; // Good luck allocating 2 ^ 30 elements
}
} else {
initialCapacity = MIN_INITIAL_CAPACITY;
}
elements = cast(new Object[initialCapacity]);
}
@Override
public BufType type() {
return BufType.MESSAGE;
protected void doFree() {
elements = null;
head = 0;
tail = 0;
}
@Override
public void addFirst(T t) {
ensureValid();
super.addFirst(t);
public boolean offer(T e) {
if (e == null) {
throw new NullPointerException();
}
if (!isWritable()) {
return false;
}
elements[tail] = e;
if ((tail = tail + 1 & elements.length - 1) == head) {
doubleCapacity();
}
return true;
}
private void doubleCapacity() {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0) {
throw new IllegalStateException("Sorry, deque too big");
}
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = cast(a);
head = 0;
tail = n;
}
@Override
public void addLast(T t) {
ensureValid();
super.addLast(t);
public T poll() {
int h = head;
T result = elements[h]; // Element is null if deque empty
if (result == null) {
return null;
}
elements[h] = null; // Must null out slot
head = h + 1 & elements.length - 1;
return result;
}
@Override
public T pollFirst() {
ensureValid();
return super.pollFirst();
public T peek() {
return elements[head]; // elements[head] is null if deque empty
}
@Override
public T pollLast() {
ensureValid();
return super.pollLast();
public boolean remove(Object o) {
if (o == null) {
return false;
}
int mask = elements.length - 1;
int i = head;
T x;
while ((x = elements[i]) != null) {
if (o.equals(x)) {
delete(i);
return true;
}
i = i + 1 & mask;
}
return false;
}
private boolean delete(int i) {
assert elements[tail] == null;
assert head == tail ? elements[head] == null
: elements[head] != null && elements[tail - 1 & elements.length - 1] != null;
assert elements[head - 1 & elements.length - 1] == null;
final T[] elements = this.elements;
final int mask = elements.length - 1;
final int h = head;
final int t = tail;
final int front = i - h & mask;
final int back = t - i & mask;
// Invariant: head <= i < tail mod circularity
if (front >= (t - h & mask)) {
throw new ConcurrentModificationException();
}
// Optimize for least element motion
if (front < back) {
if (h <= i) {
System.arraycopy(elements, h, elements, h + 1, front);
} else { // Wrap around
System.arraycopy(elements, 0, elements, 1, i);
elements[0] = elements[mask];
System.arraycopy(elements, h, elements, h + 1, mask - h);
}
elements[h] = null;
head = h + 1 & mask;
return false;
} else {
if (i < t) { // Copy the null tail as well
System.arraycopy(elements, i + 1, elements, i, back);
tail = t - 1;
} else { // Wrap around
System.arraycopy(elements, i + 1, elements, i, mask - i);
elements[mask] = elements[0];
System.arraycopy(elements, 1, elements, 0, t);
tail = t - 1 & mask;
}
return true;
}
}
@Override
public T getFirst() {
ensureValid();
return super.getFirst();
public int size() {
return tail - head & elements.length - 1;
}
@Override
public T getLast() {
ensureValid();
return super.getLast();
}
@Override
public T peekFirst() {
ensureValid();
return super.peekFirst();
}
@Override
public T peekLast() {
ensureValid();
return super.peekLast();
}
@Override
public boolean removeFirstOccurrence(Object o) {
ensureValid();
return super.removeFirstOccurrence(o);
}
@Override
public boolean removeLastOccurrence(Object o) {
ensureValid();
return super.removeLastOccurrence(o);
public boolean isEmpty() {
return head == tail;
}
@Override
public Iterator<T> iterator() {
ensureValid();
return super.iterator();
}
@Override
public Iterator<T> descendingIterator() {
ensureValid();
return super.descendingIterator();
return new Itr();
}
@Override
public boolean contains(Object o) {
ensureValid();
return super.contains(o);
if (o == null) {
return false;
}
final int mask = elements.length - 1;
int i = head;
Object e;
while ((e = elements[i]) != null) {
if (o.equals(e)) {
return true;
}
i = i + 1 & mask;
}
return false;
}
@Override
public void clear() {
ensureValid();
super.clear();
int head = this.head;
int tail = this.tail;
if (head != tail) {
this.head = this.tail = 0;
final int mask = elements.length - 1;
int i = head;
do {
elements[i] = null;
i = i + 1 & mask;
} while (i != tail);
}
}
@Override
public Object[] toArray() {
ensureValid();
return super.toArray();
return copyElements(new Object[size()]);
}
@Override
public <T1> T1[] toArray(T1[] a) {
ensureValid();
return super.toArray(a);
}
@Override
public ArrayDeque<T> clone() {
ensureValid();
return super.clone();
}
@Override
public int drainTo(Collection<? super T> c) {
ensureValid();
int cnt = 0;
for (;;) {
T o = poll();
if (o == null) {
break;
}
c.add(o);
cnt ++;
public <T> T[] toArray(T[] a) {
int size = size();
if (a.length < size) {
a = cast(Array.newInstance(a.getClass().getComponentType(), size));
}
return cnt;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
ensureValid();
int cnt = 0;
while (cnt < maxElements) {
T o = poll();
if (o == null) {
break;
}
c.add(o);
cnt ++;
copyElements(a);
if (a.length > size) {
a[size] = null;
}
return cnt;
return a;
}
@Override
public boolean isFreed() {
return freed;
private <U> U[] copyElements(U[] a) {
if (head < tail) {
System.arraycopy(elements, head, cast(a), 0, size());
} else if (head > tail) {
int headPortionLen = elements.length - head;
System.arraycopy(elements, head, cast(a), 0, headPortionLen);
System.arraycopy(elements, 0, cast(a), headPortionLen, tail);
}
return a;
}
@Override
public void free() {
freed = true;
super.clear();
@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
private static <T> T[] cast(Object a) {
return (T[]) a;
}
private void ensureValid() {
if (freed) {
throw new IllegalBufferAccessException();
private class Itr implements Iterator<T> {
private int cursor = head;
private int fence = tail;
private int lastRet = -1;
@Override
public boolean hasNext() {
return cursor != fence;
}
@Override
public T next() {
if (cursor == fence) {
throw new NoSuchElementException();
}
T result = elements[cursor];
// This check doesn't catch all possible comodifications,
// but does catch the ones that corrupt traversal
if (tail != fence || result == null) {
throw new ConcurrentModificationException();
}
lastRet = cursor;
cursor = cursor + 1 & elements.length - 1;
return result;
}
@Override
public void remove() {
if (lastRet < 0) {
throw new IllegalStateException();
}
if (delete(lastRet)) { // if left-shifted, undo increment in next()
cursor = cursor - 1 & elements.length - 1;
fence = tail;
}
lastRet = -1;
}
}
}

View File

@ -19,180 +19,108 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
final class QueueBackedMessageBuf<T> implements MessageBuf<T> {
final class QueueBackedMessageBuf<T> extends AbstractMessageBuf<T> {
private final Queue<T> queue;
private boolean freed;
QueueBackedMessageBuf(Queue<T> queue) {
super(Integer.MAX_VALUE);
if (queue == null) {
throw new NullPointerException("queue");
}
this.queue = queue;
}
@Override
public BufType type() {
return BufType.MESSAGE;
}
@Override
public boolean add(T e) {
ensureValid();
return queue.add(e);
}
@Override
public boolean offer(T e) {
ensureValid();
return queue.offer(e);
}
@Override
public T remove() {
ensureValid();
return queue.remove();
checkUnfreed();
return isWritable() && queue.offer(e);
}
@Override
public T poll() {
ensureValid();
checkUnfreed();
return queue.poll();
}
@Override
public T element() {
ensureValid();
return queue.element();
}
@Override
public T peek() {
ensureValid();
checkUnfreed();
return queue.peek();
}
@Override
public int size() {
ensureValid();
return queue.size();
}
@Override
public boolean isEmpty() {
ensureValid();
return queue.isEmpty();
}
@Override
public boolean contains(Object o) {
ensureValid();
checkUnfreed();
return queue.contains(o);
}
@Override
public Iterator<T> iterator() {
ensureValid();
checkUnfreed();
return queue.iterator();
}
@Override
public Object[] toArray() {
ensureValid();
checkUnfreed();
return queue.toArray();
}
@Override
public <E> E[] toArray(E[] a) {
ensureValid();
checkUnfreed();
return queue.toArray(a);
}
@Override
public boolean remove(Object o) {
ensureValid();
checkUnfreed();
return queue.remove(o);
}
@Override
public boolean containsAll(Collection<?> c) {
ensureValid();
checkUnfreed();
return queue.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends T> c) {
ensureValid();
return queue.addAll(c);
checkUnfreed();
return isWritable(c.size()) && queue.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
ensureValid();
checkUnfreed();
return queue.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
ensureValid();
checkUnfreed();
return queue.retainAll(c);
}
@Override
public void clear() {
ensureValid();
checkUnfreed();
queue.clear();
}
@Override
public int drainTo(Collection<? super T> c) {
ensureValid();
int cnt = 0;
for (;;) {
T o = poll();
if (o == null) {
break;
}
c.add(o);
cnt ++;
}
return cnt;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
ensureValid();
int cnt = 0;
while (cnt < maxElements) {
T o = poll();
if (o == null) {
break;
}
c.add(o);
cnt ++;
}
return cnt;
}
@Override
public boolean isFreed() {
return freed;
}
@Override
public void free() {
freed = true;
queue.clear();
}
@Override
public String toString() {
return queue.toString();
}
private void ensureValid() {
if (freed) {
throw new IllegalBufferAccessException();
}
protected void doFree() {
clear();
}
}

View File

@ -137,13 +137,35 @@ public final class SwappedByteBuf implements ByteBuf {
}
@Override
public boolean readable() {
return buf.readable();
public boolean isReadable() {
return buf.isReadable();
}
@Override
@Deprecated
public boolean readable() {
return buf.isReadable();
}
@Override
public boolean isReadable(int size) {
return buf.isReadable(size);
}
@Override
public boolean isWritable() {
return buf.isWritable();
}
@Override
@Deprecated
public boolean writable() {
return buf.writable();
return buf.isWritable();
}
@Override
public boolean isWritable(int size) {
return buf.isWritable(size);
}
@Override
@ -189,14 +211,21 @@ public final class SwappedByteBuf implements ByteBuf {
}
@Override
public ByteBuf ensureWritableBytes(int writableBytes) {
buf.ensureWritableBytes(writableBytes);
public ByteBuf ensureWritable(int writableBytes) {
buf.ensureWritable(writableBytes);
return this;
}
@Override
public int ensureWritableBytes(int minWritableBytes, boolean force) {
return buf.ensureWritableBytes(minWritableBytes, force);
@Deprecated
public ByteBuf ensureWritableBytes(int minWritableBytes) {
buf.ensureWritable(minWritableBytes);
return this;
}
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
return buf.ensureWritable(minWritableBytes, force);
}
@Override

View File

@ -292,6 +292,10 @@ public final class Unpooled {
return new DefaultMessageBuf<T>(initialCapacity);
}
public static <T> MessageBuf<T> messageBuffer(int initialCapacity, int maxCapacity) {
return new DefaultMessageBuf<T>(initialCapacity, maxCapacity);
}
public static <T> MessageBuf<T> wrappedBuffer(Queue<T> queue) {
if (queue instanceof MessageBuf) {
return (MessageBuf<T>) queue;
@ -405,7 +409,7 @@ public final class Unpooled {
* returned buffer.
*/
public static ByteBuf wrappedBuffer(ByteBuf buffer) {
if (buffer.readable()) {
if (buffer.isReadable()) {
return buffer.slice();
} else {
return EMPTY_BUFFER;
@ -483,13 +487,13 @@ public final class Unpooled {
case 0:
break;
case 1:
if (buffers[0].readable()) {
if (buffers[0].isReadable()) {
return wrappedBuffer(buffers[0].order(BIG_ENDIAN));
}
break;
default:
for (ByteBuf b: buffers) {
if (b.readable()) {
if (b.isReadable()) {
return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, buffers);
}
}
@ -600,7 +604,7 @@ public final class Unpooled {
* respectively.
*/
public static ByteBuf copiedBuffer(ByteBuf buffer) {
if (buffer.readable()) {
if (buffer.isReadable()) {
return buffer.copy();
} else {
return EMPTY_BUFFER;

View File

@ -54,7 +54,7 @@ public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame
// Binary frame
ByteBuf data = msg.data();
int dataLen = data.readableBytes();
out.ensureWritableBytes(dataLen + 5);
out.ensureWritable(dataLen + 5);
// Encode type.
out.writeByte((byte) 0x80);

View File

@ -142,18 +142,18 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame
int maskLength = maskPayload ? 4 : 0;
if (length <= 125) {
out.ensureWritableBytes(2 + maskLength + length);
out.ensureWritable(2 + maskLength + length);
out.writeByte(b0);
byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
out.writeByte(b);
} else if (length <= 0xFFFF) {
out.ensureWritableBytes(4 + maskLength + length);
out.ensureWritable(4 + maskLength + length);
out.writeByte(b0);
out.writeByte(maskPayload ? 0xFE : 126);
out.writeByte(length >>> 8 & 0xFF);
out.writeByte(length & 0xFF);
} else {
out.ensureWritableBytes(10 + maskLength + length);
out.ensureWritable(10 + maskLength + length);
out.writeByte(b0);
out.writeByte(maskPayload ? 0xFF : 127);
out.writeLong(length);

View File

@ -84,7 +84,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
ByteBuf data = spdyDataFrame.data();
byte flags = spdyDataFrame.isLast() ? SPDY_DATA_FLAG_FIN : 0;
out.ensureWritableBytes(SPDY_HEADER_SIZE + data.readableBytes());
out.ensureWritable(SPDY_HEADER_SIZE + data.readableBytes());
out.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF);
out.writeByte(flags);
out.writeMedium(data.readableBytes());
@ -106,7 +106,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else {
length = 10 + headerBlockLength;
}
out.ensureWritableBytes(SPDY_HEADER_SIZE + length);
out.ensureWritable(SPDY_HEADER_SIZE + length);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_SYN_STREAM_FRAME);
out.writeByte(flags);
@ -141,7 +141,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else {
length = 4 + headerBlockLength;
}
out.ensureWritableBytes(SPDY_HEADER_SIZE + length);
out.ensureWritable(SPDY_HEADER_SIZE + length);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_SYN_REPLY_FRAME);
out.writeByte(flags);
@ -159,7 +159,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else if (msg instanceof SpdyRstStreamFrame) {
SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
out.ensureWritableBytes(SPDY_HEADER_SIZE + 8);
out.ensureWritable(SPDY_HEADER_SIZE + 8);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_RST_STREAM_FRAME);
out.writeInt(8);
@ -174,7 +174,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
Set<Integer> IDs = spdySettingsFrame.getIds();
int numEntries = IDs.size();
int length = 4 + numEntries * 8;
out.ensureWritableBytes(SPDY_HEADER_SIZE + length);
out.ensureWritable(SPDY_HEADER_SIZE + length);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_SETTINGS_FRAME);
out.writeByte(flags);
@ -206,7 +206,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else if (msg instanceof SpdyNoOpFrame) {
out.ensureWritableBytes(SPDY_HEADER_SIZE);
out.ensureWritable(SPDY_HEADER_SIZE);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_NOOP_FRAME);
out.writeInt(0);
@ -214,7 +214,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else if (msg instanceof SpdyPingFrame) {
SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
out.ensureWritableBytes(SPDY_HEADER_SIZE + 4);
out.ensureWritable(SPDY_HEADER_SIZE + 4);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_PING_FRAME);
out.writeInt(4);
@ -224,7 +224,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg;
int length = version < 3 ? 4 : 8;
out.ensureWritableBytes(SPDY_HEADER_SIZE + length);
out.ensureWritable(SPDY_HEADER_SIZE + length);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_GOAWAY_FRAME);
out.writeInt(length);
@ -246,7 +246,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else {
length = 4 + headerBlockLength;
}
out.ensureWritableBytes(SPDY_HEADER_SIZE + length);
out.ensureWritable(SPDY_HEADER_SIZE + length);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_HEADERS_FRAME);
out.writeByte(flags);
@ -260,7 +260,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
} else if (msg instanceof SpdyWindowUpdateFrame) {
SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
out.ensureWritableBytes(SPDY_HEADER_SIZE + 8);
out.ensureWritable(SPDY_HEADER_SIZE + 8);
out.writeShort(version | 0x8000);
out.writeShort(SPDY_WINDOW_UPDATE_FRAME);
out.writeInt(8);

View File

@ -137,13 +137,18 @@ final class ReplayingDecoderBuffer implements ByteBuf {
throw new UnreplayableOperationException();
}
@Override
public ByteBuf ensureWritable(int writableBytes) {
throw new UnreplayableOperationException();
}
@Override
public ByteBuf ensureWritableBytes(int writableBytes) {
throw new UnreplayableOperationException();
}
@Override
public int ensureWritableBytes(int minWritableBytes, boolean force) {
public int ensureWritable(int minWritableBytes, boolean force) {
throw new UnreplayableOperationException();
}
@ -384,9 +389,19 @@ final class ReplayingDecoderBuffer implements ByteBuf {
return swapped;
}
@Override
public boolean isReadable() {
return terminated? buffer.isReadable() : true;
}
@Override
public boolean readable() {
return terminated? buffer.readable() : true;
return isReadable();
}
@Override
public boolean isReadable(int size) {
return terminated? buffer.isReadable(size) : true;
}
@Override
@ -718,11 +733,21 @@ final class ReplayingDecoderBuffer implements ByteBuf {
')';
}
@Override
public boolean isWritable() {
return false;
}
@Override
public boolean writable() {
return false;
}
@Override
public boolean isWritable(int size) {
return false;
}
@Override
public int writableBytes() {
return 0;

View File

@ -85,7 +85,7 @@ public class JZlibDecoder extends ZlibDecoder {
ChannelHandlerContext ctx,
ByteBuf in, ByteBuf out) throws Exception {
if (!in.readable()) {
if (!in.isReadable()) {
return;
}
@ -116,7 +116,7 @@ public class JZlibDecoder extends ZlibDecoder {
loop: for (;;) {
z.avail_out = maxOutputLength;
if (outHasArray) {
out.ensureWritableBytes(maxOutputLength);
out.ensureWritable(maxOutputLength);
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();
} else {

View File

@ -294,7 +294,7 @@ public class JZlibEncoder extends ZlibEncoder {
boolean outHasArray = out.hasArray();
z.avail_out = maxOutputLength;
if (outHasArray) {
out.ensureWritableBytes(maxOutputLength);
out.ensureWritable(maxOutputLength);
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();
} else {

View File

@ -189,7 +189,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
uncompressed.readBytes(inAry);
int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12;
out.ensureWritableBytes(sizeEstimate);
out.ensureWritable(sizeEstimate);
synchronized (deflater) {
if (gzip) {

View File

@ -268,9 +268,9 @@ public class Snappy {
return;
}
out.ensureWritableBytes(inputLength);
out.ensureWritable(inputLength);
while (in.readable() && in.readerIndex() - inIndex < maxLength) {
while (in.isReadable() && in.readerIndex() - inIndex < maxLength) {
byte tag = in.readByte();
switch (tag & 0x03) {
case LITERAL:

View File

@ -15,14 +15,13 @@
*/
package io.netty.handler.codec.protobuf;
import com.google.protobuf.CodedOutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import com.google.protobuf.CodedOutputStream;
/**
* An encoder that prepends the the Google Protocol Buffers
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html#varints">Base
@ -52,7 +51,7 @@ public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<B
ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
int bodyLen = msg.readableBytes();
int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
out.ensureWritableBytes(headerLen + bodyLen);
out.ensureWritable(headerLen + bodyLen);
CodedOutputStream headerOut =
CodedOutputStream.newInstance(new ByteBufOutputStream(out));

View File

@ -100,7 +100,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.data();
if (content.readable()) {
if (content.isReadable()) {
buf.append("CONTENT: ");
buf.append(content.toString(CharsetUtil.UTF_8));
buf.append("\r\n");

View File

@ -450,7 +450,7 @@ public class SslHandler
if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already.
// Any further write attempts should be denied.
if (in.readable()) {
if (in.isReadable()) {
in.clear();
SSLException e = new SSLException("SSLEngine already closed");
promise.setFailure(e);
@ -465,7 +465,7 @@ public class SslHandler
ctx.flush();
continue;
case NEED_UNWRAP:
if (ctx.inboundByteBuffer().readable()) {
if (ctx.inboundByteBuffer().isReadable()) {
unwrapLater = true;
}
break;
@ -556,7 +556,7 @@ public class SslHandler
in.skipBytes(result.bytesConsumed());
out.writerIndex(out.writerIndex() + result.bytesProduced());
if (result.getStatus() == Status.BUFFER_OVERFLOW) {
out.ensureWritableBytes(engine.getSession().getPacketBufferSize());
out.ensureWritable(engine.getSession().getPacketBufferSize());
} else {
return result;
}
@ -862,7 +862,7 @@ public class SslHandler
out.writerIndex(out.writerIndex() + result.bytesProduced());
switch (result.getStatus()) {
case BUFFER_OVERFLOW:
out.ensureWritableBytes(engine.getSession().getApplicationBufferSize());
out.ensureWritable(engine.getSession().getApplicationBufferSize());
break;
default:
return result;

26
license/LICENSE.deque.txt Normal file
View File

@ -0,0 +1,26 @@
The person or persons who have associated work with this document (the
"Dedicator" or "Certifier") hereby either (a) certifies that, to the best of
his knowledge, the work of authorship identified is in the public domain of
the country from which the work is published, or (b) hereby dedicates whatever
copyright the dedicators holds in the work of authorship identified below (the
"Work") to the public domain. A certifier, moreover, dedicates any copyright
interest he may have in the associated work, and for these purposes, is
described as a "dedicator" below.
A certifier has taken reasonable steps to verify the copyright status of this
work. Certifier recognizes that his good faith efforts may not shield him from
liability if in fact the work certified is not in the public domain.
Dedicator makes this dedication for the benefit of the public at large and to
the detriment of the Dedicator's heirs and successors. Dedicator intends this
dedication to be an overt act of relinquishment in perpetuate of all present
and future rights under copyright law, whether vested or contingent, in the
Work. Dedicator understands that such relinquishment of all rights includes
the relinquishment of all rights to enforce (by lawsuit or otherwise) those
copyrights in the Work.
Dedicator recognizes that, once placed in the public domain, the Work may be
freely reproduced, distributed, transmitted, used, modified, built upon, or
otherwise exploited by anyone for any purpose, commercial or non-commercial,
and in any way, including by methods that have not yet been invented or
conceived.

View File

@ -361,7 +361,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
byteBuf.capacity(maxCapacity);
} else {
// Expand by the increment.
byteBuf.ensureWritableBytes(increment);
byteBuf.ensureWritable(increment);
}
return 1;

View File

@ -90,7 +90,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
break;
}
if (byteBuf.writable()) {
if (byteBuf.isWritable()) {
continue;
}
@ -100,7 +100,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
if (!byteBuf.writable()) {
if (!byteBuf.isWritable()) {
throw new IllegalStateException(
"an inbound handler whose buffer is full must consume at " +
"least one byte.");
@ -111,7 +111,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritableBytes(available);
byteBuf.ensureWritable(available);
}
}
}
@ -151,7 +151,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
while (buf.readable()) {
while (buf.isReadable()) {
doWriteBytes(buf);
}
buf.clear();