diff --git a/pom.xml b/pom.xml
index 8aabc35..c4c242e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,10 +99,6 @@
io.net5
netty-buffer
-
- io.net5.incubator
- netty-incubator-buffer-memseg
-
javax.xml.bind
jaxb-api
@@ -275,11 +271,6 @@
netty-buffer
5.0.0.Final-SNAPSHOT
-
- io.net5.incubator
- netty-incubator-buffer-memseg
- 0.0.1.Final-SNAPSHOT
-
javax.xml.bind
jaxb-api
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
index e3d6ef7..f34168b 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
@@ -94,7 +94,7 @@ public class LuceneIndexImpl implements LuceneIndex {
.fromCallable(signal::key)
.map(indicizer::getKey), signal.score())),
llSearchResult.totalHitsCount(),
- d -> llSearchResult.close()
+ llSearchResult::close
).send();
});
}
@@ -107,7 +107,7 @@ public class LuceneIndexImpl implements LuceneIndex {
return new SearchResult<>(llSearchResult.results().map(signal -> {
var key = Mono.fromCallable(signal::key).map(indicizer::getKey);
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
- }), llSearchResult.totalHitsCount(), d -> llSearchResult.close()).send();
+ }), llSearchResult.totalHitsCount(), llSearchResult::close).send();
});
}
@@ -131,10 +131,7 @@ public class LuceneIndexImpl implements LuceneIndex {
Mono.just(tuple3.getT3().orElseThrow()),
tuple3.getT1()
));
- return new SearchResult<>(resultItemsFlux,
- llSearchResult.totalHitsCount(),
- d -> llSearchResult.close()
- ).send();
+ return new SearchResult<>(resultItemsFlux, llSearchResult.totalHitsCount(), llSearchResult::close).send();
});
}
diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResult.java b/src/main/java/it/cavallium/dbengine/client/SearchResult.java
index 0a9bec2..6542f2e 100644
--- a/src/main/java/it/cavallium/dbengine/client/SearchResult.java
+++ b/src/main/java/it/cavallium/dbengine/client/SearchResult.java
@@ -14,18 +14,39 @@ import reactor.core.publisher.Mono;
public final class SearchResult extends LiveResourceSupport, SearchResult> {
+ private static final Drop> DROP = new Drop<>() {
+ @Override
+ public void drop(SearchResult, ?> obj) {
+ if (obj.onClose != null) {
+ obj.onClose.run();
+ }
+ }
+
+ @Override
+ public Drop> fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(SearchResult, ?> obj) {
+
+ }
+ };
+
private Flux> results;
private TotalHitsCount totalHitsCount;
+ private Runnable onClose;
- public SearchResult(Flux> results, TotalHitsCount totalHitsCount,
- Drop> drop) {
- super(drop);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public SearchResult(Flux> results, TotalHitsCount totalHitsCount, Runnable onClose) {
+ super((Drop>) (Drop) DROP);
this.results = results;
this.totalHitsCount = totalHitsCount;
+ this.onClose = onClose;
}
public static SearchResult empty() {
- return new SearchResult(Flux.empty(), TotalHitsCount.of(0, true), d -> {});
+ return new SearchResult(Flux.empty(), TotalHitsCount.of(0, true), null);
}
public Flux> results() {
@@ -50,11 +71,17 @@ public final class SearchResult extends LiveResourceSupport> prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
- return drop -> new SearchResult<>(results, totalHitsCount, drop);
+ var onClose = this.onClose;
+ return drop -> {
+ var instance = new SearchResult<>(results, totalHitsCount, onClose);
+ drop.attach(instance);
+ return instance;
+ };
}
protected void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
+ this.onClose = null;
}
}
diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java
index 7478031..13f97d3 100644
--- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java
+++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java
@@ -2,42 +2,58 @@ package it.cavallium.dbengine.client;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
-import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
-import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LiveResourceSupport;
import it.cavallium.dbengine.database.collections.ValueGetter;
-import java.util.Objects;
-import org.reactivestreams.Publisher;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
public final class SearchResultKeys extends LiveResourceSupport, SearchResultKeys> {
private static final Logger logger = LoggerFactory.getLogger(SearchResultKeys.class);
+ private static final Drop> DROP = new Drop<>() {
+ @Override
+ public void drop(SearchResultKeys> obj) {
+ if (obj.onClose != null) {
+ obj.onClose.run();
+ }
+ }
+
+ @Override
+ public Drop> fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(SearchResultKeys> obj) {
+
+ }
+ };
+
private Flux> results;
private TotalHitsCount totalHitsCount;
+ private Runnable onClose;
- public SearchResultKeys(Flux> results, TotalHitsCount totalHitsCount,
- Drop> drop) {
- super(drop);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public SearchResultKeys(Flux> results, TotalHitsCount totalHitsCount, Runnable onClose) {
+ super((Drop>) (Drop) DROP);
this.results = results;
this.totalHitsCount = totalHitsCount;
+ this.onClose = onClose;
}
public static SearchResultKeys empty() {
- return new SearchResultKeys(Flux.empty(), TotalHitsCount.of(0, true), d -> {});
+ return new SearchResultKeys<>(Flux.empty(), TotalHitsCount.of(0, true), null);
}
public SearchResult withValues(ValueGetter valuesGetter) {
return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(),
item.key().flatMap(valuesGetter::get),
item.score()
- )), totalHitsCount, d -> this.close());
+ )), totalHitsCount, this::close);
}
public Flux> results() {
@@ -62,13 +78,19 @@ public final class SearchResultKeys extends LiveResourceSupport> prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
+ var onClose = this.onClose;
makeInaccessible();
- return drop -> new SearchResultKeys<>(results, totalHitsCount, drop);
+ return drop -> {
+ var instance = new SearchResultKeys<>(results, totalHitsCount, onClose);
+ drop.attach(instance);
+ return instance;
+ };
}
protected void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
+ this.onClose = null;
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDelta.java b/src/main/java/it/cavallium/dbengine/database/LLDelta.java
index 458616b..46c9b81 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLDelta.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLDelta.java
@@ -5,20 +5,66 @@ import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
+import it.cavallium.dbengine.client.SearchResultKeys;
import java.util.StringJoiner;
import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
public class LLDelta extends LiveResourceSupport {
- @Nullable
- private final Buffer previous;
- @Nullable
- private final Buffer current;
- private LLDelta(@Nullable Send previous, @Nullable Send current, Drop drop) {
- super(new LLDelta.CloseOnDrop(drop));
+ private static final Logger logger = LoggerFactory.getLogger(LLDelta.class);
+
+ private static final Drop DROP = new Drop<>() {
+ @Override
+ public void drop(LLDelta obj) {
+ try {
+ if (obj.previous != null) {
+ obj.previous.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close previous", ex);
+ }
+ try {
+ if (obj.current != null) {
+ obj.current.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close current", ex);
+ }
+ try {
+ if (obj.onClose != null) {
+ obj.onClose.run();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close onDrop", ex);
+ }
+ }
+
+ @Override
+ public Drop fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(LLDelta obj) {
+
+ }
+ };
+
+ @Nullable
+ private Buffer previous;
+ @Nullable
+ private Buffer current;
+ @Nullable
+ private Runnable onClose;
+
+ private LLDelta(@Nullable Send previous, @Nullable Send current, @Nullable Runnable onClose) {
+ super(DROP);
assert isAllAccessible();
this.previous = previous != null ? previous.receive().makeReadOnly() : null;
this.current = current != null ? current.receive().makeReadOnly() : null;
+ this.onClose = onClose;
}
private boolean isAllAccessible() {
@@ -31,7 +77,7 @@ public class LLDelta extends LiveResourceSupport {
public static LLDelta of(Send min, Send max) {
assert (min == null && max == null) || (min != max);
- return new LLDelta(min, max, d -> {});
+ return new LLDelta(min, max, null);
}
public Send previous() {
@@ -86,42 +132,28 @@ public class LLDelta extends LiveResourceSupport {
.toString();
}
- public LLDelta copy() {
- ensureOwned();
- var prevCopy = previous != null ? previous.copy().send() : null;
- Send curCopy = current != null ? current.copy().send() : null;
- return new LLDelta(prevCopy, curCopy, d -> {});
- }
-
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
+ @Override
+ protected void makeInaccessible() {
+ this.current = null;
+ this.previous = null;
+ this.onClose = null;
+ }
+
@Override
protected Owned prepareSend() {
Send minSend = this.previous != null ? this.previous.send() : null;
Send maxSend = this.current != null ? this.current.send() : null;
- return drop -> new LLDelta(minSend, maxSend, drop);
+ Runnable onClose = this.onClose;
+ return drop -> {
+ var instance = new LLDelta(minSend, maxSend, onClose);
+ drop.attach(instance);
+ return instance;
+ };
}
- private static class CloseOnDrop implements Drop {
-
- private final Drop delegate;
-
- public CloseOnDrop(Drop drop) {
- if (drop instanceof CloseOnDrop closeOnDrop) {
- this.delegate = closeOnDrop.delegate;
- } else {
- this.delegate = drop;
- }
- }
-
- @Override
- public void drop(LLDelta obj) {
- if (obj.previous != null) obj.previous.close();
- if (obj.current != null) obj.current.close();
- delegate.drop(obj);
- }
- }
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
index 5b9a462..7f9fc9b 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
@@ -5,38 +5,74 @@ import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
+import java.util.Objects;
import java.util.StringJoiner;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
public class LLEntry extends LiveResourceSupport {
- @NotNull
- private final Buffer key;
- @NotNull
- private final Buffer value;
- private LLEntry(@NotNull Send key, @NotNull Send value, Drop drop) {
- super(new LLEntry.CloseOnDrop(drop));
+ private static final Logger logger = LoggerFactory.getLogger(LLEntry.class);
+
+ private static final Drop DROP = new Drop<>() {
+ @Override
+ public void drop(LLEntry obj) {
+ try {
+ if (obj.key != null) {
+ obj.key.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close key", ex);
+ }
+ try {
+ if (obj.value != null) {
+ obj.value.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close value", ex);
+ }
+ }
+
+ @Override
+ public Drop fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(LLEntry obj) {
+
+ }
+ };
+
+ @Nullable
+ private Buffer key;
+ @Nullable
+ private Buffer value;
+
+ private LLEntry(@NotNull Send key, @NotNull Send value) {
+ super(DROP);
this.key = key.receive().makeReadOnly();
this.value = value.receive().makeReadOnly();
assert isAllAccessible();
}
private boolean isAllAccessible() {
- assert key.isAccessible();
- assert value.isAccessible();
+ assert key != null && key.isAccessible();
+ assert value != null && value.isAccessible();
assert this.isAccessible();
assert this.isOwned();
return true;
}
public static LLEntry of(@NotNull Send key, @NotNull Send value) {
- return new LLEntry(key, value, d -> {});
+ return new LLEntry(key, value);
}
public Send getKey() {
ensureOwned();
- return key.copy().send();
+ return Objects.requireNonNull(key).copy().send();
}
public Buffer getKeyUnsafe() {
@@ -45,7 +81,7 @@ public class LLEntry extends LiveResourceSupport {
public Send getValue() {
ensureOwned();
- return value.copy().send();
+ return Objects.requireNonNull(value).copy().send();
}
@@ -64,6 +100,12 @@ public class LLEntry extends LiveResourceSupport {
}
}
+ @Override
+ protected void makeInaccessible() {
+ this.key = null;
+ this.value = null;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -91,11 +133,6 @@ public class LLEntry extends LiveResourceSupport {
.toString();
}
- public LLEntry copy() {
- ensureOwned();
- return new LLEntry(key.copy().send(), value.copy().send(), d -> {});
- }
-
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
@@ -105,28 +142,12 @@ public class LLEntry extends LiveResourceSupport {
protected Owned prepareSend() {
Send keySend;
Send valueSend;
- keySend = this.key.send();
- valueSend = this.value.send();
- return drop -> new LLEntry(keySend, valueSend, drop);
- }
-
- private static class CloseOnDrop implements Drop {
-
- private final Drop delegate;
-
- public CloseOnDrop(Drop drop) {
- if (drop instanceof CloseOnDrop closeOnDrop) {
- this.delegate = closeOnDrop.delegate;
- } else {
- this.delegate = drop;
- }
- }
-
- @Override
- public void drop(LLEntry obj) {
- obj.key.close();
- obj.value.close();
- delegate.drop(obj);
- }
+ keySend = Objects.requireNonNull(this.key).send();
+ valueSend = Objects.requireNonNull(this.value).send();
+ return drop -> {
+ var instance = new LLEntry(keySend, valueSend);
+ drop.attach(instance);
+ return instance;
+ };
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java
index 6602b59..c45dc27 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLRange.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java
@@ -9,13 +9,54 @@ import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import java.util.StringJoiner;
import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
/**
* Range of data, from min (inclusive),to max (exclusive)
*/
public class LLRange extends LiveResourceSupport {
- private static final LLRange RANGE_ALL = new LLRange(null, null, null, d -> {});
+ private static final Logger logger = LoggerFactory.getLogger(LLRange.class);
+
+ private static final Drop DROP = new Drop<>() {
+ @Override
+ public void drop(LLRange obj) {
+ try {
+ if (obj.min != null) {
+ obj.min.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close min", ex);
+ }
+ try {
+ if (obj.max != null) {
+ obj.max.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close max", ex);
+ }
+ try {
+ if (obj.single != null) {
+ obj.single.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close single", ex);
+ }
+ }
+
+ @Override
+ public Drop fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(LLRange obj) {
+
+ }
+ };
+
+ private static final LLRange RANGE_ALL = new LLRange(null, null, null);
@Nullable
private Buffer min;
@Nullable
@@ -23,8 +64,8 @@ public class LLRange extends LiveResourceSupport {
@Nullable
private Buffer single;
- private LLRange(Send min, Send max, Send single, Drop drop) {
- super(new CloseOnDrop(drop));
+ private LLRange(Send min, Send max, Send single) {
+ super(DROP);
assert isAllAccessible();
assert single == null || (min == null && max == null);
this.min = min != null ? min.receive().makeReadOnly() : null;
@@ -46,19 +87,19 @@ public class LLRange extends LiveResourceSupport {
}
public static LLRange from(Send min) {
- return new LLRange(min, null, null, d -> {});
+ return new LLRange(min, null, null);
}
public static LLRange to(Send max) {
- return new LLRange(null, max, null, d -> {});
+ return new LLRange(null, max, null);
}
public static LLRange single(Send single) {
- return new LLRange(null, null, single, d -> {});
+ return new LLRange(null, null, single);
}
public static LLRange of(Send min, Send max) {
- return new LLRange(min, max, null, d -> {});
+ return new LLRange(min, max, null);
}
public boolean isAll() {
@@ -179,8 +220,7 @@ public class LLRange extends LiveResourceSupport {
ensureOwned();
return new LLRange(min != null ? min.copy().send() : null,
max != null ? max.copy().send() : null,
- single != null ? single.copy().send(): null,
- d -> {}
+ single != null ? single.copy().send(): null
);
}
@@ -197,7 +237,11 @@ public class LLRange extends LiveResourceSupport {
minSend = this.min != null ? this.min.send() : null;
maxSend = this.max != null ? this.max.send() : null;
singleSend = this.single != null ? this.single.send() : null;
- return drop -> new LLRange(minSend, maxSend, singleSend, drop);
+ return drop -> {
+ var instance = new LLRange(minSend, maxSend, singleSend);
+ drop.attach(instance);
+ return instance;
+ };
}
protected void makeInaccessible() {
@@ -205,25 +249,4 @@ public class LLRange extends LiveResourceSupport {
this.max = null;
this.single = null;
}
-
- private static class CloseOnDrop implements Drop {
-
- private final Drop delegate;
-
- public CloseOnDrop(Drop drop) {
- if (drop instanceof CloseOnDrop closeOnDrop) {
- this.delegate = closeOnDrop.delegate;
- } else {
- this.delegate = drop;
- }
- }
-
- @Override
- public void drop(LLRange obj) {
- if (obj.min != null) obj.min.close();
- if (obj.max != null) obj.max.close();
- if (obj.single != null) obj.single.close();
- delegate.drop(obj);
- }
- }
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java
index 2ffdbfc..34c3fbf 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java
@@ -13,13 +13,38 @@ public final class LLSearchResultShard extends LiveResourceSupport DROP = new Drop<>() {
+ @Override
+ public void drop(LLSearchResultShard obj) {
+ try {
+ if (obj.onClose != null) {
+ obj.onClose.run();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close onClose", ex);
+ }
+ }
+
+ @Override
+ public Drop fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(LLSearchResultShard obj) {
+
+ }
+ };
+
private Flux results;
private TotalHitsCount totalHitsCount;
+ private Runnable onClose;
- public LLSearchResultShard(Flux results, TotalHitsCount totalHitsCount, Drop drop) {
- super(drop);
+ public LLSearchResultShard(Flux results, TotalHitsCount totalHitsCount, Runnable onClose) {
+ super(DROP);
this.results = results;
this.totalHitsCount = totalHitsCount;
+ this.onClose = onClose;
}
public Flux results() {
@@ -65,11 +90,13 @@ public final class LLSearchResultShard extends LiveResourceSupport prepareSend() {
var results = this.results;
var totalHitsCount = this.totalHitsCount;
- return drop -> new LLSearchResultShard(results, totalHitsCount, drop);
+ var onClose = this.onClose;
+ return drop -> new LLSearchResultShard(results, totalHitsCount, onClose);
}
protected void makeInaccessible() {
this.results = null;
this.totalHitsCount = null;
+ this.onClose = null;
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
index cdd9dbd..1d9b9ea 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
@@ -36,10 +36,8 @@ public class DatabaseEmpty {
private DatabaseEmpty() {
}
- public static DatabaseStageEntry create(LLDictionary dictionary,
- Send key,
- Drop> drop) {
- return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), drop);
+ public static DatabaseStageEntry create(LLDictionary dictionary, Send key, Runnable onClose) {
+ return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()), onClose);
}
public static final class Nothing {
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
index 6c56ecd..c69537c 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
@@ -44,26 +44,26 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep prefixKey,
SerializerFixedBinaryLength keySuffixSerializer,
Serializer valueSerializer,
- Drop>> drop) {
+ Runnable onClose) {
// Do not retain or release or use the prefixKey here
- super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, drop);
+ super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, onClose);
this.valueSerializer = valueSerializer;
}
public static DatabaseMapDictionary simple(LLDictionary dictionary,
SerializerFixedBinaryLength keySerializer,
Serializer valueSerializer,
- Drop>> drop) {
+ Runnable onClose) {
return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer,
- valueSerializer, drop);
+ valueSerializer, onClose);
}
public static DatabaseMapDictionary tail(LLDictionary dictionary,
Send prefixKey,
SerializerFixedBinaryLength keySuffixSerializer,
Serializer valueSerializer,
- Drop>> drop) {
- return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, drop);
+ Runnable onClose) {
+ return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose);
}
private Send toKey(Send suffixKeyToSend) {
@@ -152,7 +152,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.fromCallable(() ->
- new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer, d -> {}));
+ new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer, null));
}
@Override
@@ -401,7 +401,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, toKey(keyBuf.send()), valueSerializer, d -> {})
+ new DatabaseSingle<>(dictionary, toKey(keyBuf.send()), valueSerializer, null)
));
} catch (SerializationException ex) {
sink.error(ex);
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
index 1642944..336f759 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
@@ -25,6 +25,8 @@ import java.util.Map.Entry;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -33,6 +35,45 @@ public class DatabaseMapDictionaryDeep> extend
LiveResourceSupport>, DatabaseMapDictionaryDeep>
implements DatabaseStageMap {
+ private static final Logger logger = LoggerFactory.getLogger(DatabaseMapDictionaryDeep.class);
+
+ private static final Drop> DROP = new Drop<>() {
+ @Override
+ public void drop(DatabaseMapDictionaryDeep, ?, ?> obj) {
+ try {
+ if (obj.range != null) {
+ obj.range.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close range", ex);
+ }
+ try {
+ if (obj.keyPrefix != null) {
+ obj.keyPrefix.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close keyPrefix", ex);
+ }
+ try {
+ if (obj.onClose != null) {
+ obj.onClose.run();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close onClose", ex);
+ }
+ }
+
+ @Override
+ public Drop> fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(DatabaseMapDictionaryDeep, ?, ?> obj) {
+
+ }
+ };
+
protected final LLDictionary dictionary;
private final BufferAllocator alloc;
protected final SubStageGetter subStageGetter;
@@ -44,6 +85,7 @@ public class DatabaseMapDictionaryDeep> extend
protected LLRange range;
protected Buffer keyPrefix;
+ protected Runnable onClose;
private static void incrementPrefix(Buffer prefix, int prefixLength) {
assert prefix.readableBytes() >= prefixLength;
@@ -151,29 +193,30 @@ public class DatabaseMapDictionaryDeep> extend
@Deprecated
public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary,
SerializerFixedBinaryLength keySerializer, SubStageGetterSingle subStageGetter,
- Drop>> drop) {
+ Runnable onClose) {
return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer,
- subStageGetter, 0, drop);
+ subStageGetter, 0, onClose);
}
public static > DatabaseMapDictionaryDeep deepTail(
LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, int keyExtLength,
- SubStageGetter subStageGetter, Drop> drop) {
+ SubStageGetter subStageGetter, Runnable onClose) {
return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer,
- subStageGetter, keyExtLength, drop);
+ subStageGetter, keyExtLength, onClose);
}
public static > DatabaseMapDictionaryDeep deepIntermediate(
LLDictionary dictionary, Send prefixKey, SerializerFixedBinaryLength keySuffixSerializer,
- SubStageGetter subStageGetter, int keyExtLength, Drop> drop) {
+ SubStageGetter subStageGetter, int keyExtLength, Runnable onClose) {
return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter,
- keyExtLength, drop);
+ keyExtLength, onClose);
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
protected DatabaseMapDictionaryDeep(LLDictionary dictionary, @NotNull Send prefixKeyToReceive,
SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength,
- Drop> drop) {
- super(new CloseOnDrop<>(drop));
+ Runnable onClose) {
+ super((Drop>) (Drop) DROP);
try (var prefixKey = prefixKeyToReceive.receive()) {
this.dictionary = dictionary;
this.alloc = dictionary.getAllocator();
@@ -198,9 +241,11 @@ public class DatabaseMapDictionaryDeep> extend
}
this.keyPrefix = prefixKey.send().receive();
+ this.onClose = onClose;
}
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
private DatabaseMapDictionaryDeep(LLDictionary dictionary,
BufferAllocator alloc,
SubStageGetter subStageGetter,
@@ -211,8 +256,8 @@ public class DatabaseMapDictionaryDeep> extend
Mono> rangeMono,
Send range,
Send keyPrefix,
- Drop> drop) {
- super(new CloseOnDrop<>(drop));
+ Runnable onClose) {
+ super((Drop>) (Drop) DROP);
this.dictionary = dictionary;
this.alloc = alloc;
this.subStageGetter = subStageGetter;
@@ -224,6 +269,7 @@ public class DatabaseMapDictionaryDeep> extend
this.range = range.receive();
this.keyPrefix = keyPrefix.receive();
+ this.onClose = onClose;
}
@SuppressWarnings("unused")
@@ -418,34 +464,29 @@ public class DatabaseMapDictionaryDeep> extend
protected Owned> prepareSend() {
var keyPrefix = this.keyPrefix.send();
var range = this.range.send();
- return drop -> new DatabaseMapDictionaryDeep<>(dictionary, alloc, subStageGetter, keySuffixSerializer,
- keyPrefixLength, keySuffixLength, keyExtLength, rangeMono, range, keyPrefix, drop);
+ var onClose = this.onClose;
+ return drop -> {
+ var instance = new DatabaseMapDictionaryDeep<>(dictionary,
+ alloc,
+ subStageGetter,
+ keySuffixSerializer,
+ keyPrefixLength,
+ keySuffixLength,
+ keyExtLength,
+ rangeMono,
+ range,
+ keyPrefix,
+ onClose
+ );
+ drop.attach(instance);
+ return instance;
+ };
}
@Override
protected void makeInaccessible() {
this.keyPrefix = null;
this.range = null;
- }
-
- private static class CloseOnDrop>
- implements Drop> {
-
- private final Drop> delegate;
-
- public CloseOnDrop(Drop> drop) {
- if (drop instanceof CloseOnDrop closeOnDrop) {
- this.delegate = closeOnDrop.delegate;
- } else {
- this.delegate = drop;
- }
- }
-
- @Override
- public void drop(DatabaseMapDictionaryDeep obj) {
- obj.range.close();
- obj.keyPrefix.close();
- delegate.drop(obj);
- }
+ this.onClose = null;
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
index 57ab208..9c9437a 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
@@ -25,6 +25,8 @@ import java.util.Set;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -33,19 +35,45 @@ public class DatabaseMapDictionaryHashed extends
LiveResourceSupport>, DatabaseMapDictionaryHashed>
implements DatabaseStageMap> {
+ private static final Logger logger = LoggerFactory.getLogger(DatabaseMapDictionaryHashed.class);
+
+ private static final Drop> DROP = new Drop<>() {
+ @Override
+ public void drop(DatabaseMapDictionaryHashed, ?, ?> obj) {
+ try {
+ if (obj.subDictionary != null) {
+ obj.subDictionary.close();
+ }
+ } catch (Throwable ex) {
+ logger.error("Failed to close subDictionary", ex);
+ }
+ }
+
+ @Override
+ public Drop> fork() {
+ return this;
+ }
+
+ @Override
+ public void attach(DatabaseMapDictionaryHashed, ?, ?> obj) {
+
+ }
+ };
+
private final BufferAllocator alloc;
private final Function keySuffixHashFunction;
private DatabaseMapDictionary>> subDictionary;
+ @SuppressWarnings({"unchecked", "rawtypes"})
protected DatabaseMapDictionaryHashed(LLDictionary dictionary,
@NotNull Send prefixKey,
Serializer keySuffixSerializer,
Serializer valueSerializer,
Function keySuffixHashFunction,
SerializerFixedBinaryLength keySuffixHashSerializer,
- Drop> drop) {
- super(new DatabaseMapDictionaryHashed.CloseOnDrop<>(drop));
+ Runnable onClose) {
+ super((Drop>) (Drop) DROP);
if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) {
throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW");
}
@@ -55,15 +83,16 @@ public class DatabaseMapDictionaryHashed extends
ValuesSetSerializer> valuesSetSerializer
= new ValuesSetSerializer<>(alloc, valueWithHashSerializer);
this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey, keySuffixHashSerializer,
- valuesSetSerializer, d -> {});
+ valuesSetSerializer, onClose);
this.keySuffixHashFunction = keySuffixHashFunction;
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
private DatabaseMapDictionaryHashed(BufferAllocator alloc,
Function keySuffixHashFunction,
Send>>>> subDictionary,
Drop> drop) {
- super(new CloseOnDrop<>(drop));
+ super((Drop>) (Drop) DROP);
this.alloc = alloc;
this.keySuffixHashFunction = keySuffixHashFunction;
@@ -75,7 +104,7 @@ public class DatabaseMapDictionaryHashed extends
Serializer valueSerializer,
Function keyHashFunction,
SerializerFixedBinaryLength keyHashSerializer,
- Drop> drop) {
+ Runnable onClose) {
return new DatabaseMapDictionaryHashed<>(
dictionary,
LLUtils.empty(dictionary.getAllocator()),
@@ -83,7 +112,7 @@ public class DatabaseMapDictionaryHashed extends
valueSerializer,
keyHashFunction,
keyHashSerializer,
- drop
+ onClose
);
}
@@ -93,14 +122,14 @@ public class DatabaseMapDictionaryHashed extends
Serializer valueSerializer,
Function keySuffixHashFunction,
SerializerFixedBinaryLength keySuffixHashSerializer,
- Drop> drop) {
+ Runnable onClose) {
return new DatabaseMapDictionaryHashed<>(dictionary,
prefixKey,
keySuffixSerializer,
valueSerializer,
keySuffixHashFunction,
keySuffixHashSerializer,
- drop
+ onClose
);
}
@@ -173,7 +202,7 @@ public class DatabaseMapDictionaryHashed extends
private Mono> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) {
return subDictionary
.at(snapshot, hash)
- .map(entry -> new DatabaseSingleBucket(entry, key, d -> {}))
+ .map(entry -> new DatabaseSingleBucket(entry, key, null))
.doOnDiscard(Resource.class, Resource::close);
}
@@ -325,23 +354,4 @@ public class DatabaseMapDictionaryHashed extends
protected void makeInaccessible() {
this.subDictionary = null;
}
-
- private static class CloseOnDrop implements Drop> {
-
- private final Drop> delegate;
-
- public CloseOnDrop(Drop> drop) {
- if (drop instanceof CloseOnDrop closeOnDrop) {
- this.delegate = closeOnDrop.delegate;
- } else {
- this.delegate = drop;
- }
- }
-
- @Override
- public void drop(DatabaseMapDictionaryHashed obj) {
- obj.subDictionary.close();
- delegate.drop(obj);
- }
- }
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java
index c082547..2df8d86 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java
@@ -20,21 +20,21 @@ public class DatabaseSetDictionary extends DatabaseMapDictionary
protected DatabaseSetDictionary(LLDictionary dictionary,
Send prefixKey,
SerializerFixedBinaryLength keySuffixSerializer,
- Drop>> drop) {
- super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), drop);
+ Runnable onClose) {
+ super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()), onClose);
}
public static DatabaseSetDictionary simple(LLDictionary dictionary,
SerializerFixedBinaryLength keySerializer,
- Drop>> drop) {
- return new DatabaseSetDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, drop);
+ Runnable onClose) {
+ return new DatabaseSetDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, onClose);
}
public static DatabaseSetDictionary tail(LLDictionary dictionary,
Send prefixKey,
SerializerFixedBinaryLength keySuffixSerializer,
- Drop>> drop) {
- return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer, drop);
+ Runnable onClose) {
+ return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer, onClose);
}
public Mono> getKeySet(@Nullable CompositeSnapshot snapshot) {
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java
index 68bf230..89f944b 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionaryHashed.java
@@ -25,14 +25,14 @@ public class DatabaseSetDictionaryHashed extends DatabaseMapDictionaryHas
Serializer keySuffixSerializer,
Function keySuffixHashFunction,
SerializerFixedBinaryLength keySuffixHashSerializer,
- Drop | | |