Update HttpStaticFileServer example / Fix bugs related with progress notification
- Fix a bug in DefaultProgressivePromise.tryProgress() where the notification is dropped - Fix a bug in AbstractChannel.calculateMessageSize() where FileRegion is not counted - HttpStaticFileServer example now uses zero copy file transfer if possible.
This commit is contained in:
parent
f96a8e5951
commit
762adfcb69
|
@ -55,7 +55,12 @@ public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements P
|
|||
|
||||
@Override
|
||||
public boolean tryProgress(long progress, long total) {
|
||||
if (progress < 0 || progress > total || isDone()) {
|
||||
if (total < 0) {
|
||||
total = -1;
|
||||
if (progress < 0 || isDone()) {
|
||||
return false;
|
||||
}
|
||||
} else if (progress < 0 || progress > total || isDone()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import io.netty.channel.ChannelFutureListener;
|
|||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelProgressiveFuture;
|
||||
import io.netty.channel.ChannelProgressiveFutureListener;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||
|
@ -106,6 +107,12 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
|
|||
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
|
||||
public static final int HTTP_CACHE_SECONDS = 60;
|
||||
|
||||
private final boolean useSendFile;
|
||||
|
||||
public HttpStaticFileServerHandler(boolean useSendFile) {
|
||||
this.useSendFile = useSendFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(
|
||||
ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
|
||||
|
@ -181,11 +188,25 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
|
|||
|
||||
// Write the initial line and the header.
|
||||
ctx.write(response);
|
||||
|
||||
// Write the content.
|
||||
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192), ctx.newProgressivePromise()).addListener(new ChannelProgressiveFutureListener() {
|
||||
ChannelFuture sendFileFuture;
|
||||
if (useSendFile) {
|
||||
sendFileFuture =
|
||||
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
|
||||
} else {
|
||||
sendFileFuture =
|
||||
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192), ctx.newProgressivePromise());
|
||||
}
|
||||
|
||||
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
|
||||
@Override
|
||||
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
|
||||
System.err.println("Transfer progress: " + progress);
|
||||
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
|
||||
if (total < 0) { // total unknown
|
||||
System.err.println("Transfer progress: " + progress);
|
||||
} else {
|
||||
System.err.println("Transfer progress: " + progress + " / " + total);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,13 +214,14 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
|
|||
System.err.println("Transfer complete.");
|
||||
}
|
||||
});
|
||||
|
||||
// Write the end marker
|
||||
ChannelFuture writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
|
||||
// Decide whether to close the connection or not.
|
||||
if (!isKeepAlive(request)) {
|
||||
// Close the connection when the whole content is written out.
|
||||
writeFuture.addListener(ChannelFutureListener.CLOSE);
|
||||
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,6 @@ public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketCh
|
|||
pipeline.addLast("encoder", new HttpResponseEncoder());
|
||||
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
|
||||
|
||||
pipeline.addLast("handler", new HttpStaticFileServerHandler());
|
||||
pipeline.addLast("handler", new HttpStaticFileServerHandler(true)); // Specify false if SSL.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -296,11 +296,11 @@ public class ChunkedWriteHandler
|
|||
if (!future.isSuccess()) {
|
||||
closeInput((ChunkedInput<?>) pendingMessage);
|
||||
currentWrite.fail(future.cause());
|
||||
} else if (isWritable()) {
|
||||
currentWrite.progress();
|
||||
resumeTransfer();
|
||||
} else {
|
||||
currentWrite.progress();
|
||||
if (isWritable()) {
|
||||
resumeTransfer();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -794,13 +794,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||
* Calculate the number of bytes a message takes up in memory. Sub-classes may override this if they use different
|
||||
* messages then {@link ByteBuf} or {@link ByteBufHolder}. If the size can not be calculated 0 should be returned.
|
||||
*/
|
||||
protected int calculateMessageSize(Object message) {
|
||||
protected long calculateMessageSize(Object message) {
|
||||
if (message instanceof ByteBuf) {
|
||||
return ((ByteBuf) message).readableBytes();
|
||||
}
|
||||
if (message instanceof ByteBufHolder) {
|
||||
return ((ByteBufHolder) message).content().readableBytes();
|
||||
}
|
||||
if (message instanceof FileRegion) {
|
||||
return ((FileRegion) message).count();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ public final class ChannelOutboundBuffer {
|
|||
unflushed = this.unflushed;
|
||||
}
|
||||
|
||||
final int size = channel.calculateMessageSize(msg);
|
||||
final long size = channel.calculateMessageSize(msg);
|
||||
incrementPendingOutboundBytes(size);
|
||||
|
||||
unflushed[unflushedCount] = msg;
|
||||
|
@ -249,7 +249,7 @@ public final class ChannelOutboundBuffer {
|
|||
tail = n;
|
||||
}
|
||||
|
||||
private void incrementPendingOutboundBytes(int size) {
|
||||
private void incrementPendingOutboundBytes(long size) {
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user