From 909a3d942e6830d230021fe35afbc7a8e7566201 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 7 May 2019 10:19:42 +0200 Subject: [PATCH] KQueueEventLoop won't unregister active channels reusing a file descriptor (#9114) Motivation: The current KQueueEventLoop implementation does not process concurrent domain socket channel registration/unregistration in the order they actual happen since unregistration are delated by an event loop task scheduling. When a domain socket is closed, it's file descriptor might be reused quickly and therefore trigger a new channel registration using the same descriptor. Consequently the KQueueEventLoop#add(AbstractKQueueChannel) method will overwrite the current inactive channels having the same descriptor and the delayed KQueueEventLoop#remove(AbstractKQueueChannel) will remove the active channel that replaced the inactive one. As active channels are registered, events for this file descriptor won't be processed anymore and the channels will never be closed. Modifications: Change the logic of KQueueEventLoop#remove(AbstractKQueueChannel) channels so it will check channels equality prior removal. Result: KQueueEventLoop won't remove anymore active channels reusing a file descriptor. --- .../netty/channel/kqueue/KQueueEventLoop.java | 8 +- .../kqueue/KQueueDomainSocketReuseFdTest.java | 172 ++++++++++++++++++ 2 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index eabee894ee..abcf989f68 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -101,7 +101,13 @@ final class KQueueEventLoop extends SingleThreadEventLoop { void remove(AbstractKQueueChannel ch) { assert inEventLoop(); - channels.remove(ch.fd().intValue()); + int fd = ch.fd().intValue(); + // Remove only if that's the same channel + // Due to file descriptor reuse another channel might have overwritten + // the removed channel in the map + if (channels.get(fd) == ch) { + channels.remove(fd); + } } /** diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java new file mode 100644 index 0000000000..5d99c07c3a --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; +import org.junit.Test; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class KQueueDomainSocketReuseFdTest extends SocketStringEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return KQueueSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.domainSocket(); + } + + @Test(timeout = 60000) + public void testReuseFd() throws Throwable { + run(); + } + + public void testReuseFd(ServerBootstrap sb, Bootstrap cb) throws Throwable { + sb.childOption(ChannelOption.AUTO_READ, true); + cb.option(ChannelOption.AUTO_READ, true); + int numChannels = 1000; + final AtomicReference globalException = new AtomicReference(); + final AtomicInteger serverRemaining = new AtomicInteger(numChannels); + final AtomicInteger clientRemaining = new AtomicInteger(numChannels); + final Promise serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise(); + + sb.childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) { + ReuseFdHandler sh = new ReuseFdHandler( + false, + globalException, + serverRemaining, + serverDonePromise); + sch.pipeline().addLast("handler", sh); + } + }); + + cb.handler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) { + ReuseFdHandler ch = new ReuseFdHandler( + true, + globalException, + clientRemaining, + clientDonePromise); + sch.pipeline().addLast("handler", ch); + } + }); + + Channel sc = sb.bind().sync().channel(); + for (int i = 0; i < numChannels; i++) { + cb.connect(sc.localAddress()); + } + + clientDonePromise.sync(); + serverDonePromise.sync(); + sc.close().sync(); + + if (globalException.get() != null && !(globalException.get() instanceof IOException)) { + throw globalException.get(); + } + } + + static class ReuseFdHandler extends ChannelInboundHandlerAdapter { + private final Promise donePromise; + private final AtomicInteger remaining; + private final boolean client; + volatile Channel channel; + volatile boolean complete; + final AtomicReference globalException; + final AtomicReference exception = new AtomicReference(); + final StringBuilder received = new StringBuilder(); + + ReuseFdHandler( + boolean client, + AtomicReference globalException, + AtomicInteger remaining, + Promise donePromise) { + this.client = client; + this.globalException = globalException; + this.remaining = remaining; + this.donePromise = donePromise; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + channel = ctx.channel(); + if (client) { + ctx.writeAndFlush(Unpooled.copiedBuffer("payload", CharsetUtil.US_ASCII)); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + received.append(buf.toString(CharsetUtil.US_ASCII)); + buf.release(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + complete = true; + if (client) { + ctx.close(); + } else { + ctx.writeAndFlush(Unpooled.copiedBuffer("payload".getBytes())); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (exception.compareAndSet(null, cause)) { + donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause)); + ctx.close(); + } + globalException.compareAndSet(null, cause); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + if (remaining.decrementAndGet() == 0) { + if (received.toString().equals("payload")) { + donePromise.setSuccess(null); + } else { + donePromise.tryFailure(new Exception("Unexpected payload:" + received)); + } + } + } + } +}