Replace TransferFuture(Listener) with (Channel)ProgressiveFuture(Listener)

- Now works without the transport package
- Renamed TransferFuture to ProgressiveFuture and ChannelProgressiveFuture / same for promises
- ProgressiveFutureListener now extends GenericProgressiveFutureListener and GenericFutureListener (add/removeTransferListener*() were removed)
- Renamed DefaultEventListeners to DefaultFutureListeners and only accept GenericFutureListeners
- Various clean-up
This commit is contained in:
Trustin Lee 2013-04-15 15:26:20 +09:00
parent 391c011764
commit e69033a4c3
23 changed files with 761 additions and 473 deletions

View File

@ -42,6 +42,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
return new DefaultPromise<V>(this);
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise(long total) {
return new DefaultProgressivePromise<V>(this, total);
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return new SucceededFuture<V>(this, result);

View File

@ -16,32 +16,44 @@
package io.netty.util.concurrent;
import java.util.Arrays;
import java.util.EventListener;
public final class DefaultEventListeners {
public final class DefaultFutureListeners {
private EventListener[] listeners;
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
public DefaultEventListeners(EventListener first, EventListener second) {
listeners = new EventListener[2];
@SuppressWarnings("unchecked")
public DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(EventListener l) {
EventListener[] listeners = this.listeners;
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(EventListener l) {
final EventListener[] listeners = this.listeners;
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
@ -51,16 +63,24 @@ public final class DefaultEventListeners {
}
listeners[-- size] = null;
this.size = size;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
public EventListener[] listeners() {
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
public int size() {
return size;
}
public int progressiveSize() {
return progressiveSize;
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements ProgressivePromise<V> {
private final long total;
private volatile long progress;
/**
* Creates a new instance.
*
* It is preferable to use {@link EventExecutor#newProgressivePromise(long)} to create a new progressive promise
*
* @param executor
* the {@link EventExecutor} which is used to notify the promise when it progresses or it is complete
*/
public DefaultProgressivePromise(EventExecutor executor, long total) {
super(executor);
validateTotal(total);
this.total = total;
}
protected DefaultProgressivePromise(long total) {
/* only for subclasses */
validateTotal(total);
this.total = total;
}
private static void validateTotal(long total) {
if (total < 0) {
throw new IllegalArgumentException("total: " + total + " (expected: >= 0)");
}
}
@Override
public long progress() {
return progress;
}
@Override
public long total() {
return total;
}
@Override
public ProgressivePromise<V> setProgress(long progress) {
if (progress < 0 || progress > total) {
throw new IllegalArgumentException(
"progress: " + progress + " (expected: 0 <= progress <= " + total + ')');
}
if (isDone()) {
throw new IllegalStateException("complete already");
}
long oldProgress = this.progress;
this.progress = progress;
notifyProgressiveListeners(progress - oldProgress);
return this;
}
@Override
public boolean tryProgress(long progress) {
if (progress < 0 || progress > total || isDone()) {
return false;
}
this.progress = progress;
notifyProgressiveListeners(progress);
return true;
}
@Override
public ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
super.addListener(listener);
return this;
}
@Override
public ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
super.removeListeners(listeners);
return this;
}
@Override
public ProgressivePromise<V> sync() throws InterruptedException {
super.sync();
return this;
}
@Override
public ProgressivePromise<V> syncUninterruptibly() {
super.syncUninterruptibly();
return this;
}
@Override
public ProgressivePromise<V> await() throws InterruptedException {
super.await();
return this;
}
@Override
public ProgressivePromise<V> awaitUninterruptibly() {
super.awaitUninterruptibly();
return this;
}
@Override
public ProgressivePromise<V> setSuccess(V result) {
super.setSuccess(result);
return this;
}
@Override
public ProgressivePromise<V> setFailure(Throwable cause) {
super.setFailure(cause);
return this;
}
}

View File

@ -20,7 +20,6 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.EventListener;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
@ -42,7 +41,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private final EventExecutor executor;
private volatile Object result;
private Object listeners; // Can be ChannelFutureListener or DefaultChannelPromiseListeners
private Object listeners; // Can be ChannelFutureListener or DefaultFutureListeners
/**
* The the most significant 24 bits of this field represents the number of waiter threads waiting for this promise
@ -99,7 +98,6 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@Override
@SuppressWarnings("unchecked")
public Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
@ -115,11 +113,13 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
if (listeners == null) {
listeners = listener;
} else {
if (listeners instanceof DefaultEventListeners) {
((DefaultEventListeners) listeners).add(listener);
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultEventListeners(
(EventListener) listeners, listener);
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> firstListener =
(GenericFutureListener<? extends Future<V>>) listeners;
listeners = new DefaultFutureListeners(firstListener, listener);
}
}
return this;
@ -146,7 +146,6 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@Override
@SuppressWarnings("unchecked")
public Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
@ -158,8 +157,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
synchronized (this) {
if (!isDone()) {
if (listeners instanceof DefaultEventListeners) {
((DefaultEventListeners) listeners).remove(listener);
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
@ -436,8 +435,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
return true;
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("unchecked")
public V getNow() {
Object result = this.result;
if (result instanceof CauseHolder || result == SUCCESS) {
@ -462,7 +461,6 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
state -= 0x10000000000L;
}
@SuppressWarnings("unchecked")
private void notifyListeners() {
// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.
@ -470,53 +468,60 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener()
Object listeners = this.listeners;
if (listeners == null) {
return;
}
this.listeners = null;
EventExecutor executor = executor();
if (executor.inEventLoop()) {
if (listeners instanceof DefaultEventListeners) {
notifyListeners0(this, (DefaultEventListeners) listeners);
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0(this, (DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
notifyListener0(this, l);
}
listeners = null;
} else {
final Object listeners = this.listeners;
this.listeners = null;
try {
executor.execute(new Runnable() {
@Override
public void run() {
if (listeners instanceof DefaultEventListeners) {
notifyListeners0(DefaultPromise.this, (DefaultEventListeners) listeners);
} else {
notifyListener0(
DefaultPromise.this, (GenericFutureListener<? extends Future<V>>) listeners);
if (listeners instanceof DefaultFutureListeners) {
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyListeners0(DefaultPromise.this, dfl);
}
}
});
});
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyListener0(DefaultPromise.this, l);
}
});
}
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop terminated?", t);
}
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListeners0(final Future<?> future,
DefaultEventListeners listeners) {
final EventListener[] a = listeners.listeners();
private static void notifyListeners0(Future<?> future, DefaultFutureListeners listeners) {
final GenericFutureListener<?>[] a = listeners.listeners();
final int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(future, (GenericFutureListener) a[i]);
notifyListener0(future, a[i]);
}
}
@SuppressWarnings("unchecked")
protected static void notifyListener(
final EventExecutor eventExecutor, final Future<?> future,
final GenericFutureListener<? extends Future<?>> l) {
final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) {
if (eventExecutor.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
@ -533,11 +538,11 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
try {
eventExecutor.execute(new Runnable() {
@Override
public void run() {
notifyListener(eventExecutor, future, l);
}
});
@Override
public void run() {
notifyListener(eventExecutor, future, l);
}
});
} catch (Throwable t) {
logger.error("Failed to notify a listener. Event loop terminated?", t);
}
@ -549,9 +554,118 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by " +
GenericFutureListener.class.getSimpleName() + '.', t);
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
/**
* Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
* {@code null}.
*/
private synchronized Object progressiveListeners() {
Object listeners = this.listeners;
if (listeners == null) {
// No listeners added
return null;
}
if (listeners instanceof DefaultFutureListeners) {
// Copy DefaultFutureListeners into an array of listeners.
DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
int progressiveSize = dfl.progressiveSize();
switch (progressiveSize) {
case 0:
return null;
case 1:
for (GenericFutureListener<?> l: dfl.listeners()) {
if (l instanceof GenericProgressiveFutureListener) {
return l;
}
}
return null;
}
GenericFutureListener<?>[] array = dfl.listeners();
GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
for (int i = 0, j = 0; j < progressiveSize; i ++) {
GenericFutureListener<?> l = array[i];
if (l instanceof GenericProgressiveFutureListener) {
copy[j ++] = (GenericProgressiveFutureListener<?>) l;
}
}
return copy;
} else if (listeners instanceof GenericProgressiveFutureListener) {
return listeners;
} else {
// Only one listener was added and it's not a progressive listener.
return null;
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
void notifyProgressiveListeners(final long delta) {
final Object listeners = progressiveListeners();
if (listeners == null) {
return;
}
final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
EventExecutor executor = executor();
if (executor.inEventLoop()) {
if (listeners instanceof GenericProgressiveFutureListener[]) {
notifyProgressiveListeners0(self, (GenericProgressiveFutureListener<?>[]) listeners, delta);
} else {
notifyProgressiveListener0(
self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, delta);
}
} else {
try {
if (listeners instanceof GenericProgressiveFutureListener[]) {
final GenericProgressiveFutureListener<?>[] array =
(GenericProgressiveFutureListener<?>[]) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyProgressiveListeners0(self, array, delta);
}
});
} else {
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyProgressiveListener0(self, l, delta);
}
});
}
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop terminated?", t);
}
}
}
private static void notifyProgressiveListeners0(
ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long delta) {
for (GenericProgressiveFutureListener<?> l: listeners) {
if (l == null) {
break;
}
notifyProgressiveListener0(future, l, delta);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyProgressiveListener0(
ProgressiveFuture future, GenericProgressiveFutureListener l, long delta) {
try {
l.operationProgressed(future, delta);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
}
}
}

View File

@ -51,6 +51,11 @@ public interface EventExecutor extends EventExecutorGroup {
*/
<V> Promise<V> newPromise();
/**
* Create a new {@link ProgressivePromise}.
*/
<V> ProgressivePromise<V> newProgressivePromise(long total);
/**
* Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also

View File

@ -0,0 +1,21 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
void operationProgressed(F future, long delta) throws Exception;
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* A {@link Future} which is used to indicate the progress of an operation.
*/
public interface ProgressiveFuture<V> extends Future<V> {
/**
* Returns the current progress of the operation as a positive long integer.
*/
long progress();
/**
* Returns the maximum progress of the operation that signifies the end of operation.
*/
long total();
@Override
ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<V>> listener);
@Override
ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
@Override
ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
@Override
ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
@Override
ProgressiveFuture<V> sync() throws InterruptedException;
@Override
ProgressiveFuture<V> syncUninterruptibly();
@Override
ProgressiveFuture<V> await() throws InterruptedException;
@Override
ProgressiveFuture<V> awaitUninterruptibly();
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* Special {@link ProgressiveFuture} which is writable.
*/
public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V> {
/**
* Sets the current progress of the operation and notifies the listeners that implement
* {@link GenericProgressiveFutureListener}.
*/
ProgressivePromise<V> setProgress(long progress);
/**
* Tries to set the current progress of the operation and notifies the listeners that implement
* {@link GenericProgressiveFutureListener}. If the operation is already complete or the progress is out of range,
* this method does nothing but returning {@code false}.
*/
boolean tryProgress(long progress);
@Override
ProgressivePromise<V> setSuccess(V result);
@Override
ProgressivePromise<V> setFailure(Throwable cause);
@Override
ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<V>> listener);
@Override
ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
@Override
ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
@Override
ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
@Override
ProgressivePromise<V> await() throws InterruptedException;
@Override
ProgressivePromise<V> awaitUninterruptibly();
@Override
ProgressivePromise<V> sync() throws InterruptedException;
@Override
ProgressivePromise<V> syncUninterruptibly();
}

View File

@ -22,22 +22,21 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelTransferPromise;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FileRegion;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.TransferFutureListener;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.io.File;
import java.io.FileInputStream;
@ -107,19 +106,19 @@ public class FileServer {
}
ctx.write(file + " " + file.length() + '\n');
FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length());
ChannelTransferPromise promise = ctx.newTransferPromise(region.count());
promise.addTransferListener(new TransferFutureListener() {
ChannelProgressivePromise promise = ctx.newProgressivePromise(region.count());
promise.addListener(new ChannelProgressiveFutureListener() {
@Override
public void onTransferred(long amount, long total) throws Exception {
System.out.println("amount :" + amount + " total :" + total);
}
});
promise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
System.out.println("File sent OK");
public void operationProgressed(ChannelProgressiveFuture f, long delta) throws Exception {
System.err.println("progress: " + f.progress() + " / " + f.total() + " (+" + delta + ')');
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.err.println("file transfer complete");
}
});
ctx.sendFile(region, promise);
ctx.write("\n");
} else {

View File

@ -283,7 +283,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
@SuppressWarnings("unchecked")
public <T> MessageBuf<T> outboundMessageBuffer() {
return (MessageBuf<T>) pipeline.outboundMessageBuffer();
return pipeline.outboundMessageBuffer();
}
@Override
@ -307,8 +307,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public ChannelTransferPromise newTransferPromise(long total) {
return new DefaultChannelTransferPromise(this, total);
public ChannelProgressivePromise newProgressivePromise(long total) {
return new DefaultChannelProgressivePromise(this, total);
}
@Override

View File

@ -70,4 +70,6 @@ public interface ChannelFutureListener extends GenericFutureListener<ChannelFutu
}
}
};
// Just a type alias
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ProgressiveFuture;
/**
* An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress
*/
public interface ChannelProgressiveFuture extends ChannelFuture, ProgressiveFuture<Void> {
@Override
ChannelProgressiveFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
@Override
ChannelProgressiveFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
@Override
ChannelProgressiveFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
@Override
ChannelProgressiveFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
@Override
ChannelProgressiveFuture sync() throws InterruptedException;
@Override
ChannelProgressiveFuture syncUninterruptibly();
@Override
ChannelProgressiveFuture await() throws InterruptedException;
@Override
ChannelProgressiveFuture awaitUninterruptibly();
}

View File

@ -15,18 +15,14 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.GenericProgressiveFutureListener;
import java.util.EventListener;
/**
* An {@link EventListener} listener which will be called once the sending task associated with future is
* being transferred.
*/
public interface TransferFutureListener extends EventListener {
/**
* Called once the bytes is being transferred.
*
* @param amount how many bytes has been transferred
* @param total how many bytes need to be transfer
* */
void onTransferred(long amount, long total) throws Exception;
public interface ChannelProgressiveFutureListener extends GenericProgressiveFutureListener<ChannelProgressiveFuture> {
// Just a type alias
}

View File

@ -17,50 +17,46 @@ package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ProgressivePromise;
/**
* Special {@link ChannelPromise} which will be notified once the associated bytes is transferring.
*/
public interface ChannelTransferPromise extends ChannelTransferFuture, ChannelPromise {
/**
* Increment the current transferred bytes amount
* */
ChannelTransferPromise incrementTransferredBytes(long amount);
public interface ChannelProgressivePromise extends ProgressivePromise<Void>, ChannelProgressiveFuture, ChannelPromise {
@Override
ChannelTransferPromise addTransferListener(TransferFutureListener listener);
ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<Void>> listener);
@Override
ChannelTransferPromise addTransferListeners(TransferFutureListener... listeners);
ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
@Override
ChannelTransferPromise removeTransferListener(TransferFutureListener listener);
ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<Void>> listener);
@Override
ChannelTransferPromise removeTransferListeners(TransferFutureListener... listeners);
ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
@Override
ChannelTransferPromise addListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelProgressivePromise sync() throws InterruptedException;
@Override
ChannelTransferPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelProgressivePromise syncUninterruptibly();
@Override
ChannelTransferPromise removeListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelProgressivePromise await() throws InterruptedException;
@Override
ChannelTransferPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelProgressivePromise awaitUninterruptibly();
@Override
ChannelTransferPromise sync() throws InterruptedException;
ChannelProgressivePromise setSuccess(Void result);
@Override
ChannelTransferPromise syncUninterruptibly();
ChannelProgressivePromise setSuccess();
@Override
ChannelTransferPromise await() throws InterruptedException;
ChannelProgressivePromise setFailure(Throwable cause);
@Override
ChannelTransferPromise awaitUninterruptibly();
ChannelProgressivePromise setProgress(long progress);
}

View File

@ -41,9 +41,9 @@ interface ChannelPropertyAccess {
ChannelPromise newPromise();
/**
* Return an new {@link ChannelTransferPromise}
* Return an new {@link ChannelProgressivePromise}
*/
ChannelTransferPromise newTransferPromise(long total);
ChannelProgressivePromise newProgressivePromise(long total);
/**
* Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()}

View File

@ -1,81 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress
*/
public interface ChannelTransferFuture extends ChannelFuture {
/**
* Adds the specified listener to this future. The
* specified listener is notified when this bytes associated with this if being transferred.
* If this future is already completed, the specified listener is notified immediately.
*/
ChannelTransferFuture addTransferListener(TransferFutureListener listener);
/**
* Adds the specified listeners to this future. The
* specified listeners is notified when this bytes associated with this if being transferred.
* If this future is already completed, the specified listeners is notified immediately.
*/
ChannelTransferFuture addTransferListeners(TransferFutureListener ... listeners);
/**
* Removes the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
ChannelTransferFuture removeTransferListener(TransferFutureListener listener);
/**
* Removes the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
ChannelTransferFuture removeTransferListeners(TransferFutureListener ... listeners);
@Override
ChannelTransferFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
@Override
ChannelTransferFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
@Override
ChannelTransferFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
@Override
ChannelTransferFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
@Override
ChannelTransferFuture sync() throws InterruptedException;
@Override
ChannelTransferFuture syncUninterruptibly();
@Override
ChannelTransferFuture await() throws InterruptedException;
@Override
ChannelTransferFuture awaitUninterruptibly();
}

View File

@ -1558,8 +1558,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
@Override
public ChannelTransferPromise newTransferPromise(long total) {
return new DefaultChannelTransferPromise(channel(), executor(), total);
public ChannelProgressivePromise newProgressivePromise(long total) {
return new DefaultChannelProgressivePromise(channel(), executor(), total);
}
@Override

View File

@ -0,0 +1,171 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.channel.ChannelFlushPromiseNotifier.FlushCheckpoint;
import io.netty.util.concurrent.DefaultProgressivePromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* The default {@link ChannelProgressivePromise} implementation. It is recommended to use
* {@link Channel#newProgressivePromise(long)} to create a new {@link ChannelProgressivePromise} rather than calling the
* constructor explicitly.
*/
public class DefaultChannelProgressivePromise
extends DefaultProgressivePromise<Void> implements ChannelProgressivePromise, FlushCheckpoint {
private final Channel channel;
/**
* Creates a new instance.
*
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelProgressivePromise(Channel channel, long total) {
super(total);
this.channel = channel;
}
/**
* Creates a new instance.
*
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor, long total) {
super(executor, total);
this.channel = channel;
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
@Override
public Channel channel() {
return channel;
}
@Override
public ChannelProgressivePromise setSuccess() {
return setSuccess(null);
}
@Override
public ChannelProgressivePromise setSuccess(Void result) {
super.setSuccess(result);
return this;
}
@Override
public boolean trySuccess() {
return trySuccess(null);
}
@Override
public ChannelProgressivePromise setFailure(Throwable cause) {
super.setFailure(cause);
return this;
}
@Override
public ChannelProgressivePromise setProgress(long progress) {
super.setProgress(progress);
return this;
}
@Override
public ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
super.removeListeners(listeners);
return this;
}
@Override
public ChannelProgressivePromise sync() throws InterruptedException {
super.sync();
return this;
}
@Override
public ChannelProgressivePromise syncUninterruptibly() {
super.syncUninterruptibly();
return this;
}
@Override
public ChannelProgressivePromise await() throws InterruptedException {
super.await();
return this;
}
@Override
public ChannelProgressivePromise awaitUninterruptibly() {
super.awaitUninterruptibly();
return this;
}
@Override
public long flushCheckpoint() {
return state & 0x000000FFFFFFFFFFL;
}
@Override
public void flushCheckpoint(long checkpoint) {
if ((checkpoint & 0xFFFFFF0000000000L) != 0) {
throw new IllegalStateException("flushCheckpoint overflow");
}
state = state & 0xFFFFFF0000000000L | checkpoint;
}
@Override
public ChannelProgressivePromise promise() {
return this;
}
@Override
protected void checkDeadLock() {
if (channel().isRegistered()) {
super.checkDeadLock();
}
}
}

View File

@ -81,11 +81,6 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
return trySuccess(null);
}
@Override
public boolean trySuccess(Void result) {
return super.trySuccess(result);
}
@Override
public ChannelPromise setFailure(Throwable cause) {
super.setFailure(cause);

View File

@ -1,276 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.DefaultEventListeners;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.EventListener;
/**
* The default {@link ChannelTransferPromise} implementation. It is recommended to use
* {@link Channel#newTransferPromise(long)} to create a new {@link ChannelTransferPromise} rather than calling the
* constructor explicitly.
*/
public class DefaultChannelTransferPromise extends DefaultChannelPromise implements ChannelTransferPromise {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultChannelTransferPromise.class);
private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> TRANSFER_LISTENER_STACK_DEPTH = new ThreadLocal<Integer>();
private final long total;
private long amount;
private Object transferListeners; //can be TransferFutureListener or DefaultTransferFutureListeners;
public DefaultChannelTransferPromise(Channel channel, long total) {
super(channel);
this.total = total;
}
public DefaultChannelTransferPromise(Channel channel, EventExecutor executor, long total) {
super(channel, executor);
this.total = total;
}
@Override
public ChannelTransferPromise incrementTransferredBytes(long amount) {
if (amount < 0) {
throw new IllegalArgumentException("amount must be >= 0");
}
long sum;
synchronized (this) {
this.amount += amount;
sum = this.amount;
}
notifyTransferListeners(sum);
return this;
}
@Override
public ChannelTransferPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
super.removeListeners(listeners);
return this;
}
@Override
public ChannelTransferPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelTransferPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ChannelTransferPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelTransferPromise await() throws InterruptedException {
super.await();
return this;
}
@Override
public ChannelTransferPromise awaitUninterruptibly() {
super.awaitUninterruptibly();
return this;
}
@Override
public ChannelTransferPromise sync() throws InterruptedException {
super.sync();
return this;
}
@Override
public ChannelTransferPromise syncUninterruptibly() {
super.syncUninterruptibly();
return this;
}
@Override
@SuppressWarnings("unchecked")
public ChannelTransferPromise addTransferListener(TransferFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
notifyTransferListener(executor(), amount, total, listener);
return this;
}
synchronized (this) {
if (!isDone()) {
if (transferListeners == null) {
transferListeners = listener;
} else {
if (transferListeners instanceof DefaultEventListeners) {
((DefaultEventListeners) transferListeners).add(listener);
} else {
transferListeners = new DefaultEventListeners(
(EventListener) transferListeners, listener);
}
}
return this;
}
}
notifyTransferListener(executor(), amount, total, listener);
return this;
}
@Override
@SuppressWarnings("unchecked")
public ChannelTransferPromise addTransferListeners(TransferFutureListener... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (TransferFutureListener l : listeners) {
if (l == null) {
break;
}
addTransferListener(l);
}
return this;
}
@Override
@SuppressWarnings("unchecked")
public ChannelTransferPromise removeTransferListener(TransferFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
return this;
}
if (transferListeners == null) {
return this;
}
synchronized (this) {
if (!isDone()) {
if (transferListeners instanceof DefaultEventListeners) {
((DefaultEventListeners) transferListeners).remove(listener);
} else if (transferListeners == listener) {
transferListeners = null;
}
}
}
return this;
}
@Override
@SuppressWarnings("unchecked")
public ChannelTransferPromise removeTransferListeners(TransferFutureListener... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (TransferFutureListener l : listeners) {
if (l == null) {
break;
}
removeTransferListener(l);
}
return this;
}
protected static void notifyTransferListener(final EventExecutor eventExecutor, final long amount,
final long total, final TransferFutureListener l) {
if (eventExecutor.inEventLoop()) {
Integer stackDepth = TRANSFER_LISTENER_STACK_DEPTH.get();
if (stackDepth == null) {
stackDepth = 0;
}
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
TRANSFER_LISTENER_STACK_DEPTH.set(stackDepth + 1);
try {
notifyTransferListener0(amount, total, l);
} finally {
TRANSFER_LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}
try {
eventExecutor.execute(new Runnable() {
@Override
public void run() {
notifyTransferListener(eventExecutor, amount, total, l);
}
});
} catch (Throwable t) {
logger.error("Failed to notify a listener. Event loop terminated?", t);
}
}
@SuppressWarnings("unchecked")
private void notifyTransferListeners(final long amount) {
if (transferListeners == null) {
return;
}
EventExecutor executor = executor();
if (executor.inEventLoop()) {
if (transferListeners instanceof DefaultEventListeners) {
notifyTransferListeners0(amount, total, (DefaultEventListeners) transferListeners);
} else {
notifyTransferListener0(amount, total, (TransferFutureListener) transferListeners);
}
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
if (transferListeners instanceof DefaultEventListeners) {
notifyTransferListeners0(amount, total,
(DefaultEventListeners) transferListeners);
} else {
notifyTransferListener0(amount, total, (TransferFutureListener) transferListeners);
}
}
});
} catch (Throwable t) {
logger.error("Failed to notify the transfer listener(s). Event loop terminated?", t);
}
}
}
private static void notifyTransferListeners0(long amount, long total,
DefaultEventListeners listeners) {
final EventListener[] allListeners = listeners.listeners();
for (EventListener l : allListeners) {
notifyTransferListener0(amount, total, (TransferFutureListener) l);
}
}
private static void notifyTransferListener0(long amount, long total, TransferFutureListener l) {
try {
l.onTransferred(amount, total);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("an exception is throw by {}:",
DefaultChannelTransferPromise.class.getSimpleName());
}
}
}
}

View File

@ -19,10 +19,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelTransferPromise;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
@ -185,8 +186,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
return;
} else {
writtenBytes += localWrittenBytes;
if (promise instanceof ChannelTransferPromise) {
((ChannelTransferPromise) promise).incrementTransferredBytes(localWrittenBytes);
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(pp.progress() + localWrittenBytes);
}
if (writtenBytes >= region.count()) {
region.release();

View File

@ -17,9 +17,8 @@ package io.netty.channel.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelTransferPromise;
import io.netty.channel.DefaultChannelTransferPromise;
import io.netty.channel.FileRegion;
import java.io.IOException;
@ -124,8 +123,9 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
return;
}
written += localWritten;
if (promise instanceof ChannelTransferPromise) {
((ChannelTransferPromise) promise).incrementTransferredBytes(localWritten);
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(pp.progress() + localWritten);
}
if (written >= region.count()) {
promise.setSuccess();

View File

@ -23,8 +23,8 @@ import io.netty.channel.ChannelFlushPromiseNotifier;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelTransferPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.aio.AbstractAioChannel;
@ -563,8 +563,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
written += result;
if (promise instanceof ChannelTransferPromise) {
((ChannelTransferPromise) promise).incrementTransferredBytes(result);
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(pp.progress() + result);
}
if (written >= region.count()) {