Merge pull request #10 from normanmaurer/release_memory
Correctly release memory for remote address and some code cleanup
This commit is contained in:
commit
74c0d3dfa7
@ -74,9 +74,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
private ChannelPromise connectPromise;
|
||||
private ScheduledFuture<?> connectTimeoutFuture;
|
||||
private SocketAddress requestedRemoteAddress;
|
||||
|
||||
private final ByteBuffer remoteAddressMemory;
|
||||
private final long remoteAddressMemoryAddress;
|
||||
private ByteBuffer remoteAddressMemory;
|
||||
|
||||
private volatile SocketAddress local;
|
||||
private volatile SocketAddress remote;
|
||||
@ -101,9 +99,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
} else {
|
||||
logger.trace("Create Server Socket: {}", socket.intValue());
|
||||
}
|
||||
|
||||
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
|
||||
remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
|
||||
}
|
||||
|
||||
AbstractIOUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
|
||||
@ -121,8 +116,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
} else {
|
||||
logger.trace("Create Server Socket: {}", socket.intValue());
|
||||
}
|
||||
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
|
||||
remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
|
||||
}
|
||||
|
||||
AbstractIOUringChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
|
||||
@ -134,9 +127,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
// See https://github.com/netty/netty/issues/2359
|
||||
this.remote = remote;
|
||||
this.local = fd.localAddress();
|
||||
|
||||
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
|
||||
remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
|
||||
}
|
||||
|
||||
public boolean isOpen() {
|
||||
@ -208,13 +198,22 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
return ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
|
||||
}
|
||||
|
||||
private void freeRemoteAddressMemory() {
|
||||
if (remoteAddressMemory != null) {
|
||||
Buffer.free(remoteAddressMemory);
|
||||
remoteAddressMemory = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
freeRemoteAddressMemory();
|
||||
active = false;
|
||||
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollRemove(socket.intValue());
|
||||
submissionQueue.submit();
|
||||
|
||||
active = false;
|
||||
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
|
||||
// socket which has not even been connected yet. This has been observed to block during unit tests.
|
||||
//inputClosedSeenErrorOnRead = true;
|
||||
@ -350,6 +349,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
active = true;
|
||||
|
||||
computeRemote();
|
||||
|
||||
// Register POLLRDHUP
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollRdHup(fd().intValue());
|
||||
submissionQueue.submit();
|
||||
|
||||
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
|
||||
// We still need to ensure we call fireChannelActive() in this case.
|
||||
boolean active = isActive();
|
||||
@ -459,11 +465,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
boolean connectStillInProgress = false;
|
||||
try {
|
||||
boolean wasActive = isActive();
|
||||
if (!doFinishConnect()) {
|
||||
if (!socket.finishConnect()) {
|
||||
connectStillInProgress = true;
|
||||
return;
|
||||
}
|
||||
computeRemote();
|
||||
fulfillConnectPromise(connectPromise, wasActive);
|
||||
} catch (Throwable t) {
|
||||
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
|
||||
@ -473,6 +478,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
// See https://github.com/netty/netty/issues/1770
|
||||
cancelConnectTimeoutFuture();
|
||||
connectPromise = null;
|
||||
} else {
|
||||
// The connect was not done yet, register for POLLOUT again
|
||||
addPollOut();
|
||||
}
|
||||
}
|
||||
} else if (!getSocket().isOutputShutdown()) {
|
||||
@ -503,6 +511,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
|
||||
final void connectComplete(int res) {
|
||||
freeRemoteAddressMemory();
|
||||
|
||||
if (res == 0) {
|
||||
fulfillConnectPromise(connectPromise, active);
|
||||
} else {
|
||||
@ -539,7 +549,11 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
doConnect(remoteAddress, localAddress);
|
||||
InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
|
||||
NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress());
|
||||
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(),remoteAddressMemoryAddress);
|
||||
|
||||
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
|
||||
long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
|
||||
|
||||
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress);
|
||||
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
|
||||
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
|
||||
ioUringSubmissionQueue.submit();
|
||||
@ -670,24 +684,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doFinishConnect() throws Exception {
|
||||
if (socket.finishConnect()) {
|
||||
if (requestedRemoteAddress instanceof InetSocketAddress) {
|
||||
remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
|
||||
}
|
||||
requestedRemoteAddress = null;
|
||||
|
||||
// Register POLLRDHUP
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollRdHup(fd().intValue());
|
||||
submissionQueue.submit();
|
||||
|
||||
return true;
|
||||
}
|
||||
addPollOut();
|
||||
return false;
|
||||
}
|
||||
|
||||
private void computeRemote() {
|
||||
if (requestedRemoteAddress instanceof InetSocketAddress) {
|
||||
remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
|
||||
|
Loading…
Reference in New Issue
Block a user