From 05e5ab1ecb98963604d686c1f59b2196cf73e244 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 27 Jun 2018 10:02:25 +0200 Subject: [PATCH] Store NativeDatagramPacketArray directly in the EpollEventLoop Motivation: We can store the NativeDatagramPacketArray directly in the EpollEventLoop. This removes the need of using FastThreadLocal. Modifications: - Store NativeDatagramPacketArray directly in the EpollEventLoop (just as we do with IovArray as well). Result: Less FastThreadLocal usage and more consistent code. --- .../epoll/AbstractEpollStreamChannel.java | 2 +- .../channel/epoll/EpollDatagramChannel.java | 5 ++- .../netty/channel/epoll/EpollEventLoop.java | 13 +++++- .../epoll/NativeDatagramPacketArray.java | 40 +++++-------------- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index ea871db3e1..f1fda08206 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -514,7 +514,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception { final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite(); if (PlatformDependent.hasUnsafe()) { - IovArray array = ((EpollEventLoop) eventLoop()).cleanArray(); + IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); array.maxBytes(maxBytesPerGatheringWrite); in.forEachFlushedMessage(array); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index d9f48464f5..714f612634 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -270,7 +270,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements try { // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { - NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in); + NativeDatagramPacketArray array = ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray(); + in.forEachFlushedMessage(array); int cnt = array.count(); if (cnt >= 1) { @@ -347,7 +348,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements remoteAddress.getAddress(), remoteAddress.getPort()); } } else if (data.nioBufferCount() > 1) { - IovArray array = ((EpollEventLoop) eventLoop()).cleanArray(); + IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); array.add(data); int cnt = array.count(); assert cnt != 0; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 913f5edf42..09439764bc 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -62,6 +62,8 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final boolean allowGrowing; private final EpollEventArray events; private final IovArray iovArray = new IovArray(); + private final NativeDatagramPacketArray datagramPacketArray = new NativeDatagramPacketArray(); + private final SelectStrategy selectStrategy; private final IntSupplier selectNowSupplier = new IntSupplier() { @Override @@ -141,11 +143,19 @@ final class EpollEventLoop extends SingleThreadEventLoop { /** * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}. */ - IovArray cleanArray() { + IovArray cleanIovArray() { iovArray.clear(); return iovArray; } + /** + * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}. + */ + NativeDatagramPacketArray cleanDatagramPacketArray() { + datagramPacketArray.clear(); + return datagramPacketArray; + } + @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { @@ -449,6 +459,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { } finally { // release native memory iovArray.release(); + datagramPacketArray.release(); events.free(); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index aafa67ee66..c100f3c27a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.unix.IovArray; -import io.netty.util.concurrent.FastThreadLocal; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -32,28 +31,11 @@ import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; */ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor { - private static final FastThreadLocal ARRAY = - new FastThreadLocal() { - @Override - protected NativeDatagramPacketArray initialValue() throws Exception { - return new NativeDatagramPacketArray(); - } - - @Override - protected void onRemoval(NativeDatagramPacketArray value) throws Exception { - NativeDatagramPacket[] packetsArray = value.packets; - // Release all packets - for (NativeDatagramPacket datagramPacket : packetsArray) { - datagramPacket.release(); - } - } - }; - // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call. private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV]; private int count; - private NativeDatagramPacketArray() { + NativeDatagramPacketArray() { for (int i = 0; i < packets.length; i++) { packets[i] = new NativeDatagramPacket(); } @@ -83,7 +65,7 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr } @Override - public boolean processMessage(Object msg) throws Exception { + public boolean processMessage(Object msg) { return msg instanceof DatagramPacket && add((DatagramPacket) msg); } @@ -101,15 +83,15 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr return packets; } - /** - * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of - * {@link ChannelOutboundBuffer}. - */ - static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception { - NativeDatagramPacketArray array = ARRAY.get(); - array.count = 0; - buffer.forEachFlushedMessage(array); - return array; + void clear() { + this.count = 0; + } + + void release() { + // Release all packets + for (NativeDatagramPacket datagramPacket : packets) { + datagramPacket.release(); + } } /**