ByteBufs which are not resizable should not throw in ensureWritable(int,boolean)
Motivation: ByteBuf#ensureWritable(int,boolean) returns an int indicating the status of the resize operation. For buffers that are unmodifiable or cannot be resized this method shouldn't throw but just return 1. ByteBuf#ensureWriteable(int) should throw unmodifiable buffers. Modifications: - ReadOnlyByteBuf should be updated as described above. - Add a unit test to SslHandler which verifies the read only buffer can be tolerated in the aggregation algorithm. Result: Fixes https://github.com/netty/netty/issues/7002.
This commit is contained in:
parent
2fbce6d470
commit
452fd36240
@ -65,6 +65,16 @@ public class ReadOnlyByteBuf extends AbstractDerivedByteBuf {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int ensureWritable(int minWritableBytes, boolean force) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuf ensureWritable(int minWritableBytes) {
|
||||||
|
throw new ReadOnlyBufferException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuf unwrap() {
|
public ByteBuf unwrap() {
|
||||||
return buffer;
|
return buffer;
|
||||||
|
@ -15,7 +15,13 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer;
|
package io.netty.buffer;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest {
|
public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest {
|
||||||
|
|
||||||
@ -34,4 +40,23 @@ public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest {
|
|||||||
assertEquals(0, buffer.readerIndex());
|
assertEquals(0, buffer.readerIndex());
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void ensureWritableWithEnoughSpaceShouldNotThrow() {
|
||||||
|
ByteBuf buf = newBuffer(1, 10);
|
||||||
|
buf.ensureWritable(3);
|
||||||
|
assertThat(buf.writableBytes(), is(greaterThanOrEqualTo(3)));
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IndexOutOfBoundsException.class)
|
||||||
|
public void ensureWritableWithNotEnoughSpaceShouldThrow() {
|
||||||
|
ByteBuf buf = newBuffer(1, 10);
|
||||||
|
try {
|
||||||
|
buf.ensureWritable(11);
|
||||||
|
fail();
|
||||||
|
} finally {
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,12 +21,23 @@ import java.io.IOException;
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ReadOnlyBufferException;
|
||||||
import java.nio.channels.GatheringByteChannel;
|
import java.nio.channels.GatheringByteChannel;
|
||||||
import java.nio.channels.ScatteringByteChannel;
|
import java.nio.channels.ScatteringByteChannel;
|
||||||
|
|
||||||
import static io.netty.buffer.Unpooled.*;
|
import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
|
||||||
import static org.junit.Assert.*;
|
import static io.netty.buffer.Unpooled.BIG_ENDIAN;
|
||||||
import static org.mockito.Mockito.*;
|
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
|
||||||
|
import static io.netty.buffer.Unpooled.LITTLE_ENDIAN;
|
||||||
|
import static io.netty.buffer.Unpooled.buffer;
|
||||||
|
import static io.netty.buffer.Unpooled.unmodifiableBuffer;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests read-only channel buffers
|
* Tests read-only channel buffers
|
||||||
@ -181,6 +192,37 @@ public class ReadOnlyByteBufTest {
|
|||||||
assertFalse(unmodifiableBuffer(buffer(1)).isWritable(1));
|
assertFalse(unmodifiableBuffer(buffer(1)).isWritable(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void ensureWritableIntStatusShouldFailButNotThrow() {
|
||||||
|
ensureWritableIntStatusShouldFailButNotThrow(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void ensureWritableForceIntStatusShouldFailButNotThrow() {
|
||||||
|
ensureWritableIntStatusShouldFailButNotThrow(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ensureWritableIntStatusShouldFailButNotThrow(boolean force) {
|
||||||
|
ByteBuf buf = buffer(1);
|
||||||
|
ByteBuf readOnly = buf.asReadOnly();
|
||||||
|
int result = readOnly.ensureWritable(1, force);
|
||||||
|
assertEquals(1, result);
|
||||||
|
assertFalse(ensureWritableSuccess(result));
|
||||||
|
readOnly.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ReadOnlyBufferException.class)
|
||||||
|
public void ensureWritableShouldThrow() {
|
||||||
|
ByteBuf buf = buffer(1);
|
||||||
|
ByteBuf readOnly = buf.asReadOnly();
|
||||||
|
try {
|
||||||
|
readOnly.ensureWritable(1);
|
||||||
|
fail();
|
||||||
|
} finally {
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void asReadOnly() {
|
public void asReadOnly() {
|
||||||
ByteBuf buf = buffer(1);
|
ByteBuf buf = buffer(1);
|
||||||
|
@ -22,7 +22,10 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests sliced channel buffers
|
* Tests sliced channel buffers
|
||||||
@ -227,4 +230,33 @@ public class SlicedByteBufTest extends AbstractByteBufTest {
|
|||||||
public void testWriteUtf16CharSequenceExpand() {
|
public void testWriteUtf16CharSequenceExpand() {
|
||||||
super.testWriteUtf16CharSequenceExpand();
|
super.testWriteUtf16CharSequenceExpand();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void ensureWritableWithEnoughSpaceShouldNotThrow() {
|
||||||
|
ByteBuf slice = newBuffer(10);
|
||||||
|
ByteBuf unwrapped = slice.unwrap();
|
||||||
|
unwrapped.writerIndex(unwrapped.writerIndex() + 5);
|
||||||
|
slice.writerIndex(slice.readerIndex());
|
||||||
|
|
||||||
|
// Run ensureWritable and verify this doesn't change any indexes.
|
||||||
|
int originalWriterIndex = slice.writerIndex();
|
||||||
|
int originalReadableBytes = slice.readableBytes();
|
||||||
|
slice.ensureWritable(originalWriterIndex - slice.writerIndex());
|
||||||
|
assertEquals(originalWriterIndex, slice.writerIndex());
|
||||||
|
assertEquals(originalReadableBytes, slice.readableBytes());
|
||||||
|
slice.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IndexOutOfBoundsException.class)
|
||||||
|
public void ensureWritableWithNotEnoughSpaceShouldThrow() {
|
||||||
|
ByteBuf slice = newBuffer(10);
|
||||||
|
ByteBuf unwrapped = slice.unwrap();
|
||||||
|
unwrapped.writerIndex(unwrapped.writerIndex() + 5);
|
||||||
|
try {
|
||||||
|
slice.ensureWritable(1);
|
||||||
|
fail();
|
||||||
|
} finally {
|
||||||
|
slice.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
|
|||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
@ -56,9 +57,11 @@ import java.security.KeyStore;
|
|||||||
import java.security.cert.CertificateException;
|
import java.security.cert.CertificateException;
|
||||||
import java.security.cert.X509Certificate;
|
import java.security.cert.X509Certificate;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.net.ssl.ManagerFactoryParameters;
|
import javax.net.ssl.ManagerFactoryParameters;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
@ -67,6 +70,7 @@ import javax.net.ssl.SSLProtocolException;
|
|||||||
import javax.net.ssl.TrustManager;
|
import javax.net.ssl.TrustManager;
|
||||||
import javax.net.ssl.X509TrustManager;
|
import javax.net.ssl.X509TrustManager;
|
||||||
|
|
||||||
|
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.nullValue;
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
@ -87,14 +91,14 @@ public class SslHandlerTest {
|
|||||||
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
|
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
|
||||||
|
|
||||||
// Push the first part of a 5-byte handshake message.
|
// Push the first part of a 5-byte handshake message.
|
||||||
ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{22, 3, 1, 0, 5}));
|
ch.writeInbound(wrappedBuffer(new byte[]{22, 3, 1, 0, 5}));
|
||||||
|
|
||||||
// Should decode nothing yet.
|
// Should decode nothing yet.
|
||||||
assertThat(ch.readInbound(), is(nullValue()));
|
assertThat(ch.readInbound(), is(nullValue()));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Push the second part of the 5-byte handshake message.
|
// Push the second part of the 5-byte handshake message.
|
||||||
ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{2, 0, 0, 1, 0}));
|
ch.writeInbound(wrappedBuffer(new byte[]{2, 0, 0, 1, 0}));
|
||||||
fail();
|
fail();
|
||||||
} catch (DecoderException e) {
|
} catch (DecoderException e) {
|
||||||
// Be sure we cleanup the channel and release any pending messages that may have been generated because
|
// Be sure we cleanup the channel and release any pending messages that may have been generated because
|
||||||
@ -467,6 +471,76 @@ public class SslHandlerTest {
|
|||||||
testCloseNotify(SslProvider.OPENSSL_REFCNT, 0, false);
|
testCloseNotify(SslProvider.OPENSSL_REFCNT, 0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void writingReadOnlyBufferDoesNotBreakAggregation() throws Exception {
|
||||||
|
SelfSignedCertificate ssc = new SelfSignedCertificate();
|
||||||
|
|
||||||
|
final SslContext sslServerCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
|
||||||
|
|
||||||
|
final SslContext sslClientCtx = SslContextBuilder.forClient()
|
||||||
|
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
|
||||||
|
|
||||||
|
EventLoopGroup group = new NioEventLoopGroup();
|
||||||
|
Channel sc = null;
|
||||||
|
Channel cc = null;
|
||||||
|
final CountDownLatch serverReceiveLatch = new CountDownLatch(1);
|
||||||
|
try {
|
||||||
|
final int expectedBytes = 11;
|
||||||
|
sc = new ServerBootstrap()
|
||||||
|
.group(group)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childHandler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
ch.pipeline().addLast(sslServerCtx.newHandler(ch.alloc()));
|
||||||
|
ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
|
||||||
|
private int readBytes;
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
||||||
|
readBytes += msg.readableBytes();
|
||||||
|
if (readBytes >= expectedBytes) {
|
||||||
|
serverReceiveLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}).bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
cc = new Bootstrap()
|
||||||
|
.group(group)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.handler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
ch.pipeline().addLast(sslClientCtx.newHandler(ch.alloc()));
|
||||||
|
}
|
||||||
|
}).connect(sc.localAddress()).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
// We first write a ReadOnlyBuffer because SslHandler will attempt to take the first buffer and append to it
|
||||||
|
// until there is no room, or the aggregation size threshold is exceeded. We want to verify that we don't
|
||||||
|
// throw when a ReadOnlyBuffer is used and just verify that we don't aggregate in this case.
|
||||||
|
ByteBuf firstBuffer = Unpooled.buffer(10);
|
||||||
|
firstBuffer.writeByte(0);
|
||||||
|
firstBuffer = firstBuffer.asReadOnly();
|
||||||
|
ByteBuf secondBuffer = Unpooled.buffer(10);
|
||||||
|
secondBuffer.writerIndex(secondBuffer.capacity());
|
||||||
|
cc.write(firstBuffer);
|
||||||
|
cc.writeAndFlush(secondBuffer).syncUninterruptibly();
|
||||||
|
serverReceiveLatch.countDown();
|
||||||
|
} finally {
|
||||||
|
if (cc != null) {
|
||||||
|
cc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
if (sc != null) {
|
||||||
|
sc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
group.shutdownGracefully();
|
||||||
|
|
||||||
|
ReferenceCountUtil.release(sslServerCtx);
|
||||||
|
ReferenceCountUtil.release(sslClientCtx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void testCloseNotify(SslProvider provider, final long closeNotifyReadTimeout, final boolean timeout)
|
private static void testCloseNotify(SslProvider provider, final long closeNotifyReadTimeout, final boolean timeout)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
SelfSignedCertificate ssc = new SelfSignedCertificate();
|
SelfSignedCertificate ssc = new SelfSignedCertificate();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user