Modify ChunkedInput to not return a chunk. It now read the chunk and directly transfer it. This helps to safe a copy on most cases

This commit is contained in:
Norman Maurer 2012-06-10 20:31:14 +02:00
parent 574d84e98e
commit 12898a2ef4
9 changed files with 189 additions and 100 deletions

View File

@ -0,0 +1,27 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.stream;
import io.netty.buffer.ByteBuf;
/**
* {@link ChunkedInput} which read its chunks and transfer them to a {@link ByteBuf}
*
*/
public interface ChunkedByteInput extends ChunkedInput<ByteBuf> {
}

View File

@ -15,20 +15,20 @@
*/
package io.netty.handler.stream;
import static io.netty.buffer.ByteBufs.*;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* A {@link ChunkedInput} that fetches data from a file chunk by chunk.
* A {@link ChunkedByteInput} that fetches data from a file chunk by chunk.
* <p>
* If your operating system supports
* <a href="http://en.wikipedia.org/wiki/Zero-copy">zero-copy file transfer</a>
* such as {@code sendfile()}, you might want to use {@link FileRegion} instead.
*/
public class ChunkedFile implements ChunkedInput {
public class ChunkedFile implements ChunkedByteInput {
private final RandomAccessFile file;
private final long startOffset;
@ -136,16 +136,21 @@ public class ChunkedFile implements ChunkedInput {
}
@Override
public Object nextChunk() throws Exception {
public boolean readChunk(ByteBuf buffer) throws Exception {
long offset = this.offset;
if (offset >= endOffset) {
return null;
return false;
}
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
// Check if the buffer is backed by an byte array. If so we can optimize it a bit an safe a copy
byte[] chunk = new byte[chunkSize];
file.readFully(chunk);
buffer.writeBytes(chunk);
this.offset = offset + chunkSize;
return wrappedBuffer(chunk);
return true;
}
}

View File

@ -15,29 +15,12 @@
*/
package io.netty.handler.stream;
import io.netty.buffer.ByteBuf;
/**
* A data stream of indefinite length which is consumed by {@link ChunkedWriteHandler}.
* @apiviz.landmark
*/
public interface ChunkedInput {
/**
* Fetches a chunked data from the stream. The returned chunk is usually
* a {@link ByteBuf}, but you could extend an existing implementation
* to convert the {@link ByteBuf} into a different type that your
* handler or encoder understands. Once this method returns the last chunk
* and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()}
* call must return {@code false}.
*
* @return the fetched chunk, which is usually {@link ByteBuf}.
* {@code null} if there is no data left in the stream.
* Please note that {@code null} does not necessarily mean that the
* stream has reached at its end. In a slow stream, the next chunk
* might be unavailable just momentarily.
*/
Object nextChunk() throws Exception;
public interface ChunkedInput<B> {
/**
* Return {@code true} if and only if there is no data left in the stream
@ -49,4 +32,16 @@ public interface ChunkedInput {
* Releases the resources associated with the stream.
*/
void close() throws Exception;
/**
* Fetches a chunked data from the stream. The chunk is then
* transfered to the given buffer. Once this method returns the last chunk
* and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()}
* call must return {@code false}.
*
* @param buffer read the next chunk and transfer it to the given buffer
*/
boolean readChunk(B buffer) throws Exception;
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.stream;
import java.util.Queue;
/**
* {@link ChunkedInput} which reads its chunks and transfer it to a {@link Queue}
*
*
*/
public interface ChunkedMessageInput extends ChunkedInput<Queue<Object>> {
}

View File

@ -15,7 +15,7 @@
*/
package io.netty.handler.stream;
import static io.netty.buffer.ByteBufs.*;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.FileInputStream;
@ -24,14 +24,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* A {@link ChunkedInput} that fetches data from a file chunk by chunk using
* A {@link ChunkedByteInput} that fetches data from a file chunk by chunk using
* NIO {@link FileChannel}.
* <p>
* If your operating system supports
* <a href="http://en.wikipedia.org/wiki/Zero-copy">zero-copy file transfer</a>
* such as {@code sendfile()}, you might want to use {@link FileRegion} instead.
*/
public class ChunkedNioFile implements ChunkedInput {
public class ChunkedNioFile implements ChunkedByteInput {
private final FileChannel in;
private long startOffset;
@ -141,10 +141,10 @@ public class ChunkedNioFile implements ChunkedInput {
}
@Override
public Object nextChunk() throws Exception {
public boolean readChunk(ByteBuf buffer) throws Exception {
long offset = this.offset;
if (offset >= endOffset) {
return null;
return false;
}
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
@ -161,8 +161,10 @@ public class ChunkedNioFile implements ChunkedInput {
break;
}
}
chunk.flip();
buffer.writeBytes(chunk);
this.offset += readBytes;
return wrappedBuffer(chunkArray);
return true;
}
}

View File

@ -15,18 +15,17 @@
*/
package io.netty.handler.stream;
import static io.netty.buffer.ByteBufs.*;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
/**
* A {@link ChunkedInput} that fetches data from a {@link ReadableByteChannel}
* A {@link ChunkedByteInput} that fetches data from a {@link ReadableByteChannel}
* chunk by chunk. Please note that the {@link ReadableByteChannel} must
* operate in blocking mode. Non-blocking mode channels are not supported.
*/
public class ChunkedNioStream implements ChunkedInput {
public class ChunkedNioStream implements ChunkedByteInput {
private final ReadableByteChannel in;
@ -97,10 +96,11 @@ public class ChunkedNioStream implements ChunkedInput {
}
@Override
public Object nextChunk() throws Exception {
public boolean readChunk(ByteBuf buffer) throws Exception {
if (isEndOfInput()) {
return null;
return false;
}
// buffer cannot be not be empty from there
int readBytes = byteBuffer.position();
for (;;) {
@ -116,9 +116,9 @@ public class ChunkedNioStream implements ChunkedInput {
}
}
byteBuffer.flip();
// copy since buffer is keeped for next usage
ByteBuf buffer = copiedBuffer(byteBuffer);
buffer.writeBytes(byteBuffer);
byteBuffer.clear();
return buffer;
return true;
}
}

View File

@ -15,13 +15,13 @@
*/
package io.netty.handler.stream;
import static io.netty.buffer.ByteBufs.*;
import io.netty.buffer.ByteBuf;
import java.io.InputStream;
import java.io.PushbackInputStream;
/**
* A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
* A {@link ChunkedByteInput} that fetches data from an {@link InputStream} chunk by
* chunk.
* <p>
* Please note that the {@link InputStream} instance that feeds data into
@ -30,7 +30,7 @@ import java.io.PushbackInputStream;
* Otherwise, {@link ChunkedStream} will generate many too small chunks or
* block unnecessarily often.
*/
public class ChunkedStream implements ChunkedInput {
public class ChunkedStream implements ChunkedByteInput {
static final int DEFAULT_CHUNK_SIZE = 8192;
@ -93,9 +93,9 @@ public class ChunkedStream implements ChunkedInput {
}
@Override
public Object nextChunk() throws Exception {
public boolean readChunk(ByteBuf buffer) throws Exception {
if (isEndOfInput()) {
return null;
return false;
}
final int availableBytes = in.available();
@ -105,21 +105,9 @@ public class ChunkedStream implements ChunkedInput {
} else {
chunkSize = Math.min(this.chunkSize, in.available());
}
final byte[] chunk = new byte[chunkSize];
int readBytes = 0;
for (;;) {
int localReadBytes = in.read(chunk, readBytes, chunkSize - readBytes);
if (localReadBytes < 0) {
break;
}
readBytes += localReadBytes;
offset += localReadBytes;
if (readBytes == chunkSize) {
break;
}
}
return wrappedBuffer(chunk, 0, readBytes);
// transfer to buffer
offset =+ buffer.writeBytes(in, chunkSize);
return true;
}
}

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.stream;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.Channel;
@ -154,7 +153,7 @@ public class ChunkedWriteHandler
}
if (currentEvent instanceof ChunkedInput) {
closeInput((ChunkedInput) currentEvent);
closeInput((ChunkedInput<?>) currentEvent);
} else if (currentEvent instanceof ChannelFuture) {
fireExceptionCaught = true;
((ChannelFuture) currentEvent).setFailure(cause);
@ -186,15 +185,15 @@ public class ChunkedWriteHandler
this.currentEvent = null;
ctx.flush((ChannelFuture) currentEvent);
} else if (currentEvent instanceof ChunkedInput) {
final ChunkedInput chunks = (ChunkedInput) currentEvent;
Object chunk;
final ChunkedInput<?> chunks = (ChunkedInput<?>) currentEvent;
boolean read;
boolean endOfInput;
boolean suspend;
try {
chunk = chunks.nextChunk();
read = readChunk(ctx, chunks);
endOfInput = chunks.isEndOfInput();
if (chunk == null) {
chunk = ByteBufs.EMPTY_BUFFER;
if (!read) {
// No need to suspend when reached at the end.
suspend = !endOfInput;
} else {
@ -226,7 +225,6 @@ public class ChunkedWriteHandler
}
pendingWrites.incrementAndGet();
ctx.nextOutboundMessageBuffer().add(chunk);
ChannelFuture f = ctx.flush();
if (endOfInput) {
this.currentEvent = null;
@ -249,7 +247,7 @@ public class ChunkedWriteHandler
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput) currentEvent);
closeInput((ChunkedInput<?>) currentEvent);
}
}
});
@ -259,7 +257,7 @@ public class ChunkedWriteHandler
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput) currentEvent);
closeInput((ChunkedInput<?>) currentEvent);
} else if (isWritable()) {
resumeTransfer();
}
@ -280,7 +278,25 @@ public class ChunkedWriteHandler
}
static void closeInput(ChunkedInput chunks) {
/**
* Read the next {@link ChunkedInput} and transfer it the the outbound buffer.
*
* @param ctx the {@link ChannelHandlerContext} this handler is bound to
* @param chunks the {@link ChunkedInput} to read from
* @return read <code>true</code> if something could be transfered to the outbound buffer
* @throws Exception if something goes wrong
*/
protected boolean readChunk(ChannelHandlerContext ctx, ChunkedInput<?> chunks) throws Exception {
if (chunks instanceof ChunkedByteInput) {
return ((ChunkedByteInput) chunks).readChunk(ctx.nextOutboundByteBuffer());
} else if (chunks instanceof ChunkedMessageInput) {
return ((ChunkedMessageInput) chunks).readChunk(ctx.nextOutboundMessageBuffer());
} else {
throw new IllegalArgumentException("ChunkedInput instance " + chunks + " not supported");
}
}
static void closeInput(ChunkedInput<?> chunks) {
try {
chunks.close();
} catch (Throwable t) {

View File

@ -15,15 +15,14 @@
*/
package io.netty.handler.stream;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufs;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.MessageBufs;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.channel.embedded.EmbeddedMessageChannel;
import io.netty.util.CharsetUtil;
@ -32,6 +31,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@ -105,16 +105,10 @@ public class ChunkedWriteHandlerTest {
public void testListenerNotifiedWhenIsEnd() {
ByteBuf buffer = ByteBufs.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
ChunkedInput input = new ChunkedInput() {
ChunkedByteInput input = new ChunkedByteInput() {
private boolean done;
private final ByteBuf buffer = ByteBufs.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
@Override
public Object nextChunk() throws Exception {
done = true;
return buffer.duplicate();
}
@Override
public boolean isEndOfInput() throws Exception {
return done;
@ -124,6 +118,16 @@ public class ChunkedWriteHandlerTest {
public void close() throws Exception {
// NOOP
}
@Override
public boolean readChunk(ByteBuf buffer) throws Exception {
if (done) {
return false;
}
done = true;
buffer.writeBytes(this.buffer.duplicate());
return true;
}
};
final AtomicBoolean listenerNotified = new AtomicBoolean(false);
@ -135,20 +139,7 @@ public class ChunkedWriteHandlerTest {
}
};
ChannelOutboundHandlerAdapter testHandler = new ChannelOutboundHandlerAdapter() {
@Override
public MessageBuf<ByteBuf> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return MessageBufs.buffer();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
future.setSuccess();
}
};
EmbeddedMessageChannel ch = new EmbeddedMessageChannel(new ChunkedWriteHandler(), testHandler);
EmbeddedByteChannel ch = new EmbeddedByteChannel(new ChunkedWriteHandler());
ch.outboundMessageBuffer().add(input);
ch.flush().addListener(listener).syncUninterruptibly();
ch.checkException();
@ -162,10 +153,48 @@ public class ChunkedWriteHandlerTest {
}
private static void check(ChunkedInput... inputs) {
EmbeddedMessageChannel ch = new EmbeddedMessageChannel(new ChunkedWriteHandler());
@Test
public void testChunkedMessageInput() {
for (ChunkedInput input: inputs) {
ChunkedMessageInput input = new ChunkedMessageInput() {
private boolean done;
@Override
public boolean isEndOfInput() throws Exception {
return done;
}
@Override
public void close() throws Exception {
// NOOP
}
@Override
public boolean readChunk(Queue<Object> buffer) throws Exception {
if (done) {
return false;
}
done = true;
buffer.add(0);
return true;
}
};
EmbeddedMessageChannel ch = new EmbeddedMessageChannel(new ChunkedWriteHandler());
ch.outboundMessageBuffer().add(input);
ch.flush().syncUninterruptibly();
ch.checkException();
assertTrue(ch.finish());
assertEquals(0, ch.readOutbound());
assertNull(ch.readOutbound());
}
private static void check(ChunkedInput<?>... inputs) {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new ChunkedWriteHandler());
for (ChunkedInput<?> input: inputs) {
ch.writeOutbound(input);
}