Fix drops

This commit is contained in:
Andrea Cavalli 2022-10-24 01:17:08 +02:00
parent d28ff73ee6
commit 548016c66e
2 changed files with 27 additions and 20 deletions

22
pom.xml
View File

@ -12,7 +12,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>0-SNAPSHOT</revision>
<dbengine.ci>false</dbengine.ci>
<micrometer.version>1.9.3</micrometer.version>
<micrometer.version>1.9.5</micrometer.version>
<lucene.version>9.4.0</lucene.version>
<junit.jupiter.version>5.9.0</junit.jupiter.version>
</properties>
@ -98,7 +98,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.23</version>
<version>2020.0.24</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@ -110,7 +110,7 @@
<artifactId>reactor-tools</artifactId>
<classifier>original</classifier>
<scope>runtime</scope>
<version>3.4.23</version>
<version>3.4.24</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@ -135,12 +135,12 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
<version>1.33</version>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>8.5.8</version>
<version>8.5.9</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
@ -318,7 +318,7 @@
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-codec-native-quic</artifactId>
<version>0.0.28.Final</version>
<version>0.0.33.Final</version>
<classifier>linux-x86_64</classifier>
<exclusions>
<exclusion>
@ -346,7 +346,7 @@
<dependency>
<groupId>com.squareup.moshi</groupId>
<artifactId>moshi</artifactId>
<version>1.13.0</version>
<version>1.14.0</version>
<exclusions>
<exclusion>
<groupId>org.jetbrains.kotlin</groupId>
@ -372,7 +372,7 @@
<dependency>
<groupId>io.projectreactor.netty.incubator</groupId>
<artifactId>reactor-netty-incubator-quic</artifactId>
<version>0.0.11</version>
<version>0.0.13</version>
<exclusions>
<exclusion>
<groupId>io.netty.incubator</groupId>
@ -429,6 +429,10 @@
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -458,7 +462,7 @@
<dependency>
<groupId>io.soabase.record-builder</groupId>
<artifactId>record-builder-core</artifactId>
<version>33</version>
<version>34</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -129,6 +130,8 @@ public class LLUtils {
public static void initHooks() {
if (hookRegistered.compareAndSet(false, true)) {
Hooks.onNextDropped(LLUtils::onNextDropped);
//todo: add Hooks.onDiscard when it will be implemented
// Hooks.onDiscard(LLUtils::onDiscard);
}
}
@ -1022,41 +1025,41 @@ public class LLUtils {
}
} else if (next instanceof Resource<?> resource && resource.isAccessible()) {
resource.close();
} else if (next instanceof Collection<?> iterable) {
if (!(next instanceof QueueSubscription)) {
iterable.forEach(LLUtils::onNextDropped);
}
} else if (next instanceof List<?> iterable) {
iterable.forEach(obj -> closeResource(obj, manual));
} else if (next instanceof Set<?> iterable) {
iterable.forEach(obj -> closeResource(obj, manual));
} else if (next instanceof AbstractImmutableNativeReference rocksObj) {
if (rocksObj.isOwningHandle()) {
rocksObj.close();
}
} else if (next instanceof Optional<?> optional) {
optional.ifPresent(LLUtils::onNextDropped);
optional.ifPresent(obj -> closeResource(obj, manual));
} else if (next instanceof Map.Entry<?, ?> entry) {
var key = entry.getKey();
if (key != null) {
onNextDropped(key);
closeResource(key, manual);
}
var value = entry.getValue();
if (value != null) {
onNextDropped(value);
closeResource(value, manual);
}
} else if (next instanceof Delta<?> delta) {
var previous = delta.previous();
if (previous != null) {
onNextDropped(previous);
closeResource(previous, manual);
}
var current = delta.current();
if (current != null) {
onNextDropped(current);
closeResource(current, manual);
}
} else if (next instanceof Map<?, ?> map) {
map.forEach((key, value) -> {
if (key != null) {
onNextDropped(key);
closeResource(key, manual);
}
if (value != null) {
onNextDropped(value);
closeResource(value, manual);
}
});
}