diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromiseListeners.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventListeners.java similarity index 66% rename from common/src/main/java/io/netty/util/concurrent/DefaultPromiseListeners.java rename to common/src/main/java/io/netty/util/concurrent/DefaultEventListeners.java index f5bc360b2b..4904ee2d09 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromiseListeners.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventListeners.java @@ -13,39 +13,37 @@ * License for the specific language governing permissions and limitations * under the License. */ - package io.netty.util.concurrent; import java.util.Arrays; import java.util.EventListener; -final class DefaultPromiseListeners { - private GenericFutureListener>[] listeners; +public final class DefaultEventListeners { + private EventListener[] listeners; private int size; - @SuppressWarnings("unchecked") - DefaultPromiseListeners(GenericFutureListener> firstListener, - GenericFutureListener> secondListener) { - - listeners = new GenericFutureListener[] { firstListener, secondListener }; + public DefaultEventListeners(EventListener firstListener, EventListener secondListener) { + listeners = new EventListener[2]; + listeners [0] = firstListener; + listeners [1] = secondListener; size = 2; } - void add(GenericFutureListener> l) { - GenericFutureListener>[] listeners = this.listeners; + public void add(EventListener t) { + EventListener[] listeners = this.listeners; final int size = this.size; if (size == listeners.length) { this.listeners = listeners = Arrays.copyOf(listeners, size << 1); } - listeners[size] = l; + listeners[size] = t; this.size = size + 1; } - void remove(EventListener l) { + public void remove(EventListener t) { final EventListener[] listeners = this.listeners; int size = this.size; for (int i = 0; i < size; i ++) { - if (listeners[i] == l) { + if (listeners[i] == t) { int listenersToMove = size - i - 1; if (listenersToMove > 0) { System.arraycopy(listeners, i + 1, listeners, i, listenersToMove); @@ -54,14 +52,13 @@ final class DefaultPromiseListeners { this.size = size; return; } - } - } + } } - GenericFutureListener>[] listeners() { + public EventListener[] listeners() { return listeners; } - int size() { + public int size() { return size; } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 5735e127e1..e161d72c8a 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -20,6 +20,7 @@ import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.util.EventListener; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.*; @@ -114,11 +115,11 @@ public class DefaultPromise extends AbstractFuture implements Promise { if (listeners == null) { listeners = listener; } else { - if (listeners instanceof DefaultPromiseListeners) { - ((DefaultPromiseListeners) listeners).add(listener); + if (listeners instanceof DefaultEventListeners) { + ((DefaultEventListeners) listeners).add(listener); } else { - listeners = new DefaultPromiseListeners( - (GenericFutureListener>) listeners, listener); + listeners = new DefaultEventListeners( + (EventListener) listeners, listener); } } return this; @@ -145,6 +146,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } @Override + @SuppressWarnings("unchecked") public Promise removeListener(GenericFutureListener> listener) { if (listener == null) { throw new NullPointerException("listener"); @@ -156,8 +158,8 @@ public class DefaultPromise extends AbstractFuture implements Promise { synchronized (this) { if (!isDone()) { - if (listeners instanceof DefaultPromiseListeners) { - ((DefaultPromiseListeners) listeners).remove(listener); + if (listeners instanceof DefaultEventListeners) { + ((DefaultEventListeners) listeners).remove(listener); } else if (listeners == listener) { listeners = null; } @@ -474,8 +476,8 @@ public class DefaultPromise extends AbstractFuture implements Promise { EventExecutor executor = executor(); if (executor.inEventLoop()) { - if (listeners instanceof DefaultPromiseListeners) { - notifyListeners0(this, (DefaultPromiseListeners) listeners); + if (listeners instanceof DefaultEventListeners) { + notifyListeners0(this, (DefaultEventListeners) listeners); } else { notifyListener0(this, (GenericFutureListener>) listeners); } @@ -487,8 +489,8 @@ public class DefaultPromise extends AbstractFuture implements Promise { executor.execute(new Runnable() { @Override public void run() { - if (listeners instanceof DefaultPromiseListeners) { - notifyListeners0(DefaultPromise.this, (DefaultPromiseListeners) listeners); + if (listeners instanceof DefaultEventListeners) { + notifyListeners0(DefaultPromise.this, (DefaultEventListeners) listeners); } else { notifyListener0( DefaultPromise.this, (GenericFutureListener>) listeners); @@ -501,13 +503,13 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) private static void notifyListeners0(final Future future, - DefaultPromiseListeners listeners) { - final GenericFutureListener>[] a = listeners.listeners(); + DefaultEventListeners listeners) { + final EventListener[] a = listeners.listeners(); final int size = listeners.size(); for (int i = 0; i < size; i ++) { - notifyListener0(future, a[i]); + notifyListener0(future, (GenericFutureListener) a[i]); } } diff --git a/example/src/main/java/io/netty/example/filetransfer/FileServer.java b/example/src/main/java/io/netty/example/filetransfer/FileServer.java index d4cd763f57..3a0e39108f 100644 --- a/example/src/main/java/io/netty/example/filetransfer/FileServer.java +++ b/example/src/main/java/io/netty/example/filetransfer/FileServer.java @@ -22,17 +22,22 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelTransferPromise; import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoopGroup; +import io.netty.channel.FileRegion; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.TransferFutureListener; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.io.File; import java.io.FileInputStream; @@ -101,7 +106,21 @@ public class FileServer { return; } ctx.write(file + " " + file.length() + '\n'); - ctx.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length())); + FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()); + ChannelTransferPromise promise = ctx.newTransferPromise(region.count()); + promise.addTransferListener(new TransferFutureListener() { + @Override + public void onTransferred(long amount, long total) throws Exception { + System.out.println("amount :" + amount + " total :" + total); + } + }); + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + System.out.println("File sent OK"); + } + }); + ctx.sendFile(region, promise); ctx.write("\n"); } else { ctx.write("File not found: " + file + '\n'); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 9bed510a61..bb1dfbff2b 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -306,6 +306,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return new DefaultChannelPromise(this); } + @Override + public ChannelTransferPromise newTransferPromise(long total) { + return new DefaultChannelTransferPromise(this, total); + } + @Override public ChannelFuture newSucceededFuture() { return succeededFuture; diff --git a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java index 227d291a47..cf4e5d2915 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java +++ b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java @@ -40,6 +40,11 @@ interface ChannelPropertyAccess { */ ChannelPromise newPromise(); + /** + *Return an new {@link ChannelTransferPromise} + **/ + ChannelTransferPromise newTransferPromise(long total); + /** * Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()} * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also diff --git a/transport/src/main/java/io/netty/channel/ChannelTransferFuture.java b/transport/src/main/java/io/netty/channel/ChannelTransferFuture.java new file mode 100644 index 0000000000..81716b7b45 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelTransferFuture.java @@ -0,0 +1,81 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +/** + * An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress + */ +public interface ChannelTransferFuture extends ChannelFuture { + + /** + * Adds the specified listener to this future. The + * specified listener is notified when this bytes associated with this if being transferred. + * If this future is already completed, the specified listener is notified immediately. + */ + ChannelTransferFuture addTransferListener(TransferFutureListener listener); + + /** + * Adds the specified listeners to this future. The + * specified listeners is notified when this bytes associated with this if being transferred. + * If this future is already completed, the specified listeners is notified immediately. + */ + ChannelTransferFuture addTransferListeners(TransferFutureListener ... listeners); + + /** + * Removes the specified listener from this future. + * The specified listener is no longer notified when this + * future is {@linkplain #isDone() done}. If the specified + * listener is not associated with this future, this method + * does nothing and returns silently. + */ + ChannelTransferFuture removeTransferListener(TransferFutureListener listener); + + /** + * Removes the specified listener from this future. + * The specified listener is no longer notified when this + * future is {@linkplain #isDone() done}. If the specified + * listener is not associated with this future, this method + * does nothing and returns silently. + */ + ChannelTransferFuture removeTransferListeners(TransferFutureListener ... listeners); + + @Override + ChannelTransferFuture addListener(GenericFutureListener> listener); + + @Override + ChannelTransferFuture addListeners(GenericFutureListener>... listeners); + + @Override + ChannelTransferFuture removeListener(GenericFutureListener> listener); + + @Override + ChannelTransferFuture removeListeners(GenericFutureListener>... listeners); + + @Override + ChannelTransferFuture sync() throws InterruptedException; + + @Override + ChannelTransferFuture syncUninterruptibly(); + + @Override + ChannelTransferFuture await() throws InterruptedException; + + @Override + ChannelTransferFuture awaitUninterruptibly(); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelTransferPromise.java b/transport/src/main/java/io/netty/channel/ChannelTransferPromise.java new file mode 100644 index 0000000000..b06db12cd8 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelTransferPromise.java @@ -0,0 +1,66 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +/** + * Special {@link ChannelPromise} which will be notified once the associated bytes is transferring. + */ +public interface ChannelTransferPromise extends ChannelTransferFuture, ChannelPromise { + + /** + * Increment the current transferred bytes amount + * */ + ChannelTransferPromise incrementTransferredBytes(long amount); + + @Override + ChannelTransferPromise addTransferListener(TransferFutureListener listener); + + @Override + ChannelTransferPromise addTransferListeners(TransferFutureListener... listeners); + + @Override + ChannelTransferPromise removeTransferListener(TransferFutureListener listener); + + @Override + ChannelTransferPromise removeTransferListeners(TransferFutureListener... listeners); + + @Override + ChannelTransferPromise addListener(GenericFutureListener> listener); + + @Override + ChannelTransferPromise addListeners(GenericFutureListener>... listeners); + + @Override + ChannelTransferPromise removeListener(GenericFutureListener> listener); + + @Override + ChannelTransferPromise removeListeners(GenericFutureListener>... listeners); + + @Override + ChannelTransferPromise sync() throws InterruptedException; + + @Override + ChannelTransferPromise syncUninterruptibly(); + + @Override + ChannelTransferPromise await() throws InterruptedException; + + @Override + ChannelTransferPromise awaitUninterruptibly(); +} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 0c4ebc1921..596d902575 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -1557,6 +1557,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return new DefaultChannelPromise(channel(), executor()); } + @Override + public ChannelTransferPromise newTransferPromise(long total) { + return new DefaultChannelTransferPromise(channel(), executor(), total); + } + @Override public ChannelFuture newSucceededFuture() { ChannelFuture succeededFuture = this.succeededFuture; diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelTransferPromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelTransferPromise.java new file mode 100644 index 0000000000..192fb1f8c5 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultChannelTransferPromise.java @@ -0,0 +1,276 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.concurrent.DefaultEventListeners; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.EventListener; + +/** + * The default {@link ChannelTransferPromise} implementation. It is recommended to use + * {@link Channel#newTransferPromise(long)} to create a new {@link ChannelTransferPromise} rather than calling the + * constructor explicitly. + */ +public class DefaultChannelTransferPromise extends DefaultChannelPromise implements ChannelTransferPromise { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(DefaultChannelTransferPromise.class); + private static final int MAX_LISTENER_STACK_DEPTH = 8; + private static final ThreadLocal TRANSFER_LISTENER_STACK_DEPTH = new ThreadLocal(); + private final long total; + private long amount; + private Object transferListeners; //can be TransferFutureListener or DefaultTransferFutureListeners; + public DefaultChannelTransferPromise(Channel channel, long total) { + super(channel); + this.total = total; + } + + public DefaultChannelTransferPromise(Channel channel, EventExecutor executor, long total) { + super(channel, executor); + this.total = total; + } + + @Override + public ChannelTransferPromise incrementTransferredBytes(long amount) { + if (amount < 0) { + throw new IllegalArgumentException("amount must be >= 0"); + } + long sum; + synchronized (this) { + this.amount += amount; + sum = this.amount; + } + notifyTransferListeners(sum); + return this; + } + + @Override + public ChannelTransferPromise removeListeners(GenericFutureListener>... listeners) { + super.removeListeners(listeners); + return this; + } + + @Override + public ChannelTransferPromise addListener(GenericFutureListener> listener) { + super.addListener(listener); + return this; + } + + @Override + public ChannelTransferPromise addListeners(GenericFutureListener>... listeners) { + super.addListeners(listeners); + return this; + } + + @Override + public ChannelTransferPromise removeListener(GenericFutureListener> listener) { + super.removeListener(listener); + return this; + } + + @Override + public ChannelTransferPromise await() throws InterruptedException { + super.await(); + return this; + } + + @Override + public ChannelTransferPromise awaitUninterruptibly() { + super.awaitUninterruptibly(); + return this; + } + + @Override + public ChannelTransferPromise sync() throws InterruptedException { + super.sync(); + return this; + } + + @Override + public ChannelTransferPromise syncUninterruptibly() { + super.syncUninterruptibly(); + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ChannelTransferPromise addTransferListener(TransferFutureListener listener) { + if (listener == null) { + throw new NullPointerException("listener"); + } + + if (isDone()) { + notifyTransferListener(executor(), amount, total, listener); + return this; + } + synchronized (this) { + if (!isDone()) { + if (transferListeners == null) { + transferListeners = listener; + } else { + if (transferListeners instanceof DefaultEventListeners) { + ((DefaultEventListeners) transferListeners).add(listener); + } else { + transferListeners = new DefaultEventListeners( + (EventListener) transferListeners, listener); + } + } + return this; + } + } + + notifyTransferListener(executor(), amount, total, listener); + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ChannelTransferPromise addTransferListeners(TransferFutureListener... listeners) { + if (listeners == null) { + throw new NullPointerException("listeners"); + } + for (TransferFutureListener l : listeners) { + if (l == null) { + break; + } + addTransferListener(l); + } + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ChannelTransferPromise removeTransferListener(TransferFutureListener listener) { + if (listener == null) { + throw new NullPointerException("listener"); + } + if (isDone()) { + return this; + } + if (transferListeners == null) { + return this; + } + synchronized (this) { + if (!isDone()) { + if (transferListeners instanceof DefaultEventListeners) { + ((DefaultEventListeners) transferListeners).remove(listener); + } else if (transferListeners == listener) { + transferListeners = null; + } + } + } + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ChannelTransferPromise removeTransferListeners(TransferFutureListener... listeners) { + if (listeners == null) { + throw new NullPointerException("listeners"); + } + for (TransferFutureListener l : listeners) { + if (l == null) { + break; + } + removeTransferListener(l); + } + return this; + } + + protected static void notifyTransferListener(final EventExecutor eventExecutor, final long amount, + final long total, final TransferFutureListener l) { + if (eventExecutor.inEventLoop()) { + Integer stackDepth = TRANSFER_LISTENER_STACK_DEPTH.get(); + if (stackDepth == null) { + stackDepth = 0; + } + if (stackDepth < MAX_LISTENER_STACK_DEPTH) { + TRANSFER_LISTENER_STACK_DEPTH.set(stackDepth + 1); + try { + notifyTransferListener0(amount, total, l); + } finally { + TRANSFER_LISTENER_STACK_DEPTH.set(stackDepth); + } + return; + } + } + try { + eventExecutor.execute(new Runnable() { + @Override + public void run() { + notifyTransferListener(eventExecutor, amount, total, l); + } + }); + } catch (Throwable t) { + logger.error("Failed to notify a listener. Event loop terminated?", t); + } + } + + @SuppressWarnings("unchecked") + private void notifyTransferListeners(final long amount) { + if (transferListeners == null) { + return; + } + + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + if (transferListeners instanceof DefaultEventListeners) { + notifyTransferListeners0(amount, total, (DefaultEventListeners) transferListeners); + } else { + notifyTransferListener0(amount, total, (TransferFutureListener) transferListeners); + } + } else { + try { + executor.execute(new Runnable() { + @Override + public void run() { + if (transferListeners instanceof DefaultEventListeners) { + notifyTransferListeners0(amount, total, + (DefaultEventListeners) transferListeners); + } else { + notifyTransferListener0(amount, total, (TransferFutureListener) transferListeners); + } + } + }); + } catch (Throwable t) { + logger.error("Failed to notify the transfer listener(s). Event loop terminated?", t); + } + } + } + + private static void notifyTransferListeners0(long amount, long total, + DefaultEventListeners listeners) { + final EventListener[] allListeners = listeners.listeners(); + for (EventListener l : allListeners) { + notifyTransferListener0(amount, total, (TransferFutureListener) l); + } + } + + private static void notifyTransferListener0(long amount, long total, TransferFutureListener l) { + try { + l.onTransferred(amount, total); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("an exception is throw by {}:", + DefaultChannelTransferPromise.class.getSimpleName()); + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/TransferFutureListener.java b/transport/src/main/java/io/netty/channel/TransferFutureListener.java new file mode 100644 index 0000000000..5eac78cef2 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/TransferFutureListener.java @@ -0,0 +1,32 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.util.EventListener; + +/** + * An {@link EventListener} listener which will be called once the sending task associated with future is + * being transferred. + */ +public interface TransferFutureListener extends EventListener { + /** + * Called once the bytes is being transferred. + * + * @param amount how many bytes has been transferred + * @param total how many bytes need to be transfer + * */ + void onTransferred(long amount, long total) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 605f3572a2..dd68096814 100755 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -20,9 +20,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelTransferPromise; import io.netty.channel.FileRegion; import io.netty.channel.socket.ChannelInputShutdownEvent; - import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; @@ -185,6 +185,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { return; } else { writtenBytes += localWrittenBytes; + if (promise instanceof ChannelTransferPromise) { + ((ChannelTransferPromise) promise).incrementTransferredBytes(localWrittenBytes); + } if (writtenBytes >= region.count()) { region.release(); promise.setSuccess(); diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java index a16fbe1ce5..b5f6000fa8 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java @@ -18,6 +18,8 @@ package io.netty.channel.oio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelTransferPromise; +import io.netty.channel.DefaultChannelTransferPromise; import io.netty.channel.FileRegion; import java.io.IOException; @@ -122,6 +124,9 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { return; } written += localWritten; + if (promise instanceof ChannelTransferPromise) { + ((ChannelTransferPromise) promise).incrementTransferredBytes(localWritten); + } if (written >= region.count()) { promise.setSuccess(); return; diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 60dd8b5418..8af5572cce 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelTransferPromise; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; import io.netty.channel.aio.AbstractAioChannel; @@ -562,6 +563,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } written += result; + if (promise instanceof ChannelTransferPromise) { + ((ChannelTransferPromise) promise).incrementTransferredBytes(result); + } + if (written >= region.count()) { region.release(); promise.setSuccess();