Add more discard handlers

This commit is contained in:
Andrea Cavalli 2021-11-08 12:06:32 +01:00
parent 59aa1ef5c6
commit 232e46bcea
6 changed files with 28 additions and 8 deletions

View File

@ -480,7 +480,8 @@ public class LLUtils {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close()))
.doOnDiscard(Send.class, send -> send.close()); .doOnDiscard(Send.class, send -> send.close())
.doOnDiscard(Resource.class, Resource::close);
} }
// todo: remove this ugly method // todo: remove this ugly method
@ -916,6 +917,7 @@ public class LLUtils {
.doOnDiscard(Delta.class, LLUtils::discardDelta) .doOnDiscard(Delta.class, LLUtils::discardDelta)
.doOnDiscard(LLDelta.class, LLUtils::discardLLDelta) .doOnDiscard(LLDelta.class, LLUtils::discardLLDelta)
.doOnDiscard(Send.class, LLUtils::discardSend) .doOnDiscard(Send.class, LLUtils::discardSend)
.doOnDiscard(Resource.class, LLUtils::discardResource)
.doOnDiscard(Map.class, LLUtils::discardMap) .doOnDiscard(Map.class, LLUtils::discardMap)
.doOnDiscard(DatabaseStage.class, LLUtils::discardStage); .doOnDiscard(DatabaseStage.class, LLUtils::discardStage);
@ -944,6 +946,8 @@ public class LLUtils {
discardLLDelta(o); discardLLDelta(o);
} else if (obj instanceof Send o) { } else if (obj instanceof Send o) {
discardSend(o); discardSend(o);
} else if (obj instanceof Resource o) {
discardResource(o);
} else if (obj instanceof Map o) { } else if (obj instanceof Map o) {
discardMap(o); discardMap(o);
} else if (obj instanceof DatabaseStage o) { } else if (obj instanceof DatabaseStage o) {
@ -1059,6 +1063,10 @@ public class LLUtils {
send.close(); send.close();
} }
private static void discardResource(Resource<?> res) {
res.close();
}
private static void discardMap(Map<?, ?> map) { private static void discardMap(Map<?, ?> map) {
for (Entry<?, ?> entry : map.entrySet()) { for (Entry<?, ?> entry : map.entrySet()) {
boolean hasByteBuf = false; boolean hasByteBuf = false;

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections; package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport; import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.CompositeSnapshot;
@ -364,11 +365,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
sink.error(e); sink.error(e);
} }
}) })
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
return dictionary return dictionary
.putMulti(serializedEntries, false) .putMulti(serializedEntries, false)
.then() .then()
.doOnDiscard(Send.class, Send::close) .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close)
.doOnDiscard(LLEntry.class, ResourceSupport::close) .doOnDiscard(LLEntry.class, ResourceSupport::close)
.doOnDiscard(List.class, list -> { .doOnDiscard(List.class, list -> {
for (Object o : list) { for (Object o : list) {

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException; import java.io.IOException;
@ -148,7 +149,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
assert indexSearcher.getIndexReader().getRefCount() > 0; assert indexSearcher.getIndexReader().getRefCount() > 0;
return new LLIndexSearcher(indexSearcher, decRef, this::dropCachedIndexSearcher).send(); return new LLIndexSearcher(indexSearcher, decRef, this::dropCachedIndexSearcher).send();
}) })
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
}); });
} }

View File

@ -384,13 +384,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return localSearcher return localSearcher
.collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION) .collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
} }
public Mono<Send<LLIndexSearcher>> retrieveSearcher(@Nullable LLSnapshot snapshot) { public Mono<Send<LLIndexSearcher>> retrieveSearcher(@Nullable LLSnapshot snapshot) {
return searcherManager return searcherManager
.retrieveSearcher(snapshot) .retrieveSearcher(snapshot)
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
} }
@Override @Override

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.memory;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDelta;
@ -528,7 +529,8 @@ public class LLMemoryDictionary implements LLDictionary {
return getRange(snapshot, rangeMono) return getRange(snapshot, rangeMono)
.take(1, true) .take(1, true)
.singleOrEmpty() .singleOrEmpty()
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
} }
@Override @Override
@ -536,7 +538,8 @@ public class LLMemoryDictionary implements LLDictionary {
return getRangeKeys(snapshot, rangeMono) return getRangeKeys(snapshot, rangeMono)
.take(1, true) .take(1, true)
.singleOrEmpty() .singleOrEmpty()
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
} }
@Override @Override

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
@ -39,7 +40,8 @@ public class CountLocalSearcher implements LocalSearcher {
is -> Mono.empty() is -> Mono.empty()
) )
.map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null)) .map(count -> new LuceneSearchResult(TotalHitsCount.of(count, true), Flux.empty(), null))
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
} }
@Override @Override