diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index d4e8d3a7ea..691107cda2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -15,7 +15,6 @@ */ package io.netty.handler.codec.http2; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandler; @@ -372,128 +371,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements return pipeline; } - @Override - public ByteBufAllocator alloc() { - return config().getAllocator(); - } - - @Override - public Channel read() { - pipeline().read(); - return this; - } - - @Override - public Channel flush() { - pipeline().flush(); - return this; - } - - @Override - public Future bind(SocketAddress localAddress) { - return pipeline().bind(localAddress); - } - - @Override - public Future connect(SocketAddress remoteAddress) { - return pipeline().connect(remoteAddress); - } - - @Override - public Future connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return pipeline().connect(remoteAddress, localAddress); - } - - @Override - public Future disconnect() { - return pipeline().disconnect(); - } - - @Override - public Future close() { - return pipeline().close(); - } - - @Override - public Future register() { - return pipeline().register(); - } - - @Override - public Future deregister() { - return pipeline().deregister(); - } - - @Override - public Future bind(SocketAddress localAddress, Promise promise) { - return pipeline().bind(localAddress, promise); - } - - @Override - public Future connect(SocketAddress remoteAddress, Promise promise) { - return pipeline().connect(remoteAddress, promise); - } - - @Override - public Future connect(SocketAddress remoteAddress, SocketAddress localAddress, Promise promise) { - return pipeline().connect(remoteAddress, localAddress, promise); - } - - @Override - public Future disconnect(Promise promise) { - return pipeline().disconnect(promise); - } - - @Override - public Future close(Promise promise) { - return pipeline().close(promise); - } - - @Override - public Future register(Promise promise) { - return pipeline().register(promise); - } - - @Override - public Future deregister(Promise promise) { - return pipeline().deregister(promise); - } - - @Override - public Future write(Object msg) { - return pipeline().write(msg); - } - - @Override - public Future write(Object msg, Promise promise) { - return pipeline().write(msg, promise); - } - - @Override - public Future writeAndFlush(Object msg, Promise promise) { - return pipeline().writeAndFlush(msg, promise); - } - - @Override - public Future writeAndFlush(Object msg) { - return pipeline().writeAndFlush(msg); - } - - @Override - public Promise newPromise() { - return pipeline().newPromise(); - } - - @Override - public Future newSucceededFuture() { - return pipeline().newSucceededFuture(); - } - - @Override - public Future newFailedFuture(Throwable cause) { - return pipeline().newFailedFuture(cause); - } - @Override public int hashCode() { return id().hashCode(); diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 148085362b..259b0c8f4e 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class); - static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; private final Collection selfCollection = Collections.singleton(this); + private final Future successfulVoidFuture = DefaultPromise.newSuccessfulPromise(this, null); @Override public EventExecutor next() { @@ -82,6 +82,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl @Override public Future newSucceededFuture(V result) { + if (result == null) { + @SuppressWarnings("unchecked") + Future f = (Future) successfulVoidFuture; + return f; + } return DefaultPromise.newSuccessfulPromise(this, result); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 31cdbec3e4..abae8b3f4b 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -17,7 +17,6 @@ package io.netty.channel; import static java.util.Objects.requireNonNull; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.socket.ChannelOutputShutdownEvent; import io.netty.channel.socket.ChannelOutputShutdownException; import io.netty.util.DefaultAttributeMap; @@ -54,7 +53,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final ChannelId id; private final Unsafe unsafe; private final ChannelPipeline pipeline; - private final Future succeedFuture; private final ClosePromise closePromise; private volatile SocketAddress localAddress; @@ -79,7 +77,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.eventLoop = validateEventLoop(eventLoop); closePromise = new ClosePromise(eventLoop); - succeedFuture = DefaultPromise.newSuccessfulPromise(eventLoop, null); id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); @@ -95,7 +92,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.eventLoop = validateEventLoop(eventLoop); closePromise = new ClosePromise(eventLoop); - succeedFuture = DefaultPromise.newSuccessfulPromise(eventLoop, null); this.id = id; unsafe = newUnsafe(); pipeline = newChannelPipeline(); @@ -104,6 +100,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private EventLoop validateEventLoop(EventLoop eventLoop) { return requireNonNull(eventLoop, "eventLoop"); } + protected final int maxMessagesPerWrite() { ChannelConfig config = config(); if (config instanceof DefaultChannelConfig) { @@ -137,28 +134,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return new DefaultChannelPipeline(this); } - @Override - public boolean isWritable() { - ChannelOutboundBuffer buf = unsafe.outboundBuffer(); - return buf != null && buf.isWritable(); - } - - @Override - public long bytesBeforeUnwritable() { - ChannelOutboundBuffer buf = unsafe.outboundBuffer(); - // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. - // We should be consistent with that here. - return buf != null ? buf.bytesBeforeUnwritable() : 0; - } - - @Override - public long bytesBeforeWritable() { - ChannelOutboundBuffer buf = unsafe.outboundBuffer(); - // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. - // We should be consistent with that here. - return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE; - } - @Override public Channel parent() { return parent; @@ -169,11 +144,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline; } - @Override - public ByteBufAllocator alloc() { - return config().getAllocator(); - } - @Override public EventLoop eventLoop() { return eventLoop; @@ -232,123 +202,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return registered; } - @Override - public Future bind(SocketAddress localAddress) { - return pipeline.bind(localAddress); - } - - @Override - public Future connect(SocketAddress remoteAddress) { - return pipeline.connect(remoteAddress); - } - - @Override - public Future connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return pipeline.connect(remoteAddress, localAddress); - } - - @Override - public Future disconnect() { - return pipeline.disconnect(); - } - - @Override - public Future close() { - return pipeline.close(); - } - - @Override - public Future register() { - return pipeline.register(); - } - - @Override - public Future deregister() { - return pipeline.deregister(); - } - - @Override - public Channel flush() { - pipeline.flush(); - return this; - } - - @Override - public Future bind(SocketAddress localAddress, Promise promise) { - return pipeline.bind(localAddress, promise); - } - - @Override - public Future connect(SocketAddress remoteAddress, Promise promise) { - return pipeline.connect(remoteAddress, promise); - } - - @Override - public Future connect(SocketAddress remoteAddress, SocketAddress localAddress, Promise promise) { - return pipeline.connect(remoteAddress, localAddress, promise); - } - - @Override - public Future disconnect(Promise promise) { - return pipeline.disconnect(promise); - } - - @Override - public Future close(Promise promise) { - return pipeline.close(promise); - } - - @Override - public Future register(Promise promise) { - return pipeline.register(promise); - } - - @Override - public Future deregister(Promise promise) { - return pipeline.deregister(promise); - } - - @Override - public Channel read() { - pipeline.read(); - return this; - } - - @Override - public Future write(Object msg) { - return pipeline.write(msg); - } - - @Override - public Future write(Object msg, Promise promise) { - return pipeline.write(msg, promise); - } - - @Override - public Future writeAndFlush(Object msg) { - return pipeline.writeAndFlush(msg); - } - - @Override - public Future writeAndFlush(Object msg, Promise promise) { - return pipeline.writeAndFlush(msg, promise); - } - - @Override - public Promise newPromise() { - return new DefaultPromise<>(eventLoop); - } - - @Override - public Future newSucceededFuture() { - return succeedFuture; - } - - @Override - public Future newFailedFuture(Throwable cause) { - return DefaultPromise.newFailedPromise(eventLoop, cause); - } - @Override public Future closeFuture() { return closePromise; diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 34e84b0b42..3b97e621a4 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -160,19 +160,32 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl * this method returns {@code false} are queued until the I/O thread is * ready to process the queued write requests. */ - boolean isWritable(); + default boolean isWritable() { + ChannelOutboundBuffer buf = unsafe().outboundBuffer(); + return buf != null && buf.isWritable(); + } /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. */ - long bytesBeforeUnwritable(); + default long bytesBeforeUnwritable() { + ChannelOutboundBuffer buf = unsafe().outboundBuffer(); + // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. + // We should be consistent with that here. + return buf != null ? buf.bytesBeforeUnwritable() : 0; + } /** * Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. */ - long bytesBeforeWritable(); + default long bytesBeforeWritable() { + ChannelOutboundBuffer buf = unsafe().outboundBuffer(); + // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. + // We should be consistent with that here. + return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE; + } /** * Returns an internal-use-only object that provides unsafe operations. @@ -187,13 +200,126 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl /** * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s. */ - ByteBufAllocator alloc(); + default ByteBufAllocator alloc() { + return config().getAllocator(); + } @Override - Channel read(); + default Channel read() { + pipeline().read(); + return this; + } @Override - Channel flush(); + default Future bind(SocketAddress localAddress) { + return pipeline().bind(localAddress); + } + + @Override + default Future connect(SocketAddress remoteAddress) { + return pipeline().connect(remoteAddress); + } + + @Override + default Future connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return pipeline().connect(remoteAddress, localAddress); + } + + @Override + default Future disconnect() { + return pipeline().disconnect(); + } + + @Override + default Future close() { + return pipeline().close(); + } + + @Override + default Future register() { + return pipeline().register(); + } + + @Override + default Future deregister() { + return pipeline().deregister(); + } + + @Override + default Future bind(SocketAddress localAddress, Promise promise) { + return pipeline().bind(localAddress, promise); + } + + @Override + default Future connect(SocketAddress remoteAddress, Promise promise) { + return pipeline().connect(remoteAddress, promise); + } + + @Override + default Future connect(SocketAddress remoteAddress, SocketAddress localAddress, Promise promise) { + return pipeline().connect(remoteAddress, localAddress, promise); + } + + @Override + default Future disconnect(Promise promise) { + return pipeline().disconnect(promise); + } + + @Override + default Future close(Promise promise) { + return pipeline().close(promise); + } + + @Override + default Future register(Promise promise) { + return pipeline().register(promise); + } + + @Override + default Future deregister(Promise promise) { + return pipeline().deregister(promise); + } + + @Override + default Future write(Object msg) { + return pipeline().write(msg); + } + + @Override + default Future write(Object msg, Promise promise) { + return pipeline().write(msg, promise); + } + + @Override + default Future writeAndFlush(Object msg, Promise promise) { + return pipeline().writeAndFlush(msg, promise); + } + + @Override + default Future writeAndFlush(Object msg) { + return pipeline().writeAndFlush(msg); + } + + @Override + default Channel flush() { + pipeline().flush(); + return this; + } + + @Override + default Promise newPromise() { + return eventLoop().newPromise(); + } + + @Override + default Future newSucceededFuture() { + return eventLoop().newSucceededFuture(null); + } + + @Override + default Future newFailedFuture(Throwable cause) { + return eventLoop().newFailedFuture(cause); + } /** * Unsafe operations that should never be called from user-code. These methods