diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index f87c9e3..602889c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -228,7 +228,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriterConfig.setMergeScheduler(mergeScheduler); if (luceneOptions.indexWriterBufferSize() == -1) { //todo: allow to configure maxbuffereddocs fallback - indexWriterConfig.setMaxBufferedDocs(1000); + indexWriterConfig.setMaxBufferedDocs(80000); // disable ram buffer size after enabling maxBufferedDocs indexWriterConfig.setRAMBufferSizeMB(-1); } else { diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 037430d..d8ec85e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -561,7 +561,7 @@ public class LuceneUtils { }).subscribeOn(Schedulers.boundedElastic())); } - public static Collector withTimeout(TopDocsCollector collector, Duration timeout) { + public static Collector withTimeout(Collector collector, Duration timeout) { return new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeout.toMillis()); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java deleted file mode 100644 index 5413418..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java +++ /dev/null @@ -1,59 +0,0 @@ -package it.cavallium.dbengine.lucene.collector; - -import it.cavallium.dbengine.lucene.searcher.LongSemaphore; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.Collector; -import org.apache.lucene.search.CollectorManager; -import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Sinks.Many; - -public class ReactiveCollectorMultiManager implements CollectorMultiManager { - - - public ReactiveCollectorMultiManager() { - } - - public CollectorManager get(LongSemaphore requested, - FluxSink scoreDocsSink, - int shardIndex) { - return new CollectorManager<>() { - - @Override - public Collector newCollector() { - return new Collector() { - - @Override - public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) { - return new ReactiveLeafCollector(leafReaderContext, scoreDocsSink, shardIndex, requested); - } - - @Override - public ScoreMode scoreMode() { - return ReactiveCollectorMultiManager.this.scoreMode(); - } - }; - } - - @Override - public Void reduce(Collection collection) { - return null; - } - }; - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; - } - - @Override - public Void reduce(List results) { - return null; - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java deleted file mode 100644 index 9e10591..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveLeafCollector.java +++ /dev/null @@ -1,60 +0,0 @@ -package it.cavallium.dbengine.lucene.collector; - -import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.lucene.searcher.LongSemaphore; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.CollectionTerminatedException; -import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.Scorable; -import org.apache.lucene.search.ScoreDoc; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Sinks.EmitResult; -import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Schedulers; - -public class ReactiveLeafCollector implements LeafCollector { - - private final LeafReaderContext leafReaderContext; - private final FluxSink scoreDocsSink; - private final int shardIndex; - private final LongSemaphore requested; - - public ReactiveLeafCollector(LeafReaderContext leafReaderContext, - FluxSink scoreDocsSink, - int shardIndex, - LongSemaphore requested) { - this.leafReaderContext = leafReaderContext; - this.scoreDocsSink = scoreDocsSink; - this.shardIndex = shardIndex; - this.requested = requested; - } - - @Override - public void setScorer(Scorable scorable) { - - } - - @Override - public void collect(int i) { - // Assert that this is a non-blocking context - assert !Schedulers.isInNonBlockingThread(); - - // Wait if no requests from downstream are found - try { - while (!requested.tryAcquire(1, TimeUnit.SECONDS)) { - if (scoreDocsSink.isCancelled()) { - throw new CollectionTerminatedException(); - } - } - // Send the response - var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); - scoreDocsSink.next(scoreDoc); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LongSemaphore.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LongSemaphore.java deleted file mode 100644 index fdea877..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LongSemaphore.java +++ /dev/null @@ -1,728 +0,0 @@ -/* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -/* - * This file is available under and governed by the GNU General Public - * License version 2 only, as published by the Free Software Foundation. - * However, the following notice accompanied the original version of this - * file: - * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package it.cavallium.dbengine.lucene.searcher; - -import java.io.Serial; -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; - -/** - * A counting semaphore. Conceptually, a semaphore maintains a set of - * permits. Each {@link #acquire} blocks if necessary until a permit is - * available, and then takes it. Each {@link #release} adds a permit, - * potentially releasing a blocking acquirer. - * However, no actual permit objects are used; the {@code Semaphore} just - * keeps a count of the number available and acts accordingly. - * - *

Semaphores are often used to restrict the number of threads than can - * access some (physical or logical) resource. For example, here is - * a class that uses a semaphore to control access to a pool of items: - *

 {@code
- * class Pool {
- *   private static final long MAX_AVAILABLE = 100;
- *   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
- *
- *   public Object getItem() throws InterruptedException {
- *     available.acquire();
- *     return getNextAvailableItem();
- *   }
- *
- *   public void putItem(Object x) {
- *     if (markAsUnused(x))
- *       available.release();
- *   }
- *
- *   // Not a particularly efficient data structure; just for demo
- *
- *   protected Object[] items = ...; // whatever kinds of items being managed
- *   protected boolean[] used = new boolean[MAX_AVAILABLE];
- *
- *   protected synchronized Object getNextAvailableItem() {
- *     for (long i = 0; i < MAX_AVAILABLE; ++i) {
- *       if (!used[i]) {
- *         used[i] = true;
- *         return items[i];
- *       }
- *     }
- *     return null; // not reached
- *   }
- *
- *   protected synchronized boolean markAsUnused(Object item) {
- *     for (long i = 0; i < MAX_AVAILABLE; ++i) {
- *       if (item == items[i]) {
- *         if (used[i]) {
- *           used[i] = false;
- *           return true;
- *         } else
- *           return false;
- *       }
- *     }
- *     return false;
- *   }
- * }}
- * - *

Before obtaining an item each thread must acquire a permit from - * the semaphore, guaranteeing that an item is available for use. When - * the thread has finished with the item it is returned back to the - * pool and a permit is returned to the semaphore, allowing another - * thread to acquire that item. Note that no synchronization lock is - * held when {@link #acquire} is called as that would prevent an item - * from being returned to the pool. The semaphore encapsulates the - * synchronization needed to restrict access to the pool, separately - * from any synchronization needed to maintain the consistency of the - * pool itself. - * - *

A semaphore initialized to one, and which is used such that it - * only has at most one permit available, can serve as a mutual - * exclusion lock. This is more commonly known as a binary - * semaphore, because it only has two states: one permit - * available, or zero permits available. When used in this way, the - * binary semaphore has the property (unlike many {@link java.util.concurrent.locks.Lock} - * implementations), that the "lock" can be released by a - * thread other than the owner (as semaphores have no notion of - * ownership). This can be useful in some specialized contexts, such - * as deadlock recovery. - * - *

The constructor for this class optionally accepts a - * fairness parameter. When set false, this class makes no - * guarantees about the order in which threads acquire permits. In - * particular, barging is permitted, that is, a thread - * invoking {@link #acquire} can be allocated a permit ahead of a - * thread that has been waiting - logically the new thread places itself at - * the head of the queue of waiting threads. When fairness is set true, the - * semaphore guarantees that threads invoking any of the {@link - * #acquire() acquire} methods are selected to obtain permits in the order in - * which their invocation of those methods was processed - * (first-in-first-out; FIFO). Note that FIFO ordering necessarily - * applies to specific internal points of execution within these - * methods. So, it is possible for one thread to invoke - * {@code acquire} before another, but reach the ordering point after - * the other, and similarly upon return from the method. - * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not - * honor the fairness setting, but will take any permits that are - * available. - * - *

Generally, semaphores used to control resource access should be - * initialized as fair, to ensure that no thread is starved out from - * accessing a resource. When using semaphores for other kinds of - * synchronization control, the throughput advantages of non-fair - * ordering often outweigh fairness considerations. - * - *

This class also provides convenience methods to {@link - * #acquire(long) acquire} and {@link #release(long) release} multiple - * permits at a time. These methods are generally more efficient and - * effective than loops. However, they do not establish any preference - * order. For example, if thread A invokes {@code s.acquire(3}) and - * thread B invokes {@code s.acquire(2)}, and two permits become - * available, then there is no guarantee that thread B will obtain - * them unless its acquire came first and Semaphore {@code s} is in - * fair mode. - * - *

Memory consistency effects: Actions in a thread prior to calling - * a "release" method such as {@code release()} - * happen-before - * actions following a successful "acquire" method such as {@code acquire()} - * in another thread. - * - * @since 1.5 - * @author Doug Lea - */ -@SuppressWarnings("unused") -public class LongSemaphore implements java.io.Serializable { - @Serial - private static final long serialVersionUID = -3222578661600680210L; - /** All mechanics via AbstractQueuedSynchronizer subclass */ - private final Sync sync; - - /** - * Synchronization implementation for semaphore. Uses AQS state - * to represent permits. Subclassed into fair and nonfair - * versions. - */ - abstract static class Sync extends AbstractQueuedLongSynchronizer { - @Serial - private static final long serialVersionUID = 1192457210091910933L; - - Sync(long permits) { - setState(permits); - } - - final long getPermits() { - return getState(); - } - - final long nonfairTryAcquireShared(long acquires) { - for (;;) { - long available = getState(); - long remaining = available - acquires; - if (remaining < 0L || - compareAndSetState(available, remaining)) - return remaining; - } - } - - protected final boolean tryReleaseShared(long releases) { - for (;;) { - long current = getState(); - long next = current + releases; - if (next < current) // overflow - throw new Error("Maximum permit count exceeded"); - if (compareAndSetState(current, next)) - return true; - } - } - - final void reducePermits(long reductions) { - for (;;) { - long current = getState(); - long next = current - reductions; - if (next > current) // underflow - throw new Error("Permit count underflow"); - if (compareAndSetState(current, next)) - return; - } - } - - final long drainPermits() { - for (;;) { - long current = getState(); - if (current == 0 || compareAndSetState(current, 0)) - return current; - } - } - } - - /** - * NonFair version - */ - static final class NonfairSync extends Sync { - @Serial - private static final long serialVersionUID = -2694183684443567898L; - - NonfairSync(long permits) { - super(permits); - } - - protected long tryAcquireShared(long acquires) { - return nonfairTryAcquireShared(acquires); - } - } - - /** - * Fair version - */ - static final class FairSync extends Sync { - @Serial - private static final long serialVersionUID = 2014338818796000944L; - - FairSync(long permits) { - super(permits); - } - - protected long tryAcquireShared(long acquires) { - for (;;) { - if (hasQueuedPredecessors()) - return -1; - long available = getState(); - long remaining = available - acquires; - if (remaining < 0 || - compareAndSetState(available, remaining)) - return remaining; - } - } - } - - /** - * Creates a {@code Semaphore} with the given number of - * permits and nonfair fairness setting. - * - * @param permits the initial number of permits available. - * This value may be negative, in which case releases - * must occur before any acquires will be granted. - */ - public LongSemaphore(long permits) { - sync = new NonfairSync(permits); - } - - /** - * Creates a {@code Semaphore} with the given number of - * permits and the given fairness setting. - * - * @param permits the initial number of permits available. - * This value may be negative, in which case releases - * must occur before any acquires will be granted. - * @param fair {@code true} if this semaphore will guarantee - * first-in first-out granting of permits under contention, - * else {@code false} - */ - public LongSemaphore(long permits, boolean fair) { - sync = fair ? new FairSync(permits) : new NonfairSync(permits); - } - - /** - * Acquires a permit from this semaphore, blocking until one is - * available, or the thread is {@linkplain Thread#interrupt interrupted}. - * - *

Acquires a permit, if one is available and returns immediately, - * reducing the number of available permits by one. - * - *

If no permit is available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until - * one of two things happens: - *

    - *
  • Some other thread invokes the {@link #release} method for this - * semaphore and the current thread is next to be assigned a permit; or - *
  • Some other thread {@linkplain Thread#interrupt interrupts} - * the current thread. - *
- * - *

If the current thread: - *

    - *
  • has its interrupted status set on entry to this method; or - *
  • is {@linkplain Thread#interrupt interrupted} while waiting - * for a permit, - *
- * then {@link InterruptedException} is thrown and the current thread's - * interrupted status is cleared. - * - * @throws InterruptedException if the current thread is interrupted - */ - public void acquire() throws InterruptedException { - sync.acquireSharedInterruptibly(1); - } - - /** - * Acquires a permit from this semaphore, blocking until one is - * available. - * - *

Acquires a permit, if one is available and returns immediately, - * reducing the number of available permits by one. - * - *

If no permit is available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until - * some other thread invokes the {@link #release} method for this - * semaphore and the current thread is next to be assigned a permit. - * - *

If the current thread is {@linkplain Thread#interrupt interrupted} - * while waiting for a permit then it will continue to wait, but the - * time at which the thread is assigned a permit may change compared to - * the time it would have received the permit had no interruption - * occurred. When the thread does return from this method its interrupt - * status will be set. - */ - public void acquireUninterruptibly() { - sync.acquireShared(1); - } - - /** - * Acquires a permit from this semaphore, only if one is available at the - * time of invocation. - * - *

Acquires a permit, if one is available and returns immediately, - * with the value {@code true}, - * reducing the number of available permits by one. - * - *

If no permit is available then this method will return - * immediately with the value {@code false}. - * - *

Even when this semaphore has been set to use a - * fair ordering policy, a call to {@code tryAcquire()} will - * immediately acquire a permit if one is available, whether or not - * other threads are currently waiting. - * This "barging" behavior can be useful in certain - * circumstances, even though it breaks fairness. If you want to honor - * the fairness setting, then use - * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS)} - * which is almost equivalent (it also detects interruption). - * - * @return {@code true} if a permit was acquired and {@code false} - * otherwise - */ - public boolean tryAcquire() { - return sync.nonfairTryAcquireShared(1) >= 0; - } - - /** - * Acquires a permit from this semaphore, if one becomes available - * within the given waiting time and the current thread has not - * been {@linkplain Thread#interrupt interrupted}. - * - *

Acquires a permit, if one is available and returns immediately, - * with the value {@code true}, - * reducing the number of available permits by one. - * - *

If no permit is available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until - * one of three things happens: - *

    - *
  • Some other thread invokes the {@link #release} method for this - * semaphore and the current thread is next to be assigned a permit; or - *
  • Some other thread {@linkplain Thread#interrupt interrupts} - * the current thread; or - *
  • The specified waiting time elapses. - *
- * - *

If a permit is acquired then the value {@code true} is returned. - * - *

If the current thread: - *

    - *
  • has its interrupted status set on entry to this method; or - *
  • is {@linkplain Thread#interrupt interrupted} while waiting - * to acquire a permit, - *
- * then {@link InterruptedException} is thrown and the current thread's - * interrupted status is cleared. - * - *

If the specified waiting time elapses then the value {@code false} - * is returned. If the time is less than or equal to zero, the method - * will not wait at all. - * - * @param timeout the maximum time to wait for a permit - * @param unit the time unit of the {@code timeout} argument - * @return {@code true} if a permit was acquired and {@code false} - * if the waiting time elapsed before a permit was acquired - * @throws InterruptedException if the current thread is interrupted - */ - public boolean tryAcquire(long timeout, TimeUnit unit) - throws InterruptedException { - return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); - } - - /** - * Releases a permit, returning it to the semaphore. - * - *

Releases a permit, increasing the number of available permits by - * one. If any threads are trying to acquire a permit, then one is - * selected and given the permit that was just released. That thread - * is (re)enabled for thread scheduling purposes. - * - *

There is no requirement that a thread that releases a permit must - * have acquired that permit by calling {@link #acquire}. - * Correct usage of a semaphore is established by programming convention - * in the application. - */ - public void release() { - sync.releaseShared(1); - } - - /** - * Acquires the given number of permits from this semaphore, - * blocking until all are available, - * or the thread is {@linkplain Thread#interrupt interrupted}. - * - *

Acquires the given number of permits, if they are available, - * and returns immediately, reducing the number of available permits - * by the given amount. This method has the same effect as the - * loop {@code for (long i = 0; i < permits; ++i) acquire();} except - * that it atomically acquires the permits all at once: - * - *

If insufficient permits are available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until - * one of two things happens: - *

    - *
  • Some other thread invokes one of the {@link #release() release} - * methods for this semaphore and the current thread is next to be assigned - * permits and the number of available permits satisfies this request; or - *
  • Some other thread {@linkplain Thread#interrupt interrupts} - * the current thread. - *
- * - *

If the current thread: - *

    - *
  • has its interrupted status set on entry to this method; or - *
  • is {@linkplain Thread#interrupt interrupted} while waiting - * for a permit, - *
- * then {@link InterruptedException} is thrown and the current thread's - * interrupted status is cleared. - * Any permits that were to be assigned to this thread are instead - * assigned to other threads trying to acquire permits, as if - * permits had been made available by a call to {@link #release()}. - * - * @param permits the number of permits to acquire - * @throws InterruptedException if the current thread is interrupted - * @throws IllegalArgumentException if {@code permits} is negative - */ - public void acquire(long permits) throws InterruptedException { - if (permits < 0) throw new IllegalArgumentException(); - sync.acquireSharedInterruptibly(permits); - } - - /** - * Acquires the given number of permits from this semaphore, - * blocking until all are available. - * - *

Acquires the given number of permits, if they are available, - * and returns immediately, reducing the number of available permits - * by the given amount. This method has the same effect as the - * loop {@code for (long i = 0; i < permits; ++i) acquireUninterruptibly();} - * except that it atomically acquires the permits all at once: - * - *

If insufficient permits are available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until - * some other thread invokes one of the {@link #release() release} - * methods for this semaphore and the current thread is next to be assigned - * permits and the number of available permits satisfies this request. - * - *

If the current thread is {@linkplain Thread#interrupt interrupted} - * while waiting for permits then it will continue to wait and its - * position in the queue is not affected. When the thread does return - * from this method its interrupt status will be set. - * - * @param permits the number of permits to acquire - * @throws IllegalArgumentException if {@code permits} is negative - */ - public void acquireUninterruptibly(long permits) { - if (permits < 0) throw new IllegalArgumentException(); - sync.acquireShared(permits); - } - - /** - * Acquires the given number of permits from this semaphore, only - * if all are available at the time of invocation. - * - *

Acquires the given number of permits, if they are available, and - * returns immediately, with the value {@code true}, - * reducing the number of available permits by the given amount. - * - *

If insufficient permits are available then this method will return - * immediately with the value {@code false} and the number of available - * permits is unchanged. - * - *

Even when this semaphore has been set to use a fair ordering - * policy, a call to {@code tryAcquire} will - * immediately acquire a permit if one is available, whether or - * not other threads are currently waiting. This - * "barging" behavior can be useful in certain - * circumstances, even though it breaks fairness. If you want to - * honor the fairness setting, then use {@link #tryAcquire(long, - * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS)} - * which is almost equivalent (it also detects interruption). - * - * @param permits the number of permits to acquire - * @return {@code true} if the permits were acquired and - * {@code false} otherwise - * @throws IllegalArgumentException if {@code permits} is negative - */ - public boolean tryAcquire(long permits) { - if (permits < 0) throw new IllegalArgumentException(); - return sync.nonfairTryAcquireShared(permits) >= 0; - } - - /** - * Acquires the given number of permits from this semaphore, if all - * become available within the given waiting time and the current - * thread has not been {@linkplain Thread#interrupt interrupted}. - * - *

Acquires the given number of permits, if they are available and - * returns immediately, with the value {@code true}, - * reducing the number of available permits by the given amount. - * - *

If insufficient permits are available then - * the current thread becomes disabled for thread scheduling - * purposes and lies dormant until one of three things happens: - *

    - *
  • Some other thread invokes one of the {@link #release() release} - * methods for this semaphore and the current thread is next to be assigned - * permits and the number of available permits satisfies this request; or - *
  • Some other thread {@linkplain Thread#interrupt interrupts} - * the current thread; or - *
  • The specified waiting time elapses. - *
- * - *

If the permits are acquired then the value {@code true} is returned. - * - *

If the current thread: - *

    - *
  • has its interrupted status set on entry to this method; or - *
  • is {@linkplain Thread#interrupt interrupted} while waiting - * to acquire the permits, - *
- * then {@link InterruptedException} is thrown and the current thread's - * interrupted status is cleared. - * Any permits that were to be assigned to this thread, are instead - * assigned to other threads trying to acquire permits, as if - * the permits had been made available by a call to {@link #release()}. - * - *

If the specified waiting time elapses then the value {@code false} - * is returned. If the time is less than or equal to zero, the method - * will not wait at all. Any permits that were to be assigned to this - * thread, are instead assigned to other threads trying to acquire - * permits, as if the permits had been made available by a call to - * {@link #release()}. - * - * @param permits the number of permits to acquire - * @param timeout the maximum time to wait for the permits - * @param unit the time unit of the {@code timeout} argument - * @return {@code true} if all permits were acquired and {@code false} - * if the waiting time elapsed before all permits were acquired - * @throws InterruptedException if the current thread is interrupted - * @throws IllegalArgumentException if {@code permits} is negative - */ - public boolean tryAcquire(long permits, long timeout, TimeUnit unit) - throws InterruptedException { - if (permits < 0) throw new IllegalArgumentException(); - return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); - } - - /** - * Releases the given number of permits, returning them to the semaphore. - * - *

Releases the given number of permits, increasing the number of - * available permits by that amount. - * If any threads are trying to acquire permits, then one thread - * is selected and given the permits that were just released. - * If the number of available permits satisfies that thread's request - * then that thread is (re)enabled for thread scheduling purposes; - * otherwise the thread will wait until sufficient permits are available. - * If there are still permits available - * after this thread's request has been satisfied, then those permits - * are assigned in turn to other threads trying to acquire permits. - * - *

There is no requirement that a thread that releases a permit must - * have acquired that permit by calling {@link LongSemaphore#acquire acquire}. - * Correct usage of a semaphore is established by programming convention - * in the application. - * - * @param permits the number of permits to release - * @throws IllegalArgumentException if {@code permits} is negative - */ - public void release(long permits) { - if (permits < 0) throw new IllegalArgumentException(); - sync.releaseShared(permits); - } - - /** - * Returns the current number of permits available in this semaphore. - * - *

This method is typically used for debugging and testing purposes. - * - * @return the number of permits available in this semaphore - */ - public long availablePermits() { - return sync.getPermits(); - } - - /** - * Acquires and returns all permits that are immediately - * available, or if negative permits are available, releases them. - * Upon return, zero permits are available. - * - * @return the number of permits acquired or, if negative, the - * number released - */ - public long drainPermits() { - return sync.drainPermits(); - } - - /** - * Shrinks the number of available permits by the indicated - * reduction. This method can be useful in subclasses that use - * semaphores to track resources that become unavailable. This - * method differs from {@code acquire} in that it does not block - * waiting for permits to become available. - * - * @param reduction the number of permits to remove - * @throws IllegalArgumentException if {@code reduction} is negative - */ - protected void reducePermits(long reduction) { - if (reduction < 0) throw new IllegalArgumentException(); - sync.reducePermits(reduction); - } - - /** - * Returns {@code true} if this semaphore has fairness set true. - * - * @return {@code true} if this semaphore has fairness set true - */ - public boolean isFair() { - return sync instanceof FairSync; - } - - /** - * Queries whether any threads are waiting to acquire. Note that - * because cancellations may occur at any time, a {@code true} - * return does not guarantee that any other thread will ever - * acquire. This method is designed primarily for use in - * monitoring of the system state. - * - * @return {@code true} if there may be other threads waiting to - * acquire the lock - */ - public final boolean hasQueuedThreads() { - return sync.hasQueuedThreads(); - } - - /** - * Returns an estimate of the number of threads waiting to acquire. - * The value is only an estimate because the number of threads may - * change dynamically while this method traverses internal data - * structures. This method is designed for use in monitoring - * system state, not for synchronization control. - * - * @return the estimated number of threads waiting for this lock - */ - public final long getQueueLength() { - return sync.getQueueLength(); - } - - /** - * Returns a collection containing threads that may be waiting to acquire. - * Because the actual set of threads may change dynamically while - * constructing this result, the returned collection is only a best-effort - * estimate. The elements of the returned collection are in no particular - * order. This method is designed to facilitate construction of - * subclasses that provide more extensive monitoring facilities. - * - * @return the collection of threads - */ - protected Collection getQueuedThreads() { - return sync.getQueuedThreads(); - } - - /** - * Returns a string identifying this semaphore, as well as its state. - * The state, in brackets, includes the String {@code "Permits ="} - * followed by the number of permits. - * - * @return a string identifying this semaphore, as well as its state - */ - public String toString() { - return super.toString() + "[Permits = " + sync.getPermits() + "]"; - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index bdf18d6..5102309 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.lucene.searcher; +import static it.cavallium.dbengine.lucene.LuceneUtils.withTimeout; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -9,34 +10,31 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; -import it.cavallium.dbengine.lucene.collector.ReactiveCollectorMultiManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import org.apache.commons.lang3.concurrent.TimedSemaphore; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SimpleCollector; import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuples; public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { + private static final int SEARCH_THREADS = Math.min(Math.max(8, Runtime.getRuntime().availableProcessors()), 128); + private static final ThreadFactory THREAD_FACTORY = new ShortNamedThreadFactory("UnscoredStreamingSearcher"); + private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY); + @Override public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, @@ -64,32 +62,9 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { } var shards = indexSearchers.shards(); - var cmm = new ReactiveCollectorMultiManager(); + Flux scoreDocsFlux = getScoreDocs(localQueryParams, shards); - Flux scoreDocsFlux = Flux.fromIterable(shards) - .index() - .flatMap(tuple -> Flux.create(scoreDocsSink -> { - LLUtils.ensureBlocking(); - var index = toIntExact(requireNonNull(tuple.getT1())); - var shard = tuple.getT2(); - var requested = new LongSemaphore(0); - var collectorManager = cmm.get(requested, scoreDocsSink, index); - - assert queryParams.computePreciseHitsCount() == cmm.scoreMode().isExhaustive(); - - scoreDocsSink.onRequest(requested::release); - - try { - shard.search(localQueryParams.query(), collectorManager.newCollector()); - scoreDocsSink.complete(); - } catch (IOException e) { - scoreDocsSink.error(e); - } - }, OverflowStrategy.BUFFER).subscribeOn(Schedulers.boundedElastic())); - - - Flux resultsFlux = LuceneUtils - .convertHits(scoreDocsFlux.publishOn(Schedulers.boundedElastic()), shards, keyFieldName, false); + Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); var totalHitsCount = new TotalHitsCount(0, false); Flux mergedFluxes = resultsFlux @@ -101,6 +76,59 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { }, false); } + private Flux getScoreDocs(LocalQueryParams localQueryParams, List shards) { + return Flux.create(sink -> { + AtomicReference threadAtomicReference = new AtomicReference<>(); + + var disposable = EXECUTOR.submit(() -> { + LLUtils.ensureBlocking(); + threadAtomicReference.set(Thread.currentThread()); + int shardIndexTemp = 0; + for (IndexSearcher shard : shards) { + try { + final int shardIndex = shardIndexTemp; + var collector = withTimeout(new SimpleCollector() { + private LeafReaderContext leafReaderContext; + + @Override + protected void doSetNextReader(LeafReaderContext context) { + this.leafReaderContext = context; + } + + @Override + public void collect(int i) { + // Assert that this is a non-blocking context + assert !Schedulers.isInNonBlockingThread(); + var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); + while (sink.requestedFromDownstream() <= 0 || sink.isCancelled()) { + if (sink.isCancelled()) { + throw new CollectionTerminatedException(); + } + // 1000ms + LockSupport.parkNanos(1000000000L); + } + sink.next(scoreDoc); + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + }, localQueryParams.timeout()); + shard.search(localQueryParams.query(), collector); + sink.complete(); + } catch (Throwable e) { + sink.error(e); + } + shardIndexTemp++; + } + }); + sink.onRequest(lc -> LockSupport.unpark(threadAtomicReference.get())); + sink.onDispose(() -> disposable.cancel(false)); + }, OverflowStrategy.BUFFER).publishOn(Schedulers.boundedElastic()); + + } + private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { return new LocalQueryParams(queryParams.query(), 0L,