diff --git a/src/main/java/org/jboss/netty/channel/ChannelFuture.java b/src/main/java/org/jboss/netty/channel/ChannelFuture.java index f8d240a19e..79d993d4e2 100644 --- a/src/main/java/org/jboss/netty/channel/ChannelFuture.java +++ b/src/main/java/org/jboss/netty/channel/ChannelFuture.java @@ -239,6 +239,16 @@ public interface ChannelFuture { */ boolean setFailure(Throwable cause); + /** + * Notifies the progress of the operation to the listeners that implements + * {@link ChannelFutureProgressListener}. Please note that this method will + * not do anything and return {@code false} if this future is complete + * already. + * + * @return {@code true} if and only if notification was made. + */ + boolean setProgress(long amount, long current, long total); + /** * Adds the specified listener to this future. The * specified listener is notified when this future is diff --git a/src/main/java/org/jboss/netty/channel/ChannelFutureProgressListener.java b/src/main/java/org/jboss/netty/channel/ChannelFutureProgressListener.java new file mode 100644 index 0000000000..7fcbfc62d0 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/ChannelFutureProgressListener.java @@ -0,0 +1,54 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel; + + +/** + * Listens to the progress of a time-consuming I/O operation such as + * {@link FileRegion} transfer. If this listener is added to a + * {@link ChannelFuture} of an I/O operation that supports progress + * notification, the listener's {@link #operationProgressed(ChannelFuture, long, long, long)} + * method will be called back by an I/O thread. If the operation does not + * support progress notification, {@link #operationProgressed(ChannelFuture, long, long, long)} + * will not be invoked. Like a usual {@link ChannelFutureListener} that this + * interface extends, {@link #operationComplete(ChannelFuture)} will be called + * when the future is marked as complete. + * + *

Return the control to the caller quickly

+ * + * {@link #operationProgressed(ChannelFuture, long, long, long)} and + * {@link #operationComplete(ChannelFuture)} is directly called by an I/O + * thread. Therefore, performing a time consuming task or a blocking operation + * in the handler method can cause an unexpected pause during I/O. If you need + * to perform a blocking operation on I/O completion, try to execute the + * operation in a different thread using a thread pool. + * + * @author The Netty Project + * @author Trustin Lee + * + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + */ +public interface ChannelFutureProgressListener extends ChannelFutureListener { + + /** + * Invoked when the I/O operation associated with the {@link ChannelFuture} + * has been progressed. + * + * @param future the source {@link ChannelFuture} which called this + * callback + */ + void operationProgressed(ChannelFuture future, long amount, long current, long total) throws Exception; +} diff --git a/src/main/java/org/jboss/netty/channel/CompleteChannelFuture.java b/src/main/java/org/jboss/netty/channel/CompleteChannelFuture.java index 001a84fca8..75a2c34d72 100644 --- a/src/main/java/org/jboss/netty/channel/CompleteChannelFuture.java +++ b/src/main/java/org/jboss/netty/channel/CompleteChannelFuture.java @@ -103,6 +103,10 @@ public abstract class CompleteChannelFuture implements ChannelFuture { return true; } + public boolean setProgress(long amount, long current, long total) { + return false; + } + public boolean setFailure(Throwable cause) { return false; } diff --git a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java index 2d2302ffe7..4984cfe065 100644 --- a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java +++ b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java @@ -18,6 +18,7 @@ package org.jboss.netty.channel; import static java.util.concurrent.TimeUnit.*; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; @@ -73,6 +74,7 @@ public class DefaultChannelFuture implements ChannelFuture { private ChannelFutureListener firstListener; private List otherListeners; + private Collection progressListeners; private boolean done; private Throwable cause; private int waiters; @@ -132,6 +134,13 @@ public class DefaultChannelFuture implements ChannelFuture { } otherListeners.add(listener); } + + if (listener instanceof ChannelFutureProgressListener) { + if (progressListeners == null) { + progressListeners = new ArrayList(1); + } + progressListeners.add((ChannelFutureProgressListener) listener); + } } } @@ -156,6 +165,10 @@ public class DefaultChannelFuture implements ChannelFuture { } else if (otherListeners != null) { otherListeners.remove(listener); } + + if (listener instanceof ChannelFutureProgressListener) { + progressListeners.remove(listener); + } } } } @@ -372,4 +385,41 @@ public class DefaultChannelFuture implements ChannelFuture { ChannelFutureListener.class.getSimpleName() + ".", t); } } + + public boolean setProgress(long amount, long current, long total) { + ChannelFutureProgressListener[] plisteners; + synchronized (this) { + // Do not generate progress event after completion. + if (done) { + return false; + } + + if (progressListeners.isEmpty()) { + // Nothing to notify - no need to create an empty array. + return true; + } + + plisteners = progressListeners.toArray( + new ChannelFutureProgressListener[progressListeners.size()]); + } + + for (ChannelFutureProgressListener pl: plisteners) { + notifyProgressListener(pl, amount, current, total); + } + + return true; + } + + private void notifyProgressListener( + ChannelFutureProgressListener l, + long amount, long current, long total) { + + try { + l.operationProgressed(this, amount, current, total); + } catch (Throwable t) { + logger.warn( + "An exception was thrown by " + + ChannelFutureProgressListener.class.getSimpleName() + ".", t); + } + } } diff --git a/src/main/java/org/jboss/netty/channel/FileRegion.java b/src/main/java/org/jboss/netty/channel/FileRegion.java new file mode 100644 index 0000000000..de503f7dbb --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/FileRegion.java @@ -0,0 +1,44 @@ +/* + * Copyright 2010 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel; + +import java.nio.channels.FileChannel; + +/** + * A region of a file that is sent via a {@link Channel} which supports + * zero-copy file transfer. + * + * @author The Netty Project + * @author Trustin Lee + * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ + */ +public interface FileRegion { + + /** + * Returns the {@link FileChannel} from which data will be fetched. + */ + FileChannel getFileChannel(); + + /** + * Returns the offset in the file where the transfer began. + */ + long getStartOffset(); + + /** + * Returns the offset in the file where the transfer will end. + */ + long getEndOffset(); +}