Store NativeDatagramPacketArray directly in the EpollEventLoop
Motivation: We can store the NativeDatagramPacketArray directly in the EpollEventLoop. This removes the need of using FastThreadLocal. Modifications: - Store NativeDatagramPacketArray directly in the EpollEventLoop (just as we do with IovArray as well). Result: Less FastThreadLocal usage and more consistent code.
This commit is contained in:
parent
a214f2eb96
commit
05e5ab1ecb
@ -514,7 +514,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
||||
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
|
||||
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
|
||||
if (PlatformDependent.hasUnsafe()) {
|
||||
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
|
||||
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
|
||||
array.maxBytes(maxBytesPerGatheringWrite);
|
||||
in.forEachFlushedMessage(array);
|
||||
|
||||
|
@ -270,7 +270,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
try {
|
||||
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
|
||||
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
|
||||
NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
|
||||
NativeDatagramPacketArray array = ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
|
||||
in.forEachFlushedMessage(array);
|
||||
int cnt = array.count();
|
||||
|
||||
if (cnt >= 1) {
|
||||
@ -347,7 +348,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
}
|
||||
} else if (data.nioBufferCount() > 1) {
|
||||
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
|
||||
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
|
||||
array.add(data);
|
||||
int cnt = array.count();
|
||||
assert cnt != 0;
|
||||
|
@ -62,6 +62,8 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
private final boolean allowGrowing;
|
||||
private final EpollEventArray events;
|
||||
private final IovArray iovArray = new IovArray();
|
||||
private final NativeDatagramPacketArray datagramPacketArray = new NativeDatagramPacketArray();
|
||||
|
||||
private final SelectStrategy selectStrategy;
|
||||
private final IntSupplier selectNowSupplier = new IntSupplier() {
|
||||
@Override
|
||||
@ -141,11 +143,19 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
/**
|
||||
* Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
|
||||
*/
|
||||
IovArray cleanArray() {
|
||||
IovArray cleanIovArray() {
|
||||
iovArray.clear();
|
||||
return iovArray;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
|
||||
*/
|
||||
NativeDatagramPacketArray cleanDatagramPacketArray() {
|
||||
datagramPacketArray.clear();
|
||||
return datagramPacketArray;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void wakeup(boolean inEventLoop) {
|
||||
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
|
||||
@ -449,6 +459,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
} finally {
|
||||
// release native memory
|
||||
iovArray.release();
|
||||
datagramPacketArray.release();
|
||||
events.free();
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -32,28 +31,11 @@ import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address;
|
||||
*/
|
||||
final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
|
||||
|
||||
private static final FastThreadLocal<NativeDatagramPacketArray> ARRAY =
|
||||
new FastThreadLocal<NativeDatagramPacketArray>() {
|
||||
@Override
|
||||
protected NativeDatagramPacketArray initialValue() throws Exception {
|
||||
return new NativeDatagramPacketArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onRemoval(NativeDatagramPacketArray value) throws Exception {
|
||||
NativeDatagramPacket[] packetsArray = value.packets;
|
||||
// Release all packets
|
||||
for (NativeDatagramPacket datagramPacket : packetsArray) {
|
||||
datagramPacket.release();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
|
||||
private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV];
|
||||
private int count;
|
||||
|
||||
private NativeDatagramPacketArray() {
|
||||
NativeDatagramPacketArray() {
|
||||
for (int i = 0; i < packets.length; i++) {
|
||||
packets[i] = new NativeDatagramPacket();
|
||||
}
|
||||
@ -83,7 +65,7 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processMessage(Object msg) throws Exception {
|
||||
public boolean processMessage(Object msg) {
|
||||
return msg instanceof DatagramPacket && add((DatagramPacket) msg);
|
||||
}
|
||||
|
||||
@ -101,15 +83,15 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
|
||||
return packets;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
|
||||
* {@link ChannelOutboundBuffer}.
|
||||
*/
|
||||
static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
|
||||
NativeDatagramPacketArray array = ARRAY.get();
|
||||
array.count = 0;
|
||||
buffer.forEachFlushedMessage(array);
|
||||
return array;
|
||||
void clear() {
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
void release() {
|
||||
// Release all packets
|
||||
for (NativeDatagramPacket datagramPacket : packets) {
|
||||
datagramPacket.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user