From ec18aa87318d88684dd48494571c1840bcee935d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 26 Feb 2021 15:03:58 +0100 Subject: [PATCH] Introduce ByteBufConvertible interface (#11036) Motivation: To make it possible to experiment with alternative buffer implementations, we need a way to abstract away the concrete buffers used throughout most of the Netty pipelines, while still having a common currency for doing IO in the end. Modification: - Introduce an ByteBufConvertible interface, that allow arbitrary objects to convert themselves into ByteBuf objects. - Every place in the code, where we did an instanceof check for ByteBuf, we now do an instanceof check for ByteBufConvertible. - ByteBuf itself implements ByteBufConvertible, and returns itself from the asByteBuf method. Result: It is now possible to use Netty with alternative buffer implementations, as long as they can be converted to ByteBuf. This has been verified elsewhere, with an alternative buffer implementation. --- .../main/java/io/netty/buffer/ByteBuf.java | 11 ++++++- .../io/netty/buffer/ByteBufConvertible.java | 32 +++++++++++++++++++ .../handler/codec/http/HttpObjectEncoder.java | 17 +++++----- .../AbstractMemcacheObjectEncoder.java | 13 ++++---- .../handler/codec/ByteToMessageDecoder.java | 5 +-- .../handler/codec/DatagramPacketEncoder.java | 6 ++-- .../netty/handler/logging/LoggingHandler.java | 5 +-- .../netty/handler/pcap/PcapWriteHandler.java | 17 +++++----- .../java/io/netty/handler/ssl/SslHandler.java | 5 +-- .../AbstractTrafficShapingHandler.java | 5 +-- .../traffic/ChannelTrafficShapingHandler.java | 6 ++-- .../GlobalChannelTrafficShapingHandler.java | 6 ++-- .../traffic/GlobalTrafficShapingHandler.java | 6 ++-- .../epoll/AbstractEpollStreamChannel.java | 11 ++++--- .../channel/epoll/EpollDatagramChannel.java | 11 ++++--- .../channel/epoll/EpollSocketChannel.java | 5 +-- .../epoll/NativeDatagramPacketArray.java | 5 +-- .../kqueue/AbstractKQueueStreamChannel.java | 11 ++++--- .../channel/kqueue/KQueueDatagramChannel.java | 13 ++++---- .../java/io/netty/channel/unix/IovArray.java | 5 +-- .../AbstractCoalescingBufferQueue.java | 13 ++++---- .../netty/channel/ChannelOutboundBuffer.java | 13 ++++---- .../channel/DefaultMessageSizeEstimator.java | 5 +-- .../channel/group/DefaultChannelGroup.java | 6 ++-- .../channel/nio/AbstractNioByteChannel.java | 9 +++--- .../socket/nio/NioDatagramChannel.java | 11 ++++--- 26 files changed, 156 insertions(+), 96 deletions(-) create mode 100644 buffer/src/main/java/io/netty/buffer/ByteBufConvertible.java diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index df77ac1728..04a6a63836 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -236,7 +236,7 @@ import java.nio.charset.UnsupportedCharsetException; * Please refer to {@link ByteBufInputStream} and * {@link ByteBufOutputStream}. */ -public abstract class ByteBuf implements ReferenceCounted, Comparable { +public abstract class ByteBuf implements ReferenceCounted, Comparable, ByteBufConvertible { /** * Returns the number of bytes (octets) this buffer can contain. @@ -2350,6 +2350,15 @@ public abstract class ByteBuf implements ReferenceCounted, Comparable { return false; } + /** + * A {@code ByteBuf} can turn into itself. + * @return This {@code ByteBuf} instance. + */ + @Override + public final ByteBuf asByteBuf() { + return this; + } + /** * Decodes this buffer's readable bytes into a string with the specified * character set name. This method is identical to diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufConvertible.java b/buffer/src/main/java/io/netty/buffer/ByteBufConvertible.java new file mode 100644 index 0000000000..b2559f9c3b --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/ByteBufConvertible.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * An interface that can be implemented by any object that know how to turn itself into a {@link ByteBuf}. + * All {@link ByteBuf} classes implement this interface, and return themselves. + */ +public interface ByteBufConvertible { + /** + * Turn this object into a {@link ByteBuf}. + * This does not increment the reference count of the {@link ByteBuf} instance. + * The conversion or exposure of the {@link ByteBuf} must be idempotent, so that this method can be called + * either once, or multiple times, without causing any change in program behaviour. + * + * @return A {@link ByteBuf} instance from this object. + */ + ByteBuf asByteBuf(); +} diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java index bf51aad686..75a89ffc4c 100755 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectEncoder.java @@ -15,6 +15,7 @@ */ package io.netty.handler.codec.http; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -110,15 +111,15 @@ public abstract class HttpObjectEncoder extends MessageTo // ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); // // See https://github.com/netty/netty/issues/2983 for more information. - if (msg instanceof ByteBuf) { - final ByteBuf potentialEmptyBuf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + final ByteBuf potentialEmptyBuf = ((ByteBufConvertible) msg).asByteBuf(); if (!potentialEmptyBuf.isReadable()) { out.add(potentialEmptyBuf.retain()); return; } } - if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) { + if (msg instanceof HttpContent || msg instanceof ByteBufConvertible || msg instanceof FileRegion) { switch (state) { case ST_INIT: throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg) @@ -244,12 +245,12 @@ public abstract class HttpObjectEncoder extends MessageTo @Override public boolean acceptOutboundMessage(Object msg) throws Exception { - return msg instanceof HttpObject || msg instanceof ByteBuf || msg instanceof FileRegion; + return msg instanceof HttpObject || msg instanceof ByteBufConvertible || msg instanceof FileRegion; } private static Object encodeAndRetain(Object msg) { - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).retain(); + if (msg instanceof ByteBufConvertible) { + return ((ByteBufConvertible) msg).asByteBuf().retain(); } if (msg instanceof HttpContent) { return ((HttpContent) msg).content().retain(); @@ -264,8 +265,8 @@ public abstract class HttpObjectEncoder extends MessageTo if (msg instanceof HttpContent) { return ((HttpContent) msg).content().readableBytes(); } - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).readableBytes(); + if (msg instanceof ByteBufConvertible) { + return ((ByteBufConvertible) msg).asByteBuf().readableBytes(); } if (msg instanceof FileRegion) { return ((FileRegion) msg).count(); diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectEncoder.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectEncoder.java index 0cded95494..5f7659790a 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectEncoder.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/AbstractMemcacheObjectEncoder.java @@ -15,6 +15,7 @@ */ package io.netty.handler.codec.memcache; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -49,7 +50,7 @@ public abstract class AbstractMemcacheObjectEncoder e out.add(encodeMessage(ctx, m)); } - if (msg instanceof MemcacheContent || msg instanceof ByteBuf || msg instanceof FileRegion) { + if (msg instanceof MemcacheContent || msg instanceof ByteBufConvertible || msg instanceof FileRegion) { int contentLength = contentLength(msg); if (contentLength > 0) { out.add(encodeAndRetain(msg)); @@ -63,7 +64,7 @@ public abstract class AbstractMemcacheObjectEncoder e @Override public boolean acceptOutboundMessage(Object msg) throws Exception { - return msg instanceof MemcacheObject || msg instanceof ByteBuf || msg instanceof FileRegion; + return msg instanceof MemcacheObject || msg instanceof ByteBufConvertible || msg instanceof FileRegion; } /** @@ -85,8 +86,8 @@ public abstract class AbstractMemcacheObjectEncoder e if (msg instanceof MemcacheContent) { return ((MemcacheContent) msg).content().readableBytes(); } - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).readableBytes(); + if (msg instanceof ByteBufConvertible) { + return ((ByteBufConvertible) msg).asByteBuf().readableBytes(); } if (msg instanceof FileRegion) { return (int) ((FileRegion) msg).count(); @@ -101,8 +102,8 @@ public abstract class AbstractMemcacheObjectEncoder e * @return the encoded object. */ private static Object encodeAndRetain(Object msg) { - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).retain(); + if (msg instanceof ByteBufConvertible) { + return ((ByteBufConvertible) msg).asByteBuf().retain(); } if (msg instanceof MemcacheContent) { return ((MemcacheContent) msg).content().retain(); diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index fd0d0202bf..9e8dbcb68a 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -18,6 +18,7 @@ package io.netty.handler.codec; import static io.netty.util.internal.ObjectUtil.checkPositive; import static java.util.Objects.requireNonNull; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -268,9 +269,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof ByteBuf) { + if (msg instanceof ByteBufConvertible) { try { - ByteBuf data = (ByteBuf) msg; + ByteBuf data = ((ByteBufConvertible) msg).asByteBuf(); first = cumulation == null; if (first) { cumulation = data; diff --git a/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java b/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java index b72f23203d..4772d95d51 100644 --- a/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/DatagramPacketEncoder.java @@ -17,7 +17,7 @@ package io.netty.handler.codec; import static java.util.Objects.requireNonNull; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufConvertible; import io.netty.channel.AddressedEnvelope; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -81,9 +81,9 @@ public class DatagramPacketEncoder extends MessageToMessageEncoder 1 && in.current() instanceof ByteBuf) { + if (msgCount > 1 && in.current() instanceof ByteBufConvertible) { writeSpinCount -= doWriteMultiple(in); } else if (msgCount == 0) { // Wrote all messages. @@ -337,8 +338,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception { // The outbound buffer contains only one message or it contains a file region. Object msg = in.current(); - if (msg instanceof ByteBuf) { - return writeBytes(in, (ByteBuf) msg); + if (msg instanceof ByteBufConvertible) { + return writeBytes(in, ((ByteBufConvertible) msg).asByteBuf()); } else if (msg instanceof DefaultFileRegion) { return writeDefaultFileRegion(in, (DefaultFileRegion) msg); } else if (msg instanceof FileRegion) { @@ -380,8 +381,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im @Override protected Object filterOutboundMessage(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index fd9bfe6408..2f5680bb15 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; @@ -357,7 +358,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements data = envelope.content(); remoteAddress = envelope.recipient(); } else { - data = (ByteBuf) msg; + data = ((ByteBufConvertible) msg).asByteBuf(); remoteAddress = null; } @@ -378,18 +379,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg; } - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf; } if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope e = (AddressedEnvelope) msg; - if (e.content() instanceof ByteBuf && + if (e.content() instanceof ByteBufConvertible && (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) { - ByteBuf content = (ByteBuf) e.content(); + ByteBuf content = ((ByteBufConvertible) e.content()).asByteBuf(); return UnixChannelUtil.isBufferCopyNeededForWrite(content)? new DefaultAddressedEnvelope<>( newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 6b91d6c126..682873f8c2 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -121,8 +122,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme ChannelOutboundBuffer outbound = unsafe().outboundBuffer(); outbound.addFlush(); Object curr; - if ((curr = outbound.current()) instanceof ByteBuf) { - ByteBuf initialData = (ByteBuf) curr; + if ((curr = outbound.current()) instanceof ByteBufConvertible) { + ByteBuf initialData = ((ByteBufConvertible) curr).asByteBuf(); // If no cookie is present, the write fails with EINPROGRESS and this call basically // becomes a normal async connect. All writes will be sent normally afterwards. long localFlushedAmount = doWriteOrSendBytes( diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index 4efffe6e10..05a58b0c13 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -15,6 +15,7 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; @@ -117,8 +118,8 @@ final class NativeDatagramPacketArray { ByteBuf buf = packet.content(); return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient()); } - if (msg instanceof ByteBuf && connected) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible && connected) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); return add0(buf, buf.readerIndex(), buf.readableBytes(), null); } return false; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index cecda7f779..0f858147a1 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.kqueue; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -270,7 +271,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel do { final int msgCount = in.size(); // Do gathering write if the outbound buffer entries start with more than one ByteBuf. - if (msgCount > 1 && in.current() instanceof ByteBuf) { + if (msgCount > 1 && in.current() instanceof ByteBufConvertible) { writeSpinCount -= doWriteMultiple(in); } else if (msgCount == 0) { // Wrote all messages. @@ -319,8 +320,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception { // The outbound buffer contains only one message or it contains a file region. Object msg = in.current(); - if (msg instanceof ByteBuf) { - return writeBytes(in, (ByteBuf) msg); + if (msg instanceof ByteBufConvertible) { + return writeBytes(in, ((ByteBufConvertible) msg).asByteBuf()); } else if (msg instanceof DefaultFileRegion) { return writeDefaultFileRegion(in, (DefaultFileRegion) msg); } else if (msg instanceof FileRegion) { @@ -362,8 +363,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @Override protected Object filterOutboundMessage(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 5f5dadeacf..be7c154ab0 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.kqueue; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.AddressedEnvelope; @@ -288,7 +289,7 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement data = envelope.content(); remoteAddress = envelope.recipient(); } else { - data = (ByteBuf) msg; + data = ((ByteBufConvertible) msg).asByteBuf(); remoteAddress = null; } @@ -340,18 +341,18 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg; } - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf; } if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope e = (AddressedEnvelope) msg; - if (e.content() instanceof ByteBuf && - (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) { + if (e.content() instanceof ByteBufConvertible && + (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) { - ByteBuf content = (ByteBuf) e.content(); + ByteBuf content = ((ByteBufConvertible) e.content()).asByteBuf(); return UnixChannelUtil.isBufferCopyNeededForWrite(content)? new DefaultAddressedEnvelope<>( newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e; diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/IovArray.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/IovArray.java index 6071cccc9f..f4fdf8ab7a 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/IovArray.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/IovArray.java @@ -15,6 +15,7 @@ */ package io.netty.channel.unix; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; @@ -224,8 +225,8 @@ public final class IovArray implements MessageProcessor { @Override public boolean processMessage(Object msg) throws Exception { - if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buffer = ((ByteBufConvertible) msg).asByteBuf(); return add(buffer, buffer.readerIndex(), buffer.readableBytes()); } return false; diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java index 5d8f64f710..ac439fbc30 100644 --- a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -14,6 +14,7 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -109,8 +110,8 @@ public abstract class AbstractCoalescingBufferQueue { if (entry == null) { return null; } - assert entry instanceof ByteBuf; - ByteBuf result = (ByteBuf) entry; + assert entry instanceof ByteBufConvertible; + ByteBuf result = ((ByteBufConvertible) entry).asByteBuf(); decrementReadableBytes(result.readableBytes()); @@ -234,12 +235,12 @@ public abstract class AbstractCoalescingBufferQueue { break; } - if (entry instanceof ByteBuf) { + if (entry instanceof ByteBufConvertible) { if (previousBuf != null) { decrementReadableBytes(previousBuf.readableBytes()); ctx.write(previousBuf, ctx.voidPromise()); } - previousBuf = (ByteBuf) entry; + previousBuf = ((ByteBufConvertible) entry).asByteBuf(); } else if (entry instanceof ChannelPromise) { decrementReadableBytes(previousBuf.readableBytes()); ctx.write(previousBuf, (ChannelPromise) entry); @@ -336,8 +337,8 @@ public abstract class AbstractCoalescingBufferQueue { break; } try { - if (entry instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) entry; + if (entry instanceof ByteBufConvertible) { + ByteBuf buffer = ((ByteBufConvertible) entry).asByteBuf(); decrementReadableBytes(buffer.readableBytes()); safeRelease(buffer); } else { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index b3720b4060..3c7afcce36 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; @@ -197,8 +198,8 @@ public final class ChannelOutboundBuffer { } private static long total(Object msg) { - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).readableBytes(); + if (msg instanceof ByteBufConvertible) { + return ((ByteBufConvertible) msg).asByteBuf().readableBytes(); } if (msg instanceof FileRegion) { return ((FileRegion) msg).count(); @@ -334,12 +335,12 @@ public final class ChannelOutboundBuffer { public void removeBytes(long writtenBytes) { for (;;) { Object msg = current(); - if (!(msg instanceof ByteBuf)) { + if (!(msg instanceof ByteBufConvertible)) { assert writtenBytes == 0; break; } - final ByteBuf buf = (ByteBuf) msg; + final ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; @@ -406,9 +407,9 @@ public final class ChannelOutboundBuffer { final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; - while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { + while (isFlushedEntry(entry) && entry.msg instanceof ByteBufConvertible) { if (!entry.cancelled) { - ByteBuf buf = (ByteBuf) entry.msg; + ByteBuf buf = ((ByteBufConvertible) entry.msg).asByteBuf(); final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; diff --git a/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java b/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java index ef4d542811..c18b5ea9f1 100644 --- a/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java +++ b/transport/src/main/java/io/netty/channel/DefaultMessageSizeEstimator.java @@ -17,6 +17,7 @@ package io.netty.channel; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; @@ -35,8 +36,8 @@ public final class DefaultMessageSizeEstimator implements MessageSizeEstimator { @Override public int size(Object msg) { - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).readableBytes(); + if (msg instanceof ByteBufConvertible) { + return ((ByteBufConvertible) msg).asByteBuf().readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index b9eff8f1b9..e95c3a0076 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -17,7 +17,7 @@ package io.netty.channel.group; import static java.util.Objects.requireNonNull; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -235,8 +235,8 @@ public class DefaultChannelGroup extends AbstractSet implements Channel // Create a safe duplicate of the message to write it to a channel but not affect other writes. // See https://github.com/netty/netty/issues/1461 private static Object safeDuplicate(Object message) { - if (message instanceof ByteBuf) { - return ((ByteBuf) message).retainedDuplicate(); + if (message instanceof ByteBufConvertible) { + return ((ByteBufConvertible) message).asByteBuf().retainedDuplicate(); } else if (message instanceof ByteBufHolder) { return ((ByteBufHolder) message).retainedDuplicate(); } else { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 820847997c..276595b3be 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.nio; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -216,8 +217,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); if (!buf.isReadable()) { in.remove(); return 0; @@ -272,8 +273,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override protected final Object filterOutboundMessage(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); if (buf.isDirect()) { return msg; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 07fc2aae73..f7848bbf54 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.socket.nio; +import io.netty.buffer.ByteBufConvertible; import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; @@ -268,7 +269,7 @@ public final class NioDatagramChannel remoteAddress = envelope.recipient(); data = envelope.content(); } else { - data = (ByteBuf) msg; + data = ((ByteBufConvertible) msg).asByteBuf(); remoteAddress = null; } @@ -299,8 +300,8 @@ public final class NioDatagramChannel return new DatagramPacket(newDirectBuffer(p, content), p.recipient()); } - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; + if (msg instanceof ByteBufConvertible) { + ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf(); if (isSingleDirectBuffer(buf)) { return buf; } @@ -310,8 +311,8 @@ public final class NioDatagramChannel if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope e = (AddressedEnvelope) msg; - if (e.content() instanceof ByteBuf) { - ByteBuf content = (ByteBuf) e.content(); + if (e.content() instanceof ByteBufConvertible) { + ByteBuf content = ((ByteBufConvertible) e.content()).asByteBuf(); if (isSingleDirectBuffer(content)) { return e; }