Remove Progressive*Promise / Progressive*Future (#11374)
Motivation: This special case implementation of Promise / Future requires the implementations responsible for completing the promise to have knowledge of this class to provide value. It also requires that the implementations are able to provide intermediate status while the work is being done. Even throughout the core of Netty it is not really supported most of the times and so just brings more complexity without real gain. Let's remove it completely which is better then only support it sometimes. Modifications: Remove Progressive* API Result: Code cleanup.... Fixes https://github.com/netty/netty/issues/8519
This commit is contained in:
parent
1415938163
commit
07baabaac5
@ -21,7 +21,6 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.Attribute;
|
||||
import io.netty.util.AttributeKey;
|
||||
@ -246,11 +245,6 @@ abstract class DelegatingChannelHandlerContext implements ChannelHandlerContext
|
||||
return ctx.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return ctx.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return ctx.newSucceededFuture();
|
||||
|
@ -26,7 +26,6 @@ import io.netty.channel.ChannelId;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.DefaultChannelPipeline;
|
||||
@ -489,11 +488,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
||||
return pipeline().newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return pipeline().newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return pipeline().newSucceededFuture();
|
||||
|
@ -23,7 +23,6 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.util.Attribute;
|
||||
@ -326,11 +325,6 @@ final class Http2FrameInboundWriter {
|
||||
return channel.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return channel.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return channel.newSucceededFuture();
|
||||
|
@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.CombinedChannelDuplexHandler;
|
||||
import io.netty.handler.codec.PrematureChannelClosureException;
|
||||
@ -280,10 +279,6 @@ public final class BinaryMemcacheClientCodec extends
|
||||
return ctx.newPromise();
|
||||
}
|
||||
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return ctx.newProgressivePromise();
|
||||
}
|
||||
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return ctx.newSucceededFuture();
|
||||
}
|
||||
|
@ -30,7 +30,6 @@ import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.util.Attribute;
|
||||
@ -730,11 +729,6 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
return ctx.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return ctx.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return ctx.newSucceededFuture();
|
||||
|
@ -80,11 +80,6 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
|
||||
return new DefaultPromise<>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ProgressivePromise<V> newProgressivePromise() {
|
||||
return new DefaultProgressivePromise<>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Future<V> newSucceededFuture(V result) {
|
||||
return new SucceededFuture<>(this, result);
|
||||
|
@ -21,7 +21,6 @@ final class DefaultFutureListeners {
|
||||
|
||||
private GenericFutureListener<? extends Future<?>>[] listeners;
|
||||
private int size;
|
||||
private int progressiveSize; // the number of progressive listeners
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
DefaultFutureListeners(
|
||||
@ -30,12 +29,6 @@ final class DefaultFutureListeners {
|
||||
listeners[0] = first;
|
||||
listeners[1] = second;
|
||||
size = 2;
|
||||
if (first instanceof GenericProgressiveFutureListener) {
|
||||
progressiveSize ++;
|
||||
}
|
||||
if (second instanceof GenericProgressiveFutureListener) {
|
||||
progressiveSize ++;
|
||||
}
|
||||
}
|
||||
|
||||
public void add(GenericFutureListener<? extends Future<?>> l) {
|
||||
@ -46,10 +39,6 @@ final class DefaultFutureListeners {
|
||||
}
|
||||
listeners[size] = l;
|
||||
this.size = size + 1;
|
||||
|
||||
if (l instanceof GenericProgressiveFutureListener) {
|
||||
progressiveSize ++;
|
||||
}
|
||||
}
|
||||
|
||||
public void remove(GenericFutureListener<? extends Future<?>> l) {
|
||||
@ -63,10 +52,6 @@ final class DefaultFutureListeners {
|
||||
}
|
||||
listeners[-- size] = null;
|
||||
this.size = size;
|
||||
|
||||
if (l instanceof GenericProgressiveFutureListener) {
|
||||
progressiveSize --;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -79,8 +64,4 @@ final class DefaultFutureListeners {
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public int progressiveSize() {
|
||||
return progressiveSize;
|
||||
}
|
||||
}
|
||||
|
@ -1,110 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
|
||||
|
||||
public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements ProgressivePromise<V> {
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* It is preferable to use {@link EventExecutor#newProgressivePromise()} to create a new progressive promise
|
||||
*
|
||||
* @param executor
|
||||
* the {@link EventExecutor} which is used to notify the promise when it progresses or it is complete
|
||||
*/
|
||||
public DefaultProgressivePromise(EventExecutor executor) {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> setProgress(long progress, long total) {
|
||||
if (total < 0) {
|
||||
// total unknown
|
||||
total = -1; // normalize
|
||||
checkPositiveOrZero(progress, "progress");
|
||||
} else if (progress < 0 || progress > total) {
|
||||
throw new IllegalArgumentException(
|
||||
"progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))");
|
||||
}
|
||||
|
||||
if (isDone()) {
|
||||
throw new IllegalStateException("complete already");
|
||||
}
|
||||
|
||||
notifyProgressiveListeners(progress, total);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryProgress(long progress, long total) {
|
||||
if (total < 0) {
|
||||
total = -1;
|
||||
if (progress < 0 || isDone()) {
|
||||
return false;
|
||||
}
|
||||
} else if (progress < 0 || progress > total || isDone()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
notifyProgressiveListeners(progress, total);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
|
||||
super.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> sync() throws InterruptedException {
|
||||
super.sync();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> syncUninterruptibly() {
|
||||
super.syncUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> await() throws InterruptedException {
|
||||
super.await();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> awaitUninterruptibly() {
|
||||
super.awaitUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> setSuccess(V result) {
|
||||
super.setSuccess(result);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProgressivePromise<V> setFailure(Throwable cause) {
|
||||
super.setFailure(cause);
|
||||
return this;
|
||||
}
|
||||
}
|
@ -580,103 +580,6 @@ public class DefaultPromise<V> implements Promise<V> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify all progressive listeners.
|
||||
* <p>
|
||||
* No attempt is made to ensure notification order if multiple calls are made to this method before
|
||||
* the original invocation completes.
|
||||
* <p>
|
||||
* This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
|
||||
* @param progress the new progress.
|
||||
* @param total the total progress.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
void notifyProgressiveListeners(final long progress, final long total) {
|
||||
final Object listeners = progressiveListeners();
|
||||
if (listeners == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
|
||||
|
||||
if (listeners instanceof GenericProgressiveFutureListener[]) {
|
||||
final GenericProgressiveFutureListener<?>[] array =
|
||||
(GenericProgressiveFutureListener<?>[]) listeners;
|
||||
safeExecute(executor(), () -> notifyProgressiveListeners0(self, array, progress, total));
|
||||
} else {
|
||||
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
|
||||
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
|
||||
safeExecute(executor(), () -> notifyProgressiveListener0(self, l, progress, total));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
|
||||
* {@code null}.
|
||||
*/
|
||||
private synchronized Object progressiveListeners() {
|
||||
Object listeners = this.listeners;
|
||||
if (listeners == null) {
|
||||
// No listeners added
|
||||
return null;
|
||||
}
|
||||
|
||||
if (listeners instanceof DefaultFutureListeners) {
|
||||
// Copy DefaultFutureListeners into an array of listeners.
|
||||
DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
|
||||
int progressiveSize = dfl.progressiveSize();
|
||||
switch (progressiveSize) {
|
||||
case 0:
|
||||
return null;
|
||||
case 1:
|
||||
for (GenericFutureListener<?> l: dfl.listeners()) {
|
||||
if (l instanceof GenericProgressiveFutureListener) {
|
||||
return l;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
GenericFutureListener<?>[] array = dfl.listeners();
|
||||
GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
|
||||
for (int i = 0, j = 0; j < progressiveSize; i ++) {
|
||||
GenericFutureListener<?> l = array[i];
|
||||
if (l instanceof GenericProgressiveFutureListener) {
|
||||
copy[j ++] = (GenericProgressiveFutureListener<?>) l;
|
||||
}
|
||||
}
|
||||
|
||||
return copy;
|
||||
} else if (listeners instanceof GenericProgressiveFutureListener) {
|
||||
return listeners;
|
||||
} else {
|
||||
// Only one listener was added and it's not a progressive listener.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static void notifyProgressiveListeners0(
|
||||
ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
|
||||
for (GenericProgressiveFutureListener<?> l: listeners) {
|
||||
if (l == null) {
|
||||
break;
|
||||
}
|
||||
notifyProgressiveListener0(future, l, progress, total);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private static void notifyProgressiveListener0(
|
||||
ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
|
||||
try {
|
||||
l.operationProgressed(future, progress, total);
|
||||
} catch (Throwable t) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isCancelled0(Object result) {
|
||||
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
|
||||
}
|
||||
|
@ -46,11 +46,6 @@ public interface EventExecutor extends EventExecutorGroup {
|
||||
*/
|
||||
<V> Promise<V> newPromise();
|
||||
|
||||
/**
|
||||
* Create a new {@link ProgressivePromise}.
|
||||
*/
|
||||
<V> ProgressivePromise<V> newProgressivePromise();
|
||||
|
||||
/**
|
||||
* Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
|
||||
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
|
@ -1,28 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
|
||||
/**
|
||||
* Invoked when the operation has progressed.
|
||||
*
|
||||
* @param progress the progress of the operation so far (cumulative)
|
||||
* @param total the number that signifies the end of the operation when {@code progress} reaches at it.
|
||||
* {@code -1} if the end of operation is unknown.
|
||||
*/
|
||||
void operationProgressed(F future, long progress, long total) throws Exception;
|
||||
}
|
@ -129,11 +129,6 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor {
|
||||
return new ImmediatePromise<>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ProgressivePromise<V> newProgressivePromise() {
|
||||
return new ImmediateProgressivePromise<>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay,
|
||||
TimeUnit unit) {
|
||||
@ -165,15 +160,4 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor {
|
||||
// No check
|
||||
}
|
||||
}
|
||||
|
||||
static class ImmediateProgressivePromise<V> extends DefaultProgressivePromise<V> {
|
||||
ImmediateProgressivePromise(EventExecutor executor) {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void checkDeadLock() {
|
||||
// No check
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,38 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
/**
|
||||
* A {@link Future} which is used to indicate the progress of an operation.
|
||||
*/
|
||||
public interface ProgressiveFuture<V> extends Future<V> {
|
||||
|
||||
@Override
|
||||
ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
|
||||
|
||||
@Override
|
||||
ProgressiveFuture<V> sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ProgressiveFuture<V> syncUninterruptibly();
|
||||
|
||||
@Override
|
||||
ProgressiveFuture<V> await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ProgressiveFuture<V> awaitUninterruptibly();
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
/**
|
||||
* Special {@link ProgressiveFuture} which is writable.
|
||||
*/
|
||||
public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V> {
|
||||
|
||||
/**
|
||||
* Sets the current progress of the operation and notifies the listeners that implement
|
||||
* {@link GenericProgressiveFutureListener}.
|
||||
*/
|
||||
ProgressivePromise<V> setProgress(long progress, long total);
|
||||
|
||||
/**
|
||||
* Tries to set the current progress of the operation and notifies the listeners that implement
|
||||
* {@link GenericProgressiveFutureListener}. If the operation is already complete or the progress is out of range,
|
||||
* this method does nothing but returning {@code false}.
|
||||
*/
|
||||
boolean tryProgress(long progress, long total);
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> setSuccess(V result);
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> setFailure(Throwable cause);
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> awaitUninterruptibly();
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ProgressivePromise<V> syncUninterruptibly();
|
||||
}
|
@ -97,11 +97,6 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
|
||||
return new DefaultPromise<>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ProgressivePromise<V> newProgressivePromise() {
|
||||
return new DefaultProgressivePromise<>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Future<V> newSucceededFuture(V result) {
|
||||
return new SucceededFuture<>(this, result);
|
||||
|
@ -20,8 +20,6 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelProgressiveFuture;
|
||||
import io.netty.channel.ChannelProgressiveFutureListener;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
@ -197,29 +195,20 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
|
||||
ChannelFuture lastContentFuture;
|
||||
if (ctx.pipeline().get(SslHandler.class) == null) {
|
||||
sendFileFuture =
|
||||
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
|
||||
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength));
|
||||
// Write the end marker.
|
||||
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
} else {
|
||||
sendFileFuture =
|
||||
ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
|
||||
ctx.newProgressivePromise());
|
||||
ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)));
|
||||
// HttpChunkedInput will write the end marker (LastHttpContent) for us.
|
||||
lastContentFuture = sendFileFuture;
|
||||
}
|
||||
|
||||
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
|
||||
@Override
|
||||
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
|
||||
if (total < 0) { // total unknown
|
||||
System.err.println(future.channel() + " Transfer progress: " + progress);
|
||||
} else {
|
||||
System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
|
||||
}
|
||||
}
|
||||
sendFileFuture.addListener(new ChannelFutureListener() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelProgressiveFuture future) {
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
System.err.println(future.channel() + " Transfer complete.");
|
||||
}
|
||||
});
|
||||
|
@ -25,7 +25,6 @@ import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -170,7 +169,7 @@ public class ChunkedWriteHandler implements ChannelHandler {
|
||||
}
|
||||
currentWrite.fail(cause);
|
||||
} else {
|
||||
currentWrite.success(inputLength);
|
||||
currentWrite.success();
|
||||
}
|
||||
} else {
|
||||
if (cause == null) {
|
||||
@ -307,8 +306,7 @@ public class ChunkedWriteHandler implements ChannelHandler {
|
||||
long inputProgress = input.progress();
|
||||
long inputLength = input.length();
|
||||
closeInput(input);
|
||||
currentWrite.progress(inputProgress, inputLength);
|
||||
currentWrite.success(inputLength);
|
||||
currentWrite.success();
|
||||
}
|
||||
}
|
||||
|
||||
@ -318,7 +316,6 @@ public class ChunkedWriteHandler implements ChannelHandler {
|
||||
closeInput(input);
|
||||
currentWrite.fail(future.cause());
|
||||
} else {
|
||||
currentWrite.progress(input.progress(), input.length());
|
||||
if (resume && future.channel().isWritable()) {
|
||||
resumeTransfer();
|
||||
}
|
||||
@ -349,19 +346,12 @@ public class ChunkedWriteHandler implements ChannelHandler {
|
||||
promise.tryFailure(cause);
|
||||
}
|
||||
|
||||
void success(long total) {
|
||||
void success() {
|
||||
if (promise.isDone()) {
|
||||
// No need to notify the progress or fulfill the promise because it's done already.
|
||||
return;
|
||||
}
|
||||
progress(total, total);
|
||||
promise.trySuccess();
|
||||
}
|
||||
|
||||
void progress(long progress, long total) {
|
||||
if (promise instanceof ChannelProgressivePromise) {
|
||||
((ChannelProgressivePromise) promise).tryProgress(progress, total);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
@ -302,11 +301,6 @@ public abstract class EmbeddedChannelHandlerContext implements ChannelHandlerCon
|
||||
return channel().newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ChannelProgressivePromise newProgressivePromise() {
|
||||
return channel().newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ChannelFuture newSucceededFuture() {
|
||||
return channel().newSucceededFuture();
|
||||
|
@ -19,7 +19,6 @@ import static org.junit.Assert.assertNull;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.util.concurrent.AbstractEventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.ProgressivePromise;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.concurrent.ScheduledFuture;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -139,11 +138,6 @@ public class AbstractSharedExecutorMicrobenchmark extends AbstractMicrobenchmark
|
||||
return executor.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ProgressivePromise<V> newProgressivePromise() {
|
||||
return executor.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
return executor.schedule(command, delay, unit);
|
||||
|
@ -335,11 +335,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
return new DefaultChannelPromise(this, eventLoop);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return new DefaultChannelProgressivePromise(this, eventLoop);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return succeedFuture;
|
||||
|
@ -19,7 +19,6 @@ import io.netty.buffer.ByteBufConvertible;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import io.netty.util.internal.InternalThreadLocalMap;
|
||||
@ -243,9 +242,6 @@ public final class ChannelOutboundBuffer {
|
||||
ChannelPromise p = e.promise;
|
||||
long progress = e.progress + amount;
|
||||
e.progress = progress;
|
||||
if (p instanceof ChannelProgressivePromise) {
|
||||
((ChannelProgressivePromise) p).tryProgress(progress, e.total);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -378,7 +374,6 @@ public final class ChannelOutboundBuffer {
|
||||
* <p>
|
||||
* Note that the returned array is reused and thus should not escape
|
||||
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
|
||||
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
|
||||
* </p>
|
||||
*/
|
||||
public ByteBuffer[] nioBuffers() {
|
||||
@ -392,7 +387,6 @@ public final class ChannelOutboundBuffer {
|
||||
* <p>
|
||||
* Note that the returned array is reused and thus should not escape
|
||||
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
|
||||
* Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
|
||||
* </p>
|
||||
* @param maxCount The maximum amount of buffers that will be added to the return value.
|
||||
* @param maxBytes A hint toward the maximum number of bytes to include as part of the return value. Note that this
|
||||
|
@ -261,11 +261,6 @@ public interface ChannelOutboundInvoker {
|
||||
*/
|
||||
ChannelPromise newPromise();
|
||||
|
||||
/**
|
||||
* Return an new {@link ChannelProgressivePromise}
|
||||
*/
|
||||
ChannelProgressivePromise newProgressivePromise();
|
||||
|
||||
/**
|
||||
* Create a new {@link ChannelFuture} which is marked as succeeded already. So {@link ChannelFuture#isSuccess()}
|
||||
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.ProgressiveFuture;
|
||||
|
||||
/**
|
||||
* An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress
|
||||
*/
|
||||
public interface ChannelProgressiveFuture extends ChannelFuture, ProgressiveFuture<Void> {
|
||||
@Override
|
||||
ChannelProgressiveFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelProgressiveFuture sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelProgressiveFuture syncUninterruptibly();
|
||||
|
||||
@Override
|
||||
ChannelProgressiveFuture await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelProgressiveFuture awaitUninterruptibly();
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.GenericProgressiveFutureListener;
|
||||
|
||||
import java.util.EventListener;
|
||||
|
||||
/**
|
||||
* An {@link EventListener} listener which will be called once the sending task associated with future is
|
||||
* being transferred.
|
||||
*/
|
||||
public interface ChannelProgressiveFutureListener extends GenericProgressiveFutureListener<ChannelProgressiveFuture> {
|
||||
// Just a type alias
|
||||
}
|
@ -1,53 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.ProgressivePromise;
|
||||
|
||||
/**
|
||||
* Special {@link ChannelPromise} which will be notified once the associated bytes is transferring.
|
||||
*/
|
||||
public interface ChannelProgressivePromise extends ProgressivePromise<Void>, ChannelProgressiveFuture, ChannelPromise {
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise syncUninterruptibly();
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise awaitUninterruptibly();
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise setSuccess(Void result);
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise setSuccess();
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise setFailure(Throwable cause);
|
||||
|
||||
@Override
|
||||
ChannelProgressivePromise setProgress(long progress, long total);
|
||||
}
|
@ -543,11 +543,6 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
|
||||
return ctx.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return ctx.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return ctx.newSucceededFuture();
|
||||
|
@ -826,11 +826,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
return pipeline().newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return pipeline().newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return pipeline().newSucceededFuture();
|
||||
|
@ -936,11 +936,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
return new DefaultChannelPromise(channel(), executor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ChannelProgressivePromise newProgressivePromise() {
|
||||
return new DefaultChannelProgressivePromise(channel(), executor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ChannelFuture newSucceededFuture() {
|
||||
return succeededFuture;
|
||||
|
@ -1,142 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.channel.ChannelFlushPromiseNotifier.FlushCheckpoint;
|
||||
import io.netty.util.concurrent.DefaultProgressivePromise;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* The default {@link ChannelProgressivePromise} implementation. It is recommended to use
|
||||
* {@link Channel#newProgressivePromise()} to create a new {@link ChannelProgressivePromise} rather than calling the
|
||||
* constructor explicitly.
|
||||
*/
|
||||
public class DefaultChannelProgressivePromise
|
||||
extends DefaultProgressivePromise<Void> implements ChannelProgressivePromise, FlushCheckpoint {
|
||||
|
||||
private final Channel channel;
|
||||
private long checkpoint;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param channel
|
||||
* the {@link Channel} associated with this future
|
||||
*/
|
||||
public DefaultChannelProgressivePromise(Channel channel) {
|
||||
this(channel, channel.eventLoop());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param channel
|
||||
* the {@link Channel} associated with this future
|
||||
*/
|
||||
public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor) {
|
||||
super(executor);
|
||||
this.channel = Objects.requireNonNull(channel, "channel");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel channel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise setSuccess() {
|
||||
return setSuccess(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise setSuccess(Void result) {
|
||||
super.setSuccess(result);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess() {
|
||||
return trySuccess(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise setFailure(Throwable cause) {
|
||||
super.setFailure(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise setProgress(long progress, long total) {
|
||||
super.setProgress(progress, total);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
|
||||
super.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise sync() throws InterruptedException {
|
||||
super.sync();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise syncUninterruptibly() {
|
||||
super.syncUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise await() throws InterruptedException {
|
||||
super.await();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise awaitUninterruptibly() {
|
||||
super.awaitUninterruptibly();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long flushCheckpoint() {
|
||||
return checkpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushCheckpoint(long checkpoint) {
|
||||
this.checkpoint = checkpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise promise() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void checkDeadLock() {
|
||||
if (channel().isRegistered()) {
|
||||
super.checkDeadLock();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user