Use default methods in Channel (#11608)

Motivation:

We can make things easier for implementations by providing some default methods

Modifications:

- Add default methods to Channel
- Remove code from AbstractChannel

Result:

Easier to implement custom Channel
This commit is contained in:
Norman Maurer 2021-08-24 09:34:50 +02:00 committed by GitHub
parent cde132051a
commit 11cdf1d3cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 278 deletions

View File

@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
@ -372,128 +371,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
return pipeline;
}
@Override
public ByteBufAllocator alloc() {
return config().getAllocator();
}
@Override
public Channel read() {
pipeline().read();
return this;
}
@Override
public Channel flush() {
pipeline().flush();
return this;
}
@Override
public Future<Void> bind(SocketAddress localAddress) {
return pipeline().bind(localAddress);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress) {
return pipeline().connect(remoteAddress);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline().connect(remoteAddress, localAddress);
}
@Override
public Future<Void> disconnect() {
return pipeline().disconnect();
}
@Override
public Future<Void> close() {
return pipeline().close();
}
@Override
public Future<Void> register() {
return pipeline().register();
}
@Override
public Future<Void> deregister() {
return pipeline().deregister();
}
@Override
public Future<Void> bind(SocketAddress localAddress, Promise<Void> promise) {
return pipeline().bind(localAddress, promise);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, Promise<Void> promise) {
return pipeline().connect(remoteAddress, promise);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
return pipeline().connect(remoteAddress, localAddress, promise);
}
@Override
public Future<Void> disconnect(Promise<Void> promise) {
return pipeline().disconnect(promise);
}
@Override
public Future<Void> close(Promise<Void> promise) {
return pipeline().close(promise);
}
@Override
public Future<Void> register(Promise<Void> promise) {
return pipeline().register(promise);
}
@Override
public Future<Void> deregister(Promise<Void> promise) {
return pipeline().deregister(promise);
}
@Override
public Future<Void> write(Object msg) {
return pipeline().write(msg);
}
@Override
public Future<Void> write(Object msg, Promise<Void> promise) {
return pipeline().write(msg, promise);
}
@Override
public Future<Void> writeAndFlush(Object msg, Promise<Void> promise) {
return pipeline().writeAndFlush(msg, promise);
}
@Override
public Future<Void> writeAndFlush(Object msg) {
return pipeline().writeAndFlush(msg);
}
@Override
public Promise<Void> newPromise() {
return pipeline().newPromise();
}
@Override
public Future<Void> newSucceededFuture() {
return pipeline().newSucceededFuture();
}
@Override
public Future<Void> newFailedFuture(Throwable cause) {
return pipeline().newFailedFuture(cause);
}
@Override
public int hashCode() {
return id().hashCode();

View File

@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
private final Collection<EventExecutor> selfCollection = Collections.singleton(this);
private final Future<?> successfulVoidFuture = DefaultPromise.newSuccessfulPromise(this, null);
@Override
public EventExecutor next() {
@ -82,6 +82,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
@Override
public <V> Future<V> newSucceededFuture(V result) {
if (result == null) {
@SuppressWarnings("unchecked")
Future<V> f = (Future<V>) successfulVoidFuture;
return f;
}
return DefaultPromise.newSuccessfulPromise(this, result);
}

View File

@ -17,7 +17,6 @@ package io.netty.channel;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.ChannelOutputShutdownEvent;
import io.netty.channel.socket.ChannelOutputShutdownException;
import io.netty.util.DefaultAttributeMap;
@ -54,7 +53,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final ChannelId id;
private final Unsafe unsafe;
private final ChannelPipeline pipeline;
private final Future<Void> succeedFuture;
private final ClosePromise closePromise;
private volatile SocketAddress localAddress;
@ -79,7 +77,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.eventLoop = validateEventLoop(eventLoop);
closePromise = new ClosePromise(eventLoop);
succeedFuture = DefaultPromise.newSuccessfulPromise(eventLoop, null);
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
@ -95,7 +92,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.eventLoop = validateEventLoop(eventLoop);
closePromise = new ClosePromise(eventLoop);
succeedFuture = DefaultPromise.newSuccessfulPromise(eventLoop, null);
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
@ -104,6 +100,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private EventLoop validateEventLoop(EventLoop eventLoop) {
return requireNonNull(eventLoop, "eventLoop");
}
protected final int maxMessagesPerWrite() {
ChannelConfig config = config();
if (config instanceof DefaultChannelConfig) {
@ -137,28 +134,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return new DefaultChannelPipeline(this);
}
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable();
}
@Override
public long bytesBeforeUnwritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return buf != null ? buf.bytesBeforeUnwritable() : 0;
}
@Override
public long bytesBeforeWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
}
@Override
public Channel parent() {
return parent;
@ -169,11 +144,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline;
}
@Override
public ByteBufAllocator alloc() {
return config().getAllocator();
}
@Override
public EventLoop eventLoop() {
return eventLoop;
@ -232,123 +202,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return registered;
}
@Override
public Future<Void> bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public Future<Void> disconnect() {
return pipeline.disconnect();
}
@Override
public Future<Void> close() {
return pipeline.close();
}
@Override
public Future<Void> register() {
return pipeline.register();
}
@Override
public Future<Void> deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public Future<Void> bind(SocketAddress localAddress, Promise<Void> promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, Promise<Void> promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public Future<Void> disconnect(Promise<Void> promise) {
return pipeline.disconnect(promise);
}
@Override
public Future<Void> close(Promise<Void> promise) {
return pipeline.close(promise);
}
@Override
public Future<Void> register(Promise<Void> promise) {
return pipeline.register(promise);
}
@Override
public Future<Void> deregister(Promise<Void> promise) {
return pipeline.deregister(promise);
}
@Override
public Channel read() {
pipeline.read();
return this;
}
@Override
public Future<Void> write(Object msg) {
return pipeline.write(msg);
}
@Override
public Future<Void> write(Object msg, Promise<Void> promise) {
return pipeline.write(msg, promise);
}
@Override
public Future<Void> writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
@Override
public Future<Void> writeAndFlush(Object msg, Promise<Void> promise) {
return pipeline.writeAndFlush(msg, promise);
}
@Override
public Promise<Void> newPromise() {
return new DefaultPromise<>(eventLoop);
}
@Override
public Future<Void> newSucceededFuture() {
return succeedFuture;
}
@Override
public Future<Void> newFailedFuture(Throwable cause) {
return DefaultPromise.newFailedPromise(eventLoop, cause);
}
@Override
public Future<Void> closeFuture() {
return closePromise;

View File

@ -160,19 +160,32 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl
* this method returns {@code false} are queued until the I/O thread is
* ready to process the queued write requests.
*/
boolean isWritable();
default boolean isWritable() {
ChannelOutboundBuffer buf = unsafe().outboundBuffer();
return buf != null && buf.isWritable();
}
/**
* Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
*/
long bytesBeforeUnwritable();
default long bytesBeforeUnwritable() {
ChannelOutboundBuffer buf = unsafe().outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return buf != null ? buf.bytesBeforeUnwritable() : 0;
}
/**
* Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
*/
long bytesBeforeWritable();
default long bytesBeforeWritable() {
ChannelOutboundBuffer buf = unsafe().outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
}
/**
* Returns an <em>internal-use-only</em> object that provides unsafe operations.
@ -187,13 +200,126 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl
/**
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
*/
ByteBufAllocator alloc();
default ByteBufAllocator alloc() {
return config().getAllocator();
}
@Override
Channel read();
default Channel read() {
pipeline().read();
return this;
}
@Override
Channel flush();
default Future<Void> bind(SocketAddress localAddress) {
return pipeline().bind(localAddress);
}
@Override
default Future<Void> connect(SocketAddress remoteAddress) {
return pipeline().connect(remoteAddress);
}
@Override
default Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline().connect(remoteAddress, localAddress);
}
@Override
default Future<Void> disconnect() {
return pipeline().disconnect();
}
@Override
default Future<Void> close() {
return pipeline().close();
}
@Override
default Future<Void> register() {
return pipeline().register();
}
@Override
default Future<Void> deregister() {
return pipeline().deregister();
}
@Override
default Future<Void> bind(SocketAddress localAddress, Promise<Void> promise) {
return pipeline().bind(localAddress, promise);
}
@Override
default Future<Void> connect(SocketAddress remoteAddress, Promise<Void> promise) {
return pipeline().connect(remoteAddress, promise);
}
@Override
default Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
return pipeline().connect(remoteAddress, localAddress, promise);
}
@Override
default Future<Void> disconnect(Promise<Void> promise) {
return pipeline().disconnect(promise);
}
@Override
default Future<Void> close(Promise<Void> promise) {
return pipeline().close(promise);
}
@Override
default Future<Void> register(Promise<Void> promise) {
return pipeline().register(promise);
}
@Override
default Future<Void> deregister(Promise<Void> promise) {
return pipeline().deregister(promise);
}
@Override
default Future<Void> write(Object msg) {
return pipeline().write(msg);
}
@Override
default Future<Void> write(Object msg, Promise<Void> promise) {
return pipeline().write(msg, promise);
}
@Override
default Future<Void> writeAndFlush(Object msg, Promise<Void> promise) {
return pipeline().writeAndFlush(msg, promise);
}
@Override
default Future<Void> writeAndFlush(Object msg) {
return pipeline().writeAndFlush(msg);
}
@Override
default Channel flush() {
pipeline().flush();
return this;
}
@Override
default Promise<Void> newPromise() {
return eventLoop().newPromise();
}
@Override
default Future<Void> newSucceededFuture() {
return eventLoop().newSucceededFuture(null);
}
@Override
default Future<Void> newFailedFuture(Throwable cause) {
return eventLoop().newFailedFuture(cause);
}
/**
* <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods