diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index f2cba30a35..bf02e64712 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -522,7 +522,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // the first FileRegion to flush so trigger it now! doFlushFileRegion(region, promise); } catch (Throwable cause) { - region.close(); + region.release(); promise.setFailure(cause); } return; diff --git a/transport/src/main/java/io/netty/channel/DefaultFileRegion.java b/transport/src/main/java/io/netty/channel/DefaultFileRegion.java index 6a1895666f..cb7c9756d5 100644 --- a/transport/src/main/java/io/netty/channel/DefaultFileRegion.java +++ b/transport/src/main/java/io/netty/channel/DefaultFileRegion.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.AbstractReferenceCounted; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -25,7 +26,7 @@ import java.nio.channels.WritableByteChannel; /** * Default {@link FileRegion} implementation which transfer data from a {@link FileChannel}. */ -public class DefaultFileRegion implements FileRegion { +public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class); @@ -81,7 +82,7 @@ public class DefaultFileRegion implements FileRegion { } @Override - public void close() { + protected void deallocate() { try { file.close(); } catch (IOException e) { diff --git a/transport/src/main/java/io/netty/channel/FileRegion.java b/transport/src/main/java/io/netty/channel/FileRegion.java index f07536fcbe..5201939cbe 100644 --- a/transport/src/main/java/io/netty/channel/FileRegion.java +++ b/transport/src/main/java/io/netty/channel/FileRegion.java @@ -15,6 +15,8 @@ */ package io.netty.channel; +import io.netty.buffer.ReferenceCounted; + import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; @@ -50,7 +52,7 @@ import java.nio.channels.WritableByteChannel; * * Currently, the NIO transport is the only transport that supports {@link FileRegion}. */ -public interface FileRegion { +public interface FileRegion extends ReferenceCounted { /** * Returns the offset in the file where the transfer began. @@ -73,9 +75,4 @@ public interface FileRegion { * byte of the region transferred. */ long transferTo(WritableByteChannel target, long position) throws IOException; - - /** - * Close the {@link FileRegion}. After calling this method accessing the {@link FileRegion} may fail. - */ - void close(); } diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java index fc8502a034..aaf7eb4d2c 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -21,6 +21,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelStateHandlerAdapter; +import io.netty.channel.FileRegion; import io.netty.channel.ServerChannel; import io.netty.util.CharsetUtil; @@ -116,6 +117,25 @@ public interface ChannelGroup extends Set, Comparable { */ ChannelGroupFuture write(Object message); + /** + * Writes the specified {@link FileRegion} to all {@link Channel}s in this + * group. Please note that this operation is asynchronous as + * {@link Channel#sendFile(FileRegion)} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ + ChannelGroupFuture sendFile(FileRegion region); + + /** + * Flush all {@link Channel} in this group. Please note that this operation + * is asynchronous as {@link Channel#flush()} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ + ChannelGroupFuture flush(); + /** * Disconnects all {@link Channel}s in this group from their remote peers. * diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index c21b77a85a..92816b356d 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2013 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -19,6 +19,7 @@ import io.netty.buffer.BufUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.FileRegion; import io.netty.channel.ServerChannel; import java.util.AbstractSet; @@ -221,6 +222,32 @@ public class DefaultChannelGroup extends AbstractSet implements Channel return new DefaultChannelGroupFuture(this, futures); } + @Override + public ChannelGroupFuture sendFile(FileRegion region) { + if (region == null) { + throw new NullPointerException("region"); + } + + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels.values()) { + BufUtil.retain(region); + futures.put(c.id(), c.sendFile(region)); + } + + BufUtil.release(region); + return new DefaultChannelGroupFuture(this, futures); + } + + @Override + public ChannelGroupFuture flush() { + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels.values()) { + futures.put(c.id(), c.flush()); + } + + return new DefaultChannelGroupFuture(this, futures); + } + @Override public int hashCode() { return System.identityHashCode(this); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index a9041d9f00..9ce6701d3c 100755 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -192,14 +192,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } else { writtenBytes += localWrittenBytes; if (writtenBytes >= region.count()) { - region.close(); + region.release(); promise.setSuccess(); return; } } } } catch (Throwable cause) { - region.close(); + region.release(); promise.setFailure(cause); } } @@ -217,7 +217,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } if (writtenBytes < region.count()) { - region.close(); + region.release(); if (!isOpen()) { promise.setFailure(new ClosedChannelException()); } else { diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java index 88bf8abf54..a16fbe1ce5 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java @@ -117,7 +117,7 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { long localWritten = region.transferTo(outChannel, written); if (localWritten == -1) { checkEOF(region, written); - region.close(); + region.release(); promise.setSuccess(); return; } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index a706190423..0071d5643a 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -552,7 +552,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne written += result; if (written >= region.count()) { - region.close(); + region.release(); promise.setSuccess(); return; } @@ -562,14 +562,14 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne region.transferTo(WritableByteChannelAdapter.this, written); } } catch (Throwable cause) { - region.close(); + region.release(); promise.setFailure(cause); } } @Override public void failed(Throwable exc, Object attachment) { - region.close(); + region.release(); promise.setFailure(exc); } });