diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index 5ad40037ed..3e2e4c1603 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -17,6 +17,8 @@ package io.netty.buffer; import io.netty.util.IllegalReferenceCountException; import io.netty.util.ResourceLeakDetector; +import io.netty.util.Signal; +import io.netty.util.internal.PlatformDependent; import java.io.IOException; import java.io.InputStream; @@ -1030,6 +1032,44 @@ public abstract class AbstractByteBuf implements ByteBuf { return endIndex - index; } + @Override + public int forEachByte(ByteBufProcessor processor) { + int index = readerIndex; + int length = writerIndex - index; + return forEach0(index, length, processor); + } + + @Override + public int forEachByte(int index, int length, ByteBufProcessor processor) { + checkIndex(index, length); + return forEach0(index, length, processor); + } + + private int forEach0(int index, int length, ByteBufProcessor processor) { + if (processor == null) { + throw new NullPointerException("processor"); + } + + if (length == 0) { + return -1; + } + + final int end = index + length; + int i = index; + try { + do { + i += processor.process(this, i, _getByte(i)); + } while (i < end); + } catch (Signal signal) { + signal.expect(ByteBufProcessor.ABORT); + return i; + } catch (Exception e) { + PlatformDependent.throwException(e); + } + + return -1; + } + @Override public int hashCode() { return ByteBufUtil.hashCode(this); diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index 9062c82e2d..397f3db3d2 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1690,6 +1690,22 @@ public interface ByteBuf extends ReferenceCounted, Comparable { */ int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder); + /** + * Iterates over the readable bytes of this buffer with the specified {@code processor}. + * + * @return {@code -1} if the processor iterated to or beyond the end of the readable bytes. + * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. + */ + int forEachByte(ByteBufProcessor processor); + + /** + * Iterates over the specified area of this buffer with the specified {@code processor}. + * + * @return {@code -1} if the processor iterated to or beyond the end of the specified area. + * If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned. + */ + int forEachByte(int index, int length, ByteBufProcessor processor); + /** * Returns a copy of this buffer's readable bytes. Modifying the content * of the returned buffer or this buffer does not affect each other at all. diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java b/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java new file mode 100644 index 0000000000..adcfe6c7df --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/ByteBufProcessor.java @@ -0,0 +1,30 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import io.netty.util.Signal; + +public interface ByteBufProcessor { + Signal ABORT = new Signal(ByteBufProcessor.class.getName() + ".ABORT"); + + /** + * @return the number of elements processed. {@link ByteBuf#forEachByte(ByteBufProcessor)} will determine + * the index of the next byte to be processed based on this value. Usually, an implementation will + * return {@code 1} to advance the index by {@code 1}. + */ + int process(ByteBuf buf, int index, byte value) throws Exception; +} diff --git a/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java b/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java index e8924ab0c7..23343443bd 100644 --- a/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/EmptyByteBuf.java @@ -708,6 +708,17 @@ public final class EmptyByteBuf implements ByteBuf { return -1; } + @Override + public int forEachByte(ByteBufProcessor processor) { + return -1; + } + + @Override + public int forEachByte(int index, int length, ByteBufProcessor processor) { + checkIndex(index, length); + return -1; + } + @Override public ByteBuf copy() { return this; diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index a6624f9949..52b6d8888d 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -692,7 +692,7 @@ public final class SwappedByteBuf implements ByteBuf { @Override public int bytesBefore(ByteBufIndexFinder indexFinder) { - return buf.bytesBefore(indexFinder); + return buf.bytesBefore(new SwappedByteBufIndexFinder(indexFinder)); } @Override @@ -702,7 +702,7 @@ public final class SwappedByteBuf implements ByteBuf { @Override public int bytesBefore(int length, ByteBufIndexFinder indexFinder) { - return buf.bytesBefore(length, indexFinder); + return buf.bytesBefore(length, new SwappedByteBufIndexFinder(indexFinder)); } @Override @@ -712,7 +712,17 @@ public final class SwappedByteBuf implements ByteBuf { @Override public int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder) { - return buf.bytesBefore(index, length, indexFinder); + return buf.bytesBefore(index, length, new SwappedByteBufIndexFinder(indexFinder)); + } + + @Override + public int forEachByte(ByteBufProcessor processor) { + return buf.forEachByte(new SwappedByteBufProcessor(processor)); + } + + @Override + public int forEachByte(int index, int length, ByteBufProcessor processor) { + return buf.forEachByte(index, length, new SwappedByteBufProcessor(processor)); } @Override @@ -866,4 +876,37 @@ public final class SwappedByteBuf implements ByteBuf { public String toString() { return "Swapped(" + buf.toString() + ')'; } + + private final class SwappedByteBufIndexFinder implements ByteBufIndexFinder { + private final ByteBufIndexFinder indexFinder; + + SwappedByteBufIndexFinder(ByteBufIndexFinder indexFinder) { + if (indexFinder == null) { + throw new NullPointerException("indexFinder"); + } + this.indexFinder = indexFinder; + } + + @Override + public boolean find(ByteBuf buffer, int guessedIndex) { + return indexFinder.find(SwappedByteBuf.this, guessedIndex); + } + } + + private final class SwappedByteBufProcessor implements ByteBufProcessor { + + private final ByteBufProcessor processor; + + SwappedByteBufProcessor(ByteBufProcessor processor) { + if (processor == null) { + throw new NullPointerException("processor"); + } + this.processor = processor; + } + + @Override + public int process(ByteBuf buf, int index, byte value) throws Exception { + return processor.process(SwappedByteBuf.this, index, value); + } + } } diff --git a/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java index 3dd629525e..9b970da43b 100644 --- a/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnreleasableByteBuf.java @@ -721,6 +721,16 @@ final class UnreleasableByteBuf implements ByteBuf { return buf.bytesBefore(index, length, indexFinder); } + @Override + public int forEachByte(ByteBufProcessor processor) { + return buf.forEachByte(processor); + } + + @Override + public int forEachByte(int index, int length, ByteBufProcessor processor) { + return buf.forEachByte(index, length, processor); + } + @Override public ByteBuf copy() { return buf.copy(); diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java index fdcbd02cde..0e9e433d5f 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java @@ -1647,4 +1647,50 @@ public abstract class AbstractByteBufTest { buffer.readerIndex(buffer.writerIndex()); buffer.discardReadBytes(); } + + @Test + public void testForEachByte() { + buffer.clear(); + for (int i = 0; i < CAPACITY; i ++) { + buffer.writeByte(i + 1); + } + + buffer.setIndex(CAPACITY / 4, CAPACITY * 3 / 4); + assertThat(buffer.forEachByte(new ByteBufProcessor() { + int i = CAPACITY / 4; + + @Override + public int process(ByteBuf buf, int index, byte value) throws Exception { + assertThat(value, is((byte) (i + 1))); + assertThat(index, is(i)); + i++; + return 1; + } + }), is(-1)); + } + + @Test + public void testForEachByteAbort() { + buffer.clear(); + for (int i = 0; i < CAPACITY; i ++) { + buffer.writeByte(i + 1); + } + + final int stop = CAPACITY / 2; + assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY / 3, new ByteBufProcessor() { + int i = CAPACITY / 3; + + @Override + public int process(ByteBuf buf, int index, byte value) throws Exception { + assertThat(value, is((byte) (i + 1))); + assertThat(index, is(i)); + i++; + + if (index == stop) { + throw ABORT; + } + return 1; + } + }), is(stop)); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java index b3ae441263..62d4853143 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHeaderBlockZlibDecoder.java @@ -15,14 +15,14 @@ */ package io.netty.handler.codec.spdy; -import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; + class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder { private final int version; @@ -43,7 +43,7 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder { int numBytes; do { numBytes = decompress(frame); - } while (!decompressed.readable() && numBytes > 0); + } while (!decompressed.isReadable() && numBytes > 0); } private void setInput(ByteBuf compressed) { @@ -54,7 +54,7 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder { private int decompress(SpdyHeadersFrame frame) throws Exception { if (decompressed == null) { - decompressed = decompressed = Unpooled.buffer(8192); + decompressed = Unpooled.buffer(8192); } try { int numBytes = decompressor.inflate(out); diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java index d05e1b53b6..3f49d78e63 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java @@ -16,11 +16,11 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.MessageList; +import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.TypeParameterMatcher; @@ -99,7 +99,7 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap try { encode(ctx, cast, buf); } finally { - ByteBufUtil.release(cast); + ReferenceCountUtil.release(cast); } } else { if (buf != null && buf.isReadable()) { diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index e63b506479..c6f7bef8fc 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -15,10 +15,10 @@ */ package io.netty.handler.codec; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.MessageList; +import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.internal.TypeParameterMatcher; @@ -81,7 +81,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAd try { decode(ctx, cast, out); } finally { - ByteBufUtil.release(cast); + ReferenceCountUtil.release(cast); } } else { out.add(m); diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 471d6cfd9d..3d53d72ebf 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -15,11 +15,11 @@ */ package io.netty.handler.codec; -import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.MessageList; +import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.internal.TypeParameterMatcher; @@ -79,7 +79,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandlerA try { encode(ctx, cast, out); } finally { - ByteBufUtil.release(cast); + ReferenceCountUtil.release(cast); } } else { out.add(m); diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index 9f362e3bba..e0b148c0ab 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -18,6 +18,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufIndexFinder; +import io.netty.buffer.ByteBufProcessor; import io.netty.buffer.SwappedByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Signal; @@ -306,8 +307,7 @@ final class ReplayingDecoderBuffer implements ByteBuf { } @Override - public int indexOf(int fromIndex, int toIndex, - ByteBufIndexFinder indexFinder) { + public int indexOf(int fromIndex, int toIndex, ByteBufIndexFinder indexFinder) { int endIndex = buffer.indexOf(fromIndex, toIndex, indexFinder); if (endIndex < 0) { throw REPLAY; @@ -363,8 +363,7 @@ final class ReplayingDecoderBuffer implements ByteBuf { } @Override - public int bytesBefore(int index, int length, - ByteBufIndexFinder indexFinder) { + public int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder) { int bytes = buffer.bytesBefore(index, length, indexFinder); if (bytes < 0) { throw REPLAY; @@ -372,6 +371,36 @@ final class ReplayingDecoderBuffer implements ByteBuf { return bytes; } + @Override + public int forEachByte(ByteBufProcessor processor) { + int ret = buffer.forEachByte(processor); + if (ret < 0 && !terminated) { + throw REPLAY; + } else { + return ret; + } + } + + @Override + public int forEachByte(int index, int length, ByteBufProcessor processor) { + int writerIndex = buffer.writerIndex(); + + if (index >= writerIndex) { + throw REPLAY; + } + + if (terminated || index + length <= writerIndex) { + return buffer.forEachByte(index, length, processor); + } + + int ret = buffer.forEachByte(index, writerIndex - index, processor); + if (ret < 0) { + throw REPLAY; + } else { + return ret; + } + } + @Override public ByteBuf markReaderIndex() { buffer.markReaderIndex(); diff --git a/transport/src/main/java/io/netty/channel/MessageList.java b/transport/src/main/java/io/netty/channel/MessageList.java index 37dac8d9d0..97e888753f 100644 --- a/transport/src/main/java/io/netty/channel/MessageList.java +++ b/transport/src/main/java/io/netty/channel/MessageList.java @@ -336,51 +336,76 @@ public final class MessageList implements Iterable { return (MessageList) this; } - public boolean forEach(MessageListProcessor proc) { + /** + * Iterates over the messages in this list with the specified {@code processor}. + * + * @return {@code -1} if the processor iterated to or beyond the end of the readable bytes. + * If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be + * returned. + */ + public int forEach(MessageListProcessor proc) { if (proc == null) { throw new NullPointerException("proc"); } + final int size = this.size; + if (size == 0) { + return -1; + } + @SuppressWarnings("unchecked") MessageListProcessor p = (MessageListProcessor) proc; - int size = this.size; + int i = 0; try { - for (int i = 0; i < size; i ++) { + do { i += p.process(this, i, elements[i]); - } + } while (i < size); } catch (Signal abort) { abort.expect(MessageListProcessor.ABORT); - return false; + return i; } catch (Exception e) { PlatformDependent.throwException(e); } - return true; + return -1; } - public boolean forEach(int index, int length, MessageListProcessor proc) { + /** + * Iterates over the messages in this list with the specified {@code processor}. + * + * @return {@code -1} if the processor iterated to or beyond the end of the specified area. + * If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be + * returned. + */ + public int forEach(int index, int length, MessageListProcessor proc) { checkRange(index, length); if (proc == null) { throw new NullPointerException("proc"); } + if (size == 0) { + return -1; + } + @SuppressWarnings("unchecked") MessageListProcessor p = (MessageListProcessor) proc; - int end = index + length; + final int end = index + length; + + int i = index; try { - for (int i = index; i < end;) { + do { i += p.process(this, i, elements[i]); - } + } while (i < end); } catch (Signal abort) { abort.expect(MessageListProcessor.ABORT); - return false; + return i; } catch (Exception e) { PlatformDependent.throwException(e); } - return true; + return -1; } @Override