Fixed NETTY-184 Provide NIO equivalent for ChunkedFile and ChunkedStream

* Added ChunkedNioFile and ChunkedNioStream
* Added another convenience constructor to ChunkedFile
This commit is contained in:
Trustin Lee 2009-06-25 07:45:15 +00:00
parent 3a2350158d
commit cfa4e5e4ee
3 changed files with 321 additions and 0 deletions

View File

@ -60,6 +60,13 @@ public class ChunkedFile implements ChunkedInput {
this(new RandomAccessFile(file, "r"), chunkSize);
}
/**
* Creates a new instance that fetches data from the specified file.
*/
public ChunkedFile(RandomAccessFile file) throws IOException {
this(file, ChunkedStream.DEFAULT_CHUNK_SIZE);
}
/**
* Creates a new instance that fetches data from the specified file.
*

View File

@ -0,0 +1,173 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.stream;
import static org.jboss.netty.buffer.ChannelBuffers.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* A {@link ChunkedInput} that fetches data from a file chunk by chunk using
* NIO {@link FileChannel}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @author Frederic Bregier
* @version $Rev$, $Date$
*/
public class ChunkedNioFile implements ChunkedInput {
private final FileChannel in;
private long startOffset;
private final long endOffset;
private final int chunkSize;
private volatile long offset;
/**
* Creates a new instance that fetches data from the specified file.
*/
public ChunkedNioFile(File in) throws IOException {
this(new FileInputStream(in).getChannel());
}
/**
* Creates a new instance that fetches data from the specified file.
*
* @param chunkSize the number of bytes to fetch on each
* {@link #nextChunk()} call
*/
public ChunkedNioFile(File in, int chunkSize) throws IOException {
this(new FileInputStream(in).getChannel(), chunkSize);
}
/**
* Creates a new instance that fetches data from the specified file.
*/
public ChunkedNioFile(FileChannel in) throws IOException {
this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
}
/**
* Creates a new instance that fetches data from the specified file.
*
* @param chunkSize the number of bytes to fetch on each
* {@link #nextChunk()} call
*/
public ChunkedNioFile(FileChannel in, int chunkSize) throws IOException {
this(in, 0, in.size(), chunkSize);
}
/**
* Creates a new instance that fetches data from the specified file.
*
* @param offset the offset of the file where the transfer begins
* @param length the number of bytes to transfer
* @param chunkSize the number of bytes to fetch on each
* {@link #nextChunk()} call
*/
public ChunkedNioFile(FileChannel in, long offset, long length, int chunkSize)
throws IOException {
if (in == null) {
throw new NullPointerException("in");
}
if (offset < 0) {
throw new IllegalArgumentException(
"offset: " + offset + " (expected: 0 or greater)");
}
if (length < 0) {
throw new IllegalArgumentException(
"length: " + length + " (expected: 0 or greater)");
}
if (chunkSize <= 0) {
throw new IllegalArgumentException(
"chunkSize: " + chunkSize +
" (expected: a positive integer)");
}
if (offset != 0) {
in.position(offset);
}
this.in = in;
this.chunkSize = chunkSize;
this.offset = startOffset = offset;
endOffset = offset + length;
}
/**
* Returns the offset in the file where the transfer began.
*/
public long getStartOffset() {
return startOffset;
}
/**
* Returns the offset in the file where the transfer will end.
*/
public long getEndOffset() {
return endOffset;
}
/**
* Returns the offset in the file where the transfer is happening currently.
*/
public long getCurrentOffset() {
return offset;
}
public boolean hasNextChunk() throws Exception {
return offset < endOffset && in.isOpen();
}
public void close() throws Exception {
in.close();
}
public Object nextChunk() throws Exception {
long offset = this.offset;
if (offset >= endOffset) {
return null;
}
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
byte[] chunkArray = new byte[chunkSize];
ByteBuffer chunk = ByteBuffer.wrap(chunkArray);
int readBytes = 0;
for (;;) {
int localReadBytes = in.read(chunk);
if (localReadBytes < 0) {
break;
}
readBytes += localReadBytes;
if (readBytes == chunkSize) {
break;
}
}
this.offset += readBytes;
return wrappedBuffer(chunkArray);
}
}

View File

@ -0,0 +1,141 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.stream;
import static org.jboss.netty.buffer.ChannelBuffers.*;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.jboss.netty.buffer.ChannelBuffer;
/**
* A {@link ChunkedInput} that fetches data from a {@link ReadableByteChannel}
* chunk by chunk. Please note that the {@link ReadableByteChannel} must
* operate in blocking mode. Non-blocking mode channels are not supported.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @author Frederic Bregier
* @version $Rev$, $Date$
*/
public class ChunkedNioStream implements ChunkedInput {
private final ReadableByteChannel in;
private final int chunkSize;
private volatile long offset;
/**
* Associated ByteBuffer
*/
private ByteBuffer byteBuffer = null;
/**
* Creates a new instance that fetches data from the specified channel.
*/
public ChunkedNioStream(ReadableByteChannel in) {
this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
}
/**
* Creates a new instance that fetches data from the specified channel.
*
* @param chunkSize the number of bytes to fetch on each
* {@link #nextChunk()} call
*/
public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
if (in == null) {
throw new NullPointerException("in");
}
if (chunkSize <= 0) {
throw new IllegalArgumentException("chunkSize: " + chunkSize +
" (expected: a positive integer)");
}
this.in = in;
offset = 0;
this.chunkSize = chunkSize;
if (byteBuffer != null) {
if (byteBuffer.capacity() != chunkSize) {
byteBuffer = null;
byteBuffer = ByteBuffer.allocate(chunkSize);
}
} else {
byteBuffer = ByteBuffer.allocate(chunkSize);
}
}
/**
* Returns the number of transferred bytes.
*/
public long getTransferredBytes() {
return offset;
}
public boolean hasNextChunk() throws Exception {
if (byteBuffer.position() > 0) {
// A previous read was not over, so there is a next chunk in the buffer at least
return true;
}
if (in.isOpen()) {
// Try to read a new part, and keep this part (no rewind)
int b = in.read(byteBuffer);
if (b < 0) {
return false;
} else {
offset += b;
return true;
}
}
return false;
}
public void close() throws Exception {
in.close();
}
public Object nextChunk() throws Exception {
if (!hasNextChunk()) {
return null;
}
// buffer cannot be not be empty from there
int readBytes = byteBuffer.position();
for (;;) {
int localReadBytes = in.read(byteBuffer);
if (localReadBytes < 0) {
break;
}
readBytes += localReadBytes;
offset += localReadBytes;
if (readBytes == chunkSize) {
break;
}
}
byteBuffer.flip();
// copy since buffer is keeped for next usage
ChannelBuffer buffer = copiedBuffer(byteBuffer);
byteBuffer.clear();
return buffer;
}
}