Issue NETTY-175 - Large data transfer

* Added ChunkStream and FileChunkStream
* Added ChunkStreamWriteHandler
This commit is contained in:
Trustin Lee 2009-06-16 07:04:20 +00:00
parent ee8ffec265
commit c08e7dd397
3 changed files with 307 additions and 0 deletions

View File

@ -0,0 +1,36 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.stream;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public interface ChunkStream {
boolean available() throws Exception;
Object readChunk() throws Exception;
void close() throws Exception;
}

View File

@ -0,0 +1,160 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.stream;
import static org.jboss.netty.channel.Channels.*;
import java.util.Queue;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.LinkedTransferQueue;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
@ChannelPipelineCoverage("one")
public class ChunkStreamWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkStreamWriteHandler.class);
private final Queue<MessageEvent> queue =
new LinkedTransferQueue<MessageEvent>();
private MessageEvent currentEvent;
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (!(e instanceof MessageEvent)) {
ctx.sendDownstream(e);
return;
}
queue.offer((MessageEvent) e);
if (ctx.getChannel().isWritable()) {
flushWriteEventQueue(ctx);
}
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (!(e instanceof ChannelStateEvent)) {
ctx.sendUpstream(e);
return;
}
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() != ChannelState.INTEREST_OPS) {
ctx.sendUpstream(e);
return;
}
if (ctx.getChannel().isWritable()) {
flushWriteEventQueue(ctx);
}
}
private synchronized void flushWriteEventQueue(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.getChannel();
do {
if (currentEvent == null) {
currentEvent = queue.poll();
}
if (currentEvent == null) {
break;
}
Object m = currentEvent.getMessage();
if (m instanceof ChunkStream) {
ChunkStream stream = (ChunkStream) m;
Object chunk;
boolean last;
try {
chunk = stream.readChunk();
last = !stream.available();
} catch (Throwable t) {
currentEvent.getFuture().setFailure(t);
fireExceptionCaught(ctx, t);
try {
stream.close();
} catch (Throwable t2) {
logger.warn("Failed to close a stream.", t2);
}
break;
}
if (chunk != null) {
ChannelFuture writeFuture;
final MessageEvent currentEvent = this.currentEvent;
if (last) {
this.currentEvent = null;
writeFuture = currentEvent.getFuture();
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
((ChunkStream) currentEvent.getMessage()).close();
}
});
} else {
writeFuture = future(channel);
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
currentEvent.getFuture().setFailure(future.getCause());
((ChunkStream) currentEvent.getMessage()).close();
}
}
});
}
Channels.write(
ctx, writeFuture, chunk,
currentEvent.getRemoteAddress());
} else {
currentEvent = null;
}
} else {
ctx.sendDownstream(currentEvent);
currentEvent = null;
}
} while (channel.isWritable());
}
}

View File

@ -0,0 +1,111 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.stream;
import static org.jboss.netty.buffer.ChannelBuffers.*;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class FileChunkStream implements ChunkStream {
private static final int DEFAULT_CHUNK_SIZE = 8192;
private final RandomAccessFile file;
private final long startOffset;
private final long endOffset;
private final int chunkSize;
private volatile long offset;
public FileChunkStream(File file) throws IOException {
this(file, DEFAULT_CHUNK_SIZE);
}
public FileChunkStream(File file, int chunkSize) throws IOException {
this(new RandomAccessFile(file, "r"), chunkSize);
}
public FileChunkStream(RandomAccessFile file, int chunkSize) throws IOException {
this(file, 0, file.length(), chunkSize);
}
public FileChunkStream(RandomAccessFile file, long offset, long length, int chunkSize) throws IOException {
if (file == null) {
throw new NullPointerException("file");
}
if (offset < 0) {
throw new IllegalArgumentException(
"offset: " + offset + " (expected: 0 or greater)");
}
if (length < 0) {
throw new IllegalArgumentException(
"length: " + length + " (expected: 0 or greater)");
}
this.file = file;
this.offset = startOffset = offset;
endOffset = offset + length;
this.chunkSize = chunkSize;
file.seek(offset);
}
public long getStartOffset() {
return startOffset;
}
public long getEndOffset() {
return endOffset;
}
public long getCurrentOffset() {
return offset;
}
public boolean available() throws Exception {
return offset < endOffset && file.getChannel().isOpen();
}
public void close() throws Exception {
file.close();
}
public Object readChunk() throws Exception {
long offset = this.offset;
if (offset >= endOffset) {
return null;
}
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
byte[] chunk = new byte[chunkSize];
file.readFully(chunk);
this.offset = offset + chunkSize;
return wrappedBuffer(chunk);
}
}