Add back support for FileRegion. See #668

This commit is contained in:
Norman Maurer 2012-10-24 18:27:26 +02:00
parent 33c0c89fef
commit f9225df0a9
17 changed files with 899 additions and 41 deletions

View File

@ -0,0 +1,111 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.filetransfer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
/**
* Server that accept the path of a file an echo back its content.
*/
public class FileServer {
private final int port;
public FileServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
ServerBootstrap b = new ServerBootstrap();
try {
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.localAddress(new InetSocketAddress(port))
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new StringEncoder(CharsetUtil.UTF_8),
new LineBasedFrameDecoder(8192),
new StringDecoder(CharsetUtil.UTF_8),
new FileHandler());
}
});
// Start the server.
ChannelFuture f = b.bind().sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
b.shutdown();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new FileServer(port).run();
}
private static final class FileHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
File file = new File(msg);
if (file.exists()) {
ctx.write(file + " " + file.length() + "\r\n");
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()));
} else {
ctx.write("File not found: " + file + "\r\n");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SocketFileRegionTest extends AbstractSocketTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576 * 10];
static {
random.nextBytes(data);
}
@Test
public void testFileRegion() throws Throwable {
run();
}
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
File file = File.createTempFile("netty-", ".tmp");
file.deleteOnExit();
FileOutputStream out = new FileOutputStream(file);
out.write(data);
out.close();
ChannelInboundByteHandlerAdapter ch = new ChannelInboundByteHandlerAdapter() {
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
in.clear();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
};
TestHandler sh = new TestHandler();
sb.childHandler(sh);
cb.handler(ch);
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
ChannelFuture future = cc.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(),
0L, file.length())).syncUninterruptibly();
assertTrue(future.isSuccess());
while (sh.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// Ignore.
}
}
sh.channel.close().sync();
cc.close().sync();
sc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
}
private static class TestHandler extends ChannelInboundByteHandlerAdapter {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
channel = ctx.channel();
}
@Override
public void inboundBufferUpdated(
ChannelHandlerContext ctx, ByteBuf in)
throws Exception {
byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual);
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(data[i + lastIdx], actual[i]);
}
counter += actual.length;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
ctx.close();
}
}
}
}

View File

@ -23,6 +23,7 @@ import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.DetectionUtil;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -96,6 +97,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private boolean strValActive;
private String strVal;
private AbstractUnsafe.FlushTask flushTaskInProgress;
/**
* Creates a new instance.
*
@ -317,6 +320,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return unsafe;
}
@Override
public ChannelFuture sendFile(FileRegion region) {
return pipeline.sendFile(region);
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelFuture future) {
return pipeline.sendFile(region, future);
}
protected abstract Unsafe newUnsafe();
/**
@ -380,10 +393,99 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return strVal;
}
protected abstract class AbstractUnsafe implements Unsafe {
private final class FlushTask {
final FileRegion region;
final ChannelFuture future;
FlushTask next;
FlushTask(FileRegion region, ChannelFuture future) {
this.region = region;
this.future = future;
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
flushTaskInProgress = next;
if (next != null) {
try {
FileRegion region = next.region;
if (region == null) {
// no region present means the next flush task was to directly flush
// the outbound buffer
flushNotifierAndFlush(future);
} else {
// flush the region now
doFlushFileRegion(region, future);
}
} catch (Throwable cause) {
future.setFailure(cause);
}
} else {
// notify the flush futures
flushFutureNotifier.notifyFlushFutures();
}
}
});
}
}
private final Runnable flushLaterTask = new FlushLater();
@Override
public final void sendFile(final FileRegion region, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (outboundBufSize() > 0) {
flushNotifier(newFuture()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
sendFile0(region, future);
}
});
} else {
// nothing pending try to send the fileRegion now!
sendFile0(region, future);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
sendFile(region, future);
}
});
}
}
private void sendFile0(FileRegion region, ChannelFuture future) {
if (flushTaskInProgress == null) {
flushTaskInProgress = new FlushTask(region, future);
try {
// the first FileRegion to flush so trigger it now!
doFlushFileRegion(region, future);
} catch (Throwable cause) {
region.close();
future.setFailure(cause);
}
return;
}
FlushTask task = flushTaskInProgress;
for (;;) {
FlushTask next = task.next;
if (next == null) {
break;
}
task = next;
}
// there is something that needs to get flushed first so add it as next in the chain
task.next = new FlushTask(region, future);
}
@Override
public final ChannelHandlerContext directOutboundContext() {
return pipeline.head;
@ -617,43 +719,24 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public void flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
// Append flush future to the notification list.
if (future != voidFuture) {
final int bufSize;
final ChannelHandlerContext ctx = directOutboundContext();
if (ctx.hasOutboundByteBuffer()) {
bufSize = ctx.outboundByteBuffer().readableBytes();
} else {
bufSize = ctx.outboundMessageBuffer().size();
}
flushFutureNotifier.addFlushFuture(future, bufSize);
}
if (flushTaskInProgress != null) {
FlushTask task = flushTaskInProgress;
if (!inFlushNow) { // Avoid re-entrance
try {
if (!isFlushPending()) {
flushNow();
} else {
// Event loop will call flushNow() later by itself.
}
} catch (Throwable t) {
flushFutureNotifier.notifyFlushFutures(t);
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
// loop over the tasks to find the last one
for (;;) {
FlushTask t = task.next;
if (t == null) {
break;
}
task = t.next;
}
} else {
if (!flushNowPending) {
flushNowPending = true;
eventLoop().execute(flushLaterTask);
}
task.next = new FlushTask(null, future);
return;
}
flushNotifierAndFlush(future);
} else {
eventLoop().execute(new Runnable() {
@Override
@ -664,9 +747,59 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private void flushNotifierAndFlush(ChannelFuture future) {
flushNotifier(future);
flush0();
}
private int outboundBufSize() {
final int bufSize;
final ChannelHandlerContext ctx = directOutboundContext();
if (ctx.hasOutboundByteBuffer()) {
bufSize = ctx.outboundByteBuffer().readableBytes();
} else {
bufSize = ctx.outboundMessageBuffer().size();
}
return bufSize;
}
private ChannelFuture flushNotifier(ChannelFuture future) {
// Append flush future to the notification list.
if (future != voidFuture) {
flushFutureNotifier.addFlushFuture(future, outboundBufSize());
}
return future;
}
private void flush0() {
if (!inFlushNow) { // Avoid re-entrance
try {
if (!isFlushPending()) {
flushNow();
} else {
// Event loop will call flushNow() later by itself.
}
} catch (Throwable t) {
flushFutureNotifier.notifyFlushFutures(t);
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
}
}
} else {
if (!flushNowPending) {
flushNowPending = true;
eventLoop().execute(flushLaterTask);
}
}
}
@Override
public final void flushNow() {
if (inFlushNow) {
if (inFlushNow || flushTaskInProgress != null) {
return;
}
@ -765,6 +898,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
throw new UnsupportedOperationException();
}
protected void doFlushFileRegion(FileRegion region, ChannelFuture future) throws Exception {
throw new UnsupportedOperationException();
}
protected static void checkEOF(FileRegion region, long writtenBytes) throws IOException {
if (writtenBytes < region.count()) {
throw new EOFException("Expected to be able to write "
+ region.count() + " bytes, but only wrote "
+ writtenBytes);
}
}
protected abstract boolean isFlushPending();
private final class CloseFuture extends DefaultChannelFuture implements ChannelFuture.Unsafe {

View File

@ -276,5 +276,8 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* has no effect.
*/
void resumeRead();
void sendFile(FileRegion region, ChannelFuture future);
}
}

View File

@ -19,6 +19,11 @@ import java.net.SocketAddress;
public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler {
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelFuture future) throws Exception {
ctx.sendFile(region, future);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {
ctx.bind(localAddress, future);

View File

@ -26,4 +26,6 @@ public interface ChannelOperationHandler extends ChannelHandler {
void close(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception;
void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelFuture future) throws Exception;
}

View File

@ -91,4 +91,9 @@ public class ChannelOperationHandlerAdapter implements ChannelOperationHandler {
}
ctx.flush(future);
}
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelFuture future) throws Exception {
ctx.sendFile(region, future);
}
}

View File

@ -26,6 +26,7 @@ public interface ChannelOutboundInvoker {
ChannelFuture deregister();
ChannelFuture flush();
ChannelFuture write(Object message);
ChannelFuture sendFile(FileRegion region);
ChannelFuture bind(SocketAddress localAddress, ChannelFuture future);
ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future);
@ -35,4 +36,5 @@ public interface ChannelOutboundInvoker {
ChannelFuture deregister(ChannelFuture future);
ChannelFuture flush(ChannelFuture future);
ChannelFuture write(Object message, ChannelFuture future);
ChannelFuture sendFile(FileRegion region, ChannelFuture future);
}

View File

@ -1261,4 +1261,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public void readable(boolean readable) {
pipeline.readable(this, readable);
}
@Override
public ChannelFuture sendFile(FileRegion region) {
return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, newFuture());
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelFuture future) {
return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, future);
}
}

View File

@ -1024,6 +1024,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture write(Object message) {
if (message instanceof FileRegion) {
return sendFile((FileRegion) message);
}
return write(message, channel.newFuture());
}
@ -1178,6 +1181,38 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return future;
}
@Override
public ChannelFuture sendFile(FileRegion region) {
return sendFile(region, channel().newFuture());
}
@Override
public ChannelFuture sendFile(FileRegion region, ChannelFuture future) {
return sendFile(firstContext(DIR_OUTBOUND), region, future);
}
ChannelFuture sendFile(final DefaultChannelHandlerContext ctx, final FileRegion region,
final ChannelFuture future) {
validateFuture(future);
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
ctx.flushBridge();
((ChannelOperationHandler) ctx.handler()).sendFile(ctx, region, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
sendFile(ctx, region, future);
}
});
}
return future;
}
@Override
public ChannelFuture flush(ChannelFuture future) {
return flush(firstContext(DIR_OUTBOUND), future);
@ -1218,6 +1253,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture write(Object message, ChannelFuture future) {
if (message instanceof FileRegion) {
return sendFile((FileRegion) message, future);
}
return write(tail, message, future);
}
@ -1502,5 +1540,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelFuture future) throws Exception {
unsafe.sendFile(region, future);
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
public class DefaultFileRegion implements FileRegion {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class);
private final FileChannel file;
private final long position;
private final long count;
public DefaultFileRegion(FileChannel file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}
@Override
public long position() {
return position;
}
@Override
public long count() {
return count;
}
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ")");
}
if (count == 0) {
return 0L;
}
return file.transferTo(this.position + position, count, target);
}
@Override
public void close() {
try {
file.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a file.", e);
}
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
/**
* A region of a file that is sent via a {@link Channel} which supports
* <a href="http://en.wikipedia.org/wiki/Zero-copy">zero-copy file transfer</a>.
*
* <h3>Upgrade your JDK / JRE</h3>
*
* {@link FileChannel#transferTo(long, long, WritableByteChannel)} has at least
* four known bugs in the old versions of Sun JDK and perhaps its derived ones.
* Please upgrade your JDK to 1.6.0_18 or later version if you are going to use
* zero-copy file transfer.
* <ul>
* <li><a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988">5103988</a>
* - FileChannel.transferTo() should return -1 for EAGAIN instead throws IOException</li>
* <li><a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6253145">6253145</a>
* - FileChannel.transferTo() on Linux fails when going beyond 2GB boundary</li>
* <li><a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6427312">6427312</a>
* - FileChannel.transferTo() throws IOException "system call interrupted"</li>
* <li><a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6524172">6470086</a>
* - FileChannel.transferTo(2147483647, 1, channel) causes "Value too large" exception</li>
* </ul>
*
* <h3>Check your operating system and JDK / JRE</h3>
*
* If your operating system (or JDK / JRE) does not support zero-copy file
* transfer, sending a file with {@link FileRegion} might fail or yield worse
* performance. For example, sending a large file doesn't work well in Windows.
*
* <h3>Not all transports support it</h3>
*
* Currently, the NIO transport is the only transport that supports {@link FileRegion}.
*/
public interface FileRegion {
/**
* Returns the offset in the file where the transfer began.
*/
long position();
/**
* Returns the number of bytes to transfer.
*/
long count();
/**
* Transfers the content of this file region to the specified channel.
*
* @param target the destination of the transfer
* @param position the relative offset of the file where the transfer
* begins from. For example, <tt>0</tt> will make the
* transfer start from {@link #position()}th byte and
* <tt>{@link #count()} - 1</tt> will make the last
* byte of the region transferred.
*/
long transferTo(WritableByteChannel target, long position) throws IOException;
/**
* Close the {@link FileRegion}.
*/
void close();
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
@ -37,6 +38,7 @@ import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -292,6 +294,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
}
@Override
protected void doFlushFileRegion(FileRegion region, ChannelFuture future) throws Exception {
region.transferTo(new WritableByteChannelAdapter(region, future), 0);
}
private void beginRead() {
if (inBeginRead || asyncReadInProgress || readSuspended.get()) {
return;
@ -540,4 +547,70 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
}
}
private final class WritableByteChannelAdapter implements WritableByteChannel {
private final FileRegion region;
private final ChannelFuture future;
private long written;
public WritableByteChannelAdapter(FileRegion region, ChannelFuture future) {
this.region = region;
this.future = future;
}
@Override
public int write(final ByteBuffer src) {
javaChannel().write(src, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
try {
if (result == 0) {
javaChannel().write(src, null, this);
return;
}
if (result == -1) {
checkEOF(region, written);
future.setSuccess();
return;
}
written += result;
if (written >= region.count()) {
region.close();
future.setSuccess();
return;
}
if (src.hasRemaining()) {
javaChannel().write(src, null, this);
} else {
region.transferTo(WritableByteChannelAdapter.this, written);
}
} catch (Throwable cause) {
region.close();
future.setFailure(cause);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
region.close();
future.setFailure(exc);
}
});
return 0;
}
@Override
public boolean isOpen() {
return javaChannel().isOpen();
}
@Override
public void close() throws IOException {
javaChannel().close();
}
}
}

View File

@ -17,13 +17,17 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;
abstract class AbstractNioByteChannel extends AbstractNioChannel {
@ -33,7 +37,7 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
@Override
protected NioByteUnsafe newUnsafe() {
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}
@ -105,6 +109,7 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
}
@Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
if (!buf.readable()) {
@ -126,6 +131,78 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
}
@Override
protected void doFlushFileRegion(final FileRegion region, final ChannelFuture future) throws Exception {
if (javaChannel() instanceof WritableByteChannel) {
TransferTask transferTask = new TransferTask(region, (WritableByteChannel) javaChannel(), future);
transferTask.transfer();
} else {
throw new UnsupportedOperationException("Underlying Channel is not of instance "
+ WritableByteChannel.class);
}
}
private final class TransferTask implements NioTask<SelectableChannel> {
private long writtenBytes;
private final FileRegion region;
private final WritableByteChannel wch;
private final ChannelFuture future;
TransferTask(FileRegion region, WritableByteChannel wch, ChannelFuture future) {
this.region = region;
this.wch = wch;
this.future = future;
}
public void transfer() {
try {
for (;;) {
long localWrittenBytes = region.transferTo(wch, writtenBytes);
if (localWrittenBytes == 0) {
// reschedule for write once the channel is writable again
eventLoop().executeWhenWritable(
AbstractNioByteChannel.this, this);
return;
} else if (localWrittenBytes == -1) {
checkEOF(region, writtenBytes);
future.setSuccess();
return;
} else {
writtenBytes += localWrittenBytes;
if (writtenBytes >= region.count()) {
region.close();
future.setSuccess();
return;
}
}
}
} catch (Throwable cause) {
region.close();
future.setFailure(cause);
pipeline().fireExceptionCaught(cause);
}
}
@Override
public void channelReady(SelectableChannel ch, SelectionKey key) throws Exception {
transfer();
}
@Override
public void channelUnregistered(SelectableChannel ch) throws Exception {
if (writtenBytes < region.count()) {
region.close();
if (!isOpen()) {
future.setFailure(new ClosedChannelException());
} else {
future.setFailure(new IllegalStateException(
"Channel was unregistered before the region could be fully written"));
}
}
}
}
protected abstract int doReadBytes(ByteBuf buf) throws Exception;
protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception;

View File

@ -29,6 +29,8 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -41,6 +43,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
private final int readInterestOp;
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
final Queue<NioTask<SelectableChannel>> writableTasks = new ConcurrentLinkedQueue<NioTask<SelectableChannel>>();
final Runnable suspendReadTask = new Runnable() {
@Override
@ -109,6 +112,11 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return ch;
}
@Override
public NioEventLoop eventLoop() {
return (NioEventLoop) super.eventLoop();
}
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;

View File

@ -128,6 +128,22 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
void executeWhenWritable(AbstractNioChannel channel, NioTask<? extends SelectableChannel> task) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}
SelectionKey key = channel.selectionKey();
channel.writableTasks.offer((NioTask<SelectableChannel>) task);
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
// Create a new selector and "transfer" all channels from the old
// selector to the new one
private Selector recreateSelector() {
@ -330,8 +346,9 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
int readyOps = -1;
try {
int readyOps = k.readyOps();
readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
@ -340,16 +357,45 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flushNow();
processWritable(k, ch);
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
} catch (CancelledKeyException e) {
if (readyOps != 1 && (readyOps & SelectionKey.OP_WRITE) != 0) {
unregisterWritableTasks(ch);
}
unsafe.close(unsafe.voidFuture());
}
}
private static void processWritable(SelectionKey k, AbstractNioChannel ch) {
if (ch.writableTasks.isEmpty()) {
ch.unsafe().flushNow();
} else {
NioTask<SelectableChannel> task = null;
for (;;) {
task = ch.writableTasks.poll();
if (task == null) { break; }
processSelectedKey(ch.selectionKey(), task);
}
k.interestOps(k.interestOps() | SelectionKey.OP_WRITE);
}
}
private static void unregisterWritableTasks(AbstractNioChannel ch) {
NioTask<SelectableChannel> task = null;
for (;;) {
task = ch.writableTasks.poll();
if (task == null) {
break;
} else {
invokeChannelUnregistered(task, ch.selectionKey());
}
}
}
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
boolean success = false;
try {
@ -371,11 +417,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private void closeAll() {
SelectorUtil.cleanupKeys(selector);
Set<SelectionKey> keys = selector.keys();
Collection<Channel> channels = new ArrayList<Channel>(keys.size());
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof Channel) {
channels.add((Channel) a);
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
@ -384,7 +430,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
for (Channel ch: channels) {
for (AbstractNioChannel ch: channels) {
unregisterWritableTasks(ch);
ch.unsafe().close(ch.unsafe().voidFuture());
}
}

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
@ -34,7 +35,9 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.Channels;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.WritableByteChannel;
public class OioSocketChannel extends AbstractOioByteChannel
implements SocketChannel {
@ -48,6 +51,7 @@ public class OioSocketChannel extends AbstractOioByteChannel
private final SocketChannelConfig config;
private InputStream is;
private OutputStream os;
private WritableByteChannel outChannel;
public OioSocketChannel() {
this(new Socket());
@ -223,4 +227,32 @@ public class OioSocketChannel extends AbstractOioByteChannel
}
buf.readBytes(os, buf.readableBytes());
}
@Override
protected void doFlushFileRegion(FileRegion region, ChannelFuture future) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
if (outChannel == null) {
outChannel = Channels.newChannel(os);
}
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) {
checkEOF(region, written);
region.close();
future.setSuccess();
return;
}
written += localWritten;
if (written >= region.count()) {
future.setSuccess();
return;
}
}
}
}