Make changes to prepare for io_uring incubator repository (#10741)

Motivation:

During the last few month we did develop an io_uring based transport which shows very promising performance numbers. To give it more time to bake we will develop it outside of netty in an "incubator" module which will make it clear to users what to expect and also allow us to seperate its release cycle. While the implementation of it is very self contained there are few small adjustments that need to be made in netty itself to allow us to reuse code.

Modifications:

- AbstractChannel: Add method which can be used when a write fails and remove final from one method
- IovArray: Allow to create an IovArray from a ByteBuf instance
- FileDescriptor: Allow to reuse mark close logic via sub-class

Result:

Be able to reuse netty core classes in io_uring incubator repository
This commit is contained in:
Norman Maurer 2020-10-28 15:31:02 +01:00 committed by GitHub
parent 701eed8762
commit ddebc1027d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 50 deletions

View File

@ -62,23 +62,28 @@ public class FileDescriptor {
return fd;
}
protected boolean markClosed() {
for (;;) {
int state = this.state;
if (isClosed(state)) {
return false;
}
// Once a close operation happens, the channel is considered shutdown.
if (casState(state, state | STATE_ALL_MASK)) {
return true;
}
}
}
/**
* Close the file descriptor.
*/
public void close() throws IOException {
for (;;) {
int state = this.state;
if (isClosed(state)) {
return;
if (markClosed()) {
int res = close(fd);
if (res < 0) {
throw newIOException("close", res);
}
// Once a close operation happens, the channel is considered shutdown.
if (casState(state, state | STATE_ALL_MASK)) {
break;
}
}
int res = close(fd);
if (res < 0) {
throw newIOException("close", res);
}
}

View File

@ -16,10 +16,12 @@
package io.netty.channel.unix;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import static io.netty.channel.unix.Limits.IOV_MAX;
import static io.netty.channel.unix.Limits.SSIZE_MAX;
@ -52,23 +54,29 @@ public final class IovArray implements MessageProcessor {
* The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the
* address.
*/
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
public static final int IOV_SIZE = 2 * ADDRESS_SIZE;
/**
* The needed memory to hold up to {@code IOV_MAX} iov entries, where {@code IOV_MAX} signified
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
*/
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
private static final int MAX_CAPACITY = IOV_MAX * IOV_SIZE;
private final ByteBuffer memory;
private final long memoryAddress;
private final ByteBuf memory;
private int count;
private long size;
private long maxBytes = SSIZE_MAX;
public IovArray() {
memory = Buffer.allocateDirectWithNativeOrder(CAPACITY);
memoryAddress = Buffer.memoryAddress(memory);
this(Unpooled.wrappedBuffer(Buffer.allocateDirectWithNativeOrder(MAX_CAPACITY)).setIndex(0, 0));
}
@SuppressWarnings("deprecation")
public IovArray(ByteBuf memory) {
assert memory.writerIndex() == 0;
assert memory.readerIndex() == 0;
this.memory = PlatformDependent.hasUnsafe() ? memory : memory.order(
PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
}
public void clear() {
@ -88,22 +96,25 @@ public final class IovArray implements MessageProcessor {
if (count == IOV_MAX) {
// No more room!
return false;
} else if (buf.nioBufferCount() == 1) {
}
long memoryAddress = memory.memoryAddress();
if (buf.nioBufferCount() == 1) {
if (len == 0) {
return true;
}
if (buf.hasMemoryAddress()) {
return add(buf.memoryAddress() + offset, len);
return add(memoryAddress, buf.memoryAddress() + offset, len);
} else {
ByteBuffer nioBuffer = buf.internalNioBuffer(offset, len);
return add(Buffer.memoryAddress(nioBuffer) + nioBuffer.position(), len);
return add(memoryAddress, Buffer.memoryAddress(nioBuffer) + nioBuffer.position(), len);
}
} else {
ByteBuffer[] buffers = buf.nioBuffers(offset, len);
for (ByteBuffer nioBuffer : buffers) {
final int remaining = nioBuffer.remaining();
if (remaining != 0 &&
(!add(Buffer.memoryAddress(nioBuffer) + nioBuffer.position(), remaining) || count == IOV_MAX)) {
(!add(memoryAddress, Buffer.memoryAddress(nioBuffer) + nioBuffer.position(), remaining)
|| count == IOV_MAX)) {
return false;
}
}
@ -111,12 +122,14 @@ public final class IovArray implements MessageProcessor {
}
}
private boolean add(long addr, int len) {
private boolean add(long memoryAddress, long addr, int len) {
assert addr != 0;
// If there is at least 1 entry then we enforce the maximum bytes. We want to accept at least one entry so we
// will attempt to write some data and make progress.
if (maxBytes - len < size && count > 0) {
if ((maxBytes - len < size && count > 0) ||
// Check if we have enough space left
memory.capacity() < (count + 1) * IOV_SIZE) {
// If the size + len will overflow SSIZE_MAX we stop populate the IovArray. This is done as linux
// not allow to write more bytes then SSIZE_MAX with one writev(...) call and so will
// return 'EINVAL', which will raise an IOException.
@ -137,8 +150,8 @@ public final class IovArray implements MessageProcessor {
PlatformDependent.putLong(baseOffset + memoryAddress, addr);
PlatformDependent.putLong(lengthOffset + memoryAddress, len);
} else {
memory.putLong(baseOffset, addr);
memory.putLong(lengthOffset, len);
memory.setLong(baseOffset, addr);
memory.setLong(lengthOffset, len);
}
} else {
assert ADDRESS_SIZE == 4;
@ -146,8 +159,8 @@ public final class IovArray implements MessageProcessor {
PlatformDependent.putInt(baseOffset + memoryAddress, (int) addr);
PlatformDependent.putInt(lengthOffset + memoryAddress, len);
} else {
memory.putInt(baseOffset, (int) addr);
memory.putInt(lengthOffset, len);
memory.setInt(baseOffset, (int) addr);
memory.setInt(lengthOffset, len);
}
}
return true;
@ -193,14 +206,14 @@ public final class IovArray implements MessageProcessor {
* Returns the {@code memoryAddress} for the given {@code offset}.
*/
public long memoryAddress(int offset) {
return memoryAddress + idx(offset);
return memory.memoryAddress() + idx(offset);
}
/**
* Release the {@link IovArray}. Once release further using of it may crash the JVM!
*/
public void release() {
Buffer.free(memory);
memory.release();
}
@Override

View File

@ -600,7 +600,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public final void close(final ChannelPromise promise) {
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
@ -940,30 +940,34 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
}
}
handleWriteError(t);
} finally {
inFlush0 = false;
}
}
protected final void handleWriteError(Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
}
}
}
private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
ClosedChannelException exception =
StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);