Vastly simplified ByteBufProcessor and MessageListProcessor

- Related: #1378
- They now accept only one argument.
- A user who wants to use a buffer for more complex use cases, he or she can always access the buffer directly via memoryAddress() and array()
This commit is contained in:
Trustin Lee 2013-06-28 20:29:00 +09:00
parent 094d01873b
commit 0dcf352f4c
15 changed files with 107 additions and 166 deletions

View File

@ -1037,34 +1037,32 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public int forEachByte(ByteBufProcessor processor) {
return forEachByteAsc0(readerIndex, writerIndex, processor);
int index = readerIndex;
int length = writerIndex - index;
return forEachByteAsc0(index, length, processor);
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
if (fromIndex < 0 || fromIndex > toIndex || toIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"fromIndex: %d, toIndex: %d (expected: 0 <= fromIndex <= toIndex <= capacity(%d))",
fromIndex, toIndex, capacity()));
}
return forEachByteAsc0(fromIndex, toIndex, processor);
public int forEachByte(int index, int length, ByteBufProcessor processor) {
checkIndex(index, length);
return forEachByteAsc0(index, length, processor);
}
private int forEachByteAsc0(int fromIndex, int toIndex, ByteBufProcessor processor) {
private int forEachByteAsc0(int index, int length, ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
if (fromIndex == toIndex) {
if (length == 0) {
return -1;
}
int i = fromIndex;
final int endIndex = index + length;
int i = index;
try {
do {
i += processor.process(this, i, _getByte(i));
} while (i < toIndex);
i += processor.process(_getByte(i));
} while (i < endIndex);
} catch (Signal signal) {
signal.expect(ByteBufProcessor.ABORT);
return i;
@ -1077,30 +1075,31 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public int forEachByteDesc(ByteBufProcessor processor) {
return forEachByteDesc0(readerIndex, writerIndex, processor);
int index = readerIndex;
int length = writerIndex - index;
return forEachByteDesc0(index, length, processor);
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
if (toIndex < 0 || toIndex > fromIndex || fromIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"toIndex: %d, fromIndex: %d (expected: 0 <= toIndex <= fromIndex <= capacity(%d))",
toIndex, fromIndex, capacity()));
}
return forEachByteDesc0(toIndex, fromIndex, processor);
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
checkIndex(index, length);
return forEachByteDesc0(index, length, processor);
}
private int forEachByteDesc0(int toIndex, int fromIndex, ByteBufProcessor processor) {
private int forEachByteDesc0(int index, int length, ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
int i = fromIndex - 1;
if (length == 0) {
return -1;
}
int i = index + length - 1;
try {
do {
i -= processor.process(this, i, _getByte(i));
} while (i >= toIndex);
i -= processor.process(_getByte(i));
} while (i >= index);
} catch (Signal signal) {
signal.expect(ByteBufProcessor.ABORT);
return i;

View File

@ -1709,12 +1709,12 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
/**
* Iterates over the specified area of this buffer with the specified {@code processor} in ascending order.
* ({@code [fromIndex, fromIndex&#41} i.e. {@code fromIndex}, {@code (fromIndex + 1)}, .. {@code (toIndex - 1)})
* (i.e. {@code index}, {@code (index + 1)}, .. {@code (index + length - 1)})
*
* @return {@code -1} if the processor iterated to or beyond the end of the specified area.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
*/
int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor);
int forEachByte(int index, int length, ByteBufProcessor processor);
/**
* Iterates over the readable bytes of this buffer with the specified {@code processor} in descending order.
@ -1726,12 +1726,12 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
/**
* Iterates over the specified area of this buffer with the specified {@code processor} in descending order.
* ({@code [toIndex, fromIndex&#41} i.e. {@code (fromIndex - 1)}, {@code (fromIndex - 2)}, ... {@code toIndex})
* (i.e. {@code (index + length - 1)}, {@code (index + length - 2)}, ... {@code index})
*
* @return {@code -1} if the processor iterated to or beyond the beginning of the specified area.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
*/
int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor);
int forEachByteDesc(int index, int length, ByteBufProcessor processor);
/**
* Returns a copy of this buffer's readable bytes. Modifying the content

View File

@ -27,7 +27,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NUL = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value == 0) {
throw ABORT;
} else {
@ -41,7 +41,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_NUL = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value != 0) {
throw ABORT;
} else {
@ -55,7 +55,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_CR = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value == '\r') {
throw ABORT;
} else {
@ -69,7 +69,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_CR = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value != '\r') {
throw ABORT;
} else {
@ -83,7 +83,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_LF = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value == '\n') {
throw ABORT;
} else {
@ -97,7 +97,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_LF = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value != '\n') {
throw ABORT;
} else {
@ -111,7 +111,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_CRLF = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value == '\r' || value == '\n') {
throw ABORT;
} else {
@ -125,7 +125,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_CRLF = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value != '\r' && value != '\n') {
throw ABORT;
} else {
@ -139,7 +139,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_LINEAR_WHITESPACE = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value == ' ' || value == '\t') {
throw ABORT;
} else {
@ -153,7 +153,7 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_LINEAR_WHITESPACE = new ByteBufProcessor() {
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
public int process(byte value) throws Exception {
if (value != ' ' && value != '\t') {
throw ABORT;
} else {
@ -169,5 +169,5 @@ public interface ByteBufProcessor {
* allowed where a negative value advances the index in the opposite direction and zero leaves the index
* as-is.
*/
int process(ByteBuf buf, int index, byte value) throws Exception;
int process(byte value) throws Exception;
}

View File

@ -293,13 +293,13 @@ public class DuplicatedByteBuf extends AbstractDerivedByteBuf {
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
return buffer.forEachByte(fromIndex, toIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByte(int index, int length, ByteBufProcessor processor) {
return buffer.forEachByte(index, length, processor);
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
return buffer.forEachByteDesc(toIndex, fromIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
return buffer.forEachByteDesc(index, length, processor);
}
}

View File

@ -718,10 +718,8 @@ public final class EmptyByteBuf implements ByteBuf {
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
if (fromIndex != 0 || fromIndex != toIndex) {
throw new IndexOutOfBoundsException();
}
public int forEachByte(int index, int length, ByteBufProcessor processor) {
checkIndex(index, length);
return -1;
}
@ -731,10 +729,8 @@ public final class EmptyByteBuf implements ByteBuf {
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
if (toIndex != 0 || toIndex != fromIndex) {
throw new IndexOutOfBoundsException();
}
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
checkIndex(index, length);
return -1;
}

View File

@ -296,13 +296,13 @@ public class ReadOnlyByteBuf extends AbstractDerivedByteBuf {
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
return buffer.forEachByte(fromIndex, toIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByte(int index, int length, ByteBufProcessor processor) {
return buffer.forEachByte(index, length, processor);
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
return buffer.forEachByteDesc(toIndex, fromIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
return buffer.forEachByteDesc(index, length, processor);
}
@Override

View File

@ -275,9 +275,8 @@ public class SlicedByteBuf extends AbstractDerivedByteBuf {
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
int ret = buffer.forEachByte(
fromIndex + adjustment, toIndex + adjustment, new SlicedByteBufProcessor(processor));
public int forEachByte(int index, int length, ByteBufProcessor processor) {
int ret = buffer.forEachByte(index + adjustment, length, processor);
if (ret >= adjustment) {
return ret - adjustment;
} else {
@ -286,29 +285,12 @@ public class SlicedByteBuf extends AbstractDerivedByteBuf {
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
int ret = buffer.forEachByteDesc(
toIndex + adjustment, fromIndex + adjustment, new SlicedByteBufProcessor(processor));
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
int ret = buffer.forEachByteDesc(index + adjustment, length, processor);
if (ret >= adjustment) {
return ret - adjustment;
} else {
return -1;
}
}
private final class SlicedByteBufProcessor implements ByteBufProcessor {
private final ByteBufProcessor processor;
SlicedByteBufProcessor(ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
this.processor = processor;
}
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
return processor.process(buf, index - adjustment, value);
}
}
}

View File

@ -721,22 +721,22 @@ public final class SwappedByteBuf implements ByteBuf {
@Override
public int forEachByte(ByteBufProcessor processor) {
return buf.forEachByte(new WrappedByteBufProcessor(this, processor));
return buf.forEachByte(processor);
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
return buf.forEachByte(fromIndex, toIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByte(int index, int length, ByteBufProcessor processor) {
return buf.forEachByte(index, length, processor);
}
@Override
public int forEachByteDesc(ByteBufProcessor processor) {
return buf.forEachByteDesc(new WrappedByteBufProcessor(this, processor));
return buf.forEachByteDesc(processor);
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
return buf.forEachByteDesc(toIndex, fromIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
return buf.forEachByteDesc(index, length, processor);
}
@Override

View File

@ -727,22 +727,22 @@ final class UnreleasableByteBuf implements ByteBuf {
@Override
public int forEachByte(ByteBufProcessor processor) {
return buf.forEachByte(new WrappedByteBufProcessor(this, processor));
return buf.forEachByte(processor);
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
return buf.forEachByte(fromIndex, toIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByte(int index, int length, ByteBufProcessor processor) {
return buf.forEachByte(index, length, processor);
}
@Override
public int forEachByteDesc(ByteBufProcessor processor) {
return buf.forEachByteDesc(new WrappedByteBufProcessor(this, processor));
return buf.forEachByteDesc(processor);
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
return buf.forEachByteDesc(toIndex, fromIndex, new WrappedByteBufProcessor(this, processor));
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
return buf.forEachByteDesc(index, length, processor);
}
@Override

View File

@ -1,35 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
final class WrappedByteBufProcessor implements ByteBufProcessor {
private final ByteBuf buffer;
private final ByteBufProcessor processor;
WrappedByteBufProcessor(ByteBuf buffer, ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
this.buffer = buffer;
this.processor = processor;
}
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
return processor.process(buffer, index, value);
}
}

View File

@ -1662,11 +1662,10 @@ public abstract class AbstractByteBufTest {
int i = CAPACITY / 4;
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
assertThat(index, is(i));
public int process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
lastIndex.set(i);
i ++;
lastIndex.set(index);
return 1;
}
}), is(-1));
@ -1682,18 +1681,17 @@ public abstract class AbstractByteBufTest {
}
final int stop = CAPACITY / 2;
assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY * 2 / 3, new ByteBufProcessor() {
assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY / 3, new ByteBufProcessor() {
int i = CAPACITY / 3;
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
assertThat(index, is(i));
public int process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
i++;
if (index == stop) {
if (i == stop) {
throw ABORT;
}
i ++;
return 1;
}
}), is(stop));
@ -1707,15 +1705,14 @@ public abstract class AbstractByteBufTest {
}
final AtomicInteger lastIndex = new AtomicInteger();
assertThat(buffer.forEachByteDesc(CAPACITY / 4, CAPACITY * 3 / 4, new ByteBufProcessor() {
assertThat(buffer.forEachByteDesc(CAPACITY / 4, CAPACITY * 2 / 4, new ByteBufProcessor() {
int i = CAPACITY * 3 / 4 - 1;
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
assertThat(index, is(i));
public int process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
i--;
lastIndex.set(index);
lastIndex.set(i);
i --;
return 1;
}
}), is(-1));

View File

@ -24,26 +24,28 @@ import static org.junit.Assert.*;
public class ByteBufProcessorTest {
@Test
public void testForward() {
ByteBuf buf = Unpooled.copiedBuffer("abc\r\n\ndef\r\rghi\n\njkl\0\0mno \t\tx", CharsetUtil.ISO_8859_1);
final ByteBuf buf = Unpooled.copiedBuffer("abc\r\n\ndef\r\rghi\n\njkl\0\0mno \t\tx", CharsetUtil.ISO_8859_1);
final int length = buf.readableBytes();
assertEquals(3, buf.forEachByte(0, buf.capacity(), ByteBufProcessor.FIND_CRLF));
assertEquals(6, buf.forEachByte(3, buf.capacity(), ByteBufProcessor.FIND_NON_CRLF));
assertEquals(9, buf.forEachByte(6, buf.capacity(), ByteBufProcessor.FIND_CR));
assertEquals(11, buf.forEachByte(9, buf.capacity(), ByteBufProcessor.FIND_NON_CR));
assertEquals(14, buf.forEachByte(11, buf.capacity(), ByteBufProcessor.FIND_LF));
assertEquals(16, buf.forEachByte(14, buf.capacity(), ByteBufProcessor.FIND_NON_LF));
assertEquals(19, buf.forEachByte(16, buf.capacity(), ByteBufProcessor.FIND_NUL));
assertEquals(21, buf.forEachByte(19, buf.capacity(), ByteBufProcessor.FIND_NON_NUL));
assertEquals(24, buf.forEachByte(21, buf.capacity(), ByteBufProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(28, buf.forEachByte(24, buf.capacity(), ByteBufProcessor.FIND_NON_LINEAR_WHITESPACE));
assertEquals(-1, buf.forEachByte(28, buf.capacity(), ByteBufProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(3, buf.forEachByte(0, length, ByteBufProcessor.FIND_CRLF));
assertEquals(6, buf.forEachByte(3, length - 3, ByteBufProcessor.FIND_NON_CRLF));
assertEquals(9, buf.forEachByte(6, length - 6, ByteBufProcessor.FIND_CR));
assertEquals(11, buf.forEachByte(9, length - 9, ByteBufProcessor.FIND_NON_CR));
assertEquals(14, buf.forEachByte(11, length - 11, ByteBufProcessor.FIND_LF));
assertEquals(16, buf.forEachByte(14, length - 14, ByteBufProcessor.FIND_NON_LF));
assertEquals(19, buf.forEachByte(16, length - 16, ByteBufProcessor.FIND_NUL));
assertEquals(21, buf.forEachByte(19, length - 19, ByteBufProcessor.FIND_NON_NUL));
assertEquals(24, buf.forEachByte(21, length - 21, ByteBufProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(28, buf.forEachByte(24, length - 24, ByteBufProcessor.FIND_NON_LINEAR_WHITESPACE));
assertEquals(-1, buf.forEachByte(28, length - 28, ByteBufProcessor.FIND_LINEAR_WHITESPACE));
}
@Test
public void testBackward() {
ByteBuf buf = Unpooled.copiedBuffer("abc\r\n\ndef\r\rghi\n\njkl\0\0mno \t\tx", CharsetUtil.ISO_8859_1);
final ByteBuf buf = Unpooled.copiedBuffer("abc\r\n\ndef\r\rghi\n\njkl\0\0mno \t\tx", CharsetUtil.ISO_8859_1);
final int length = buf.readableBytes();
assertEquals(27, buf.forEachByteDesc(0, buf.writerIndex(), ByteBufProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(27, buf.forEachByteDesc(0, length, ByteBufProcessor.FIND_LINEAR_WHITESPACE));
assertEquals(23, buf.forEachByteDesc(0, 28, ByteBufProcessor.FIND_NON_LINEAR_WHITESPACE));
assertEquals(20, buf.forEachByteDesc(0, 24, ByteBufProcessor.FIND_NUL));
assertEquals(18, buf.forEachByteDesc(0, 21, ByteBufProcessor.FIND_NON_NUL));

View File

@ -386,17 +386,17 @@ final class ReplayingDecoderBuffer implements ByteBuf {
}
@Override
public int forEachByte(int fromIndex, int toIndex, ByteBufProcessor processor) {
public int forEachByte(int index, int length, ByteBufProcessor processor) {
final int writerIndex = buffer.writerIndex();
if (fromIndex >= writerIndex) {
if (index >= writerIndex) {
throw REPLAY;
}
if (toIndex <= writerIndex) {
return buffer.forEachByte(fromIndex, toIndex, processor);
if (index + length <= writerIndex) {
return buffer.forEachByte(index, length, processor);
}
int ret = buffer.forEachByte(fromIndex, writerIndex, processor);
int ret = buffer.forEachByte(index, writerIndex - index, processor);
if (ret < 0) {
throw REPLAY;
} else {
@ -414,12 +414,12 @@ final class ReplayingDecoderBuffer implements ByteBuf {
}
@Override
public int forEachByteDesc(int toIndex, int fromIndex, ByteBufProcessor processor) {
if (fromIndex > buffer.writerIndex()) {
public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
if (index + length > buffer.writerIndex()) {
throw REPLAY;
}
return buffer.forEachByteDesc(toIndex, fromIndex, processor);
return buffer.forEachByteDesc(index, length, processor);
}
@Override

View File

@ -359,7 +359,7 @@ public final class MessageList<T> implements Iterable<T> {
int i = 0;
try {
do {
i += p.process(this, i, elements[i]);
i += p.process(elements[i]);
} while (i < size);
} catch (Signal abort) {
abort.expect(MessageListProcessor.ABORT);
@ -396,7 +396,7 @@ public final class MessageList<T> implements Iterable<T> {
int i = index;
try {
do {
i += p.process(this, i, elements[i]);
i += p.process(elements[i]);
} while (i < end);
} catch (Signal abort) {
abort.expect(MessageListProcessor.ABORT);

View File

@ -29,5 +29,5 @@ public interface MessageListProcessor<T> {
* allowed where a negative value advances the index in the opposite direction and zero leaves the index
* as-is.
*/
int process(MessageList<T> messages, int index, T value) throws Exception;
int process(T value) throws Exception;
}