diff --git a/NOTICE.txt b/NOTICE.txt index cda0d4ce33..717f96d66c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12,7 +12,7 @@ The Netty Project licenses this file to you under the Apache License, version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: -http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT @@ -74,6 +74,12 @@ facade for Java, which can be obtained at: * HOMEPAGE: * http://www.slf4j.org/ +This product contains a modified portion of 'ArrayDeque', written by Josh +Bloch of Google, Inc: + + * LICENSE: + * license/LICENSE.deque.txt (Public Domain) + This product optionally depends on 'Metrics', Yammer's JVM- and application- level metrics library, which can be obtained at: diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index 0ccc8727b7..5ff1e499b1 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -110,13 +110,35 @@ public abstract class AbstractByteBuf implements ByteBuf { } @Override - public boolean readable() { + public boolean isReadable() { return writerIndex > readerIndex; } @Override - public boolean writable() { - return writableBytes() > 0; + @Deprecated + public final boolean readable() { + return isReadable(); + } + + @Override + public boolean isReadable(int numBytes) { + return writerIndex - readerIndex >= numBytes; + } + + @Override + public boolean isWritable() { + return capacity() > writerIndex; + } + + @Override + @Deprecated + public final boolean writable() { + return isWritable(); + } + + @Override + public boolean isWritable(int numBytes) { + return capacity() - writerIndex >= numBytes; } @Override @@ -216,7 +238,7 @@ public abstract class AbstractByteBuf implements ByteBuf { } @Override - public ByteBuf ensureWritableBytes(int minWritableBytes) { + public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); @@ -241,7 +263,13 @@ public abstract class AbstractByteBuf implements ByteBuf { } @Override - public int ensureWritableBytes(int minWritableBytes, boolean force) { + @Deprecated + public final ByteBuf ensureWritableBytes(int minWritableBytes) { + return ensureWritable(minWritableBytes); + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); @@ -660,14 +688,14 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeByte(int value) { - ensureWritableBytes(1); + ensureWritable(1); setByte(writerIndex ++, value); return this; } @Override public ByteBuf writeShort(int value) { - ensureWritableBytes(2); + ensureWritable(2); setShort(writerIndex, value); writerIndex += 2; return this; @@ -675,7 +703,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeMedium(int value) { - ensureWritableBytes(3); + ensureWritable(3); setMedium(writerIndex, value); writerIndex += 3; return this; @@ -683,7 +711,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeInt(int value) { - ensureWritableBytes(4); + ensureWritable(4); setInt(writerIndex, value); writerIndex += 4; return this; @@ -691,7 +719,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeLong(long value) { - ensureWritableBytes(8); + ensureWritable(8); setLong(writerIndex, value); writerIndex += 8; return this; @@ -717,7 +745,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { - ensureWritableBytes(length); + ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; @@ -748,7 +776,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { - ensureWritableBytes(length); + ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; @@ -757,7 +785,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public ByteBuf writeBytes(ByteBuffer src) { int length = src.remaining(); - ensureWritableBytes(length); + ensureWritable(length); setBytes(writerIndex, src); writerIndex += length; return this; @@ -766,7 +794,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public int writeBytes(InputStream in, int length) throws IOException { - ensureWritableBytes(length); + ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; @@ -777,7 +805,7 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { - ensureWritableBytes(length); + ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; @@ -956,6 +984,10 @@ public abstract class AbstractByteBuf implements ByteBuf { buf.append(writerIndex); buf.append(", cap: "); buf.append(capacity()); + if (maxCapacity != Integer.MAX_VALUE) { + buf.append('/'); + buf.append(maxCapacity); + } ByteBuf unwrapped = unwrap(); if (unwrapped != null) { diff --git a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java new file mode 100644 index 0000000000..274c8b437a --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java @@ -0,0 +1,159 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import java.util.AbstractQueue; +import java.util.Collection; + +public abstract class AbstractMessageBuf extends AbstractQueue implements MessageBuf { + + private final int maxCapacity; + private boolean freed; + + protected AbstractMessageBuf(int maxCapacity) { + if (maxCapacity < 0) { + throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)"); + } + this.maxCapacity = maxCapacity; + } + + @Override + public final BufType type() { + return BufType.MESSAGE; + } + + @Override + public final boolean isFreed() { + return freed; + } + + @Override + public final void free() { + if (freed) { + return; + } + + try { + doFree(); + } finally { + freed = true; + } + } + + protected abstract void doFree(); + + @Override + public final int maxCapacity() { + return maxCapacity; + } + + @Override + public final boolean isReadable() { + return !isEmpty(); + } + + @Override + public final boolean isReadable(int size) { + if (size < 0) { + throw new IllegalArgumentException("size: " + size + " (expected: >= 0)"); + } + return size() >= size; + } + + @Override + public final boolean isWritable() { + return size() < maxCapacity; + } + + @Override + public final boolean isWritable(int size) { + if (size < 0) { + throw new IllegalArgumentException("size: " + size + " (expected: >= 0)"); + } + return size() <= maxCapacity - size; + } + + protected final void checkUnfreed() { + if (isFreed()) { + throw new IllegalBufferAccessException(); + } + } + + @Override + public final boolean add(T t) { + return super.add(t); + } + + @Override + public final T remove() { + return super.remove(); + } + + @Override + public final T element() { + return super.element(); + } + + @Override + public int drainTo(Collection c) { + checkUnfreed(); + int cnt = 0; + for (;;) { + T o = poll(); + if (o == null) { + break; + } + c.add(o); + cnt ++; + } + return cnt; + } + + @Override + public int drainTo(Collection c, int maxElements) { + checkUnfreed(); + int cnt = 0; + while (cnt < maxElements) { + T o = poll(); + if (o == null) { + break; + } + c.add(o); + cnt ++; + } + return cnt; + } + + @Override + public String toString() { + if (isFreed()) { + return getClass().getSimpleName() + "(freed)"; + } + + StringBuilder buf = new StringBuilder(); + buf.append(getClass().getSimpleName()); + buf.append("(size: "); + buf.append(size()); + if (maxCapacity != Integer.MAX_VALUE) { + buf.append('/'); + buf.append(maxCapacity); + } + buf.append(')'); + + return buf.toString(); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/Buf.java b/buffer/src/main/java/io/netty/buffer/Buf.java index 7e806d07d5..9766cd61e2 100644 --- a/buffer/src/main/java/io/netty/buffer/Buf.java +++ b/buffer/src/main/java/io/netty/buffer/Buf.java @@ -23,4 +23,30 @@ public interface Buf extends Freeable { * The BufType which will be handled by the Buf implementation */ BufType type(); + + /** + * Returns the maximum allowed capacity of this buffer. + */ + int maxCapacity(); + + /** + * Returns {@code true} if and only if this buffer contains at least one readable element. + */ + boolean isReadable(); + + /** + * Returns {@code true} if and only if this buffer contains equal to or more than the specified number of elements. + */ + boolean isReadable(int size); + + /** + * Returns {@code true} if and only if this buffer has enough room to allow writing one element. + */ + boolean isWritable(); + + /** + * Returns {@code true} if and only if this buffer has enough room to allow writing the specified number of + * elements. + */ + boolean isWritable(int size); } diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index e56a880045..2a5a8e3f39 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -246,9 +246,10 @@ public interface ByteBuf extends Buf, Comparable { /** * Returns the maximum allowed capacity of this buffer. If a user attempts to increase the * capacity of this buffer beyond the maximum capacity using {@link #capacity(int)} or - * {@link #ensureWritableBytes(int)}, those methods will raise an + * {@link #ensureWritable(int)}, those methods will raise an * {@link IllegalArgumentException}. */ + @Override int maxCapacity(); /** @@ -391,6 +392,13 @@ public interface ByteBuf extends Buf, Comparable { * if and only if {@code (this.writerIndex - this.readerIndex)} is greater * than {@code 0}. */ + @Override + boolean isReadable(); + + /** + * @deprecated Use {@link #isReadable()} or {@link #isReadable(int)} instead. + */ + @Deprecated boolean readable(); /** @@ -398,6 +406,13 @@ public interface ByteBuf extends Buf, Comparable { * if and only if {@code (this.capacity - this.writerIndex)} is greater * than {@code 0}. */ + @Override + boolean isWritable(); + + /** + * @deprecated Use {@link #isWritable()} or {@link #isWritable(int)} instead. + */ + @Deprecated boolean writable(); /** @@ -476,11 +491,17 @@ public interface ByteBuf extends Buf, Comparable { * @throws IndexOutOfBoundsException * if {@link #writerIndex()} + {@code minWritableBytes} > {@link #maxCapacity()} */ + ByteBuf ensureWritable(int minWritableBytes); + + /** + * @deprecated Use {@link #ensureWritable(int)} instead. + */ + @Deprecated ByteBuf ensureWritableBytes(int minWritableBytes); /** * Tries to make sure the number of {@linkplain #writableBytes() the writable bytes} - * is equal to or greater than the specified value. Unlike {@link #ensureWritableBytes(int)}, + * is equal to or greater than the specified value. Unlike {@link #ensureWritable(int)}, * this method does not raise an exception but returns a code. * * @param minWritableBytes @@ -497,7 +518,7 @@ public interface ByteBuf extends Buf, Comparable { * {@code 3} if the buffer does not have enough bytes, but its capacity has been * increased to its maximum. */ - int ensureWritableBytes(int minWritableBytes, boolean force); + int ensureWritable(int minWritableBytes, boolean force); /** * Gets a boolean at the specified absolute (@code index) in this buffer. diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java b/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java index ef1bbcd31f..5b04f348b4 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java @@ -103,7 +103,7 @@ public class ByteBufInputStream extends InputStream implements DataInput { @Override public int read() throws IOException { - if (!buffer.readable()) { + if (!buffer.isReadable()) { return -1; } return buffer.readByte() & 0xff; @@ -143,7 +143,7 @@ public class ByteBufInputStream extends InputStream implements DataInput { @Override public byte readByte() throws IOException { - if (!buffer.readable()) { + if (!buffer.isReadable()) { throw new EOFException(); } return buffer.readByte(); diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java index 8c4614a9a8..86a04df097 100644 --- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java @@ -42,7 +42,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the {@link ByteBuf} to add * @return self * this instance - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the index is invalid */ CompositeByteBuf addComponent(int cIndex, ByteBuf buffer); @@ -72,7 +72,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the {@link ByteBuf}s to add * @return self * this instance - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the index is invalid * */ @@ -87,7 +87,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the {@link ByteBuf}s to add * @return self * this instance - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the index is invalid */ CompositeByteBuf addComponents(int cIndex, Iterable buffers); @@ -99,7 +99,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the index on from which the {@link ByteBuf} will be remove * @return self * this instance - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the index is invalid */ CompositeByteBuf removeComponent(int cIndex); @@ -113,7 +113,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the number of components to remove * @return self * this instance - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the index is invalid */ CompositeByteBuf removeComponents(int cIndex, int numComponents); @@ -135,7 +135,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the index for which the {@link ByteBuf} should be returned * @return buf * the {@link ByteBuf} on the specified index - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the index is invalid */ ByteBuf component(int cIndex); @@ -147,7 +147,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the offset for which the {@link ByteBuf} should be returned * @return buf * the {@link ByteBuf} on the specified index - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the offset is invalid */ ByteBuf componentAtOffset(int offset); @@ -175,7 +175,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { * the number of components to compose * @return self * this instance - * @throws {@link IndexOutOfBoundsException} + * @throws IndexOutOfBoundsException * if the offset is invalid */ CompositeByteBuf consolidate(int cIndex, int numComponents); @@ -226,7 +226,7 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { CompositeByteBuf discardSomeReadBytes(); @Override - CompositeByteBuf ensureWritableBytes(int minWritableBytes); + CompositeByteBuf ensureWritable(int minWritableBytes); @Override CompositeByteBuf getBytes(int index, ByteBuf dst); diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index de1bec8d3e..e52642d9cf 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -197,7 +197,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit if (b == null) { break; } - if (b.readable()) { + if (b.isReadable()) { cIndex = addComponent0(cIndex, b, false) + 1; int size = components.size(); if (cIndex > size) { @@ -1341,8 +1341,8 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit } @Override - public CompositeByteBuf ensureWritableBytes(int minWritableBytes) { - return (CompositeByteBuf) super.ensureWritableBytes(minWritableBytes); + public CompositeByteBuf ensureWritable(int minWritableBytes) { + return (CompositeByteBuf) super.ensureWritable(minWritableBytes); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java index 78db721758..94c2de855f 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultMessageBuf.java @@ -13,179 +13,308 @@ * License for the specific language governing permissions and limitations * under the License. */ +/* + * Written by Josh Bloch of Google Inc. and released to the public domain, + * as explained at http://creativecommons.org/publicdomain/zero/1.0/. + */ package io.netty.buffer; -import java.util.ArrayDeque; -import java.util.Collection; +import java.lang.reflect.Array; +import java.util.ConcurrentModificationException; import java.util.Iterator; +import java.util.NoSuchElementException; /** * Default {@link MessageBuf} implementation * */ -final class DefaultMessageBuf extends ArrayDeque implements MessageBuf { +final class DefaultMessageBuf extends AbstractMessageBuf { - private static final long serialVersionUID = 1229808623624907552L; + private static final int MIN_INITIAL_CAPACITY = 8; - private boolean freed; + private T[] elements; + private int head; + private int tail; - DefaultMessageBuf() { } + DefaultMessageBuf() { + this(MIN_INITIAL_CAPACITY << 1); + } DefaultMessageBuf(int initialCapacity) { - super(initialCapacity); + this(initialCapacity, Integer.MAX_VALUE); + } + + DefaultMessageBuf(int initialCapacity, int maxCapacity) { + super(maxCapacity); + + if (initialCapacity < 0) { + throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: >= 0)"); + } + if (maxCapacity < initialCapacity) { + throw new IllegalArgumentException( + "maxCapacity: " + maxCapacity + " (expected: >= initialCapacity(" + initialCapacity + ')'); + } + + // Find the best power of two to hold elements. + // Tests "<=" because arrays aren't kept full. + if (initialCapacity >= MIN_INITIAL_CAPACITY) { + initialCapacity |= initialCapacity >>> 1; + initialCapacity |= initialCapacity >>> 2; + initialCapacity |= initialCapacity >>> 4; + initialCapacity |= initialCapacity >>> 8; + initialCapacity |= initialCapacity >>> 16; + initialCapacity ++; + + if (initialCapacity < 0) { // Too many elements, must back off + initialCapacity >>>= 1; // Good luck allocating 2 ^ 30 elements + } + } else { + initialCapacity = MIN_INITIAL_CAPACITY; + } + + elements = cast(new Object[initialCapacity]); } @Override - public BufType type() { - return BufType.MESSAGE; + protected void doFree() { + elements = null; + head = 0; + tail = 0; } @Override - public void addFirst(T t) { - ensureValid(); - super.addFirst(t); + public boolean offer(T e) { + if (e == null) { + throw new NullPointerException(); + } + if (!isWritable()) { + return false; + } + + elements[tail] = e; + if ((tail = tail + 1 & elements.length - 1) == head) { + doubleCapacity(); + } + + return true; + } + + private void doubleCapacity() { + assert head == tail; + + int p = head; + int n = elements.length; + int r = n - p; // number of elements to the right of p + int newCapacity = n << 1; + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + Object[] a = new Object[newCapacity]; + System.arraycopy(elements, p, a, 0, r); + System.arraycopy(elements, 0, a, r, p); + elements = cast(a); + head = 0; + tail = n; } @Override - public void addLast(T t) { - ensureValid(); - super.addLast(t); + public T poll() { + int h = head; + T result = elements[h]; // Element is null if deque empty + if (result == null) { + return null; + } + elements[h] = null; // Must null out slot + head = h + 1 & elements.length - 1; + return result; } @Override - public T pollFirst() { - ensureValid(); - return super.pollFirst(); + public T peek() { + return elements[head]; // elements[head] is null if deque empty } @Override - public T pollLast() { - ensureValid(); - return super.pollLast(); + public boolean remove(Object o) { + if (o == null) { + return false; + } + int mask = elements.length - 1; + int i = head; + T x; + while ((x = elements[i]) != null) { + if (o.equals(x)) { + delete(i); + return true; + } + i = i + 1 & mask; + } + return false; + } + + private boolean delete(int i) { + assert elements[tail] == null; + assert head == tail ? elements[head] == null + : elements[head] != null && elements[tail - 1 & elements.length - 1] != null; + assert elements[head - 1 & elements.length - 1] == null; + + final T[] elements = this.elements; + final int mask = elements.length - 1; + final int h = head; + final int t = tail; + final int front = i - h & mask; + final int back = t - i & mask; + + // Invariant: head <= i < tail mod circularity + if (front >= (t - h & mask)) { + throw new ConcurrentModificationException(); + } + + // Optimize for least element motion + if (front < back) { + if (h <= i) { + System.arraycopy(elements, h, elements, h + 1, front); + } else { // Wrap around + System.arraycopy(elements, 0, elements, 1, i); + elements[0] = elements[mask]; + System.arraycopy(elements, h, elements, h + 1, mask - h); + } + elements[h] = null; + head = h + 1 & mask; + return false; + } else { + if (i < t) { // Copy the null tail as well + System.arraycopy(elements, i + 1, elements, i, back); + tail = t - 1; + } else { // Wrap around + System.arraycopy(elements, i + 1, elements, i, mask - i); + elements[mask] = elements[0]; + System.arraycopy(elements, 1, elements, 0, t); + tail = t - 1 & mask; + } + return true; + } } @Override - public T getFirst() { - ensureValid(); - return super.getFirst(); + public int size() { + return tail - head & elements.length - 1; } @Override - public T getLast() { - ensureValid(); - return super.getLast(); - } - - @Override - public T peekFirst() { - ensureValid(); - return super.peekFirst(); - } - - @Override - public T peekLast() { - ensureValid(); - return super.peekLast(); - } - - @Override - public boolean removeFirstOccurrence(Object o) { - ensureValid(); - return super.removeFirstOccurrence(o); - } - - @Override - public boolean removeLastOccurrence(Object o) { - ensureValid(); - return super.removeLastOccurrence(o); + public boolean isEmpty() { + return head == tail; } @Override public Iterator iterator() { - ensureValid(); - return super.iterator(); - } - - @Override - public Iterator descendingIterator() { - ensureValid(); - return super.descendingIterator(); + return new Itr(); } @Override public boolean contains(Object o) { - ensureValid(); - return super.contains(o); + if (o == null) { + return false; + } + + final int mask = elements.length - 1; + int i = head; + Object e; + while ((e = elements[i]) != null) { + if (o.equals(e)) { + return true; + } + i = i + 1 & mask; + } + + return false; } @Override public void clear() { - ensureValid(); - super.clear(); + int head = this.head; + int tail = this.tail; + if (head != tail) { + this.head = this.tail = 0; + final int mask = elements.length - 1; + int i = head; + do { + elements[i] = null; + i = i + 1 & mask; + } while (i != tail); + } } @Override public Object[] toArray() { - ensureValid(); - return super.toArray(); + return copyElements(new Object[size()]); } @Override - public T1[] toArray(T1[] a) { - ensureValid(); - return super.toArray(a); - } - - @Override - public ArrayDeque clone() { - ensureValid(); - return super.clone(); - } - - @Override - public int drainTo(Collection c) { - ensureValid(); - int cnt = 0; - for (;;) { - T o = poll(); - if (o == null) { - break; - } - c.add(o); - cnt ++; + public T[] toArray(T[] a) { + int size = size(); + if (a.length < size) { + a = cast(Array.newInstance(a.getClass().getComponentType(), size)); } - return cnt; - } - - @Override - public int drainTo(Collection c, int maxElements) { - ensureValid(); - int cnt = 0; - while (cnt < maxElements) { - T o = poll(); - if (o == null) { - break; - } - c.add(o); - cnt ++; + copyElements(a); + if (a.length > size) { + a[size] = null; } - return cnt; + return a; } - @Override - public boolean isFreed() { - return freed; + private U[] copyElements(U[] a) { + if (head < tail) { + System.arraycopy(elements, head, cast(a), 0, size()); + } else if (head > tail) { + int headPortionLen = elements.length - head; + System.arraycopy(elements, head, cast(a), 0, headPortionLen); + System.arraycopy(elements, 0, cast(a), headPortionLen, tail); + } + return a; } - @Override - public void free() { - freed = true; - super.clear(); + @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" }) + private static T[] cast(Object a) { + return (T[]) a; } - private void ensureValid() { - if (freed) { - throw new IllegalBufferAccessException(); + private class Itr implements Iterator { + private int cursor = head; + private int fence = tail; + private int lastRet = -1; + + @Override + public boolean hasNext() { + return cursor != fence; + } + + @Override + public T next() { + if (cursor == fence) { + throw new NoSuchElementException(); + } + T result = elements[cursor]; + // This check doesn't catch all possible comodifications, + // but does catch the ones that corrupt traversal + if (tail != fence || result == null) { + throw new ConcurrentModificationException(); + } + lastRet = cursor; + cursor = cursor + 1 & elements.length - 1; + return result; + } + + @Override + public void remove() { + if (lastRet < 0) { + throw new IllegalStateException(); + } + if (delete(lastRet)) { // if left-shifted, undo increment in next() + cursor = cursor - 1 & elements.length - 1; + fence = tail; + } + lastRet = -1; } } } diff --git a/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java b/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java index f74f2b1920..a72c468d32 100644 --- a/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/QueueBackedMessageBuf.java @@ -19,180 +19,108 @@ import java.util.Collection; import java.util.Iterator; import java.util.Queue; -final class QueueBackedMessageBuf implements MessageBuf { +final class QueueBackedMessageBuf extends AbstractMessageBuf { private final Queue queue; - private boolean freed; QueueBackedMessageBuf(Queue queue) { + super(Integer.MAX_VALUE); if (queue == null) { throw new NullPointerException("queue"); } this.queue = queue; } - @Override - public BufType type() { - return BufType.MESSAGE; - } - - @Override - public boolean add(T e) { - ensureValid(); - return queue.add(e); - } - @Override public boolean offer(T e) { - ensureValid(); - return queue.offer(e); - } - - @Override - public T remove() { - ensureValid(); - return queue.remove(); + checkUnfreed(); + return isWritable() && queue.offer(e); } @Override public T poll() { - ensureValid(); + checkUnfreed(); return queue.poll(); } - @Override - public T element() { - ensureValid(); - return queue.element(); - } - @Override public T peek() { - ensureValid(); + checkUnfreed(); return queue.peek(); } @Override public int size() { - ensureValid(); return queue.size(); } @Override public boolean isEmpty() { - ensureValid(); return queue.isEmpty(); } @Override public boolean contains(Object o) { - ensureValid(); + checkUnfreed(); return queue.contains(o); } @Override public Iterator iterator() { - ensureValid(); + checkUnfreed(); return queue.iterator(); } @Override public Object[] toArray() { - ensureValid(); + checkUnfreed(); return queue.toArray(); } @Override public E[] toArray(E[] a) { - ensureValid(); + checkUnfreed(); return queue.toArray(a); } @Override public boolean remove(Object o) { - ensureValid(); + checkUnfreed(); return queue.remove(o); } @Override public boolean containsAll(Collection c) { - ensureValid(); + checkUnfreed(); return queue.containsAll(c); } @Override public boolean addAll(Collection c) { - ensureValid(); - return queue.addAll(c); + checkUnfreed(); + return isWritable(c.size()) && queue.addAll(c); } @Override public boolean removeAll(Collection c) { - ensureValid(); + checkUnfreed(); return queue.removeAll(c); } @Override public boolean retainAll(Collection c) { - ensureValid(); + checkUnfreed(); return queue.retainAll(c); } @Override public void clear() { - ensureValid(); + checkUnfreed(); queue.clear(); } @Override - public int drainTo(Collection c) { - ensureValid(); - int cnt = 0; - for (;;) { - T o = poll(); - if (o == null) { - break; - } - c.add(o); - cnt ++; - } - return cnt; - } - - @Override - public int drainTo(Collection c, int maxElements) { - ensureValid(); - int cnt = 0; - while (cnt < maxElements) { - T o = poll(); - if (o == null) { - break; - } - c.add(o); - cnt ++; - } - return cnt; - } - - @Override - public boolean isFreed() { - return freed; - } - - @Override - public void free() { - freed = true; - queue.clear(); - } - - @Override - public String toString() { - return queue.toString(); - } - - private void ensureValid() { - if (freed) { - throw new IllegalBufferAccessException(); - } + protected void doFree() { + clear(); } } diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index 4539c9b413..ee18ca0fd3 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -137,13 +137,35 @@ public final class SwappedByteBuf implements ByteBuf { } @Override - public boolean readable() { - return buf.readable(); + public boolean isReadable() { + return buf.isReadable(); } @Override + @Deprecated + public boolean readable() { + return buf.isReadable(); + } + + @Override + public boolean isReadable(int size) { + return buf.isReadable(size); + } + + @Override + public boolean isWritable() { + return buf.isWritable(); + } + + @Override + @Deprecated public boolean writable() { - return buf.writable(); + return buf.isWritable(); + } + + @Override + public boolean isWritable(int size) { + return buf.isWritable(size); } @Override @@ -189,14 +211,21 @@ public final class SwappedByteBuf implements ByteBuf { } @Override - public ByteBuf ensureWritableBytes(int writableBytes) { - buf.ensureWritableBytes(writableBytes); + public ByteBuf ensureWritable(int writableBytes) { + buf.ensureWritable(writableBytes); return this; } @Override - public int ensureWritableBytes(int minWritableBytes, boolean force) { - return buf.ensureWritableBytes(minWritableBytes, force); + @Deprecated + public ByteBuf ensureWritableBytes(int minWritableBytes) { + buf.ensureWritable(minWritableBytes); + return this; + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + return buf.ensureWritable(minWritableBytes, force); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/Unpooled.java b/buffer/src/main/java/io/netty/buffer/Unpooled.java index 8261600966..12a5a5efa4 100644 --- a/buffer/src/main/java/io/netty/buffer/Unpooled.java +++ b/buffer/src/main/java/io/netty/buffer/Unpooled.java @@ -292,6 +292,10 @@ public final class Unpooled { return new DefaultMessageBuf(initialCapacity); } + public static MessageBuf messageBuffer(int initialCapacity, int maxCapacity) { + return new DefaultMessageBuf(initialCapacity, maxCapacity); + } + public static MessageBuf wrappedBuffer(Queue queue) { if (queue instanceof MessageBuf) { return (MessageBuf) queue; @@ -405,7 +409,7 @@ public final class Unpooled { * returned buffer. */ public static ByteBuf wrappedBuffer(ByteBuf buffer) { - if (buffer.readable()) { + if (buffer.isReadable()) { return buffer.slice(); } else { return EMPTY_BUFFER; @@ -483,13 +487,13 @@ public final class Unpooled { case 0: break; case 1: - if (buffers[0].readable()) { + if (buffers[0].isReadable()) { return wrappedBuffer(buffers[0].order(BIG_ENDIAN)); } break; default: for (ByteBuf b: buffers) { - if (b.readable()) { + if (b.isReadable()) { return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, buffers); } } @@ -600,7 +604,7 @@ public final class Unpooled { * respectively. */ public static ByteBuf copiedBuffer(ByteBuf buffer) { - if (buffer.readable()) { + if (buffer.isReadable()) { return buffer.copy(); } else { return EMPTY_BUFFER; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java index 4012478103..033f1dbae5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocket00FrameEncoder.java @@ -54,7 +54,7 @@ public class WebSocket00FrameEncoder extends MessageToByteEncoder>> 8 & 0xFF); out.writeByte(length & 0xFF); } else { - out.ensureWritableBytes(10 + maskLength + length); + out.ensureWritable(10 + maskLength + length); out.writeByte(b0); out.writeByte(maskPayload ? 0xFF : 127); out.writeLong(length); diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 67b98589ef..344ff694a2 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -84,7 +84,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; ByteBuf data = spdyDataFrame.data(); byte flags = spdyDataFrame.isLast() ? SPDY_DATA_FLAG_FIN : 0; - out.ensureWritableBytes(SPDY_HEADER_SIZE + data.readableBytes()); + out.ensureWritable(SPDY_HEADER_SIZE + data.readableBytes()); out.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF); out.writeByte(flags); out.writeMedium(data.readableBytes()); @@ -106,7 +106,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else { length = 10 + headerBlockLength; } - out.ensureWritableBytes(SPDY_HEADER_SIZE + length); + out.ensureWritable(SPDY_HEADER_SIZE + length); out.writeShort(version | 0x8000); out.writeShort(SPDY_SYN_STREAM_FRAME); out.writeByte(flags); @@ -141,7 +141,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else { length = 4 + headerBlockLength; } - out.ensureWritableBytes(SPDY_HEADER_SIZE + length); + out.ensureWritable(SPDY_HEADER_SIZE + length); out.writeShort(version | 0x8000); out.writeShort(SPDY_SYN_REPLY_FRAME); out.writeByte(flags); @@ -159,7 +159,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else if (msg instanceof SpdyRstStreamFrame) { SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg; - out.ensureWritableBytes(SPDY_HEADER_SIZE + 8); + out.ensureWritable(SPDY_HEADER_SIZE + 8); out.writeShort(version | 0x8000); out.writeShort(SPDY_RST_STREAM_FRAME); out.writeInt(8); @@ -174,7 +174,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { Set IDs = spdySettingsFrame.getIds(); int numEntries = IDs.size(); int length = 4 + numEntries * 8; - out.ensureWritableBytes(SPDY_HEADER_SIZE + length); + out.ensureWritable(SPDY_HEADER_SIZE + length); out.writeShort(version | 0x8000); out.writeShort(SPDY_SETTINGS_FRAME); out.writeByte(flags); @@ -206,7 +206,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else if (msg instanceof SpdyNoOpFrame) { - out.ensureWritableBytes(SPDY_HEADER_SIZE); + out.ensureWritable(SPDY_HEADER_SIZE); out.writeShort(version | 0x8000); out.writeShort(SPDY_NOOP_FRAME); out.writeInt(0); @@ -214,7 +214,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else if (msg instanceof SpdyPingFrame) { SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; - out.ensureWritableBytes(SPDY_HEADER_SIZE + 4); + out.ensureWritable(SPDY_HEADER_SIZE + 4); out.writeShort(version | 0x8000); out.writeShort(SPDY_PING_FRAME); out.writeInt(4); @@ -224,7 +224,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { SpdyGoAwayFrame spdyGoAwayFrame = (SpdyGoAwayFrame) msg; int length = version < 3 ? 4 : 8; - out.ensureWritableBytes(SPDY_HEADER_SIZE + length); + out.ensureWritable(SPDY_HEADER_SIZE + length); out.writeShort(version | 0x8000); out.writeShort(SPDY_GOAWAY_FRAME); out.writeInt(length); @@ -246,7 +246,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else { length = 4 + headerBlockLength; } - out.ensureWritableBytes(SPDY_HEADER_SIZE + length); + out.ensureWritable(SPDY_HEADER_SIZE + length); out.writeShort(version | 0x8000); out.writeShort(SPDY_HEADERS_FRAME); out.writeByte(flags); @@ -260,7 +260,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder { } else if (msg instanceof SpdyWindowUpdateFrame) { SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg; - out.ensureWritableBytes(SPDY_HEADER_SIZE + 8); + out.ensureWritable(SPDY_HEADER_SIZE + 8); out.writeShort(version | 0x8000); out.writeShort(SPDY_WINDOW_UPDATE_FRAME); out.writeInt(8); diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index c3e7f71544..7a1cf22f0f 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -137,13 +137,18 @@ final class ReplayingDecoderBuffer implements ByteBuf { throw new UnreplayableOperationException(); } + @Override + public ByteBuf ensureWritable(int writableBytes) { + throw new UnreplayableOperationException(); + } + @Override public ByteBuf ensureWritableBytes(int writableBytes) { throw new UnreplayableOperationException(); } @Override - public int ensureWritableBytes(int minWritableBytes, boolean force) { + public int ensureWritable(int minWritableBytes, boolean force) { throw new UnreplayableOperationException(); } @@ -384,9 +389,19 @@ final class ReplayingDecoderBuffer implements ByteBuf { return swapped; } + @Override + public boolean isReadable() { + return terminated? buffer.isReadable() : true; + } + @Override public boolean readable() { - return terminated? buffer.readable() : true; + return isReadable(); + } + + @Override + public boolean isReadable(int size) { + return terminated? buffer.isReadable(size) : true; } @Override @@ -718,11 +733,21 @@ final class ReplayingDecoderBuffer implements ByteBuf { ')'; } + @Override + public boolean isWritable() { + return false; + } + @Override public boolean writable() { return false; } + @Override + public boolean isWritable(int size) { + return false; + } + @Override public int writableBytes() { return 0; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java index 0e606a9d52..78c3fd60cc 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java @@ -85,7 +85,7 @@ public class JZlibDecoder extends ZlibDecoder { ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { - if (!in.readable()) { + if (!in.isReadable()) { return; } @@ -116,7 +116,7 @@ public class JZlibDecoder extends ZlibDecoder { loop: for (;;) { z.avail_out = maxOutputLength; if (outHasArray) { - out.ensureWritableBytes(maxOutputLength); + out.ensureWritable(maxOutputLength); z.next_out = out.array(); z.next_out_index = out.arrayOffset() + out.writerIndex(); } else { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 159edfe482..7395b911a7 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -294,7 +294,7 @@ public class JZlibEncoder extends ZlibEncoder { boolean outHasArray = out.hasArray(); z.avail_out = maxOutputLength; if (outHasArray) { - out.ensureWritableBytes(maxOutputLength); + out.ensureWritable(maxOutputLength); z.next_out = out.array(); z.next_out_index = out.arrayOffset() + out.writerIndex(); } else { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index aface2d2d2..445f2dda1f 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -189,7 +189,7 @@ public class JdkZlibEncoder extends ZlibEncoder { uncompressed.readBytes(inAry); int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12; - out.ensureWritableBytes(sizeEstimate); + out.ensureWritable(sizeEstimate); synchronized (deflater) { if (gzip) { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java b/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java index 97045769ce..e91a35241c 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Snappy.java @@ -268,9 +268,9 @@ public class Snappy { return; } - out.ensureWritableBytes(inputLength); + out.ensureWritable(inputLength); - while (in.readable() && in.readerIndex() - inIndex < maxLength) { + while (in.isReadable() && in.readerIndex() - inIndex < maxLength) { byte tag = in.readByte(); switch (tag & 0x03) { case LITERAL: diff --git a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java index be984d02fc..b150d00d0a 100644 --- a/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java +++ b/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java @@ -15,14 +15,13 @@ */ package io.netty.handler.codec.protobuf; +import com.google.protobuf.CodedOutputStream; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import com.google.protobuf.CodedOutputStream; - /** * An encoder that prepends the the Google Protocol Buffers * Base @@ -52,7 +51,7 @@ public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder maxCapacity) { byteBuf.capacity(maxCapacity); } else { - byteBuf.ensureWritableBytes(available); + byteBuf.ensureWritable(available); } } } @@ -151,7 +151,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { @Override protected void doFlushByteBuffer(ByteBuf buf) throws Exception { - while (buf.readable()) { + while (buf.isReadable()) { doWriteBytes(buf); } buf.clear();