From 186b9eb6ab9cb31823e3f5179485dfab0008b7c9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 31 Aug 2020 13:22:34 +0200 Subject: [PATCH] Correctly release memory for remote address and some code cleanup --- .../channel/uring/AbstractIOUringChannel.java | 62 +++++++++---------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 9565855027..e3ffd658fa 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -74,9 +74,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private ChannelPromise connectPromise; private ScheduledFuture connectTimeoutFuture; private SocketAddress requestedRemoteAddress; - - private final ByteBuffer remoteAddressMemory; - private final long remoteAddressMemoryAddress; + private ByteBuffer remoteAddressMemory; private volatile SocketAddress local; private volatile SocketAddress remote; @@ -101,9 +99,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } else { logger.trace("Create Server Socket: {}", socket.intValue()); } - - remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); - remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory); } AbstractIOUringChannel(final Channel parent, LinuxSocket socket, boolean active) { @@ -121,8 +116,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } else { logger.trace("Create Server Socket: {}", socket.intValue()); } - remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); - remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory); } AbstractIOUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) { @@ -134,9 +127,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // See https://github.com/netty/netty/issues/2359 this.remote = remote; this.local = fd.localAddress(); - - remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); - remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory); } public boolean isOpen() { @@ -208,13 +198,22 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); } + private void freeRemoteAddressMemory() { + if (remoteAddressMemory != null) { + Buffer.free(remoteAddressMemory); + remoteAddressMemory = null; + } + } + @Override protected void doClose() throws Exception { + freeRemoteAddressMemory(); + active = false; + IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollRemove(socket.intValue()); submissionQueue.submit(); - active = false; // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a // socket which has not even been connected yet. This has been observed to block during unit tests. //inputClosedSeenErrorOnRead = true; @@ -350,6 +349,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } active = true; + computeRemote(); + + // Register POLLRDHUP + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRdHup(fd().intValue()); + submissionQueue.submit(); + // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // We still need to ensure we call fireChannelActive() in this case. boolean active = isActive(); @@ -459,11 +465,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha boolean connectStillInProgress = false; try { boolean wasActive = isActive(); - if (!doFinishConnect()) { + if (!socket.finishConnect()) { connectStillInProgress = true; return; } - computeRemote(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); @@ -473,6 +478,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // See https://github.com/netty/netty/issues/1770 cancelConnectTimeoutFuture(); connectPromise = null; + } else { + // The connect was not done yet, register for POLLOUT again + addPollOut(); } } } else if (!getSocket().isOutputShutdown()) { @@ -503,6 +511,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } final void connectComplete(int res) { + freeRemoteAddressMemory(); + if (res == 0) { fulfillConnectPromise(connectPromise, active); } else { @@ -539,7 +549,11 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha doConnect(remoteAddress, localAddress); InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress; NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress()); - socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(),remoteAddressMemoryAddress); + + remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); + long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory); + + socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress); final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue(); ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN); ioUringSubmissionQueue.submit(); @@ -670,24 +684,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - private boolean doFinishConnect() throws Exception { - if (socket.finishConnect()) { - if (requestedRemoteAddress instanceof InetSocketAddress) { - remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); - } - requestedRemoteAddress = null; - - // Register POLLRDHUP - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollRdHup(fd().intValue()); - submissionQueue.submit(); - - return true; - } - addPollOut(); - return false; - } - private void computeRemote() { if (requestedRemoteAddress instanceof InetSocketAddress) { remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());