[#1106] Also handle FileRegion in ByteToByteEncoder and SslHandler

This commit is contained in:
Norman Maurer 2013-02-28 19:39:44 +01:00
parent 881bd8eea3
commit 42dad6d9d4
2 changed files with 125 additions and 0 deletions

View File

@ -19,8 +19,14 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundByteHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.channel.PartialFlushException;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
* {@link ChannelOutboundByteHandlerAdapter} which encodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other.
@ -78,6 +84,62 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
ctx.flush(promise);
}
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
long written = 0;
try {
for (;;) {
long localWritten = region.transferTo(new BufferChannel(ctx.outboundByteBuffer()), written);
if (localWritten == -1) {
checkEOF(region, written);
flush(ctx, promise);
break;
}
written += localWritten;
if (written >= region.count()) {
flush(ctx, promise);
break;
}
}
} catch (IOException e) {
promise.setFailure(new EncoderException(e));
} finally {
region.release();
}
}
private static void checkEOF(FileRegion region, long writtenBytes) throws IOException {
if (writtenBytes < region.count()) {
throw new EOFException("Expected to be able to write "
+ region.count() + " bytes, but only wrote "
+ writtenBytes);
}
}
private static final class BufferChannel implements WritableByteChannel {
private final ByteBuf buffer;
BufferChannel(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public int write(ByteBuffer src) {
int bytes = src.remaining();
buffer.writeBytes(src);
return bytes;
}
@Override
public boolean isOpen() {
return buffer.refCnt() > 0;
}
@Override
public void close() {
// NOOP
}
}
/**
* Encodes the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read anymore or till nothing was read from the input {@link ByteBuf}.

View File

@ -28,6 +28,7 @@ import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -36,11 +37,13 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
@ -159,6 +162,7 @@ public class SslHandler
private final boolean startTls;
private boolean sentFirstMessage;
private WritableByteChannel bufferChannel;
private final Queue<ChannelPromise> handshakePromises = new ArrayDeque<ChannelPromise>();
private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture();
@ -421,6 +425,65 @@ public class SslHandler
ctx.read();
}
@Override
public final void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise) throws Exception {
if (bufferChannel == null) {
bufferChannel = new BufferChannel(ctx.outboundByteBuffer());
}
long written = 0;
try {
for (;;) {
long localWritten = region.transferTo(bufferChannel, written);
if (localWritten == -1) {
checkEOF(region, written);
flush(ctx, promise);
break;
}
written += localWritten;
if (written >= region.count()) {
flush(ctx, promise);
break;
}
}
} catch (IOException e) {
promise.setFailure(e);
} finally {
region.release();
}
}
private static void checkEOF(FileRegion region, long writtenBytes) throws IOException {
if (writtenBytes < region.count()) {
throw new EOFException("Expected to be able to write "
+ region.count() + " bytes, but only wrote "
+ writtenBytes);
}
}
private static final class BufferChannel implements WritableByteChannel {
private final ByteBuf buffer;
BufferChannel(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public int write(ByteBuffer src) {
int bytes = src.remaining();
buffer.writeBytes(src);
return bytes;
}
@Override
public boolean isOpen() {
return buffer.refCnt() > 0;
}
@Override
public void close() {
// NOOP
}
}
@Override
public void flush(final ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
flush0(ctx, promise, false);