Correctly release memory for remote address and some code cleanup

This commit is contained in:
Norman Maurer 2020-08-31 13:22:34 +02:00
parent e41c68b151
commit 186b9eb6ab

View File

@ -74,9 +74,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private ChannelPromise connectPromise; private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture; private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress; private SocketAddress requestedRemoteAddress;
private ByteBuffer remoteAddressMemory;
private final ByteBuffer remoteAddressMemory;
private final long remoteAddressMemoryAddress;
private volatile SocketAddress local; private volatile SocketAddress local;
private volatile SocketAddress remote; private volatile SocketAddress remote;
@ -101,9 +99,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} else { } else {
logger.trace("Create Server Socket: {}", socket.intValue()); logger.trace("Create Server Socket: {}", socket.intValue());
} }
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
} }
AbstractIOUringChannel(final Channel parent, LinuxSocket socket, boolean active) { AbstractIOUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
@ -121,8 +116,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} else { } else {
logger.trace("Create Server Socket: {}", socket.intValue()); logger.trace("Create Server Socket: {}", socket.intValue());
} }
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
} }
AbstractIOUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) { AbstractIOUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
@ -134,9 +127,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// See https://github.com/netty/netty/issues/2359 // See https://github.com/netty/netty/issues/2359
this.remote = remote; this.remote = remote;
this.local = fd.localAddress(); this.local = fd.localAddress();
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
} }
public boolean isOpen() { public boolean isOpen() {
@ -208,13 +198,22 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
return ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); return ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
} }
private void freeRemoteAddressMemory() {
if (remoteAddressMemory != null) {
Buffer.free(remoteAddressMemory);
remoteAddressMemory = null;
}
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
freeRemoteAddressMemory();
active = false;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRemove(socket.intValue()); submissionQueue.addPollRemove(socket.intValue());
submissionQueue.submit(); submissionQueue.submit();
active = false;
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
// socket which has not even been connected yet. This has been observed to block during unit tests. // socket which has not even been connected yet. This has been observed to block during unit tests.
//inputClosedSeenErrorOnRead = true; //inputClosedSeenErrorOnRead = true;
@ -350,6 +349,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
active = true; active = true;
computeRemote();
// Register POLLRDHUP
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRdHup(fd().intValue());
submissionQueue.submit();
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case. // We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive(); boolean active = isActive();
@ -459,11 +465,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
boolean connectStillInProgress = false; boolean connectStillInProgress = false;
try { try {
boolean wasActive = isActive(); boolean wasActive = isActive();
if (!doFinishConnect()) { if (!socket.finishConnect()) {
connectStillInProgress = true; connectStillInProgress = true;
return; return;
} }
computeRemote();
fulfillConnectPromise(connectPromise, wasActive); fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) { } catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
@ -473,6 +478,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// See https://github.com/netty/netty/issues/1770 // See https://github.com/netty/netty/issues/1770
cancelConnectTimeoutFuture(); cancelConnectTimeoutFuture();
connectPromise = null; connectPromise = null;
} else {
// The connect was not done yet, register for POLLOUT again
addPollOut();
} }
} }
} else if (!getSocket().isOutputShutdown()) { } else if (!getSocket().isOutputShutdown()) {
@ -503,6 +511,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
final void connectComplete(int res) { final void connectComplete(int res) {
freeRemoteAddressMemory();
if (res == 0) { if (res == 0) {
fulfillConnectPromise(connectPromise, active); fulfillConnectPromise(connectPromise, active);
} else { } else {
@ -539,7 +549,11 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
doConnect(remoteAddress, localAddress); doConnect(remoteAddress, localAddress);
InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress; InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress()); NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress());
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(),remoteAddressMemoryAddress);
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress);
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue(); final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN); ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
ioUringSubmissionQueue.submit(); ioUringSubmissionQueue.submit();
@ -670,24 +684,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
} }
private boolean doFinishConnect() throws Exception {
if (socket.finishConnect()) {
if (requestedRemoteAddress instanceof InetSocketAddress) {
remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
}
requestedRemoteAddress = null;
// Register POLLRDHUP
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRdHup(fd().intValue());
submissionQueue.submit();
return true;
}
addPollOut();
return false;
}
private void computeRemote() { private void computeRemote() {
if (requestedRemoteAddress instanceof InetSocketAddress) { if (requestedRemoteAddress instanceof InetSocketAddress) {
remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());