Add ByteBufProcessor and ByteBuf.forEach(...)

- Fixes #1378
- Needs to provide optimized forEach implementations though.
This commit is contained in:
Trustin Lee 2013-06-27 13:55:42 +09:00
parent 734ec51ac9
commit 4dd9b6ef2e
13 changed files with 279 additions and 29 deletions

View File

@ -17,6 +17,8 @@ package io.netty.buffer;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.Signal;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.io.InputStream;
@ -1030,6 +1032,44 @@ public abstract class AbstractByteBuf implements ByteBuf {
return endIndex - index;
}
@Override
public int forEachByte(ByteBufProcessor processor) {
int index = readerIndex;
int length = writerIndex - index;
return forEach0(index, length, processor);
}
@Override
public int forEachByte(int index, int length, ByteBufProcessor processor) {
checkIndex(index, length);
return forEach0(index, length, processor);
}
private int forEach0(int index, int length, ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
if (length == 0) {
return -1;
}
final int end = index + length;
int i = index;
try {
do {
i += processor.process(this, i, _getByte(i));
} while (i < end);
} catch (Signal signal) {
signal.expect(ByteBufProcessor.ABORT);
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}
return -1;
}
@Override
public int hashCode() {
return ByteBufUtil.hashCode(this);

View File

@ -1690,6 +1690,22 @@ public interface ByteBuf extends ReferenceCounted, Comparable<ByteBuf> {
*/
int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder);
/**
* Iterates over the readable bytes of this buffer with the specified {@code processor}.
*
* @return {@code -1} if the processor iterated to or beyond the end of the readable bytes.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
*/
int forEachByte(ByteBufProcessor processor);
/**
* Iterates over the specified area of this buffer with the specified {@code processor}.
*
* @return {@code -1} if the processor iterated to or beyond the end of the specified area.
* If the {@code processor} raised {@link ByteBufProcessor#ABORT}, the last-visited index will be returned.
*/
int forEachByte(int index, int length, ByteBufProcessor processor);
/**
* Returns a copy of this buffer's readable bytes. Modifying the content
* of the returned buffer or this buffer does not affect each other at all.

View File

@ -0,0 +1,30 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.Signal;
public interface ByteBufProcessor {
Signal ABORT = new Signal(ByteBufProcessor.class.getName() + ".ABORT");
/**
* @return the number of elements processed. {@link ByteBuf#forEachByte(ByteBufProcessor)} will determine
* the index of the next byte to be processed based on this value. Usually, an implementation will
* return {@code 1} to advance the index by {@code 1}.
*/
int process(ByteBuf buf, int index, byte value) throws Exception;
}

View File

@ -708,6 +708,17 @@ public final class EmptyByteBuf implements ByteBuf {
return -1;
}
@Override
public int forEachByte(ByteBufProcessor processor) {
return -1;
}
@Override
public int forEachByte(int index, int length, ByteBufProcessor processor) {
checkIndex(index, length);
return -1;
}
@Override
public ByteBuf copy() {
return this;

View File

@ -692,7 +692,7 @@ public final class SwappedByteBuf implements ByteBuf {
@Override
public int bytesBefore(ByteBufIndexFinder indexFinder) {
return buf.bytesBefore(indexFinder);
return buf.bytesBefore(new SwappedByteBufIndexFinder(indexFinder));
}
@Override
@ -702,7 +702,7 @@ public final class SwappedByteBuf implements ByteBuf {
@Override
public int bytesBefore(int length, ByteBufIndexFinder indexFinder) {
return buf.bytesBefore(length, indexFinder);
return buf.bytesBefore(length, new SwappedByteBufIndexFinder(indexFinder));
}
@Override
@ -712,7 +712,17 @@ public final class SwappedByteBuf implements ByteBuf {
@Override
public int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder) {
return buf.bytesBefore(index, length, indexFinder);
return buf.bytesBefore(index, length, new SwappedByteBufIndexFinder(indexFinder));
}
@Override
public int forEachByte(ByteBufProcessor processor) {
return buf.forEachByte(new SwappedByteBufProcessor(processor));
}
@Override
public int forEachByte(int index, int length, ByteBufProcessor processor) {
return buf.forEachByte(index, length, new SwappedByteBufProcessor(processor));
}
@Override
@ -866,4 +876,37 @@ public final class SwappedByteBuf implements ByteBuf {
public String toString() {
return "Swapped(" + buf.toString() + ')';
}
private final class SwappedByteBufIndexFinder implements ByteBufIndexFinder {
private final ByteBufIndexFinder indexFinder;
SwappedByteBufIndexFinder(ByteBufIndexFinder indexFinder) {
if (indexFinder == null) {
throw new NullPointerException("indexFinder");
}
this.indexFinder = indexFinder;
}
@Override
public boolean find(ByteBuf buffer, int guessedIndex) {
return indexFinder.find(SwappedByteBuf.this, guessedIndex);
}
}
private final class SwappedByteBufProcessor implements ByteBufProcessor {
private final ByteBufProcessor processor;
SwappedByteBufProcessor(ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
this.processor = processor;
}
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
return processor.process(SwappedByteBuf.this, index, value);
}
}
}

View File

@ -721,6 +721,16 @@ final class UnreleasableByteBuf implements ByteBuf {
return buf.bytesBefore(index, length, indexFinder);
}
@Override
public int forEachByte(ByteBufProcessor processor) {
return buf.forEachByte(processor);
}
@Override
public int forEachByte(int index, int length, ByteBufProcessor processor) {
return buf.forEachByte(index, length, processor);
}
@Override
public ByteBuf copy() {
return buf.copy();

View File

@ -1647,4 +1647,50 @@ public abstract class AbstractByteBufTest {
buffer.readerIndex(buffer.writerIndex());
buffer.discardReadBytes();
}
@Test
public void testForEachByte() {
buffer.clear();
for (int i = 0; i < CAPACITY; i ++) {
buffer.writeByte(i + 1);
}
buffer.setIndex(CAPACITY / 4, CAPACITY * 3 / 4);
assertThat(buffer.forEachByte(new ByteBufProcessor() {
int i = CAPACITY / 4;
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
assertThat(index, is(i));
i++;
return 1;
}
}), is(-1));
}
@Test
public void testForEachByteAbort() {
buffer.clear();
for (int i = 0; i < CAPACITY; i ++) {
buffer.writeByte(i + 1);
}
final int stop = CAPACITY / 2;
assertThat(buffer.forEachByte(CAPACITY / 3, CAPACITY / 3, new ByteBufProcessor() {
int i = CAPACITY / 3;
@Override
public int process(ByteBuf buf, int index, byte value) throws Exception {
assertThat(value, is((byte) (i + 1)));
assertThat(index, is(i));
i++;
if (index == stop) {
throw ABORT;
}
return 1;
}
}), is(stop));
}
}

View File

@ -15,14 +15,14 @@
*/
package io.netty.handler.codec.spdy;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
private final int version;
@ -43,7 +43,7 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
int numBytes;
do {
numBytes = decompress(frame);
} while (!decompressed.readable() && numBytes > 0);
} while (!decompressed.isReadable() && numBytes > 0);
}
private void setInput(ByteBuf compressed) {
@ -54,7 +54,7 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
private int decompress(SpdyHeadersFrame frame) throws Exception {
if (decompressed == null) {
decompressed = decompressed = Unpooled.buffer(8192);
decompressed = Unpooled.buffer(8192);
}
try {
int numBytes = decompressor.inflate(out);

View File

@ -16,11 +16,11 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
@ -99,7 +99,7 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
try {
encode(ctx, cast, buf);
} finally {
ByteBufUtil.release(cast);
ReferenceCountUtil.release(cast);
}
} else {
if (buf != null && buf.isReadable()) {

View File

@ -15,10 +15,10 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.TypeParameterMatcher;
@ -81,7 +81,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAd
try {
decode(ctx, cast, out);
} finally {
ByteBufUtil.release(cast);
ReferenceCountUtil.release(cast);
}
} else {
out.add(m);

View File

@ -15,11 +15,11 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.MessageList;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.TypeParameterMatcher;
@ -79,7 +79,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
try {
encode(ctx, cast, out);
} finally {
ByteBufUtil.release(cast);
ReferenceCountUtil.release(cast);
}
} else {
out.add(m);

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufIndexFinder;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.SwappedByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Signal;
@ -306,8 +307,7 @@ final class ReplayingDecoderBuffer implements ByteBuf {
}
@Override
public int indexOf(int fromIndex, int toIndex,
ByteBufIndexFinder indexFinder) {
public int indexOf(int fromIndex, int toIndex, ByteBufIndexFinder indexFinder) {
int endIndex = buffer.indexOf(fromIndex, toIndex, indexFinder);
if (endIndex < 0) {
throw REPLAY;
@ -363,8 +363,7 @@ final class ReplayingDecoderBuffer implements ByteBuf {
}
@Override
public int bytesBefore(int index, int length,
ByteBufIndexFinder indexFinder) {
public int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder) {
int bytes = buffer.bytesBefore(index, length, indexFinder);
if (bytes < 0) {
throw REPLAY;
@ -372,6 +371,36 @@ final class ReplayingDecoderBuffer implements ByteBuf {
return bytes;
}
@Override
public int forEachByte(ByteBufProcessor processor) {
int ret = buffer.forEachByte(processor);
if (ret < 0 && !terminated) {
throw REPLAY;
} else {
return ret;
}
}
@Override
public int forEachByte(int index, int length, ByteBufProcessor processor) {
int writerIndex = buffer.writerIndex();
if (index >= writerIndex) {
throw REPLAY;
}
if (terminated || index + length <= writerIndex) {
return buffer.forEachByte(index, length, processor);
}
int ret = buffer.forEachByte(index, writerIndex - index, processor);
if (ret < 0) {
throw REPLAY;
} else {
return ret;
}
}
@Override
public ByteBuf markReaderIndex() {
buffer.markReaderIndex();

View File

@ -336,51 +336,76 @@ public final class MessageList<T> implements Iterable<T> {
return (MessageList<U>) this;
}
public boolean forEach(MessageListProcessor<? super T> proc) {
/**
* Iterates over the messages in this list with the specified {@code processor}.
*
* @return {@code -1} if the processor iterated to or beyond the end of the readable bytes.
* If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be
* returned.
*/
public int forEach(MessageListProcessor<? super T> proc) {
if (proc == null) {
throw new NullPointerException("proc");
}
final int size = this.size;
if (size == 0) {
return -1;
}
@SuppressWarnings("unchecked")
MessageListProcessor<T> p = (MessageListProcessor<T>) proc;
int size = this.size;
int i = 0;
try {
for (int i = 0; i < size; i ++) {
do {
i += p.process(this, i, elements[i]);
}
} while (i < size);
} catch (Signal abort) {
abort.expect(MessageListProcessor.ABORT);
return false;
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}
return true;
return -1;
}
public boolean forEach(int index, int length, MessageListProcessor<? super T> proc) {
/**
* Iterates over the messages in this list with the specified {@code processor}.
*
* @return {@code -1} if the processor iterated to or beyond the end of the specified area.
* If the {@code processor} raised {@link MessageListProcessor#ABORT}, the last-visited index will be
* returned.
*/
public int forEach(int index, int length, MessageListProcessor<? super T> proc) {
checkRange(index, length);
if (proc == null) {
throw new NullPointerException("proc");
}
if (size == 0) {
return -1;
}
@SuppressWarnings("unchecked")
MessageListProcessor<T> p = (MessageListProcessor<T>) proc;
int end = index + length;
final int end = index + length;
int i = index;
try {
for (int i = index; i < end;) {
do {
i += p.process(this, i, elements[i]);
}
} while (i < end);
} catch (Signal abort) {
abort.expect(MessageListProcessor.ABORT);
return false;
return i;
} catch (Exception e) {
PlatformDependent.throwException(e);
}
return true;
return -1;
}
@Override