Related issue: NETTY-293 sendfile() support for NIO TCP transport
Support for progress notification of time-consuming I/O operations * Added FileRegion * Added ChannelFutureProgressListener * Added ChannelFuture.setProgress() * Performance seems to stay same as before fortunately
This commit is contained in:
parent
b97648f95e
commit
59052be709
@ -239,6 +239,16 @@ public interface ChannelFuture {
|
|||||||
*/
|
*/
|
||||||
boolean setFailure(Throwable cause);
|
boolean setFailure(Throwable cause);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notifies the progress of the operation to the listeners that implements
|
||||||
|
* {@link ChannelFutureProgressListener}. Please note that this method will
|
||||||
|
* not do anything and return {@code false} if this future is complete
|
||||||
|
* already.
|
||||||
|
*
|
||||||
|
* @return {@code true} if and only if notification was made.
|
||||||
|
*/
|
||||||
|
boolean setProgress(long amount, long current, long total);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the specified listener to this future. The
|
* Adds the specified listener to this future. The
|
||||||
* specified listener is notified when this future is
|
* specified listener is notified when this future is
|
||||||
|
@ -0,0 +1,54 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2009 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listens to the progress of a time-consuming I/O operation such as
|
||||||
|
* {@link FileRegion} transfer. If this listener is added to a
|
||||||
|
* {@link ChannelFuture} of an I/O operation that supports progress
|
||||||
|
* notification, the listener's {@link #operationProgressed(ChannelFuture, long, long, long)}
|
||||||
|
* method will be called back by an I/O thread. If the operation does not
|
||||||
|
* support progress notification, {@link #operationProgressed(ChannelFuture, long, long, long)}
|
||||||
|
* will not be invoked. Like a usual {@link ChannelFutureListener} that this
|
||||||
|
* interface extends, {@link #operationComplete(ChannelFuture)} will be called
|
||||||
|
* when the future is marked as complete.
|
||||||
|
*
|
||||||
|
* <h3>Return the control to the caller quickly</h3>
|
||||||
|
*
|
||||||
|
* {@link #operationProgressed(ChannelFuture, long, long, long)} and
|
||||||
|
* {@link #operationComplete(ChannelFuture)} is directly called by an I/O
|
||||||
|
* thread. Therefore, performing a time consuming task or a blocking operation
|
||||||
|
* in the handler method can cause an unexpected pause during I/O. If you need
|
||||||
|
* to perform a blocking operation on I/O completion, try to execute the
|
||||||
|
* operation in a different thread using a thread pool.
|
||||||
|
*
|
||||||
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||||
|
*
|
||||||
|
* @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
|
||||||
|
*/
|
||||||
|
public interface ChannelFutureProgressListener extends ChannelFutureListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked when the I/O operation associated with the {@link ChannelFuture}
|
||||||
|
* has been progressed.
|
||||||
|
*
|
||||||
|
* @param future the source {@link ChannelFuture} which called this
|
||||||
|
* callback
|
||||||
|
*/
|
||||||
|
void operationProgressed(ChannelFuture future, long amount, long current, long total) throws Exception;
|
||||||
|
}
|
@ -103,6 +103,10 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean setProgress(long amount, long current, long total) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean setFailure(Throwable cause) {
|
public boolean setFailure(Throwable cause) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package org.jboss.netty.channel;
|
|||||||
import static java.util.concurrent.TimeUnit.*;
|
import static java.util.concurrent.TimeUnit.*;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -73,6 +74,7 @@ public class DefaultChannelFuture implements ChannelFuture {
|
|||||||
|
|
||||||
private ChannelFutureListener firstListener;
|
private ChannelFutureListener firstListener;
|
||||||
private List<ChannelFutureListener> otherListeners;
|
private List<ChannelFutureListener> otherListeners;
|
||||||
|
private Collection<ChannelFutureProgressListener> progressListeners;
|
||||||
private boolean done;
|
private boolean done;
|
||||||
private Throwable cause;
|
private Throwable cause;
|
||||||
private int waiters;
|
private int waiters;
|
||||||
@ -132,6 +134,13 @@ public class DefaultChannelFuture implements ChannelFuture {
|
|||||||
}
|
}
|
||||||
otherListeners.add(listener);
|
otherListeners.add(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (listener instanceof ChannelFutureProgressListener) {
|
||||||
|
if (progressListeners == null) {
|
||||||
|
progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
|
||||||
|
}
|
||||||
|
progressListeners.add((ChannelFutureProgressListener) listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,6 +165,10 @@ public class DefaultChannelFuture implements ChannelFuture {
|
|||||||
} else if (otherListeners != null) {
|
} else if (otherListeners != null) {
|
||||||
otherListeners.remove(listener);
|
otherListeners.remove(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (listener instanceof ChannelFutureProgressListener) {
|
||||||
|
progressListeners.remove(listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -372,4 +385,41 @@ public class DefaultChannelFuture implements ChannelFuture {
|
|||||||
ChannelFutureListener.class.getSimpleName() + ".", t);
|
ChannelFutureListener.class.getSimpleName() + ".", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean setProgress(long amount, long current, long total) {
|
||||||
|
ChannelFutureProgressListener[] plisteners;
|
||||||
|
synchronized (this) {
|
||||||
|
// Do not generate progress event after completion.
|
||||||
|
if (done) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (progressListeners.isEmpty()) {
|
||||||
|
// Nothing to notify - no need to create an empty array.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
plisteners = progressListeners.toArray(
|
||||||
|
new ChannelFutureProgressListener[progressListeners.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ChannelFutureProgressListener pl: plisteners) {
|
||||||
|
notifyProgressListener(pl, amount, current, total);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyProgressListener(
|
||||||
|
ChannelFutureProgressListener l,
|
||||||
|
long amount, long current, long total) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
l.operationProgressed(this, amount, current, total);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn(
|
||||||
|
"An exception was thrown by " +
|
||||||
|
ChannelFutureProgressListener.class.getSimpleName() + ".", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
44
src/main/java/org/jboss/netty/channel/FileRegion.java
Normal file
44
src/main/java/org/jboss/netty/channel/FileRegion.java
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2010 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel;
|
||||||
|
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A region of a file that is sent via a {@link Channel} which supports
|
||||||
|
* zero-copy file transfer.
|
||||||
|
*
|
||||||
|
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||||
|
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||||
|
* @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
|
||||||
|
*/
|
||||||
|
public interface FileRegion {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link FileChannel} from which data will be fetched.
|
||||||
|
*/
|
||||||
|
FileChannel getFileChannel();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the offset in the file where the transfer began.
|
||||||
|
*/
|
||||||
|
long getStartOffset();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the offset in the file where the transfer will end.
|
||||||
|
*/
|
||||||
|
long getEndOffset();
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user