Let FileRegion extend ReferenceCounted and add ChannelGroup.flush() , ChannelGroup.sendFile(..)

This commit is contained in:
Norman Maurer 2013-02-10 14:25:53 +01:00
parent 4f6d05365a
commit 33c94a98a3
8 changed files with 63 additions and 18 deletions

View File

@ -522,7 +522,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// the first FileRegion to flush so trigger it now! // the first FileRegion to flush so trigger it now!
doFlushFileRegion(region, promise); doFlushFileRegion(region, promise);
} catch (Throwable cause) { } catch (Throwable cause) {
region.close(); region.release();
promise.setFailure(cause); promise.setFailure(cause);
} }
return; return;

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.AbstractReferenceCounted;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -25,7 +26,7 @@ import java.nio.channels.WritableByteChannel;
/** /**
* Default {@link FileRegion} implementation which transfer data from a {@link FileChannel}. * Default {@link FileRegion} implementation which transfer data from a {@link FileChannel}.
*/ */
public class DefaultFileRegion implements FileRegion { public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class);
@ -81,7 +82,7 @@ public class DefaultFileRegion implements FileRegion {
} }
@Override @Override
public void close() { protected void deallocate() {
try { try {
file.close(); file.close();
} catch (IOException e) { } catch (IOException e) {

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ReferenceCounted;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -50,7 +52,7 @@ import java.nio.channels.WritableByteChannel;
* *
* Currently, the NIO transport is the only transport that supports {@link FileRegion}. * Currently, the NIO transport is the only transport that supports {@link FileRegion}.
*/ */
public interface FileRegion { public interface FileRegion extends ReferenceCounted {
/** /**
* Returns the offset in the file where the transfer began. * Returns the offset in the file where the transfer began.
@ -73,9 +75,4 @@ public interface FileRegion {
* byte of the region transferred. * byte of the region transferred.
*/ */
long transferTo(WritableByteChannel target, long position) throws IOException; long transferTo(WritableByteChannel target, long position) throws IOException;
/**
* Close the {@link FileRegion}. After calling this method accessing the {@link FileRegion} may fail.
*/
void close();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012 The Netty Project * Copyright 2013 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, * The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance * version 2.0 (the "License"); you may not use this file except in compliance
@ -21,6 +21,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateHandlerAdapter; import io.netty.channel.ChannelStateHandlerAdapter;
import io.netty.channel.FileRegion;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
@ -116,6 +117,25 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/ */
ChannelGroupFuture write(Object message); ChannelGroupFuture write(Object message);
/**
* Writes the specified {@link FileRegion} to all {@link Channel}s in this
* group. Please note that this operation is asynchronous as
* {@link Channel#sendFile(FileRegion)} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture sendFile(FileRegion region);
/**
* Flush all {@link Channel} in this group. Please note that this operation
* is asynchronous as {@link Channel#flush()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture flush();
/** /**
* Disconnects all {@link Channel}s in this group from their remote peers. * Disconnects all {@link Channel}s in this group from their remote peers.
* *

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012 The Netty Project * Copyright 2013 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, * The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance * version 2.0 (the "License"); you may not use this file except in compliance
@ -19,6 +19,7 @@ import io.netty.buffer.BufUtil;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import java.util.AbstractSet; import java.util.AbstractSet;
@ -221,6 +222,32 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures);
} }
@Override
public ChannelGroupFuture sendFile(FileRegion region) {
if (region == null) {
throw new NullPointerException("region");
}
Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
BufUtil.retain(region);
futures.put(c.id(), c.sendFile(region));
}
BufUtil.release(region);
return new DefaultChannelGroupFuture(this, futures);
}
@Override
public ChannelGroupFuture flush() {
Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
futures.put(c.id(), c.flush());
}
return new DefaultChannelGroupFuture(this, futures);
}
@Override @Override
public int hashCode() { public int hashCode() {
return System.identityHashCode(this); return System.identityHashCode(this);

View File

@ -192,14 +192,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} else { } else {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
if (writtenBytes >= region.count()) { if (writtenBytes >= region.count()) {
region.close(); region.release();
promise.setSuccess(); promise.setSuccess();
return; return;
} }
} }
} }
} catch (Throwable cause) { } catch (Throwable cause) {
region.close(); region.release();
promise.setFailure(cause); promise.setFailure(cause);
} }
} }
@ -217,7 +217,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
if (writtenBytes < region.count()) { if (writtenBytes < region.count()) {
region.close(); region.release();
if (!isOpen()) { if (!isOpen()) {
promise.setFailure(new ClosedChannelException()); promise.setFailure(new ClosedChannelException());
} else { } else {

View File

@ -117,7 +117,7 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
long localWritten = region.transferTo(outChannel, written); long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) { if (localWritten == -1) {
checkEOF(region, written); checkEOF(region, written);
region.close(); region.release();
promise.setSuccess(); promise.setSuccess();
return; return;
} }

View File

@ -552,7 +552,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
written += result; written += result;
if (written >= region.count()) { if (written >= region.count()) {
region.close(); region.release();
promise.setSuccess(); promise.setSuccess();
return; return;
} }
@ -562,14 +562,14 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
region.transferTo(WritableByteChannelAdapter.this, written); region.transferTo(WritableByteChannelAdapter.this, written);
} }
} catch (Throwable cause) { } catch (Throwable cause) {
region.close(); region.release();
promise.setFailure(cause); promise.setFailure(cause);
} }
} }
@Override @Override
public void failed(Throwable exc, Object attachment) { public void failed(Throwable exc, Object attachment) {
region.close(); region.release();
promise.setFailure(exc); promise.setFailure(exc);
} }
}); });