Simplify ByteBufProcessor and MessageListProcessor and Add internal component accessors to CompositeByteBuf

- Fixes #1528

It's not really easy to provide a general-purpose abstraction for fast-yet-safe iteration. Instead of making forEachByte() less optimal, let's make it do what it does really well, and allow a user to implement potentially unsafe-yet-fast loop using unsafe operations.
This commit is contained in:
Trustin Lee 2013-07-05 14:00:05 +09:00
parent ea85054eab
commit 0b9235f072
8 changed files with 89 additions and 115 deletions

View File

@ -17,7 +17,6 @@ package io.netty.buffer;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.Signal;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
@ -1043,11 +1042,12 @@ public abstract class AbstractByteBuf implements ByteBuf {
int i = index;
try {
do {
i += processor.process(_getByte(i));
if (processor.process(_getByte(i))) {
i ++;
} else {
return i;
}
} while (i < endIndex);
} catch (Signal signal) {
signal.expect(ByteBufProcessor.ABORT);
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}
@ -1080,11 +1080,12 @@ public abstract class AbstractByteBuf implements ByteBuf {
int i = index + length - 1;
try {
do {
i -= processor.process(_getByte(i));
if (processor.process(_getByte(i))) {
i --;
} else {
return i;
}
} while (i >= index);
} catch (Signal signal) {
signal.expect(ByteBufProcessor.ABORT);
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}

View File

@ -1685,7 +1685,7 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
* Iterates over the readable bytes of this buffer with the specified {@code processor} in ascending order.
*
* @return {@code -1} if the processor iterated to or beyond the end of the readable bytes.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
* The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}.
*/
int forEachByte(ByteBufProcessor processor);
@ -1694,7 +1694,7 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
* (i.e. {@code index}, {@code (index + 1)}, .. {@code (index + length - 1)})
*
* @return {@code -1} if the processor iterated to or beyond the end of the specified area.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
* The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}.
*/
int forEachByte(int index, int length, ByteBufProcessor processor);
@ -1702,7 +1702,7 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
* Iterates over the readable bytes of this buffer with the specified {@code processor} in descending order.
*
* @return {@code -1} if the processor iterated to or beyond the beginning of the readable bytes.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
* The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}.
*/
int forEachByteDesc(ByteBufProcessor processor);
@ -1710,8 +1710,9 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
* Iterates over the specified area of this buffer with the specified {@code processor} in descending order.
* (i.e. {@code (index + length - 1)}, {@code (index + length - 2)}, ... {@code index})
*
*
* @return {@code -1} if the processor iterated to or beyond the beginning of the specified area.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
* The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}.
*/
int forEachByteDesc(int index, int length, ByteBufProcessor processor);

View File

@ -16,23 +16,15 @@
package io.netty.buffer;
import io.netty.util.Signal;
public interface ByteBufProcessor {
Signal ABORT = new Signal(ByteBufProcessor.class.getName() + ".ABORT");
/**
* Aborts on a {@code NUL (0x00)}.
*/
ByteBufProcessor FIND_NUL = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value == 0) {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value != 0;
}
};
@ -41,12 +33,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_NUL = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value != 0) {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value == 0;
}
};
@ -55,12 +43,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_CR = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value == '\r') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value != '\r';
}
};
@ -69,12 +53,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_CR = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value != '\r') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value == '\r';
}
};
@ -83,12 +63,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_LF = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value == '\n') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value != '\n';
}
};
@ -97,12 +73,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_LF = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value != '\n') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value == '\n';
}
};
@ -111,12 +83,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_CRLF = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value == '\r' || value == '\n') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value != '\r' && value != '\n';
}
};
@ -125,12 +93,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_CRLF = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value != '\r' && value != '\n') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value == '\r' || value == '\n';
}
};
@ -139,12 +103,8 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_LINEAR_WHITESPACE = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value == ' ' || value == '\t') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value != ' ' && value != '\t';
}
};
@ -153,21 +113,14 @@ public interface ByteBufProcessor {
*/
ByteBufProcessor FIND_NON_LINEAR_WHITESPACE = new ByteBufProcessor() {
@Override
public int process(byte value) throws Exception {
if (value != ' ' && value != '\t') {
throw ABORT;
} else {
return 1;
}
public boolean process(byte value) throws Exception {
return value == ' ' || value == '\t';
}
};
/**
* @return the number of elements processed. {@link ByteBuf#forEachByte(ByteBufProcessor)} will determine
* the index of the next byte to be processed based on this value. Usually, an implementation will
* return {@code 1} to advance the index by {@code 1}. Note that returning a non-positive value is
* allowed where a negative value advances the index in the opposite direction and zero leaves the index
* as-is.
* @return {@code true} if the processor wants to continue the loop and handle the next byte in the buffer.
* {@code false} if the processor wants to stop handling bytes and abort the loop.
*/
int process(byte value) throws Exception;
boolean process(byte value) throws Exception;
}

View File

@ -170,6 +170,22 @@ public interface CompositeByteBuf extends ByteBuf, Iterable<ByteBuf> {
*/
ByteBuf componentAtOffset(int offset);
/**
* Return the internal {@link ByteBuf} on the specified index. Note that updating the indexes of the returned
* buffer will lead to an undefined behavior of this buffer.
*
* @param cIndex the index for which the {@link ByteBuf} should be returned
*/
ByteBuf internalComponent(int cIndex);
/**
* Return the internal {@link ByteBuf} on the specified offset. Note that updating the indexes of the returned
* buffer will lead to an undefined behavior of this buffer.
*
* @param offset the offset for which the {@link ByteBuf} should be returned
*/
ByteBuf internalComponentAtOffset(int offset);
/**
* Discard all {@link ByteBuf}s which are read.
*

View File

@ -947,12 +947,22 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
@Override
public ByteBuf component(int cIndex) {
return internalComponent(cIndex).duplicate();
}
@Override
public ByteBuf componentAtOffset(int offset) {
return internalComponentAtOffset(offset).duplicate();
}
@Override
public ByteBuf internalComponent(int cIndex) {
checkComponentIndex(cIndex);
return components.get(cIndex).buf;
}
@Override
public ByteBuf componentAtOffset(int offset) {
public ByteBuf internalComponentAtOffset(int offset) {
return findComponent(offset).buf;
}

View File

@ -1662,11 +1662,11 @@ public abstract class AbstractByteBufTest {
int i = CAPACITY / 4;
@Override
public int process(byte value) throws Exception {
public boolean process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
lastIndex.set(i);
i ++;
return 1;
return true;
}
}), is(-1));
@ -1685,14 +1685,14 @@ public abstract class AbstractByteBufTest {
int i = CAPACITY / 3;
@Override
public int process(byte value) throws Exception {
public boolean process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
if (i == stop) {
throw ABORT;
return false;
}
i ++;
return 1;
return true;
}
}), is(stop));
}
@ -1709,11 +1709,11 @@ public abstract class AbstractByteBufTest {
int i = CAPACITY * 3 / 4 - 1;
@Override
public int process(byte value) throws Exception {
public boolean process(byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
lastIndex.set(i);
i --;
return 1;
return true;
}
}), is(-1));

View File

@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Signal;
import io.netty.util.internal.PlatformDependent;
import java.util.Arrays;
@ -390,8 +389,7 @@ public final class MessageList<T> implements Iterable<T> {
* Iterates over the messages in this list with the specified {@code processor}.
*
* @return {@code -1} if the processor iterated to or beyond the end of the readable bytes.
* If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be
* returned.
* The last-visited index If the {@link MessageListProcessor#process(Object)} returned {@code false}.
*/
public int forEach(MessageListProcessor<? super T> proc) {
if (proc == null) {
@ -409,11 +407,12 @@ public final class MessageList<T> implements Iterable<T> {
int i = 0;
try {
do {
i += p.process(elements[i]);
if (p.process(elements[i])) {
i ++;
} else {
return i;
}
} while (i < size);
} catch (Signal abort) {
abort.expect(MessageListProcessor.ABORT);
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}
@ -425,8 +424,7 @@ public final class MessageList<T> implements Iterable<T> {
* Iterates over the messages in this list with the specified {@code processor}.
*
* @return {@code -1} if the processor iterated to or beyond the end of the specified area.
* If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be
* returned.
* The last-visited index If the {@link MessageListProcessor#process(Object)} returned {@code false}.
*/
public int forEach(int index, int length, MessageListProcessor<? super T> proc) {
checkRange(index, length);
@ -446,11 +444,12 @@ public final class MessageList<T> implements Iterable<T> {
int i = index;
try {
do {
i += p.process(elements[i]);
if (p.process(elements[i])) {
i ++;
} else {
return i;
}
} while (i < end);
} catch (Signal abort) {
abort.expect(MessageListProcessor.ABORT);
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}
@ -542,8 +541,9 @@ public final class MessageList<T> implements Iterable<T> {
}
private final class MessageListIterator implements Iterator<T> {
private final int expectedModifications = modifications;
private int index;
private int expectedModifications = modifications;
private void checkConcurrentModifications() {
if (expectedModifications != modifications) {

View File

@ -16,18 +16,11 @@
package io.netty.channel;
import io.netty.util.Signal;
public interface MessageListProcessor<T> {
Signal ABORT = new Signal(MessageListProcessor.class.getName() + ".ABORT");
/**
* @return the number of elements processed. {@link MessageList#forEach(MessageListProcessor)} will determine
* the index of the next element to be processed based on this value. Usually, an implementation will
* return {@code 1} to advance the index by {@code 1}. Note that returning a non-positive value is
* allowed where a negative value advances the index in the opposite direction and zero leaves the index
* as-is.
* @return {@code true} if the processor wants to continue the loop and handle the next message in the list.
* {@code false} if the processor wants to stop handling messages and abort the loop.
*/
int process(T value) throws Exception;
boolean process(T value) throws Exception;
}