[#1244] Support ChannelTransferPromise for sendFile
This commit is contained in:
parent
d8387fa4c3
commit
713b200adf
@ -13,39 +13,37 @@
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EventListener;
|
||||
|
||||
final class DefaultPromiseListeners {
|
||||
private GenericFutureListener<? extends Future<?>>[] listeners;
|
||||
public final class DefaultEventListeners {
|
||||
private EventListener[] listeners;
|
||||
private int size;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
DefaultPromiseListeners(GenericFutureListener<? extends Future<?>> firstListener,
|
||||
GenericFutureListener<? extends Future<?>> secondListener) {
|
||||
|
||||
listeners = new GenericFutureListener[] { firstListener, secondListener };
|
||||
public DefaultEventListeners(EventListener firstListener, EventListener secondListener) {
|
||||
listeners = new EventListener[2];
|
||||
listeners [0] = firstListener;
|
||||
listeners [1] = secondListener;
|
||||
size = 2;
|
||||
}
|
||||
|
||||
void add(GenericFutureListener<? extends Future<?>> l) {
|
||||
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
|
||||
public void add(EventListener t) {
|
||||
EventListener[] listeners = this.listeners;
|
||||
final int size = this.size;
|
||||
if (size == listeners.length) {
|
||||
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
|
||||
}
|
||||
listeners[size] = l;
|
||||
listeners[size] = t;
|
||||
this.size = size + 1;
|
||||
}
|
||||
|
||||
void remove(EventListener l) {
|
||||
public void remove(EventListener t) {
|
||||
final EventListener[] listeners = this.listeners;
|
||||
int size = this.size;
|
||||
for (int i = 0; i < size; i ++) {
|
||||
if (listeners[i] == l) {
|
||||
if (listeners[i] == t) {
|
||||
int listenersToMove = size - i - 1;
|
||||
if (listenersToMove > 0) {
|
||||
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
|
||||
@ -54,14 +52,13 @@ final class DefaultPromiseListeners {
|
||||
this.size = size;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} }
|
||||
|
||||
GenericFutureListener<? extends Future<?>>[] listeners() {
|
||||
public EventListener[] listeners() {
|
||||
return listeners;
|
||||
}
|
||||
|
||||
int size() {
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@ import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.EventListener;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.*;
|
||||
@ -114,11 +115,11 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
if (listeners == null) {
|
||||
listeners = listener;
|
||||
} else {
|
||||
if (listeners instanceof DefaultPromiseListeners) {
|
||||
((DefaultPromiseListeners) listeners).add(listener);
|
||||
if (listeners instanceof DefaultEventListeners) {
|
||||
((DefaultEventListeners) listeners).add(listener);
|
||||
} else {
|
||||
listeners = new DefaultPromiseListeners(
|
||||
(GenericFutureListener<? extends Future<V>>) listeners, listener);
|
||||
listeners = new DefaultEventListeners(
|
||||
(EventListener) listeners, listener);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
@ -145,6 +146,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
@ -156,8 +158,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
|
||||
synchronized (this) {
|
||||
if (!isDone()) {
|
||||
if (listeners instanceof DefaultPromiseListeners) {
|
||||
((DefaultPromiseListeners) listeners).remove(listener);
|
||||
if (listeners instanceof DefaultEventListeners) {
|
||||
((DefaultEventListeners) listeners).remove(listener);
|
||||
} else if (listeners == listener) {
|
||||
listeners = null;
|
||||
}
|
||||
@ -474,8 +476,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
|
||||
EventExecutor executor = executor();
|
||||
if (executor.inEventLoop()) {
|
||||
if (listeners instanceof DefaultPromiseListeners) {
|
||||
notifyListeners0(this, (DefaultPromiseListeners) listeners);
|
||||
if (listeners instanceof DefaultEventListeners) {
|
||||
notifyListeners0(this, (DefaultEventListeners) listeners);
|
||||
} else {
|
||||
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
|
||||
}
|
||||
@ -487,8 +489,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (listeners instanceof DefaultPromiseListeners) {
|
||||
notifyListeners0(DefaultPromise.this, (DefaultPromiseListeners) listeners);
|
||||
if (listeners instanceof DefaultEventListeners) {
|
||||
notifyListeners0(DefaultPromise.this, (DefaultEventListeners) listeners);
|
||||
} else {
|
||||
notifyListener0(
|
||||
DefaultPromise.this, (GenericFutureListener<? extends Future<V>>) listeners);
|
||||
@ -501,13 +503,13 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private static void notifyListeners0(final Future<?> future,
|
||||
DefaultPromiseListeners listeners) {
|
||||
final GenericFutureListener<? extends Future<?>>[] a = listeners.listeners();
|
||||
DefaultEventListeners listeners) {
|
||||
final EventListener[] a = listeners.listeners();
|
||||
final int size = listeners.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
notifyListener0(future, a[i]);
|
||||
notifyListener0(future, (GenericFutureListener) a[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,17 +22,22 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelTransferPromise;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.TransferFutureListener;
|
||||
import io.netty.handler.codec.LineBasedFrameDecoder;
|
||||
import io.netty.handler.codec.string.StringDecoder;
|
||||
import io.netty.handler.codec.string.StringEncoder;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
@ -101,7 +106,21 @@ public class FileServer {
|
||||
return;
|
||||
}
|
||||
ctx.write(file + " " + file.length() + '\n');
|
||||
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()));
|
||||
FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length());
|
||||
ChannelTransferPromise promise = ctx.newTransferPromise(region.count());
|
||||
promise.addTransferListener(new TransferFutureListener() {
|
||||
@Override
|
||||
public void onTransferred(long amount, long total) throws Exception {
|
||||
System.out.println("amount :" + amount + " total :" + total);
|
||||
}
|
||||
});
|
||||
promise.addListener(new FutureListener<Void>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Void> future) throws Exception {
|
||||
System.out.println("File sent OK");
|
||||
}
|
||||
});
|
||||
ctx.sendFile(region, promise);
|
||||
ctx.write("\n");
|
||||
} else {
|
||||
ctx.write("File not found: " + file + '\n');
|
||||
|
@ -306,6 +306,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
return new DefaultChannelPromise(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise newTransferPromise(long total) {
|
||||
return new DefaultChannelTransferPromise(this, total);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return succeededFuture;
|
||||
|
@ -40,6 +40,11 @@ interface ChannelPropertyAccess {
|
||||
*/
|
||||
ChannelPromise newPromise();
|
||||
|
||||
/**
|
||||
*Return an new {@link ChannelTransferPromise}
|
||||
**/
|
||||
ChannelTransferPromise newTransferPromise(long total);
|
||||
|
||||
/**
|
||||
* Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()}
|
||||
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
|
||||
/**
|
||||
* An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress
|
||||
*/
|
||||
public interface ChannelTransferFuture extends ChannelFuture {
|
||||
|
||||
/**
|
||||
* Adds the specified listener to this future. The
|
||||
* specified listener is notified when this bytes associated with this if being transferred.
|
||||
* If this future is already completed, the specified listener is notified immediately.
|
||||
*/
|
||||
ChannelTransferFuture addTransferListener(TransferFutureListener listener);
|
||||
|
||||
/**
|
||||
* Adds the specified listeners to this future. The
|
||||
* specified listeners is notified when this bytes associated with this if being transferred.
|
||||
* If this future is already completed, the specified listeners is notified immediately.
|
||||
*/
|
||||
ChannelTransferFuture addTransferListeners(TransferFutureListener ... listeners);
|
||||
|
||||
/**
|
||||
* Removes the specified listener from this future.
|
||||
* The specified listener is no longer notified when this
|
||||
* future is {@linkplain #isDone() done}. If the specified
|
||||
* listener is not associated with this future, this method
|
||||
* does nothing and returns silently.
|
||||
*/
|
||||
ChannelTransferFuture removeTransferListener(TransferFutureListener listener);
|
||||
|
||||
/**
|
||||
* Removes the specified listener from this future.
|
||||
* The specified listener is no longer notified when this
|
||||
* future is {@linkplain #isDone() done}. If the specified
|
||||
* listener is not associated with this future, this method
|
||||
* does nothing and returns silently.
|
||||
*/
|
||||
ChannelTransferFuture removeTransferListeners(TransferFutureListener ... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture syncUninterruptibly();
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelTransferFuture awaitUninterruptibly();
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
|
||||
/**
|
||||
* Special {@link ChannelPromise} which will be notified once the associated bytes is transferring.
|
||||
*/
|
||||
public interface ChannelTransferPromise extends ChannelTransferFuture, ChannelPromise {
|
||||
|
||||
/**
|
||||
* Increment the current transferred bytes amount
|
||||
* */
|
||||
ChannelTransferPromise incrementTransferredBytes(long amount);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise addTransferListener(TransferFutureListener listener);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise addTransferListeners(TransferFutureListener... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise removeTransferListener(TransferFutureListener listener);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise removeTransferListeners(TransferFutureListener... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise addListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise removeListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise syncUninterruptibly();
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelTransferPromise awaitUninterruptibly();
|
||||
}
|
@ -1557,6 +1557,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
return new DefaultChannelPromise(channel(), executor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise newTransferPromise(long total) {
|
||||
return new DefaultChannelTransferPromise(channel(), executor(), total);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
ChannelFuture succeededFuture = this.succeededFuture;
|
||||
|
@ -0,0 +1,276 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.DefaultEventListeners;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.EventListener;
|
||||
|
||||
/**
|
||||
* The default {@link ChannelTransferPromise} implementation. It is recommended to use
|
||||
* {@link Channel#newTransferPromise(long)} to create a new {@link ChannelTransferPromise} rather than calling the
|
||||
* constructor explicitly.
|
||||
*/
|
||||
public class DefaultChannelTransferPromise extends DefaultChannelPromise implements ChannelTransferPromise {
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(DefaultChannelTransferPromise.class);
|
||||
private static final int MAX_LISTENER_STACK_DEPTH = 8;
|
||||
private static final ThreadLocal<Integer> TRANSFER_LISTENER_STACK_DEPTH = new ThreadLocal<Integer>();
|
||||
private final long total;
|
||||
private long amount;
|
||||
private Object transferListeners; //can be TransferFutureListener or DefaultTransferFutureListeners;
|
||||
public DefaultChannelTransferPromise(Channel channel, long total) {
|
||||
super(channel);
|
||||
this.total = total;
|
||||
}
|
||||
|
||||
public DefaultChannelTransferPromise(Channel channel, EventExecutor executor, long total) {
|
||||
super(channel, executor);
|
||||
this.total = total;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise incrementTransferredBytes(long amount) {
|
||||
if (amount < 0) {
|
||||
throw new IllegalArgumentException("amount must be >= 0");
|
||||
}
|
||||
long sum;
|
||||
synchronized (this) {
|
||||
this.amount += amount;
|
||||
sum = this.amount;
|
||||
}
|
||||
notifyTransferListeners(sum);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.removeListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.addListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.removeListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise await() throws InterruptedException {
|
||||
super.await();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise awaitUninterruptibly() {
|
||||
super.awaitUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise sync() throws InterruptedException {
|
||||
super.sync();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelTransferPromise syncUninterruptibly() {
|
||||
super.syncUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ChannelTransferPromise addTransferListener(TransferFutureListener listener) {
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
}
|
||||
|
||||
if (isDone()) {
|
||||
notifyTransferListener(executor(), amount, total, listener);
|
||||
return this;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (!isDone()) {
|
||||
if (transferListeners == null) {
|
||||
transferListeners = listener;
|
||||
} else {
|
||||
if (transferListeners instanceof DefaultEventListeners) {
|
||||
((DefaultEventListeners) transferListeners).add(listener);
|
||||
} else {
|
||||
transferListeners = new DefaultEventListeners(
|
||||
(EventListener) transferListeners, listener);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
notifyTransferListener(executor(), amount, total, listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ChannelTransferPromise addTransferListeners(TransferFutureListener... listeners) {
|
||||
if (listeners == null) {
|
||||
throw new NullPointerException("listeners");
|
||||
}
|
||||
for (TransferFutureListener l : listeners) {
|
||||
if (l == null) {
|
||||
break;
|
||||
}
|
||||
addTransferListener(l);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ChannelTransferPromise removeTransferListener(TransferFutureListener listener) {
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
}
|
||||
if (isDone()) {
|
||||
return this;
|
||||
}
|
||||
if (transferListeners == null) {
|
||||
return this;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (!isDone()) {
|
||||
if (transferListeners instanceof DefaultEventListeners) {
|
||||
((DefaultEventListeners) transferListeners).remove(listener);
|
||||
} else if (transferListeners == listener) {
|
||||
transferListeners = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ChannelTransferPromise removeTransferListeners(TransferFutureListener... listeners) {
|
||||
if (listeners == null) {
|
||||
throw new NullPointerException("listeners");
|
||||
}
|
||||
for (TransferFutureListener l : listeners) {
|
||||
if (l == null) {
|
||||
break;
|
||||
}
|
||||
removeTransferListener(l);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
protected static void notifyTransferListener(final EventExecutor eventExecutor, final long amount,
|
||||
final long total, final TransferFutureListener l) {
|
||||
if (eventExecutor.inEventLoop()) {
|
||||
Integer stackDepth = TRANSFER_LISTENER_STACK_DEPTH.get();
|
||||
if (stackDepth == null) {
|
||||
stackDepth = 0;
|
||||
}
|
||||
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
|
||||
TRANSFER_LISTENER_STACK_DEPTH.set(stackDepth + 1);
|
||||
try {
|
||||
notifyTransferListener0(amount, total, l);
|
||||
} finally {
|
||||
TRANSFER_LISTENER_STACK_DEPTH.set(stackDepth);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
eventExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
notifyTransferListener(eventExecutor, amount, total, l);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
logger.error("Failed to notify a listener. Event loop terminated?", t);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void notifyTransferListeners(final long amount) {
|
||||
if (transferListeners == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
EventExecutor executor = executor();
|
||||
if (executor.inEventLoop()) {
|
||||
if (transferListeners instanceof DefaultEventListeners) {
|
||||
notifyTransferListeners0(amount, total, (DefaultEventListeners) transferListeners);
|
||||
} else {
|
||||
notifyTransferListener0(amount, total, (TransferFutureListener) transferListeners);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (transferListeners instanceof DefaultEventListeners) {
|
||||
notifyTransferListeners0(amount, total,
|
||||
(DefaultEventListeners) transferListeners);
|
||||
} else {
|
||||
notifyTransferListener0(amount, total, (TransferFutureListener) transferListeners);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
logger.error("Failed to notify the transfer listener(s). Event loop terminated?", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void notifyTransferListeners0(long amount, long total,
|
||||
DefaultEventListeners listeners) {
|
||||
final EventListener[] allListeners = listeners.listeners();
|
||||
for (EventListener l : allListeners) {
|
||||
notifyTransferListener0(amount, total, (TransferFutureListener) l);
|
||||
}
|
||||
}
|
||||
|
||||
private static void notifyTransferListener0(long amount, long total, TransferFutureListener l) {
|
||||
try {
|
||||
l.onTransferred(amount, total);
|
||||
} catch (Throwable t) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("an exception is throw by {}:",
|
||||
DefaultChannelTransferPromise.class.getSimpleName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import java.util.EventListener;
|
||||
|
||||
/**
|
||||
* An {@link EventListener} listener which will be called once the sending task associated with future is
|
||||
* being transferred.
|
||||
*/
|
||||
public interface TransferFutureListener extends EventListener {
|
||||
/**
|
||||
* Called once the bytes is being transferred.
|
||||
*
|
||||
* @param amount how many bytes has been transferred
|
||||
* @param total how many bytes need to be transfer
|
||||
* */
|
||||
void onTransferred(long amount, long total) throws Exception;
|
||||
}
|
@ -20,9 +20,9 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelTransferPromise;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
@ -185,6 +185,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
return;
|
||||
} else {
|
||||
writtenBytes += localWrittenBytes;
|
||||
if (promise instanceof ChannelTransferPromise) {
|
||||
((ChannelTransferPromise) promise).incrementTransferredBytes(localWrittenBytes);
|
||||
}
|
||||
if (writtenBytes >= region.count()) {
|
||||
region.release();
|
||||
promise.setSuccess();
|
||||
|
@ -18,6 +18,8 @@ package io.netty.channel.oio;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelTransferPromise;
|
||||
import io.netty.channel.DefaultChannelTransferPromise;
|
||||
import io.netty.channel.FileRegion;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -122,6 +124,9 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
|
||||
return;
|
||||
}
|
||||
written += localWritten;
|
||||
if (promise instanceof ChannelTransferPromise) {
|
||||
((ChannelTransferPromise) promise).incrementTransferredBytes(localWritten);
|
||||
}
|
||||
if (written >= region.count()) {
|
||||
promise.setSuccess();
|
||||
return;
|
||||
|
@ -24,6 +24,7 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelTransferPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.aio.AbstractAioChannel;
|
||||
@ -562,6 +563,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
}
|
||||
written += result;
|
||||
|
||||
if (promise instanceof ChannelTransferPromise) {
|
||||
((ChannelTransferPromise) promise).incrementTransferredBytes(result);
|
||||
}
|
||||
|
||||
if (written >= region.count()) {
|
||||
region.release();
|
||||
promise.setSuccess();
|
||||
|
Loading…
Reference in New Issue
Block a user