Introduce ByteBufConvertible interface (#11036)

Motivation:
To make it possible to experiment with alternative buffer implementations, we need a way to abstract away the concrete buffers used throughout most of the Netty pipelines, while still having a common currency for doing IO in the end.

Modification:
- Introduce an ByteBufConvertible interface, that allow arbitrary objects to convert themselves into ByteBuf objects.
- Every place in the code, where we did an instanceof check for ByteBuf, we now do an instanceof check for ByteBufConvertible.
- ByteBuf itself implements ByteBufConvertible, and returns itself from the asByteBuf method.

Result:
It is now possible to use Netty with alternative buffer implementations, as long as they can be converted to ByteBuf.
This has been verified elsewhere, with an alternative buffer implementation.
This commit is contained in:
Chris Vest 2021-02-26 15:03:58 +01:00 committed by GitHub
parent d421ae10d7
commit ec18aa8731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 156 additions and 96 deletions

View File

@ -236,7 +236,7 @@ import java.nio.charset.UnsupportedCharsetException;
* Please refer to {@link ByteBufInputStream} and
* {@link ByteBufOutputStream}.
*/
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>, ByteBufConvertible {
/**
* Returns the number of bytes (octets) this buffer can contain.
@ -2350,6 +2350,15 @@ public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
return false;
}
/**
* A {@code ByteBuf} can turn into itself.
* @return This {@code ByteBuf} instance.
*/
@Override
public final ByteBuf asByteBuf() {
return this;
}
/**
* Decodes this buffer's readable bytes into a string with the specified
* character set name. This method is identical to

View File

@ -0,0 +1,32 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
/**
* An interface that can be implemented by any object that know how to turn itself into a {@link ByteBuf}.
* All {@link ByteBuf} classes implement this interface, and return themselves.
*/
public interface ByteBufConvertible {
/**
* Turn this object into a {@link ByteBuf}.
* This does <strong>not</strong> increment the reference count of the {@link ByteBuf} instance.
* The conversion or exposure of the {@link ByteBuf} must be idempotent, so that this method can be called
* either once, or multiple times, without causing any change in program behaviour.
*
* @return A {@link ByteBuf} instance from this object.
*/
ByteBuf asByteBuf();
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@ -110,15 +111,15 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
// ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//
// See https://github.com/netty/netty/issues/2983 for more information.
if (msg instanceof ByteBuf) {
final ByteBuf potentialEmptyBuf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
final ByteBuf potentialEmptyBuf = ((ByteBufConvertible) msg).asByteBuf();
if (!potentialEmptyBuf.isReadable()) {
out.add(potentialEmptyBuf.retain());
return;
}
}
if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
if (msg instanceof HttpContent || msg instanceof ByteBufConvertible || msg instanceof FileRegion) {
switch (state) {
case ST_INIT:
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
@ -244,12 +245,12 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof HttpObject || msg instanceof ByteBuf || msg instanceof FileRegion;
return msg instanceof HttpObject || msg instanceof ByteBufConvertible || msg instanceof FileRegion;
}
private static Object encodeAndRetain(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).retain();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().retain();
}
if (msg instanceof HttpContent) {
return ((HttpContent) msg).content().retain();
@ -264,8 +265,8 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
if (msg instanceof HttpContent) {
return ((HttpContent) msg).content().readableBytes();
}
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.memcache;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -49,7 +50,7 @@ public abstract class AbstractMemcacheObjectEncoder<M extends MemcacheMessage> e
out.add(encodeMessage(ctx, m));
}
if (msg instanceof MemcacheContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
if (msg instanceof MemcacheContent || msg instanceof ByteBufConvertible || msg instanceof FileRegion) {
int contentLength = contentLength(msg);
if (contentLength > 0) {
out.add(encodeAndRetain(msg));
@ -63,7 +64,7 @@ public abstract class AbstractMemcacheObjectEncoder<M extends MemcacheMessage> e
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof MemcacheObject || msg instanceof ByteBuf || msg instanceof FileRegion;
return msg instanceof MemcacheObject || msg instanceof ByteBufConvertible || msg instanceof FileRegion;
}
/**
@ -85,8 +86,8 @@ public abstract class AbstractMemcacheObjectEncoder<M extends MemcacheMessage> e
if (msg instanceof MemcacheContent) {
return ((MemcacheContent) msg).content().readableBytes();
}
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().readableBytes();
}
if (msg instanceof FileRegion) {
return (int) ((FileRegion) msg).count();
@ -101,8 +102,8 @@ public abstract class AbstractMemcacheObjectEncoder<M extends MemcacheMessage> e
* @return the encoded object.
*/
private static Object encodeAndRetain(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).retain();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().retain();
}
if (msg instanceof MemcacheContent) {
return ((MemcacheContent) msg).content().retain();

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
@ -268,9 +269,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
if (msg instanceof ByteBufConvertible) {
try {
ByteBuf data = (ByteBuf) msg;
ByteBuf data = ((ByteBufConvertible) msg).asByteBuf();
first = cumulation == null;
if (first) {
cumulation = data;

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufConvertible;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -81,9 +81,9 @@ public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedE
StringUtil.simpleClassName(encoder) + " must produce only one message.");
}
Object content = out.get(0);
if (content instanceof ByteBuf) {
if (content instanceof ByteBufConvertible) {
// Replace the ByteBuf with a DatagramPacket.
out.set(0, new DatagramPacket((ByteBuf) content, msg.recipient(), msg.sender()));
out.set(0, new DatagramPacket(((ByteBufConvertible) content).asByteBuf(), msg.recipient(), msg.sender()));
} else {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only ByteBuf.");

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.logging;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelHandler;
@ -314,8 +315,8 @@ public class LoggingHandler implements ChannelHandler {
* @param arg the argument of the event
*/
protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {
if (arg instanceof ByteBuf) {
return formatByteBuf(ctx, eventName, (ByteBuf) arg);
if (arg instanceof ByteBufConvertible) {
return formatByteBuf(ctx, eventName, ((ByteBufConvertible) arg).asByteBuf());
} else if (arg instanceof ByteBufHolder) {
return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg);
} else {

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.pcap;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
@ -163,7 +164,7 @@ public final class PcapWriteHandler extends ChannelDuplexHandler implements Clos
ByteBuf byteBuf = byteBufAllocator.buffer();
try {
this.pCapWriter = new PcapWriter(this.outputStream, byteBuf);
pCapWriter = new PcapWriter(outputStream, byteBuf);
} catch (IOException ex) {
ctx.channel().close();
ctx.fireExceptionCaught(ex);
@ -172,7 +173,7 @@ public final class PcapWriteHandler extends ChannelDuplexHandler implements Clos
byteBuf.release();
}
} else {
this.pCapWriter = new PcapWriter(this.outputStream);
pCapWriter = new PcapWriter(outputStream);
}
// If Channel belongs to `SocketChannel` then we're handling TCP.
@ -261,16 +262,16 @@ public final class PcapWriteHandler extends ChannelDuplexHandler implements Clos
* else set {@code false}
*/
private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
if (msg instanceof ByteBuf) {
if (msg instanceof ByteBufConvertible) {
// If bytes are 0 and `captureZeroByte` is false, we won't capture this.
if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
if (((ByteBufConvertible) msg).asByteBuf().readableBytes() == 0 && !captureZeroByte) {
logger.debug("Discarding Zero Byte TCP Packet. isWriteOperation {}", isWriteOperation);
return;
}
ByteBufAllocator byteBufAllocator = ctx.alloc();
ByteBuf packet = ((ByteBuf) msg).duplicate();
ByteBuf packet = ((ByteBufConvertible) msg).asByteBuf().duplicate();
ByteBuf tcpBuf = byteBufAllocator.buffer();
int bytes = packet.readableBytes();
@ -408,15 +409,15 @@ public final class PcapWriteHandler extends ChannelDuplexHandler implements Clos
UDPPacket.writePacket(udpBuf, datagramPacket.content(), srcAddr.getPort(), dstAddr.getPort());
completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
} else if (msg instanceof ByteBuf && ((DatagramChannel) ctx.channel()).isConnected()) {
} else if (msg instanceof ByteBufConvertible && ((DatagramChannel) ctx.channel()).isConnected()) {
// If bytes are 0 and `captureZeroByte` is false, we won't capture this.
if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
if (((ByteBufConvertible) msg).asByteBuf().readableBytes() == 0 && !captureZeroByte) {
logger.debug("Discarding Zero Byte UDP Packet");
return;
}
ByteBuf byteBuf = ((ByteBuf) msg).duplicate();
ByteBuf byteBuf = ((ByteBufConvertible) msg).asByteBuf().duplicate();
logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
byteBuf.readableBytes(), srcAddr, dstAddr);

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.ssl;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
@ -760,7 +761,7 @@ public class SslHandler extends ByteToMessageDecoder {
@Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (!(msg instanceof ByteBuf)) {
if (!(msg instanceof ByteBufConvertible)) {
UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
ReferenceCountUtil.safeRelease(msg);
promise.setFailure(exception);
@ -768,7 +769,7 @@ public class SslHandler extends ByteToMessageDecoder {
ReferenceCountUtil.safeRelease(msg);
promise.setFailure(newPendingWritesNullException());
} else {
pendingUnencryptedWrites.add((ByteBuf) msg, promise);
pendingUnencryptedWrites.add(((ByteBufConvertible) msg).asByteBuf(), promise);
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.traffic;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
@ -650,8 +651,8 @@ public abstract class AbstractTrafficShapingHandler implements ChannelHandler {
* @return size the size of the msg or {@code -1} if unknown.
*/
protected long calculateSize(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();

View File

@ -15,7 +15,7 @@
*/
package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufConvertible;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@ -150,8 +150,8 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
}
} else {
for (ToSend toSend : messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
if (toSend.toSend instanceof ByteBufConvertible) {
((ByteBufConvertible) toSend.toSend).asByteBuf().release();
}
}
}

View File

@ -15,7 +15,7 @@
*/
package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufConvertible;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelConfig;
@ -501,8 +501,8 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
} else {
queuesSize.addAndGet(-perChannel.queueSize);
for (ToSend toSend : perChannel.messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
if (toSend.toSend instanceof ByteBufConvertible) {
((ByteBufConvertible) toSend.toSend).asByteBuf().release();
}
}
}

View File

@ -17,7 +17,7 @@ package io.netty.handler.traffic;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufConvertible;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -277,8 +277,8 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
} else {
queuesSize.addAndGet(-perChannel.queueSize);
for (ToSend toSend : perChannel.messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
if (toSend.toSend instanceof ByteBufConvertible) {
((ByteBufConvertible) toSend.toSend).asByteBuf().release();
}
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@ -288,7 +289,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (msgCount > 1 && in.current() instanceof ByteBufConvertible) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
@ -337,8 +338,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
if (msg instanceof ByteBufConvertible) {
return writeBytes(in, ((ByteBufConvertible) msg).asByteBuf());
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
@ -380,8 +381,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
@ -357,7 +358,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
data = envelope.content();
remoteAddress = envelope.recipient();
} else {
data = (ByteBuf) msg;
data = ((ByteBufConvertible) msg).asByteBuf();
remoteAddress = null;
}
@ -378,18 +379,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf &&
if (e.content() instanceof ByteBufConvertible &&
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
ByteBuf content = (ByteBuf) e.content();
ByteBuf content = ((ByteBufConvertible) e.content()).asByteBuf();
return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
new DefaultAddressedEnvelope<>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@ -121,8 +122,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
outbound.addFlush();
Object curr;
if ((curr = outbound.current()) instanceof ByteBuf) {
ByteBuf initialData = (ByteBuf) curr;
if ((curr = outbound.current()) instanceof ByteBufConvertible) {
ByteBuf initialData = ((ByteBufConvertible) curr).asByteBuf();
// If no cookie is present, the write fails with EINPROGRESS and this call basically
// becomes a normal async connect. All writes will be sent normally afterwards.
long localFlushedAmount = doWriteOrSendBytes(

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
@ -117,8 +118,8 @@ final class NativeDatagramPacketArray {
ByteBuf buf = packet.content();
return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient());
}
if (msg instanceof ByteBuf && connected) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible && connected) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
return add0(buf, buf.readerIndex(), buf.readableBytes(), null);
}
return false;

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.kqueue;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@ -270,7 +271,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (msgCount > 1 && in.current() instanceof ByteBufConvertible) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
@ -319,8 +320,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
if (msg instanceof ByteBufConvertible) {
return writeBytes(in, ((ByteBufConvertible) msg).asByteBuf());
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
@ -362,8 +363,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.kqueue;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AddressedEnvelope;
@ -288,7 +289,7 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
data = envelope.content();
remoteAddress = envelope.recipient();
} else {
data = (ByteBuf) msg;
data = ((ByteBufConvertible) msg).asByteBuf();
remoteAddress = null;
}
@ -340,18 +341,18 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf &&
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
if (e.content() instanceof ByteBufConvertible &&
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
ByteBuf content = (ByteBuf) e.content();
ByteBuf content = ((ByteBufConvertible) e.content()).asByteBuf();
return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
new DefaultAddressedEnvelope<>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.unix;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
@ -224,8 +225,8 @@ public final class IovArray implements MessageProcessor {
@Override
public boolean processMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buffer = ((ByteBufConvertible) msg).asByteBuf();
return add(buffer, buffer.readerIndex(), buffer.readableBytes());
}
return false;

View File

@ -14,6 +14,7 @@
*/
package io.netty.channel;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
@ -109,8 +110,8 @@ public abstract class AbstractCoalescingBufferQueue {
if (entry == null) {
return null;
}
assert entry instanceof ByteBuf;
ByteBuf result = (ByteBuf) entry;
assert entry instanceof ByteBufConvertible;
ByteBuf result = ((ByteBufConvertible) entry).asByteBuf();
decrementReadableBytes(result.readableBytes());
@ -234,12 +235,12 @@ public abstract class AbstractCoalescingBufferQueue {
break;
}
if (entry instanceof ByteBuf) {
if (entry instanceof ByteBufConvertible) {
if (previousBuf != null) {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf, ctx.voidPromise());
}
previousBuf = (ByteBuf) entry;
previousBuf = ((ByteBufConvertible) entry).asByteBuf();
} else if (entry instanceof ChannelPromise) {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf, (ChannelPromise) entry);
@ -336,8 +337,8 @@ public abstract class AbstractCoalescingBufferQueue {
break;
}
try {
if (entry instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) entry;
if (entry instanceof ByteBufConvertible) {
ByteBuf buffer = ((ByteBufConvertible) entry).asByteBuf();
decrementReadableBytes(buffer.readableBytes());
safeRelease(buffer);
} else {

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
@ -197,8 +198,8 @@ public final class ChannelOutboundBuffer {
}
private static long total(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
@ -334,12 +335,12 @@ public final class ChannelOutboundBuffer {
public void removeBytes(long writtenBytes) {
for (;;) {
Object msg = current();
if (!(msg instanceof ByteBuf)) {
if (!(msg instanceof ByteBufConvertible)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
@ -406,9 +407,9 @@ public final class ChannelOutboundBuffer {
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
while (isFlushedEntry(entry) && entry.msg instanceof ByteBufConvertible) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
ByteBuf buf = ((ByteBufConvertible) entry.msg).asByteBuf();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
@ -35,8 +36,8 @@ public final class DefaultMessageSizeEstimator implements MessageSizeEstimator {
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBufConvertible) {
return ((ByteBufConvertible) msg).asByteBuf().readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();

View File

@ -17,7 +17,7 @@ package io.netty.channel.group;
import static java.util.Objects.requireNonNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -235,8 +235,8 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
// Create a safe duplicate of the message to write it to a channel but not affect other writes.
// See https://github.com/netty/netty/issues/1461
private static Object safeDuplicate(Object message) {
if (message instanceof ByteBuf) {
return ((ByteBuf) message).retainedDuplicate();
if (message instanceof ByteBufConvertible) {
return ((ByteBufConvertible) message).asByteBuf().retainedDuplicate();
} else if (message instanceof ByteBufHolder) {
return ((ByteBufHolder) message).retainedDuplicate();
} else {

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.nio;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@ -216,8 +217,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
if (!buf.isReadable()) {
in.remove();
return 0;
@ -272,8 +273,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
if (buf.isDirect()) {
return msg;
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
@ -268,7 +269,7 @@ public final class NioDatagramChannel
remoteAddress = envelope.recipient();
data = envelope.content();
} else {
data = (ByteBuf) msg;
data = ((ByteBufConvertible) msg).asByteBuf();
remoteAddress = null;
}
@ -299,8 +300,8 @@ public final class NioDatagramChannel
return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (msg instanceof ByteBufConvertible) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
if (isSingleDirectBuffer(buf)) {
return buf;
}
@ -310,8 +311,8 @@ public final class NioDatagramChannel
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
ByteBuf content = (ByteBuf) e.content();
if (e.content() instanceof ByteBufConvertible) {
ByteBuf content = ((ByteBufConvertible) e.content()).asByteBuf();
if (isSingleDirectBuffer(content)) {
return e;
}