From 0b9235f072f8bdac8bfa2f3e365a9cbcdbb0fe81 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 5 Jul 2013 14:00:05 +0900 Subject: [PATCH] Simplify ByteBufProcessor and MessageListProcessor and Add internal component accessors to CompositeByteBuf - Fixes #1528 It's not really easy to provide a general-purpose abstraction for fast-yet-safe iteration. Instead of making forEachByte() less optimal, let's make it do what it does really well, and allow a user to implement potentially unsafe-yet-fast loop using unsafe operations. --- .../java/io/netty/buffer/AbstractByteBuf.java | 19 ++-- .../main/java/io/netty/buffer/ByteBuf.java | 9 +- .../io/netty/buffer/ByteBufProcessor.java | 93 +++++-------------- .../io/netty/buffer/CompositeByteBuf.java | 16 ++++ .../netty/buffer/DefaultCompositeByteBuf.java | 12 ++- .../io/netty/buffer/AbstractByteBufTest.java | 14 +-- .../java/io/netty/channel/MessageList.java | 28 +++--- .../netty/channel/MessageListProcessor.java | 13 +-- 8 files changed, 89 insertions(+), 115 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index f41ca9b209..5875a21727 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -17,7 +17,6 @@ package io.netty.buffer; import io.netty.util.IllegalReferenceCountException; import io.netty.util.ResourceLeakDetector; -import io.netty.util.Signal; import io.netty.util.internal.PlatformDependent; import java.io.IOException; @@ -1043,11 +1042,12 @@ public abstract class AbstractByteBuf implements ByteBuf { int i = index; try { do { - i += processor.process(_getByte(i)); + if (processor.process(_getByte(i))) { + i ++; + } else { + return i; + } } while (i < endIndex); - } catch (Signal signal) { - signal.expect(ByteBufProcessor.ABORT); - return i; } catch (Exception e) { PlatformDependent.throwException(e); } @@ -1080,11 +1080,12 @@ public abstract class AbstractByteBuf implements ByteBuf { int i = index + length - 1; try { do { - i -= processor.process(_getByte(i)); + if (processor.process(_getByte(i))) { + i --; + } else { + return i; + } } while (i >= index); - } catch (Signal signal) { - signal.expect(ByteBufProcessor.ABORT); - return i; } catch (Exception e) { PlatformDependent.throwException(e); } diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index 84f9d86184..33d69952ca 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1685,7 +1685,7 @@ public interface ByteBuf extends ReferenceCounted, Comparable { * Iterates over the readable bytes of this buffer with the specified {@code processor} in ascending order. * * @return {@code -1} if the processor iterated to or beyond the end of the readable bytes. - * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. + * The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}. */ int forEachByte(ByteBufProcessor processor); @@ -1694,7 +1694,7 @@ public interface ByteBuf extends ReferenceCounted, Comparable { * (i.e. {@code index}, {@code (index + 1)}, .. {@code (index + length - 1)}) * * @return {@code -1} if the processor iterated to or beyond the end of the specified area. - * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. + * The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}. */ int forEachByte(int index, int length, ByteBufProcessor processor); @@ -1702,7 +1702,7 @@ public interface ByteBuf extends ReferenceCounted, Comparable { * Iterates over the readable bytes of this buffer with the specified {@code processor} in descending order. * * @return {@code -1} if the processor iterated to or beyond the beginning of the readable bytes. - * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. + * The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}. */ int forEachByteDesc(ByteBufProcessor processor); @@ -1710,8 +1710,9 @@ public interface ByteBuf extends ReferenceCounted, Comparable { * Iterates over the specified area of this buffer with the specified {@code processor} in descending order. * (i.e. {@code (index + length - 1)}, {@code (index + length - 2)}, ... {@code index}) * + * * @return {@code -1} if the processor iterated to or beyond the beginning of the specified area. - * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. + * The last-visited index If the {@link ByteBufProcessor#process(byte)} returned {@code false}. */ int forEachByteDesc(int index, int length, ByteBufProcessor processor); diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java b/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java index e6a428aa27..a1323a0ce6 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java @@ -16,23 +16,15 @@ package io.netty.buffer; -import io.netty.util.Signal; - public interface ByteBufProcessor { - Signal ABORT = new Signal(ByteBufProcessor.class.getName() + ".ABORT"); - /** * Aborts on a {@code NUL (0x00)}. */ ByteBufProcessor FIND_NUL = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value == 0) { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value != 0; } }; @@ -41,12 +33,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_NON_NUL = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value != 0) { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value == 0; } }; @@ -55,12 +43,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_CR = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value == '\r') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value != '\r'; } }; @@ -69,12 +53,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_NON_CR = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value != '\r') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value == '\r'; } }; @@ -83,12 +63,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_LF = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value == '\n') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value != '\n'; } }; @@ -97,12 +73,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_NON_LF = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value != '\n') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value == '\n'; } }; @@ -111,12 +83,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_CRLF = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value == '\r' || value == '\n') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value != '\r' && value != '\n'; } }; @@ -125,12 +93,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_NON_CRLF = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value != '\r' && value != '\n') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value == '\r' || value == '\n'; } }; @@ -139,12 +103,8 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_LINEAR_WHITESPACE = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value == ' ' || value == '\t') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value != ' ' && value != '\t'; } }; @@ -153,21 +113,14 @@ public interface ByteBufProcessor { */ ByteBufProcessor FIND_NON_LINEAR_WHITESPACE = new ByteBufProcessor() { @Override - public int process(byte value) throws Exception { - if (value != ' ' && value != '\t') { - throw ABORT; - } else { - return 1; - } + public boolean process(byte value) throws Exception { + return value == ' ' || value == '\t'; } }; /** - * @return the number of elements processed. {@link ByteBuf#forEachByte(ByteBufProcessor)} will determine - * the index of the next byte to be processed based on this value. Usually, an implementation will - * return {@code 1} to advance the index by {@code 1}. Note that returning a non-positive value is - * allowed where a negative value advances the index in the opposite direction and zero leaves the index - * as-is. + * @return {@code true} if the processor wants to continue the loop and handle the next byte in the buffer. + * {@code false} if the processor wants to stop handling bytes and abort the loop. */ - int process(byte value) throws Exception; + boolean process(byte value) throws Exception; } diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java index f39a50608f..9dad4ffabd 100644 --- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java @@ -170,6 +170,22 @@ public interface CompositeByteBuf extends ByteBuf, Iterable { */ ByteBuf componentAtOffset(int offset); + /** + * Return the internal {@link ByteBuf} on the specified index. Note that updating the indexes of the returned + * buffer will lead to an undefined behavior of this buffer. + * + * @param cIndex the index for which the {@link ByteBuf} should be returned + */ + ByteBuf internalComponent(int cIndex); + + /** + * Return the internal {@link ByteBuf} on the specified offset. Note that updating the indexes of the returned + * buffer will lead to an undefined behavior of this buffer. + * + * @param offset the offset for which the {@link ByteBuf} should be returned + */ + ByteBuf internalComponentAtOffset(int offset); + /** * Discard all {@link ByteBuf}s which are read. * diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index 05fcd86636..93474a8caa 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -947,12 +947,22 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp @Override public ByteBuf component(int cIndex) { + return internalComponent(cIndex).duplicate(); + } + + @Override + public ByteBuf componentAtOffset(int offset) { + return internalComponentAtOffset(offset).duplicate(); + } + + @Override + public ByteBuf internalComponent(int cIndex) { checkComponentIndex(cIndex); return components.get(cIndex).buf; } @Override - public ByteBuf componentAtOffset(int offset) { + public ByteBuf internalComponentAtOffset(int offset) { return findComponent(offset).buf; } diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java index 76ea1d5293..ab5c5032b1 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java @@ -1662,11 +1662,11 @@ public abstract class AbstractByteBufTest { int i = CAPACITY / 4; @Override - public int process(byte value) throws Exception { + public boolean process(byte value) throws Exception { assertThat(value, is((byte) (i + 1))); lastIndex.set(i); i ++; - return 1; + return true; } }), is(-1)); @@ -1685,14 +1685,14 @@ public abstract class AbstractByteBufTest { int i = CAPACITY / 3; @Override - public int process(byte value) throws Exception { + public boolean process(byte value) throws Exception { assertThat(value, is((byte) (i + 1))); if (i == stop) { - throw ABORT; + return false; } i ++; - return 1; + return true; } }), is(stop)); } @@ -1709,11 +1709,11 @@ public abstract class AbstractByteBufTest { int i = CAPACITY * 3 / 4 - 1; @Override - public int process(byte value) throws Exception { + public boolean process(byte value) throws Exception { assertThat(value, is((byte) (i + 1))); lastIndex.set(i); i --; - return 1; + return true; } }), is(-1)); diff --git a/transport/src/main/java/io/netty/channel/MessageList.java b/transport/src/main/java/io/netty/channel/MessageList.java index 1b53fbb7e4..35f0cd8593 100644 --- a/transport/src/main/java/io/netty/channel/MessageList.java +++ b/transport/src/main/java/io/netty/channel/MessageList.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; -import io.netty.util.Signal; import io.netty.util.internal.PlatformDependent; import java.util.Arrays; @@ -390,8 +389,7 @@ public final class MessageList implements Iterable { * Iterates over the messages in this list with the specified {@code processor}. * * @return {@code -1} if the processor iterated to or beyond the end of the readable bytes. - * If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be - * returned. + * The last-visited index If the {@link MessageListProcessor#process(Object)} returned {@code false}. */ public int forEach(MessageListProcessor proc) { if (proc == null) { @@ -409,11 +407,12 @@ public final class MessageList implements Iterable { int i = 0; try { do { - i += p.process(elements[i]); + if (p.process(elements[i])) { + i ++; + } else { + return i; + } } while (i < size); - } catch (Signal abort) { - abort.expect(MessageListProcessor.ABORT); - return i; } catch (Exception e) { PlatformDependent.throwException(e); } @@ -425,8 +424,7 @@ public final class MessageList implements Iterable { * Iterates over the messages in this list with the specified {@code processor}. * * @return {@code -1} if the processor iterated to or beyond the end of the specified area. - * If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be - * returned. + * The last-visited index If the {@link MessageListProcessor#process(Object)} returned {@code false}. */ public int forEach(int index, int length, MessageListProcessor proc) { checkRange(index, length); @@ -446,11 +444,12 @@ public final class MessageList implements Iterable { int i = index; try { do { - i += p.process(elements[i]); + if (p.process(elements[i])) { + i ++; + } else { + return i; + } } while (i < end); - } catch (Signal abort) { - abort.expect(MessageListProcessor.ABORT); - return i; } catch (Exception e) { PlatformDependent.throwException(e); } @@ -542,8 +541,9 @@ public final class MessageList implements Iterable { } private final class MessageListIterator implements Iterator { + + private final int expectedModifications = modifications; private int index; - private int expectedModifications = modifications; private void checkConcurrentModifications() { if (expectedModifications != modifications) { diff --git a/transport/src/main/java/io/netty/channel/MessageListProcessor.java b/transport/src/main/java/io/netty/channel/MessageListProcessor.java index 6cd5f53b0d..bfcdb91f7b 100644 --- a/transport/src/main/java/io/netty/channel/MessageListProcessor.java +++ b/transport/src/main/java/io/netty/channel/MessageListProcessor.java @@ -16,18 +16,11 @@ package io.netty.channel; -import io.netty.util.Signal; - public interface MessageListProcessor { - Signal ABORT = new Signal(MessageListProcessor.class.getName() + ".ABORT"); - /** - * @return the number of elements processed. {@link MessageList#forEach(MessageListProcessor)} will determine - * the index of the next element to be processed based on this value. Usually, an implementation will - * return {@code 1} to advance the index by {@code 1}. Note that returning a non-positive value is - * allowed where a negative value advances the index in the opposite direction and zero leaves the index - * as-is. + * @return {@code true} if the processor wants to continue the loop and handle the next message in the list. + * {@code false} if the processor wants to stop handling messages and abort the loop. */ - int process(T value) throws Exception; + boolean process(T value) throws Exception; }