diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index 1f234f6db9..0f24e085e8 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -1037,32 +1037,66 @@ public abstract class AbstractByteBuf implements ByteBuf { @Override public int forEachByte(ByteBufProcessor processor) { - int index = readerIndex; - int length = writerIndex - index; - return forEach0(index, length, processor); + return forEachByteAsc0(readerIndex, writerIndex, processor); } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { - checkIndex(index, length); - return forEach0(index, length, processor); + public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (fromIndex < toIndex) { + return forEachByteAsc(fromIndex, toIndex, processor); + } else if (fromIndex > toIndex) { + return forEachByteDesc(fromIndex, toIndex, processor); + } else { + checkIndex(fromIndex); + return -1; + } } - private int forEach0(int index, int length, ByteBufProcessor processor) { + private int forEachByteAsc(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (fromIndex < 0 || toIndex > capacity()) { + throw new IndexOutOfBoundsException(String.format( + "fromIndex: %d, toIndex: %d (expected: 0 <= fromIndex < toIndex <= capacity(%d))", + fromIndex, toIndex, capacity())); + } + return forEachByteAsc0(fromIndex, toIndex, processor); + } + + private int forEachByteAsc0(int fromIndex, int toIndex, ByteBufProcessor processor) { if (processor == null) { throw new NullPointerException("processor"); } - if (length == 0) { - return -1; - } - - final int end = index + length; - int i = index; + int i = fromIndex; try { do { i += processor.process(this, i, _getByte(i)); - } while (i < end); + } while (i < toIndex); + } catch (Signal signal) { + signal.expect(ByteBufProcessor.ABORT); + return i; + } catch (Exception e) { + PlatformDependent.throwException(e); + } + + return -1; + } + + private int forEachByteDesc(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (toIndex < -1 || fromIndex >= capacity()) { + throw new IndexOutOfBoundsException(String.format( + "fromIndex: %d, toIndex: %d (expected: -1 <= toIndex < fromIndex < capacity(%d))", + fromIndex, toIndex, capacity())); + } + + if (processor == null) { + throw new NullPointerException("processor"); + } + + int i = fromIndex; + try { + do { + i -= processor.process(this, i, _getByte(i)); + } while (i > toIndex); } catch (Signal signal) { signal.expect(ByteBufProcessor.ABORT); return i; diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index 93b1b32223..d220661cd3 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1709,11 +1709,20 @@ public interface ByteBuf extends ReferenceCounted, Comparable { /** * Iterates over the specified area of this buffer with the specified {@code processor}. + * * * @return {@code -1} if the processor iterated to or beyond the end of the specified area. * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. */ - int forEachByte(int index, int length, ByteBufProcessor processor); + int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor); /** * Returns a copy of this buffer's readable bytes. Modifying the content diff --git a/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java b/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java index 7a10fbf393..2ab14154ca 100644 --- a/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java @@ -718,8 +718,10 @@ public final class EmptyByteBuf implements ByteBuf { } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { - checkIndex(index, length); + public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (fromIndex != 0 && fromIndex != toIndex) { + throw new IndexOutOfBoundsException(); + } return -1; } diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index 319a65eaf7..29e368fd94 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -725,8 +725,8 @@ public final class SwappedByteBuf implements ByteBuf { } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { - return buf.forEachByte(index, length, new SwappedByteBufProcessor(processor)); + public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) { + return buf.forEachByte(fromIndex, toIndex, new SwappedByteBufProcessor(processor)); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java index 5a9b40e8f9..0020967043 100644 --- a/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java @@ -731,8 +731,8 @@ final class UnreleasableByteBuf implements ByteBuf { } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { - return buf.forEachByte(index, length, processor); + public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) { + return buf.forEachByte(fromIndex, toIndex, processor); } @Override diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java index 0e9e433d5f..324eb513a1 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.Queue; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static io.netty.buffer.Unpooled.*; import static io.netty.util.internal.EmptyArrays.*; @@ -1655,6 +1656,7 @@ public abstract class AbstractByteBufTest { buffer.writeByte(i + 1); } + final AtomicInteger lastIndex = new AtomicInteger(); buffer.setIndex(CAPACITY / 4, CAPACITY * 3 / 4); assertThat(buffer.forEachByte(new ByteBufProcessor() { int i = CAPACITY / 4; @@ -1663,10 +1665,13 @@ public abstract class AbstractByteBufTest { public int process(ByteBuf buf, int index, byte value) throws Exception { assertThat(value, is((byte) (i + 1))); assertThat(index, is(i)); - i++; + i ++; + lastIndex.set(index); return 1; } }), is(-1)); + + assertThat(lastIndex.get(), is(CAPACITY * 3 / 4 - 1)); } @Test @@ -1677,14 +1682,14 @@ public abstract class AbstractByteBufTest { } final int stop = CAPACITY / 2; - assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY / 3, new ByteBufProcessor() { + assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY * 2 / 3, new ByteBufProcessor() { int i = CAPACITY / 3; @Override public int process(ByteBuf buf, int index, byte value) throws Exception { assertThat(value, is((byte) (i + 1))); assertThat(index, is(i)); - i++; + i ++; if (index == stop) { throw ABORT; @@ -1693,4 +1698,29 @@ public abstract class AbstractByteBufTest { } }), is(stop)); } + + @Test + public void testForEachByteDesc() { + buffer.clear(); + for (int i = 0; i < CAPACITY; i ++) { + buffer.writeByte(i + 1); + } + + final AtomicInteger lastIndex = new AtomicInteger(); + buffer.setIndex(CAPACITY / 4, CAPACITY * 3 / 4); + assertThat(buffer.forEachByte(CAPACITY * 3 / 4, CAPACITY / 4, new ByteBufProcessor() { + int i = CAPACITY * 3 / 4; + + @Override + public int process(ByteBuf buf, int index, byte value) throws Exception { + assertThat(value, is((byte) (i + 1))); + assertThat(index, is(i)); + i --; + lastIndex.set(index); + return 1; + } + }), is(-1)); + + assertThat(lastIndex.get(), is(CAPACITY / 4 + 1)); + } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index 6d85ad13fc..6c17df011e 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -378,7 +378,7 @@ final class ReplayingDecoderBuffer implements ByteBuf { @Override public int forEachByte(ByteBufProcessor processor) { int ret = buffer.forEachByte(processor); - if (ret < 0 && !terminated) { + if (ret < 0) { throw REPLAY; } else { return ret; @@ -386,18 +386,32 @@ final class ReplayingDecoderBuffer implements ByteBuf { } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { - int writerIndex = buffer.writerIndex(); + public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (fromIndex < toIndex) { + return forEachByteAsc(fromIndex, toIndex, processor); + } else if (fromIndex > toIndex) { + return forEachByteDesc(fromIndex, toIndex, processor); + } else { + checkIndex(fromIndex, 0); + return -1; + } + } - if (index >= writerIndex) { + private int forEachByteAsc(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (fromIndex < 0) { + throw new IndexOutOfBoundsException("fromIndex: " + fromIndex); + } + + final int writerIndex = buffer.writerIndex(); + if (fromIndex >= writerIndex) { throw REPLAY; } - if (terminated || index + length <= writerIndex) { - return buffer.forEachByte(index, length, processor); + if (toIndex <= writerIndex) { + return buffer.forEachByte(fromIndex, toIndex, processor); } - int ret = buffer.forEachByte(index, writerIndex - index, processor); + int ret = buffer.forEachByte(fromIndex, writerIndex, processor); if (ret < 0) { throw REPLAY; } else { @@ -405,6 +419,18 @@ final class ReplayingDecoderBuffer implements ByteBuf { } } + private int forEachByteDesc(int fromIndex, int toIndex, ByteBufProcessor processor) { + if (toIndex < -1) { + throw new IndexOutOfBoundsException("toIndex: " + toIndex); + } + + if (fromIndex >= buffer.writerIndex()) { + throw REPLAY; + } + + return buffer.forEachByte(fromIndex, toIndex, processor); + } + @Override public ByteBuf markReaderIndex() { buffer.markReaderIndex();