diff --git a/pom.xml b/pom.xml index 010d651..13d2df6 100644 --- a/pom.xml +++ b/pom.xml @@ -484,7 +484,7 @@ maven-compiler-plugin 3.8.1 - 16 + 17 io.soabase.record-builder @@ -499,8 +499,8 @@ --enable-preview --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED - 16 - 16 + 17 + 17 @@ -533,7 +533,7 @@ - --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED + --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access ci diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 83fe3f9..947de21 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -52,6 +52,7 @@ import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; @@ -339,7 +340,7 @@ public class LLUtils { PlatformDependent.freeDirectBuffer(directBuffer); directBuffer = null; } - directBuffer = LLUtils.obtainDirect(buffer); + directBuffer = LLUtils.obtainDirect(buffer, true); buffer.ensureWritable(size); } } @@ -373,12 +374,12 @@ public class LLUtils { boolean cleanupOnSuccess) { return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(r::close); + return Mono.fromRunnable(() -> r.close()); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) - .doOnDiscard(Send.class, Send::close); + }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) + .doOnDiscard(Send.class, send -> send.close()); } /** @@ -390,13 +391,13 @@ public class LLUtils { boolean cleanupOnSuccess) { return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(r::close); + return Mono.fromRunnable(() -> r.close()); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) - .doOnDiscard(Resource.class, Resource::close) - .doOnDiscard(Send.class, Send::close); + }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) + .doOnDiscard(Resource.class, resource -> resource.close()) + .doOnDiscard(Send.class, send -> send.close()); } /** @@ -408,13 +409,13 @@ public class LLUtils { boolean cleanupOnSuccess) { return Mono.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(r::close); + return Mono.fromRunnable(() -> r.close()); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) - .doOnDiscard(Resource.class, Resource::close) - .doOnDiscard(Send.class, Send::close); + }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) + .doOnDiscard(Resource.class, resource -> resource.close()) + .doOnDiscard(Send.class, send -> send.close()); } /** @@ -426,13 +427,22 @@ public class LLUtils { boolean cleanupOnSuccess) { return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { if (cleanupOnSuccess) { - return Mono.fromRunnable(r::close); + return Mono.fromRunnable(() -> r.close()); } else { return Mono.empty(); } - }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) - .doOnDiscard(Resource.class, Resource::close) - .doOnDiscard(Send.class, Send::close); + }, (r, ex) -> Mono.fromRunnable(() -> r.close()), r -> Mono.fromRunnable(() -> r.close())) + .doOnDiscard(Resource.class, resource -> resource.close()) + .doOnDiscard(Send.class, send -> send.close()); + } + + public static boolean isSet(ScoreDoc[] scoreDocs) { + for (ScoreDoc scoreDoc : scoreDocs) { + if (scoreDoc == null) { + return false; + } + } + return true; } public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} @@ -440,16 +450,16 @@ public class LLUtils { @NotNull public static DirectBuffer newDirect(BufferAllocator allocator, int size) { try (var buf = allocator.allocate(size)) { - var direct = obtainDirect(buf); + var direct = obtainDirect(buf, true); return new DirectBuffer(buf.send(), direct); } } @NotNull - public static DirectBuffer convertToDirect(BufferAllocator allocator, Send content) { + public static DirectBuffer convertToReadableDirect(BufferAllocator allocator, Send content) { try (var buf = content.receive()) { - if (buf.countComponents() != 0) { - var direct = obtainDirect(buf); + if (buf.countComponents() == 1) { + var direct = obtainDirect(buf, false); return new DirectBuffer(buf.send(), direct); } else { var direct = newDirect(allocator, buf.readableBytes()); @@ -462,7 +472,7 @@ public class LLUtils { } @NotNull - public static ByteBuffer obtainDirect(Buffer buffer) { + public static ByteBuffer obtainDirect(Buffer buffer, boolean writable) { if (!PlatformDependent.hasUnsafe()) { throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", PlatformDependent.getUnsafeUnavailabilityCause() @@ -470,15 +480,28 @@ public class LLUtils { } if (!MemorySegmentUtils.isSupported()) { throw new UnsupportedOperationException("Foreign Memory Access API support is disabled." - + " Please set \"--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit\""); + + " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"", + MemorySegmentUtils.getUnsupportedCause() + ); } assert buffer.isAccessible(); AtomicLong nativeAddress = new AtomicLong(0); - if (buffer.countComponents() == 1 && buffer.countReadableComponents() == 1) { - buffer.forEachReadable(0, (i, c) -> { - nativeAddress.setPlain(c.readableNativeAddress()); - return false; - }); + if (buffer.countComponents() == 1) { + if (writable) { + if (buffer.countWritableComponents() == 1) { + buffer.forEachWritable(0, (i, c) -> { + nativeAddress.setPlain(c.writableNativeAddress()); + return false; + }); + } + } else { + if (buffer.countReadableComponents() == 1) { + buffer.forEachReadable(0, (i, c) -> { + nativeAddress.setPlain(c.readableNativeAddress()); + return false; + }); + } + } } if (nativeAddress.getPlain() == 0) { if (buffer.capacity() == 0) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 84447c9..343e8c8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -7,10 +7,12 @@ import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.IndexSearcher; @@ -27,6 +29,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Empty; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; public class CachedIndexSearcherManager implements IndexSearcherManager { @@ -128,35 +131,30 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { } private Mono> generateCachedSearcher(@Nullable LLSnapshot snapshot) { - var onClose = this.closeRequested.asMono(); - var onQueryRefresh = Mono.delay(queryRefreshDebounceTime).then(); - var onInvalidateCache = Mono.firstWithSignal(onClose, onQueryRefresh); + // todo: check if defer is really needed + return Mono.defer(() -> { + var onClose = this.closeRequested.asMono(); + var onQueryRefresh = Mono.delay(queryRefreshDebounceTime).then(); + var onInvalidateCache = Mono.firstWithSignal(onClose, onQueryRefresh).doOnNext(s -> System.err.println("Invalidation triggered")); - return Mono.fromCallable(() -> { - activeSearchers.register(); - IndexSearcher indexSearcher; - SearcherManager associatedSearcherManager; - boolean ownsIndexSearcher; - if (snapshot == null) { - indexSearcher = searcherManager.acquire(); + return Mono.fromCallable(() -> { + activeSearchers.register(); + IndexSearcher indexSearcher; + if (snapshot == null) { + indexSearcher = searcherManager.acquire(); + } else { + indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(); + } indexSearcher.setSimilarity(similarity); - associatedSearcherManager = searcherManager; - ownsIndexSearcher = true; - } else { - indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(); - associatedSearcherManager = null; - ownsIndexSearcher = false; - } - return new LLIndexSearcher(indexSearcher, - associatedSearcherManager, - ownsIndexSearcher, - this::dropCachedIndexSearcher - ); - }) - .cacheInvalidateWhen(indexSearcher -> onInvalidateCache, ResourceSupport::close) - .map(searcher -> searcher.copy(this::dropCachedIndexSearcher).send()) - .takeUntilOther(onClose) - .doOnDiscard(ResourceSupport.class, ResourceSupport::close); + assert indexSearcher.getIndexReader().getRefCount() > 0; + return indexSearcher; + }) + // todo: re-enable caching if needed + //.cacheInvalidateWhen(tuple -> onInvalidateCache) + .map(indexSearcher -> new LLIndexSearcher(indexSearcher, this::dropCachedIndexSearcher).send()) + .takeUntilOther(onClose) + .doOnDiscard(Send.class, Send::close); + }); } private void dropCachedIndexSearcher(LLIndexSearcher cachedIndexSearcher) { @@ -188,24 +186,6 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { } } - @Override - public Flux searchMany(@Nullable LLSnapshot snapshot, Function> searcherFunction) { - return Flux.usingWhen( - this.retrieveSearcher(snapshot).map(Send::receive), - indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()), - cachedIndexSearcher -> Mono.fromRunnable(cachedIndexSearcher::close) - ); - } - - @Override - public Mono search(@Nullable LLSnapshot snapshot, Function> searcherFunction) { - return Mono.usingWhen( - this.retrieveSearcher(snapshot).map(Send::receive), - indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()), - cachedIndexSearcher -> Mono.fromRunnable(cachedIndexSearcher::close) - ); - } - @Override public Mono> retrieveSearcher(@Nullable LLSnapshot snapshot) { if (snapshot == null) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java index c86f222..bc3e133 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java @@ -15,10 +15,6 @@ public interface IndexSearcherManager { void maybeRefresh() throws IOException; - Flux searchMany(@Nullable LLSnapshot snapshot, Function> searcherFunction); - - Mono search(@Nullable LLSnapshot snapshot, Function> searcherFunction); - Mono> retrieveSearcher(@Nullable LLSnapshot snapshot); Mono close(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java index 5ffeca6..093eccd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java @@ -13,51 +13,27 @@ import org.slf4j.LoggerFactory; public class LLIndexSearcher extends ResourceSupport { - private static final Logger logger = LoggerFactory.getLogger(LLIndexSearcher.class); - private final boolean ownsIndexSearcher; - private IndexSearcher indexSearcher; - private SearcherManager associatedSearcherManager; - public LLIndexSearcher(IndexSearcher indexSearcher, - @Nullable SearcherManager associatedSearcherManager, - boolean ownsIndexSearcher, - Drop drop) { + public LLIndexSearcher(IndexSearcher indexSearcher, Drop drop) { super(new LLIndexSearcher.CloseOnDrop(drop)); this.indexSearcher = indexSearcher; - this.associatedSearcherManager = associatedSearcherManager; - this.ownsIndexSearcher = ownsIndexSearcher; } public IndexReader getIndexReader() { if (!isOwned()) { - throw attachTrace(new IllegalStateException("CachedIndexSearcher must be owned to be used")); + throw attachTrace(new IllegalStateException("LLIndexSearcher must be owned to be used")); } return indexSearcher.getIndexReader(); } public IndexSearcher getIndexSearcher() { if (!isOwned()) { - throw attachTrace(new IllegalStateException("CachedIndexSearcher must be owned to be used")); + throw attachTrace(new IllegalStateException("LLIndexSearcher must be owned to be used")); } return indexSearcher; } - public LLIndexSearcher copy(Drop drop) { - if (!isOwned()) { - throw attachTrace(new IllegalStateException("CachedIndexSearcher must be owned to be used")); - } - var copyIndexSearcher = this.indexSearcher; - boolean ownsIndexSearcher; - if (this.ownsIndexSearcher && associatedSearcherManager != null) { - copyIndexSearcher.getIndexReader().incRef(); - ownsIndexSearcher = true; - } else { - ownsIndexSearcher = false; - } - return new LLIndexSearcher(copyIndexSearcher, associatedSearcherManager, ownsIndexSearcher, drop); - } - @Override protected RuntimeException createResourceClosedException() { return new IllegalStateException("Closed"); @@ -66,14 +42,12 @@ public class LLIndexSearcher extends ResourceSupport prepareSend() { var indexSearcher = this.indexSearcher; - var associatedSearcherManager = this.associatedSearcherManager; makeInaccessible(); - return drop -> new LLIndexSearcher(indexSearcher, associatedSearcherManager, ownsIndexSearcher, drop); + return drop -> new LLIndexSearcher(indexSearcher, drop); } private void makeInaccessible() { this.indexSearcher = null; - this.associatedSearcherManager = null; } private static class CloseOnDrop implements Drop { @@ -87,14 +61,7 @@ public class LLIndexSearcher extends ResourceSupport 0) { - obj.associatedSearcherManager.release(obj.indexSearcher); - } - } delegate.drop(obj); - } catch (IOException e) { - logger.error("Failed to drop CachedIndexSearcher", e); } finally { obj.makeInaccessible(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java index 6ac8213..684385b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java @@ -19,6 +19,7 @@ import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.MultiReader; import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; public interface LLIndexSearchers extends Resource { @@ -30,9 +31,9 @@ public interface LLIndexSearchers extends Resource { return new UnshardedIndexSearchers(indexSearcher, d -> {}); } - Iterable shards(); + List shards(); - LLIndexSearcher shard(int shardIndex); + IndexSearcher shard(int shardIndex); IndexReader allShards(); @@ -47,19 +48,19 @@ public interface LLIndexSearchers extends Resource { } @Override - public Iterable shards() { - return Collections.singleton(indexSearcher); + public List shards() { + return List.of(indexSearcher.getIndexSearcher()); } @Override - public LLIndexSearcher shard(int shardIndex) { + public IndexSearcher shard(int shardIndex) { if (!isOwned()) { throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used")); } if (shardIndex != -1) { throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index"); } - return indexSearcher; + return indexSearcher.getIndexSearcher(); } @Override @@ -67,7 +68,7 @@ public interface LLIndexSearchers extends Resource { return indexSearcher.getIndexReader(); } - public LLIndexSearcher shard() { + public IndexSearcher shard() { return this.shard(-1); } @@ -111,43 +112,53 @@ public interface LLIndexSearchers extends Resource { implements LLIndexSearchers { private List indexSearchers; + private List indexSearchersVals; public ShardedIndexSearchers(List> indexSearchers, Drop drop) { super(new CloseOnDrop(drop)); this.indexSearchers = new ArrayList<>(indexSearchers.size()); - for (Send indexSearcher : indexSearchers) { - this.indexSearchers.add(indexSearcher.receive()); + this.indexSearchersVals = new ArrayList<>(indexSearchers.size()); + for (Send llIndexSearcher : indexSearchers) { + var indexSearcher = llIndexSearcher.receive(); + this.indexSearchers.add(indexSearcher); + this.indexSearchersVals.add(indexSearcher.getIndexSearcher()); } } @Override - public Iterable shards() { - return Collections.unmodifiableList(indexSearchers); + public List shards() { + if (!isOwned()) { + throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); + } + return Collections.unmodifiableList(indexSearchersVals); } @Override - public LLIndexSearcher shard(int shardIndex) { + public IndexSearcher shard(int shardIndex) { if (!isOwned()) { throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); } if (shardIndex < 0) { throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid"); } - return indexSearchers.get(shardIndex); + return indexSearchersVals.get(shardIndex); } @Override public IndexReader allShards() { - var irs = new IndexReader[indexSearchers.size()]; - for (int i = 0, s = indexSearchers.size(); i < s; i++) { - irs[i] = indexSearchers.get(i).getIndexReader(); + if (!isOwned()) { + throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used")); + } + var irs = new IndexReader[indexSearchersVals.size()]; + for (int i = 0, s = indexSearchersVals.size(); i < s; i++) { + irs[i] = indexSearchersVals.get(i).getIndexReader(); } Object2IntOpenHashMap indexes = new Object2IntOpenHashMap<>(); for (int i = 0; i < irs.length; i++) { indexes.put(irs[i], i); } try { - return new MultiReader(irs, Comparator.comparingInt(indexes::getInt), true); + return new MultiReader(irs, Comparator.comparingInt(indexes::getInt), false); } catch (IOException ex) { // This shouldn't happen throw new UncheckedIOException(ex); @@ -171,10 +182,12 @@ public interface LLIndexSearchers extends Resource { private void makeInaccessible() { this.indexSearchers = null; + this.indexSearchersVals = null; } private static class CloseOnDrop implements Drop { + private volatile boolean dropped = false; private final Drop delegate; public CloseOnDrop(Drop drop) { @@ -184,11 +197,15 @@ public interface LLIndexSearchers extends Resource { @Override public void drop(ShardedIndexSearchers obj) { try { + assert !dropped; if (obj.indexSearchers != null) { for (LLIndexSearcher indexSearcher : obj.indexSearchers) { - indexSearcher.close(); + if (indexSearcher.isAccessible()) { + indexSearcher.close(); + } } } + dropped = true; delegate.drop(obj); } finally { obj.makeInaccessible(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 65db216..51351cd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -300,7 +300,7 @@ public class LLLocalDictionary implements LLDictionary { // Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers // Create the key nio buffer to pass to RocksDB - var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); + var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); // Create a direct result buffer because RocksDB works only with direct buffers try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) { int valueSize; @@ -308,7 +308,7 @@ public class LLLocalDictionary implements LLDictionary { ByteBuffer resultNioBuf; do { // Create the result nio buffer to pass to RocksDB - resultNioBuf = LLUtils.obtainDirect(resultBuf); + resultNioBuf = LLUtils.obtainDirect(resultBuf, true); assert keyNioBuffer.byteBuffer().isDirect(); assert resultNioBuf.isDirect(); valueSize = db.get(cfh, @@ -415,10 +415,10 @@ public class LLLocalDictionary implements LLDictionary { throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); } if (databaseOptions.allowNettyDirect()) { - var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); + var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); try (var ignored1 = keyNioBuffer.buffer().receive()) { assert keyNioBuffer.byteBuffer().isDirect(); - var valueNioBuffer = LLUtils.convertToDirect(alloc, value.send()); + var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send()); try (var ignored2 = valueNioBuffer.buffer().receive()) { assert valueNioBuffer.byteBuffer().isDirect(); db.put(cfh, validWriteOptions, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer()); @@ -479,7 +479,7 @@ public class LLLocalDictionary implements LLDictionary { if (range.hasMin()) { try (var rangeMin = range.getMin().receive()) { if (databaseOptions.allowNettyDirect()) { - var directBuf = LLUtils.convertToDirect(alloc, rangeMin.send()); + var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); cloned1 = directBuf.buffer().receive(); direct1 = directBuf.byteBuffer(); readOpts.setIterateLowerBound(slice1 = new DirectSlice(directBuf.byteBuffer())); @@ -491,7 +491,7 @@ public class LLLocalDictionary implements LLDictionary { if (range.hasMax()) { try (var rangeMax = range.getMax().receive()) { if (databaseOptions.allowNettyDirect()) { - var directBuf = LLUtils.convertToDirect(alloc, rangeMax.send()); + var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMax.send()); cloned2 = directBuf.buffer().receive(); direct2 = directBuf.byteBuffer(); readOpts.setIterateUpperBound(slice2 = new DirectSlice(directBuf.byteBuffer())); @@ -504,7 +504,7 @@ public class LLLocalDictionary implements LLDictionary { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { try (var rangeMin = range.getMin().receive()) { if (databaseOptions.allowNettyDirect()) { - var directBuf = LLUtils.convertToDirect(alloc, rangeMin.send()); + var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); cloned3 = directBuf.buffer().receive(); direct3 = directBuf.byteBuffer(); rocksIterator.seek(directBuf.byteBuffer()); @@ -910,7 +910,7 @@ public class LLLocalDictionary implements LLDictionary { } var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); if (databaseOptions.allowNettyDirect()) { - var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); + var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); try { db.delete(cfh, validWriteOptions, keyNioBuffer.byteBuffer()); } finally { @@ -1176,9 +1176,9 @@ public class LLLocalDictionary implements LLDictionary { batch.close(); } else { for (LLEntry entry : entriesWindow) { - var k = LLUtils.convertToDirect(alloc, entry.getKey()); + var k = LLUtils.convertToReadableDirect(alloc, entry.getKey()); try { - var v = LLUtils.convertToDirect(alloc, entry.getValue()); + var v = LLUtils.convertToReadableDirect(alloc, entry.getValue()); try { db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer()); } finally { @@ -1309,9 +1309,9 @@ public class LLLocalDictionary implements LLDictionary { } else { int i = 0; for (Tuple2 entry : entriesWindow) { - var k = LLUtils.convertToDirect(alloc, entry.getT1().send()); + var k = LLUtils.convertToReadableDirect(alloc, entry.getT1().send()); try { - var v = LLUtils.convertToDirect(alloc, updatedValuesToWrite.get(i)); + var v = LLUtils.convertToReadableDirect(alloc, updatedValuesToWrite.get(i)); try { db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer()); } finally { @@ -1679,9 +1679,9 @@ public class LLLocalDictionary implements LLDictionary { if (!USE_WRITE_BATCHES_IN_SET_RANGE) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); - var k = LLUtils.convertToDirect(alloc, entry.getKey()); + var k = LLUtils.convertToReadableDirect(alloc, entry.getKey()); try { - var v = LLUtils.convertToDirect(alloc, entry.getValue()); + var v = LLUtils.convertToReadableDirect(alloc, entry.getValue()); try { db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer()); } finally { @@ -1874,7 +1874,7 @@ public class LLLocalDictionary implements LLDictionary { Send bufferToReceive) { try (var buffer = bufferToReceive.receive()) { if (allowNettyDirect) { - var direct = LLUtils.convertToDirect(alloc, buffer.send()); + var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); assert direct.byteBuffer().isDirect(); rocksIterator.seek(direct.byteBuffer()); return () -> { @@ -1895,7 +1895,7 @@ public class LLLocalDictionary implements LLDictionary { requireNonNull(buffer); AbstractSlice slice; if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) { - var direct = LLUtils.convertToDirect(alloc, buffer.send()); + var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); buffer = direct.buffer().receive(); assert direct.byteBuffer().isDirect(); slice = new DirectSlice(direct.byteBuffer(), buffer.readableBytes()); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index e5e0f92..950ec41 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -100,7 +100,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } if (!MemorySegmentUtils.isSupported()) { throw new UnsupportedOperationException("Foreign Memory Access API support is disabled." - + " Please set \"--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit\""); + + " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"", + MemorySegmentUtils.getUnsupportedCause() + ); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 08ea765..7b3814a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -96,9 +96,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { // Resolve the snapshot of each shard .flatMap(tuple -> Mono .fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1())) - .flatMap(luceneSnapshot -> tuple.getT2().retrieveSearcher( - luceneSnapshot.orElse(null)) - ) + .flatMap(luceneSnapshot -> tuple.getT2().retrieveSearcher(luceneSnapshot.orElse(null))) ) .collectList() .map(searchers -> LLIndexSearchers.of(searchers).send()); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java index 94f6236..608404e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java @@ -17,32 +17,29 @@ public class MemorySegmentUtils { private static final Object NATIVE; static { - Lookup lookup = MethodHandles.publicLookup(); + Lookup lookup = MethodHandles.lookup(); Object nativeVal = null; - MethodHandle ofNativeRestricted; - try { - ofNativeRestricted = lookup.findStatic(Class.forName("jdk.incubator.foreign.MemorySegment"), - "ofNativeRestricted", - MethodType.methodType(Class.forName("jdk.incubator.foreign.MemorySegment")) - ); + var ofNativeRestricted = getJava16NativeRestricted(lookup); + if (ofNativeRestricted == null) { + cause = null; + ofNativeRestricted = getJava17NativeRestricted(lookup); + } + if (ofNativeRestricted != null) { try { nativeVal = ofNativeRestricted.invoke(); } catch (Throwable e) { cause = e; } - } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { - ofNativeRestricted = null; - cause = e; } OF_NATIVE_RESTRICTED = ofNativeRestricted; MethodHandle asSlice; try { - asSlice = lookup.findVirtual(Class.forName("jdk.incubator.foreign.MemorySegment"), + asSlice = lookup.findVirtual(lookup.findClass("jdk.incubator.foreign.MemorySegment"), "asSlice", - MethodType.methodType(Class.forName("jdk.incubator.foreign.MemorySegment"), long.class, long.class) + MethodType.methodType(lookup.findClass("jdk.incubator.foreign.MemorySegment"), long.class, long.class) ); } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { asSlice = null; @@ -52,7 +49,7 @@ public class MemorySegmentUtils { MethodHandle asByteBuffer; try { - asByteBuffer = lookup.findVirtual(Class.forName("jdk.incubator.foreign.MemorySegment"), + asByteBuffer = lookup.findVirtual(lookup.findClass("jdk.incubator.foreign.MemorySegment"), "asByteBuffer", MethodType.methodType(ByteBuffer.class)); } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { asByteBuffer = null; @@ -63,6 +60,36 @@ public class MemorySegmentUtils { NATIVE = nativeVal; } + @SuppressWarnings("JavaLangInvokeHandleSignature") + private static MethodHandle getJava16NativeRestricted(Lookup lookup) { + MethodHandle ofNativeRestricted; + try { + ofNativeRestricted = lookup.findStatic(lookup.findClass("jdk.incubator.foreign.MemorySegment"), + "ofNativeRestricted", + MethodType.methodType(lookup.findClass("jdk.incubator.foreign.MemorySegment")) + ); + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { + ofNativeRestricted = null; + cause = e; + } + return ofNativeRestricted; + } + + @SuppressWarnings("JavaLangInvokeHandleSignature") + private static MethodHandle getJava17NativeRestricted(Lookup lookup) { + MethodHandle ofNativeRestricted; + try { + ofNativeRestricted = lookup.findStatic(lookup.findClass("jdk.incubator.foreign.MemorySegment"), + "globalNativeSegment", + MethodType.methodType(lookup.findClass("jdk.incubator.foreign.MemorySegment")) + ); + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { + ofNativeRestricted = null; + cause = e; + } + return ofNativeRestricted; + } + public static ByteBuffer directBuffer(long address, long size) { if (address <= 0) { throw new IllegalArgumentException("Address is " + address); @@ -76,13 +103,15 @@ public class MemorySegmentUtils { return PlatformDependent.directBuffer(address, (int) size); } throw new UnsupportedOperationException("Foreign Memory Access API is disabled!" - + " Please set \"--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit\""); + + " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"", + getUnsupportedCause() + ); } var memorySegment = AS_SLICE.invoke(NATIVE, address, size); return (ByteBuffer) AS_BYTE_BUFFER.invoke(memorySegment); } catch (Throwable e) { throw new UnsupportedOperationException("Foreign Memory Access API is disabled!" - + " Please set \"--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit\"", e); + + " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"", e); } } @@ -93,4 +122,8 @@ public class MemorySegmentUtils { public static Throwable getUnsupportedCause() { return cause; } + + public static String getSuggestedArgs() { + return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access"; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 0139dda..2d4dd88 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -51,6 +51,7 @@ import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; @@ -376,7 +377,7 @@ public class LuceneUtils { } public static Flux convertHits(Flux hitsFlux, - LLIndexSearchers indexSearchers, + List indexSearchers, String keyFieldName, boolean preserveOrder) { if (preserveOrder) { @@ -401,7 +402,7 @@ public class LuceneUtils { @Nullable private static LLKeyScore mapHitBlocking(ScoreDoc hit, - LLIndexSearchers indexSearchers, + List indexSearchers, String keyFieldName) { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread"); @@ -409,7 +410,10 @@ public class LuceneUtils { int shardDocId = hit.doc; int shardIndex = hit.shardIndex; float score = hit.score; - var indexSearcher = indexSearchers.shard(shardIndex); + if (shardIndex == -1 && indexSearchers.size() == 1) { + shardIndex = 0; + } + var indexSearcher = indexSearchers.get(shardIndex); try { String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName); return new LLKeyScore(shardDocId, score, collectedDoc); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java index 2746c82..3e2037a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchResult.java @@ -89,7 +89,11 @@ public final class LuceneSearchResult extends ResourceSupport this // Search first page results - .searchFirstPage(indexSearchers, queryParams, paginationInfo) + .searchFirstPage(indexSearchers.shards(), queryParams, paginationInfo) // Compute the results of the first page .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, keyFieldName, queryParams)) // Compute other results - .transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers, queryParams, keyFieldName)) + .map(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, keyFieldName, indexSearchers::close)) // Ensure that one LuceneSearchResult is always returned .single(), false); @@ -65,7 +68,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { /** * Search effectively the raw results of the first page */ - private Mono searchFirstPage(LLIndexSearchers indexSearchers, + private Mono searchFirstPage(Iterable indexSearchers, LocalQueryParams queryParams, PaginationInfo paginationInfo) { var limit = paginationInfo.totalLimit(); @@ -86,9 +89,11 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { LocalQueryParams queryParams) { return firstPageDataMono.map(firstPageData -> { var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits); + var scoreDocs = firstPageData.topDocs().scoreDocs; + assert LLUtils.isSet(scoreDocs); - Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageData.topDocs().scoreDocs), - indexSearchers, keyFieldName, true) + Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(scoreDocs), + indexSearchers.shards(), keyFieldName, true) .take(queryParams.limit(), true); CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo(); @@ -97,33 +102,35 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { }); } - private Mono> computeOtherResults(Mono firstResultMono, - LLIndexSearchers indexSearchers, + private Send computeOtherResults(FirstPageResults firstResult, + List indexSearchers, LocalQueryParams queryParams, - String keyFieldName) { - return firstResultMono.map(firstResult -> { - var totalHitsCount = firstResult.totalHitsCount(); - var firstPageHitsFlux = firstResult.firstPageHitsFlux(); - var secondPageInfo = firstResult.nextPageInfo(); + String keyFieldName, + Runnable drop) { + var totalHitsCount = firstResult.totalHitsCount(); + var firstPageHitsFlux = firstResult.firstPageHitsFlux(); + var secondPageInfo = firstResult.nextPageInfo(); - Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); + Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); - Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); - return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> indexSearchers.close()).send(); - }); + Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); + return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> drop.run()).send(); } /** * Search effectively the merged raw results of the next pages */ - private Flux searchOtherPages(LLIndexSearchers indexSearchers, + private Flux searchOtherPages(List indexSearchers, LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { return Flux .defer(() -> { AtomicReference currentPageInfoRef = new AtomicReference<>(secondPageInfo); - return this - .searchPage(queryParams, indexSearchers, true, queryParams.pageLimits(), - 0, currentPageInfoRef.get()) + return Mono + .fromSupplier(currentPageInfoRef::get) + .doOnNext(s -> System.err.println("Current page info: " + s)) + .flatMap(currentPageInfo -> this.searchPage(queryParams, indexSearchers, true, + queryParams.pageLimits(), 0, currentPageInfo)) + .doOnNext(s -> System.err.println("Next page info: " + s.nextPageInfo())) .doOnNext(s -> currentPageInfoRef.set(s.nextPageInfo())) .repeatWhen(s -> s.takeWhile(n -> n > 0)); }) @@ -140,7 +147,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { * skip the first n results in the first page */ private Mono searchPage(LocalQueryParams queryParams, - LLIndexSearchers indexSearchers, + Iterable indexSearchers, boolean allowPagination, PageLimits pageLimits, int resultsOffset, @@ -154,18 +161,19 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher { if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { var sort = getSort(queryParams); var pageLimit = pageLimits.getPageLimit(s.pageIndex()); + var after = (FieldDoc) s.last(); var totalHitsThreshold = LuceneUtils.totalHitsThreshold(); - return new ScoringShardsCollectorManager(sort, pageLimit, null, - totalHitsThreshold, resultsOffset, pageLimit); + return new ScoringShardsCollectorManager(sort, pageLimit, after, totalHitsThreshold, + resultsOffset); } else { return null; } }) .flatMap(sharedManager -> Flux - .fromIterable(indexSearchers.shards()) + .fromIterable(indexSearchers) .flatMap(shard -> Mono.fromCallable(() -> { var collector = sharedManager.newCollector(); - shard.getIndexSearcher().search(queryParams.query(), collector); + shard.search(queryParams.query(), collector); return collector; })) .collectList() diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java index 85908d1..92011ad 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; import java.util.Collection; @@ -34,6 +35,14 @@ public class ScoringShardsCollectorManager implements CollectorManager 2147483630) { + this.topN = 2147483630 - startN; + } else if (topN != null && topN > 2147483630) { + this.topN = 2147483630; + } else { + this.topN = topN; + } this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort, numHits, after, totalHitsThreshold); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 683351b..b5cb26a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -13,7 +13,9 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearch import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Objects; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocsCollector; @@ -37,12 +39,13 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this // Search first page results - .searchFirstPage(indexSearchers, queryParams, paginationInfo) + .searchFirstPage(indexSearchers.shards(), queryParams, paginationInfo) // Compute the results of the first page - .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers, + .transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers.shards(), keyFieldName, queryParams)) // Compute other results - .transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers, queryParams, keyFieldName)) + .transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams, + keyFieldName, indexSearchers::close)) // Ensure that one LuceneSearchResult is always returned .single(), false); @@ -62,7 +65,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { /** * Search effectively the raw results of the first page */ - private Mono searchFirstPage(LLIndexSearchers indexSearchers, + private Mono searchFirstPage(List indexSearchers, LocalQueryParams queryParams, PaginationInfo paginationInfo) { var limit = paginationInfo.totalLimit(); @@ -77,13 +80,15 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { * Compute the results of the first page, extracting useful data */ private Mono computeFirstPageResults(Mono firstPageDataMono, - LLIndexSearchers indexSearchers, + List indexSearchers, String keyFieldName, LocalQueryParams queryParams) { return firstPageDataMono.map(firstPageData -> { var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits); + var scoreDocs = firstPageData.topDocs().scoreDocs; + assert LLUtils.isSet(scoreDocs); - Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(firstPageData.topDocs().scoreDocs), + Flux firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(scoreDocs), indexSearchers, keyFieldName, true) .take(queryParams.limit(), true); @@ -94,9 +99,10 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { } private Mono> computeOtherResults(Mono firstResultMono, - UnshardedIndexSearchers indexSearchers, + List indexSearchers, LocalQueryParams queryParams, - String keyFieldName) { + String keyFieldName, + Runnable drop) { return firstResultMono.map(firstResult -> { var totalHitsCount = firstResult.totalHitsCount(); var firstPageHitsFlux = firstResult.firstPageHitsFlux(); @@ -105,14 +111,14 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { Flux nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo); Flux combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux); - return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> indexSearchers.close()).send(); + return new LuceneSearchResult(totalHitsCount, combinedFlux, d -> drop.run()).send(); }); } /** * Search effectively the merged raw results of the next pages */ - private Flux searchOtherPages(UnshardedIndexSearchers indexSearchers, + private Flux searchOtherPages(List indexSearchers, LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { return Flux .generate( @@ -133,7 +139,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { * skip the first n results in the first page */ private CurrentPageInfo searchPageSync(LocalQueryParams queryParams, - LLIndexSearchers indexSearchers, + List indexSearchers, boolean allowPagination, int resultsOffset, CurrentPageInfo s, @@ -142,12 +148,6 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { if (resultsOffset < 0) { throw new IndexOutOfBoundsException(resultsOffset); } - UnshardedIndexSearchers unshardedIndexSearchers; - if (indexSearchers instanceof UnshardedIndexSearchers unshardedIndexSearchers1) { - unshardedIndexSearchers = unshardedIndexSearchers1; - } else { - throw new IllegalArgumentException(); - } var currentPageLimit = queryParams.pageLimits().getPageLimit(s.pageIndex()); if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { TopDocs pageTopDocs; @@ -155,7 +155,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { TopDocsCollector collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), currentPageLimit, s.last(), LuceneUtils.totalHitsThreshold(), allowPagination, queryParams.isScored()); - unshardedIndexSearchers.shard().getIndexSearcher().search(queryParams.query(), collector); + indexSearchers.get(0).search(queryParams.query(), collector); if (resultsOffset > 0) { pageTopDocs = collector.topDocs(resultsOffset, currentPageLimit); } else { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java index 6638c5a..a1c7503 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleUnsortedUnscoredLuceneMultiSearcher.java @@ -1,15 +1,12 @@ package it.cavallium.dbengine.lucene.searcher; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,11 +37,14 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea } }) .then(indexSearchersMono.map(Send::receive)); + var localQueryParams = getLocalQueryParams(queryParams); return LLUtils.usingResource(indexSearchersResource, indexSearchers -> Flux.fromIterable(indexSearchers.shards()) - .flatMap(searcher -> localSearcher - .collect(Mono.just(searcher.send()), queryParams, keyFieldName, transformer)) + .flatMap(searcher -> { + var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, d -> {}).send()); + return localSearcher.collect(llSearcher, localQueryParams, keyFieldName, transformer); + }) .collectList() .map(results -> { List resultsToDrop = new ArrayList<>(results.size()); @@ -60,7 +60,10 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea } var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); - Flux mergedFluxes = Flux.merge(resultsFluxes); + Flux mergedFluxes = Flux + .merge(resultsFluxes) + .skip(queryParams.offset()) + .take(queryParams.limit(), true); return new LuceneSearchResult(totalHitsCount, mergedFluxes, d -> { for (LuceneSearchResult luceneSearchResult : resultsToDrop) { @@ -68,7 +71,18 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea } }).send(); }), - true + false + ); + } + + private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { + return new LocalQueryParams(queryParams.query(), + 0, + queryParams.limit(), + queryParams.pageLimits(), + queryParams.minCompetitiveScore(), + queryParams.sort(), + queryParams.scoreMode() ); } } diff --git a/src/main/java/org/rocksdb/CappedWriteBatch.java b/src/main/java/org/rocksdb/CappedWriteBatch.java index f5e11d6..1d242f5 100644 --- a/src/main/java/org/rocksdb/CappedWriteBatch.java +++ b/src/main/java/org/rocksdb/CappedWriteBatch.java @@ -105,13 +105,13 @@ public class CappedWriteBatch extends WriteBatch { var value = valueToReceive.receive(); if (USE_FAST_DIRECT_BUFFERS && isDirect(key) && isDirect(value)) { buffersToRelease.add(value); - var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); + var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); key = keyNioBuffer.buffer().receive(); buffersToRelease.add(key); byteBuffersToRelease.add(keyNioBuffer.byteBuffer()); assert keyNioBuffer.byteBuffer().isDirect(); - var valueNioBuffer = LLUtils.convertToDirect(alloc, value.send()); + var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send()); value = valueNioBuffer.buffer().receive(); buffersToRelease.add(value); byteBuffersToRelease.add(valueNioBuffer.byteBuffer()); @@ -172,7 +172,7 @@ public class CappedWriteBatch extends WriteBatch { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send keyToReceive) throws RocksDBException { var key = keyToReceive.receive(); if (USE_FAST_DIRECT_BUFFERS) { - var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send()); + var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); key = keyNioBuffer.buffer().receive(); buffersToRelease.add(key); byteBuffersToRelease.add(keyNioBuffer.byteBuffer()); diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index dfb27c7..79007fe 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -98,7 +98,7 @@ public class DbTestUtils { if (!MemorySegmentUtils.isSupported()) { System.err.println("Warning! Foreign Memory Access API is not available!" + " Netty direct buffers will not be used in tests!" - + " Please set \"--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit\""); + + " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\""); if (MemorySegmentUtils.getUnsupportedCause() != null) { System.err.println("\tCause: " + MemorySegmentUtils.getUnsupportedCause().getClass().getName() + ":" + MemorySegmentUtils.getUnsupportedCause().getLocalizedMessage());