From af2f343648f4cb3deca5314174e0e579f9fec846 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 14 Dec 2017 16:47:31 -0800 Subject: [PATCH] FileDescriptor writev core dump Motivation: FileDescriptor#writev calls JNI code, and that JNI code dereferences a NULL pointer which crashes the application. This occurs when writing a single CompositeByteBuf object with more than one component. Modifications: - Initialize the iovec iterator properly to avoid the core dump - Fix the array length calculation if we aren't able to fit all the ByteBuffer objects in the iovec array Result: No more core dump. --- .../CompositeBufferGatheringWriteTest.java | 118 ++++++++++++++++++ ...pollCompositeBufferGatheringWriteTest.java | 30 +++++ ...ueueCompositeBufferGatheringWriteTest.java | 30 +++++ .../src/main/c/netty_unix_filedescriptor.c | 20 +-- 4 files changed, 188 insertions(+), 10 deletions(-) create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/socket/CompositeBufferGatheringWriteTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java create mode 100644 transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueCompositeBufferGatheringWriteTest.java diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/CompositeBufferGatheringWriteTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/CompositeBufferGatheringWriteTest.java new file mode 100644 index 0000000000..c436223943 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/CompositeBufferGatheringWriteTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; + +public class CompositeBufferGatheringWriteTest extends AbstractSocketTest { + private static final int EXPECTED_BYTES = 20; + + @Test(timeout = 10000) + public void testSingleCompositeBufferWrite() throws Throwable { + run(); + } + + public void testSingleCompositeBufferWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference clientReceived = new AtomicReference(); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + ctx.writeAndFlush(newCompositeBuffer(ctx.alloc())); + } + }); + } + }); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + private ByteBuf aggregator; + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + aggregator = ctx.alloc().buffer(EXPECTED_BYTES); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + if (msg instanceof ByteBuf) { + aggregator.writeBytes((ByteBuf) msg); + if (aggregator.readableBytes() == EXPECTED_BYTES) { + if (clientReceived.compareAndSet(null, aggregator)) { + latch.countDown(); + } + } + } + } finally { + ReferenceCountUtil.release(msg); + } + } + }); + } + }); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); + + ByteBuf expected = newCompositeBuffer(clientChannel.alloc()); + clientChannel.writeAndFlush(expected.retainedSlice()); + latch.await(); + ByteBuf actual = clientReceived.get(); + assertEquals(expected, actual); + expected.release(); + actual.release(); + } finally { + if (clientChannel != null) { + clientChannel.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } + } + } + + private static ByteBuf newCompositeBuffer(ByteBufAllocator alloc) { + CompositeByteBuf compositeByteBuf = alloc.compositeBuffer(); + compositeByteBuf.addComponent(true, alloc.directBuffer(4).writeInt(100)); + compositeByteBuf.addComponent(true, alloc.directBuffer(8).writeLong(123)); + compositeByteBuf.addComponent(true, alloc.directBuffer(8).writeLong(456)); + assertEquals(EXPECTED_BYTES, compositeByteBuf.readableBytes()); + return compositeByteBuf; + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java new file mode 100644 index 0000000000..9ce0cbff30 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollCompositeBufferGatheringWriteTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.CompositeBufferGatheringWriteTest; + +import java.util.List; + +public class EpollCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } +} diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueCompositeBufferGatheringWriteTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueCompositeBufferGatheringWriteTest.java new file mode 100644 index 0000000000..9fe1e604b1 --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueCompositeBufferGatheringWriteTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.CompositeBufferGatheringWriteTest; + +import java.util.List; + +public class KQueueCompositeBufferGatheringWriteTest extends CompositeBufferGatheringWriteTest { + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.socket(); + } +} diff --git a/transport-native-unix-common/src/main/c/netty_unix_filedescriptor.c b/transport-native-unix-common/src/main/c/netty_unix_filedescriptor.c index 446b7a9535..36632b1cad 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_filedescriptor.c +++ b/transport-native-unix-common/src/main/c/netty_unix_filedescriptor.c @@ -111,19 +111,19 @@ static jlong netty_unix_filedescriptor_writevAddresses(JNIEnv* env, jclass clazz return _writev(env, clazz, fd, iov, length); } -static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length, jlong maxBytesToWrite) { +static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd, jobjectArray buffers, const jint offset, jint length, jlong maxBytesToWrite) { struct iovec iov[length]; - struct iovec* iovptr = NULL; + struct iovec* iovptr = iov; int i; int num = offset + length; if (posFieldId != NULL && limitFieldId != NULL) { - for (i = offset; i < num; i++) { + for (i = offset; i < num; ++i) { jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); jint pos = (*env)->GetIntField(env, bufObj, posFieldId); jint limit = (*env)->GetIntField(env, bufObj, limitFieldId); size_t bytesLength = (size_t) (limit - pos); if (bytesLength > maxBytesToWrite && i != offset) { - bytesLength = num - i; + length = i - offset; break; } maxBytesToWrite -= bytesLength; @@ -140,13 +140,13 @@ static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd (*env)->DeleteLocalRef(env, bufObj); } } else if (posFieldId != NULL && limitFieldId == NULL) { - for (i = offset; i < num; i++) { + for (i = offset; i < num; ++i) { jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); jint pos = (*env)->GetIntField(env, bufObj, posFieldId); jint limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); size_t bytesLength = (size_t) (limit - pos); if (bytesLength > maxBytesToWrite && i != offset) { - bytesLength = num - i; + length = i - offset; break; } maxBytesToWrite -= bytesLength; @@ -163,13 +163,13 @@ static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd (*env)->DeleteLocalRef(env, bufObj); } } else if (limitFieldId != NULL) { - for (i = offset; i < num; i++) { + for (i = offset; i < num; ++i) { jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); jint pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); jint limit = (*env)->GetIntField(env, bufObj, limitFieldId); size_t bytesLength = (size_t) (limit - pos); if (bytesLength > maxBytesToWrite && i != offset) { - bytesLength = num - i; + length = i - offset; break; } maxBytesToWrite -= bytesLength; @@ -186,13 +186,13 @@ static jlong netty_unix_filedescriptor_writev(JNIEnv* env, jclass clazz, jint fd (*env)->DeleteLocalRef(env, bufObj); } } else { - for (i = offset; i < num; i++) { + for (i = offset; i < num; ++i) { jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); jint pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); jint limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); size_t bytesLength = (size_t) (limit - pos); if (bytesLength > maxBytesToWrite && i != offset) { - bytesLength = num - i; + length = i - offset; break; } maxBytesToWrite -= bytesLength;