Reimplement LMDB PriorityQueue using RocksDB

This commit is contained in:
Andrea Cavalli 2022-04-06 02:41:32 +02:00
parent 6ac9505653
commit dc69bf8e25
51 changed files with 1187 additions and 1707 deletions

View File

@ -509,11 +509,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.8.2</version>
</dependency>
</dependencies>
<build>
<testSourceDirectory>src/test/java</testSourceDirectory>

View File

@ -63,7 +63,6 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
protected final Logger logger = LogManager.getLogger(this.getClass());
private final T db;
private final DatabaseOptions opts;
private final boolean nettyDirect;
private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh;
@ -97,14 +96,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final Timer updateUnchangedTime;
public AbstractRocksDBColumn(T db,
DatabaseOptions databaseOptions,
boolean nettyDirect,
BufferAllocator alloc,
String databaseName,
ColumnFamilyHandle cfh,
MeterRegistry meterRegistry) {
this.db = db;
this.opts = databaseOptions;
this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
this.nettyDirect = nettyDirect && alloc.getAllocationType() == OFF_HEAP;
this.alloc = alloc;
this.cfh = cfh;
String columnName;
@ -318,10 +316,6 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return db;
}
protected DatabaseOptions getOpts() {
return opts;
}
protected ColumnFamilyHandle getCfh() {
return cfh;
}

View File

@ -0,0 +1,80 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.disk.LLTempHugePqEnv.getColumnOptions;
import com.google.common.primitives.Ints;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.netty5.buffer.api.BufferAllocator;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
public class HugePqEnv implements Closeable {
private final RocksDB db;
private final ArrayList<ColumnFamilyHandle> defaultCfh;
private final Int2ObjectMap<ColumnFamilyHandle> cfhs = new Int2ObjectOpenHashMap<>();
public HugePqEnv(RocksDB db, ArrayList<ColumnFamilyHandle> defaultCfh) {
this.db = db;
this.defaultCfh = defaultCfh;
}
@Override
public void close() throws IOException {
for (ColumnFamilyHandle cfh : defaultCfh) {
db.destroyColumnFamilyHandle(cfh);
cfh.close();
}
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
}
}
public int createColumnFamily(int name, AbstractComparator comparator) throws RocksDBException {
var cfh = db.createColumnFamily(new ColumnFamilyDescriptor(Ints.toByteArray(name), getColumnOptions(comparator)));
synchronized (cfhs) {
var prev = cfhs.put(name, cfh);
if (prev != null) {
throw new UnsupportedOperationException("Db " + name + " already exists");
}
return name;
}
}
public void deleteColumnFamily(int db) throws RocksDBException {
ColumnFamilyHandle cfh;
synchronized (cfhs) {
cfh = cfhs.remove(db);
}
if (cfh != null) {
this.db.dropColumnFamily(cfh);
this.db.destroyColumnFamilyHandle(cfh);
cfh.close();
}
}
public StandardRocksDBColumn openDb(int hugePqId) {
ColumnFamilyHandle cfh;
synchronized (cfhs) {
cfh = Objects.requireNonNull(cfhs.get(hugePqId), () -> "column " + hugePqId + " does not exist");
}
return new StandardRocksDBColumn(db,
true,
BufferAllocator.offHeapPooled(),
db.getName(),
cfh,
new CompositeMeterRegistry()
);
}
}

View File

@ -36,7 +36,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
private final Path basePath;
private final boolean inMemory;
private final LuceneRocksDBManager rocksDBManager;
private final AtomicReference<LLTempLMDBEnv> env = new AtomicReference<>();
private final AtomicReference<LLTempHugePqEnv> env = new AtomicReference<>();
public LLLocalDatabaseConnection(BufferAllocator allocator,
MeterRegistry meterRegistry,
@ -69,7 +69,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
if (Files.notExists(basePath)) {
Files.createDirectories(basePath);
}
var prev = env.getAndSet(new LLTempLMDBEnv());
var prev = env.getAndSet(new LLTempHugePqEnv());
if (prev != null) {
throw new IllegalStateException("Env was already set");
}

View File

@ -61,6 +61,7 @@ import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
@ -157,7 +158,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB);
columnOptions.setMaxBytesForLevelBase((databaseOptions.spinning() ? 512 : 256) * SizeUnit.MB);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnOptions.setMaxBytesForLevelMultiplier(10);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
@ -237,8 +238,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
columnOptions.setTableFormatConfig(tableOptions);
columnOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
columnOptions.setOptimizeFiltersForHits(true);
if (databaseOptions.spinning()) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
columnOptions.setOptimizeFiltersForHits(true);
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
}
// Increasing this value can reduce the frequency of compaction and reduce write amplification,
// but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification.
// This parameter is not easy to adjust. It is generally not recommended to set it above 256MB.
columnOptions.setTargetFileSizeBase((databaseOptions.spinning() ? 256 : 128) * SizeUnit.MB);
// For each level up, the threshold is multiplied by the factor target_file_size_multiplier
// (but the default value is 1, which means that the maximum sstable of each level is the same).
columnOptions.setTargetFileSizeMultiplier(1);
descriptors.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII), columnOptions));
}
@ -687,12 +698,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
var nettyDirect = databaseOptions.allowNettyDirect();
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, name, cfh, meterRegistry);
return new OptimisticRocksDBColumn(optimisticTransactionDB, nettyDirect, allocator, name, cfh, meterRegistry);
} else if (db instanceof TransactionDB transactionDB) {
return new PessimisticRocksDBColumn(transactionDB, databaseOptions, allocator, name, cfh, meterRegistry);
return new PessimisticRocksDBColumn(transactionDB, nettyDirect, allocator, name, cfh, meterRegistry);
} else {
return new StandardRocksDBColumn(db, databaseOptions, allocator, name, cfh, meterRegistry);
return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry);
}
}

View File

@ -130,7 +130,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final Phaser activeTasks = new Phaser(1);
private final AtomicBoolean closeRequested = new AtomicBoolean();
public LLLocalLuceneIndex(LLTempLMDBEnv env,
public LLLocalLuceneIndex(LLTempHugePqEnv env,
MeterRegistry meterRegistry,
@NotNull String clusterName,
int shardIndex,
@ -160,12 +160,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
this.rocksDBManager = rocksDBManager;
var useLMDB = luceneOptions.allowNonVolatileCollection();
var useHugePq = luceneOptions.allowNonVolatileCollection();
var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) {
localSearcher = luceneHacks.customLocalSearcher().get();
} else {
localSearcher = new AdaptiveLocalSearcher(env, useLMDB, maxInMemoryResultEntries);
localSearcher = new AdaptiveLocalSearcher(env, useHugePq, maxInMemoryResultEntries);
}
var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);

View File

@ -29,7 +29,6 @@ import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.IntList;
import java.io.Closeable;
import java.io.IOException;
@ -54,7 +53,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.math.MathFlux;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@ -83,7 +81,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final MultiSearcher multiSearcher;
private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
public LLLocalMultiLuceneIndex(LLTempHugePqEnv env,
MeterRegistry meterRegistry,
String clusterName,
IntList activeShards,
@ -130,12 +128,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
this.lowMemory = luceneOptions.lowMemory();
var useLMDB = luceneOptions.allowNonVolatileCollection();
var useHugePq = luceneOptions.allowNonVolatileCollection();
var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) {
multiSearcher = luceneHacks.customMultiSearcher().get();
} else {
multiSearcher = new AdaptiveMultiSearcher(env, useLMDB, maxInMemoryResultEntries);
multiSearcher = new AdaptiveMultiSearcher(env, useHugePq, maxInMemoryResultEntries);
}
}

View File

@ -0,0 +1,128 @@
package it.cavallium.dbengine.database.disk;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.rocksdb.AbstractComparator;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ChecksumType;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
public class LLTempHugePqEnv implements Closeable {
private Path tempDirectory;
private AtomicInteger nextColumnName;
private HugePqEnv env;
private volatile boolean initialized;
private volatile boolean closed;
public LLTempHugePqEnv() {
}
public HugePqEnv getEnv() {
if (closed) {
throw new IllegalStateException("Environment closed");
}
initializeIfPossible();
return env;
}
private void initializeIfPossible() {
if (!initialized) {
synchronized(this) {
if (!initialized) {
try {
tempDirectory = Files.createTempDirectory("huge-pq");
var opts = new DBOptions();
opts.setCreateIfMissing(true);
opts.setAtomicFlush(false);
opts.optimizeForSmallDb();
opts.setParanoidChecks(false);
opts.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
opts.setMaxOpenFiles(-1);
opts.setUseFsync(false);
opts.setUnorderedWrite(true);
opts.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
var cfh = new ArrayList<ColumnFamilyHandle>();
nextColumnName = new AtomicInteger(0);
env = new HugePqEnv(RocksDB.open(opts,
tempDirectory.toString(),
List.of(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, getColumnOptions(null))),
cfh
), cfh);
initialized = true;
} catch (RocksDBException | IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
static ColumnFamilyOptions getColumnOptions(AbstractComparator comparator) {
var opts = new ColumnFamilyOptions()
.setOptimizeFiltersForHits(true)
.setParanoidFileChecks(false)
.optimizeLevelStyleCompaction()
.setLevelCompactionDynamicLevelBytes(true)
.setTableFormatConfig(new BlockBasedTableConfig()
.setOptimizeFiltersForMemory(true)
.setChecksumType(ChecksumType.kNoChecksum));
if (comparator != null) {
opts.setComparator(comparator);
}
return opts;
}
public int allocateDb(AbstractComparator comparator) {
initializeIfPossible();
try {
return env.createColumnFamily(nextColumnName.getAndIncrement(), comparator);
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
public void freeDb(int db) {
initializeIfPossible();
try {
env.deleteColumnFamily(db);
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
@Override
public void close() throws IOException {
if (this.closed) {
return;
}
if (!this.initialized) {
synchronized (this) {
closed = true;
initialized = true;
return;
}
}
this.closed = true;
env.close();
//noinspection ResultOfMethodCallIgnored
Files.walk(tempDirectory)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
}

View File

@ -1,118 +0,0 @@
package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.ByteBuf;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BitSet;
import org.lmdbjava.Net5ByteBufProxy;
import org.lmdbjava.Env;
import static org.lmdbjava.EnvFlags.*;
public class LLTempLMDBEnv implements Closeable {
private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L;
public static final int MAX_DATABASES = 1024;
private static final AtomicInteger NEXT_LMDB_ENV_ID = new AtomicInteger(0);
private BitSet freeIds;
private int envId;
private Path tempDirectory;
private Env<ByteBuf> env;
private volatile boolean initialized;
private volatile boolean closed;
public LLTempLMDBEnv() {
this.envId = NEXT_LMDB_ENV_ID.getAndIncrement();
}
public Env<ByteBuf> getEnv() {
if (closed) {
throw new IllegalStateException("Environment closed");
}
initializeIfPossible();
return env;
}
private void initializeIfPossible() {
if (!initialized) {
synchronized(this) {
if (!initialized) {
try {
tempDirectory = Files.createTempDirectory("lmdb");
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
.setMapSize(TWENTY_GIBIBYTES)
.setMaxDbs(MAX_DATABASES);
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC);
freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
public int allocateDb() {
initializeIfPossible();
//noinspection SynchronizeOnNonFinalField
synchronized (freeIds) {
var freeBit = freeIds.nextSetBit(0);
if (freeBit == DocIdSetIterator.NO_MORE_DOCS) {
throw new IllegalStateException("LMDB databases limit has been reached in environment "
+ envId + ": " + MAX_DATABASES);
}
freeIds.clear(freeBit);
return freeBit;
}
}
public static String stringifyDbId(int bit) {
return "$db_" + bit;
}
public void freeDb(int db) {
initializeIfPossible();
//noinspection SynchronizeOnNonFinalField
synchronized (freeIds) {
freeIds.set(db);
}
}
@Override
public void close() throws IOException {
if (this.closed) {
return;
}
if (!this.initialized) {
synchronized (this) {
closed = true;
initialized = true;
return;
}
}
this.closed = true;
env.close();
//noinspection ResultOfMethodCallIgnored
Files.walk(tempDirectory)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
public int countUsedDbs() {
int freeIds;
if (this.freeIds == null) {
freeIds = MAX_DATABASES;
} else {
freeIds = this.freeIds.cardinality();
}
return MAX_DATABASES - freeIds;
}
}

View File

@ -35,12 +35,12 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
private final DistributionSummary optimisticAttempts;
public OptimisticRocksDBColumn(OptimisticTransactionDB db,
DatabaseOptions databaseOptions,
boolean nettyDirect,
BufferAllocator alloc,
String databaseName,
ColumnFamilyHandle cfh,
MeterRegistry meterRegistry) {
super(db, databaseOptions, alloc, databaseName, cfh, meterRegistry);
super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry);
this.optimisticAttempts = DistributionSummary
.builder("db.optimistic.attempts.distribution")
.publishPercentiles(0.2, 0.5, 0.95)

View File

@ -28,11 +28,11 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
private static final TransactionOptions DEFAULT_TX_OPTIONS = new TransactionOptions();
public PessimisticRocksDBColumn(TransactionDB db,
DatabaseOptions databaseOptions,
boolean nettyDirect,
BufferAllocator alloc,
String dbName,
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
super(db, databaseOptions, alloc, dbName, cfh, meterRegistry);
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry);
}
@Override

View File

@ -15,7 +15,15 @@ public record RocksIteratorTuple(List<RocksObject> refs, @NotNull RocksDBIterato
@Override
public void close() {
refs.forEach(AbstractImmutableNativeReference::close);
for (RocksObject rocksObject : refs) {
if (rocksObject instanceof UnreleasableReadOptions) {
continue;
}
if (rocksObject instanceof UnreleasableWriteOptions) {
continue;
}
rocksObject.close();
}
iterator.close();
sliceMin.close();
sliceMax.close();

View File

@ -23,11 +23,11 @@ import org.rocksdb.WriteOptions;
public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB> {
public StandardRocksDBColumn(RocksDB db,
DatabaseOptions databaseOptions,
boolean nettyDirect,
BufferAllocator alloc,
String dbName,
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
super(db, databaseOptions, alloc, dbName, cfh, meterRegistry);
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry);
}
@Override
@ -111,24 +111,12 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (prevData != null) {
prevData.close();
}
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
}
case PREVIOUS -> {
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);

View File

@ -6,9 +6,8 @@ import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory;
import it.cavallium.dbengine.rpc.current.data.Column;
@ -34,7 +33,7 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
private final AtomicBoolean connected = new AtomicBoolean();
private final BufferAllocator allocator;
private final MeterRegistry meterRegistry;
private final AtomicReference<LLTempLMDBEnv> env = new AtomicReference<>();
private final AtomicReference<LLTempHugePqEnv> env = new AtomicReference<>();
public LLMemoryDatabaseConnection(BufferAllocator allocator, MeterRegistry meterRegistry) {
this.allocator = allocator;
@ -58,7 +57,7 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
if (!connected.compareAndSet(false, true)) {
throw new IllegalStateException("Already connected");
}
var prev = env.getAndSet(new LLTempLMDBEnv());
var prev = env.getAndSet(new LLTempHugePqEnv());
if (prev != null) {
throw new IllegalStateException("Env was already set");
}

View File

@ -1,12 +1,12 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
public class ByteArrayCodec implements LMDBCodec<byte[]> {
public class ByteArrayCodec implements HugePqCodec<byte[]> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, byte[] data) {
public Buffer serialize(Function<Integer, Buffer> allocator, byte[] data) {
var buf = allocator.apply(data.length + Integer.BYTES);
buf.writeInt(data.length);
buf.writeBytes(data);
@ -14,10 +14,15 @@ public class ByteArrayCodec implements LMDBCodec<byte[]> {
}
@Override
public byte[] deserialize(ByteBuf b) {
public byte[] deserialize(Buffer b) {
var length = b.readInt();
byte[] data = new byte[length];
b.readBytes(data);
b.readBytes(data, 0, length);
return data;
}
@Override
public byte[] clone(byte[] obj) {
return obj.clone();
}
}

View File

@ -1,13 +1,13 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
import org.apache.lucene.util.BytesRef;
public class BytesRefCodec implements LMDBCodec<BytesRef> {
public class BytesRefCodec implements HugePqCodec<BytesRef> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, BytesRef data) {
public Buffer serialize(Function<Integer, Buffer> allocator, BytesRef data) {
var buf = allocator.apply(data.length + Integer.BYTES);
buf.writeInt(data.length);
buf.writeBytes(data.bytes, data.offset, data.length);
@ -15,10 +15,15 @@ public class BytesRefCodec implements LMDBCodec<BytesRef> {
}
@Override
public BytesRef deserialize(ByteBuf b) {
public BytesRef deserialize(Buffer b) {
var length = b.readInt();
var bytes = new byte[length];
b.readBytes(bytes, 0, length);
return new BytesRef(bytes, 0, length);
}
@Override
public BytesRef clone(BytesRef obj) {
return obj.clone();
}
}

View File

@ -1,17 +1,17 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
public class DoubleCodec implements LMDBCodec<Double> {
public class DoubleCodec implements HugePqCodec<Double> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Double data) {
public Buffer serialize(Function<Integer, Buffer> allocator, Double data) {
return allocator.apply(Double.BYTES).writeDouble(data);
}
@Override
public Double deserialize(ByteBuf b) {
public Double deserialize(Buffer b) {
return b.readDouble();
}
}

View File

@ -1,17 +1,17 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
public class FloatCodec implements LMDBCodec<Float> {
public class FloatCodec implements HugePqCodec<Float> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Float data) {
public Buffer serialize(Function<Integer, Buffer> allocator, Float data) {
return allocator.apply(Float.BYTES).writeFloat(data);
}
@Override
public Float deserialize(ByteBuf b) {
public Float deserialize(Buffer b) {
return b.readFloat();
}
}

View File

@ -0,0 +1,121 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.database.disk.HugePqEnv;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
public class HugePqArray<V> implements IArray<V>, SafeCloseable {
private final AtomicBoolean closed = new AtomicBoolean();
private final HugePqCodec<V> valueCodec;
private final LLTempHugePqEnv tempEnv;
private final HugePqEnv env;
private final int hugePqId;
private final StandardRocksDBColumn rocksDB;
private WriteOptions writeOptions;
private ReadOptions readOptions;
private final V defaultValue;
private final long virtualSize;
public HugePqArray(LLTempHugePqEnv env, HugePqCodec<V> codec, long size, @Nullable V defaultValue) {
this.valueCodec = codec;
this.tempEnv = env;
this.env = env.getEnv();
this.hugePqId = env.allocateDb(null);
this.rocksDB = this.env.openDb(hugePqId);
this.writeOptions = new WriteOptions().setDisableWAL(true).setSync(false);
this.readOptions = new ReadOptions().setVerifyChecksums(false);
this.defaultValue = defaultValue;
this.virtualSize = size;
}
public HugePqCodec<V> getValueCodec() {
return valueCodec;
}
private Buffer allocate(int size) {
return rocksDB.getAllocator().allocate(size);
}
private static void ensureThread() {
LLUtils.ensureBlocking();
}
@Override
public void set(long index, @Nullable V value) {
ensureBounds(index);
ensureThread();
var keyBuf = allocate(Long.BYTES);
keyBuf.writeLong(index);
try (var valueBuf = valueCodec.serialize(this::allocate, value); keyBuf) {
rocksDB.put(writeOptions, keyBuf, valueBuf);
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
@Override
public void reset(long index) {
ensureBounds(index);
ensureThread();
var keyBuf = allocate(Long.BYTES);
try (keyBuf) {
keyBuf.writeLong(index);
rocksDB.delete(writeOptions, keyBuf);
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
@Override
public @Nullable V get(long index) {
ensureBounds(index);
ensureThread();
var keyBuf = allocate(Long.BYTES);
try (keyBuf) {
try (var value = rocksDB.get(readOptions, keyBuf)) {
return valueCodec.deserialize(value);
}
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
private void ensureBounds(long index) {
if (index < 0 || index >= virtualSize) throw new IndexOutOfBoundsException();
}
@Override
public long size() {
ensureThread();
return virtualSize;
}
@Override
public void close() {
readOptions.close();
writeOptions.close();
if (closed.compareAndSet(false, true)) {
ensureThread();
this.tempEnv.freeDb(hugePqId);
}
}
@Override
public String toString() {
return "huge_pq_array[" + virtualSize + "]";
}
}

View File

@ -0,0 +1,105 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.NumericUtils;
import org.rocksdb.AbstractComparator;
public interface HugePqCodec<T> {
Buffer serialize(Function<Integer, Buffer> allocator, T data);
T deserialize(Buffer b);
default T clone(T obj) {
return obj;
}
default AbstractComparator getComparator() {
return null;
}
static int getLexInt(Buffer buffer, int offset, boolean invert) {
var data = new byte[Integer.BYTES];
buffer.copyInto(offset, data, 0, data.length);
var result = sortableBytesToInt(data, 0, invert);
return result;
}
static Buffer setLexInt(Buffer buffer, int offset, boolean invert, int value) {
var data = new byte[Integer.BYTES];
intToSortableBytes(value, data, 0, invert);
for (int i = 0; i < data.length; i++) {
buffer.setByte(offset + i, data[i]);
}
return buffer;
}
static float getLexFloat(Buffer buffer, int offset, boolean invert) {
return sortableIntToFloat(getLexInt(buffer, offset, false), invert);
}
static Buffer setLexFloat(Buffer buffer, int offset, boolean invert, float value) {
return setLexInt(buffer, offset, false, floatToSortableInt(value, invert));
}
/**
* Encodes an integer {@code value} such that unsigned byte order comparison is consistent with
* {@link Integer#compare(int, int)}
*
*/
public static void intToSortableBytes(int value, byte[] result, int offset, boolean invert) {
if (!invert) {
// Flip the sign bit, so negative ints sort before positive ints correctly:
value ^= 0x80000000;
} else {
value ^= 0x7FFFFFFF;
}
BitUtil.VH_BE_INT.set(result, offset, value);
}
/**
* Decodes an integer value previously written with {@link #intToSortableBytes}
*
*/
public static int sortableBytesToInt(byte[] encoded, int offset, boolean invert) {
int x = (int) BitUtil.VH_BE_INT.get(encoded, offset);
if (!invert) {
// Re-flip the sign bit to restore the original value:
return x ^ 0x80000000;
} else {
return x ^ 0x7FFFFFFF;
}
}
/**
* Converts a <code>float</code> value to a sortable signed <code>int</code>. The value is
* converted by getting their IEEE 754 floating-point &quot;float format&quot; bit layout and then
* some bits are swapped, to be able to compare the result as int. By this the precision is not
* reduced, but the value can easily used as an int. The sort order (including {@link Float#NaN})
* is defined by {@link Float#compareTo}; {@code NaN} is greater than positive infinity.
*
* @see #sortableIntToFloat
*/
public static int floatToSortableInt(float value, boolean invert) {
if (invert) {
return Float.floatToIntBits(value);
} else {
return NumericUtils.floatToSortableInt(value);
}
}
/**
* Converts a sortable <code>int</code> back to a <code>float</code>.
*
* @see #floatToSortableInt
*/
public static float sortableIntToFloat(int encoded, boolean invert) {
if (invert) {
return Float.intBitsToFloat(encoded);
} else {
return NumericUtils.sortableIntToFloat(encoded);
}
}
}

View File

@ -2,7 +2,7 @@ package it.cavallium.dbengine.lucene;
import static org.apache.lucene.search.SortField.STRING_LAST;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.comparators.DoubleComparator;
import it.cavallium.dbengine.lucene.comparators.FloatComparator;
import it.cavallium.dbengine.lucene.comparators.IntComparator;
@ -18,11 +18,11 @@ import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.LeafFieldComparator;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSelector;
import org.apache.lucene.search.comparators.LMDBDocComparator;
import org.apache.lucene.search.comparators.HugePqDocComparator;
public class LMDBComparator {
public class HugePqComparator {
public static FieldComparator<?> getComparator(LLTempLMDBEnv env, SortField sortField,
public static FieldComparator<?> getComparator(LLTempHugePqEnv env, SortField sortField,
int numHits, int sortPos) {
var sortFieldClass = sortField.getClass();
if (sortFieldClass == org.apache.lucene.search.SortedNumericSortField.class) {
@ -93,7 +93,7 @@ public class LMDBComparator {
var comparatorSource = sortField.getComparatorSource();
return switch (sortField.getType()) {
case SCORE -> new RelevanceComparator(env, numHits);
case DOC -> new LMDBDocComparator(env, numHits, reverse, sortPos);
case DOC -> new HugePqDocComparator(env, numHits, reverse, sortPos);
case INT -> new IntComparator(env, numHits, field, (Integer) missingValue,
reverse, sortPos);
case FLOAT -> new FloatComparator(env, numHits, field, (Float) missingValue,

View File

@ -0,0 +1,346 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.database.disk.HugePqEnv;
import it.cavallium.dbengine.database.disk.RocksIteratorTuple;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.database.disk.UnreleasableReadOptions;
import it.cavallium.dbengine.database.disk.UnreleasableWriteOptions;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import reactor.core.publisher.Flux;
public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
static {
RocksDB.loadLibrary();
}
private final AtomicBoolean closed = new AtomicBoolean();
private final LLTempHugePqEnv tempEnv;
private final HugePqEnv env;
private final int hugePqId;
private final StandardRocksDBColumn rocksDB;
private static final UnreleasableWriteOptions writeOptions = new UnreleasableWriteOptions(new WriteOptions()
.setDisableWAL(true)
.setSync(false));
private static final UnreleasableReadOptions readOptions = new UnreleasableReadOptions(new ReadOptions()
.setVerifyChecksums(false).setTotalOrderSeek(true));
private final HugePqCodec<T> codec;
private long size = 0;
public HugePqPriorityQueue(LLTempHugePqEnv env, HugePqCodec<T> codec) {
this.tempEnv = env;
this.env = env.getEnv();
this.hugePqId = env.allocateDb(codec.getComparator());
this.rocksDB = this.env.openDb(hugePqId);
this.codec = codec;
}
private Buffer allocate(int size) {
return rocksDB.getAllocator().allocate(size);
}
private static void ensureThread() {
LLUtils.ensureBlocking();
}
private static void ensureItThread() {
ensureThread();
}
@Override
public void add(T element) {
ensureThread();
var keyBuf = serializeKey(element);
try (keyBuf) {
rocksDB.updateAtomic(readOptions, writeOptions, keyBuf, this::incrementOrAdd, UpdateAtomicResultMode.NOTHING);
++size;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
private Buffer serializeKey(T element) {
return codec.serialize(this::allocate, element);
}
private T deserializeKey(Buffer keyBuf) {
return codec.deserialize(keyBuf.writerOffset(keyBuf.writerOffset()));
}
private Buffer serializeValue(int count) {
var keyBuf = allocate(Integer.BYTES);
keyBuf.writeInt(count);
return keyBuf;
}
private int deserializeValue(Buffer keyBuf) {
return keyBuf.readInt();
}
@Override
public T top() {
ensureThread();
return databaseTop();
}
private T databaseTop() {
try (var it = rocksDB.getRocksIterator(true, readOptions, LLRange.all(), false)) {
var rocksIterator = it.iterator();
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
var key = rocksIterator.key();
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
return deserializeKey(keyBuf);
}
} else {
return null;
}
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
@Override
public T pop() {
ensureThread();
try (var it = rocksDB.getRocksIterator(true, readOptions, LLRange.all(), false)) {
var rocksIterator = it.iterator();
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
var key = rocksIterator.key();
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
rocksDB.updateAtomic(readOptions, writeOptions, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING);
--size;
return deserializeKey(keyBuf);
}
} else {
return null;
}
} catch (RocksDBException | IOException e) {
throw new IllegalStateException(e);
}
}
private Buffer incrementOrAdd(@Nullable Buffer prev) {
if (prev == null) {
return serializeValue(1);
} else {
var prevCount = deserializeValue(prev);
assert prevCount > 0;
return serializeValue(prevCount + 1);
}
}
@Nullable
private Buffer reduceOrRemove(@Nullable Buffer prev) {
if (prev == null) {
return null;
}
var prevCount = deserializeValue(prev);
assert prevCount > 0;
if (prevCount == 1) {
return null;
} else {
return serializeValue(prevCount - 1);
}
}
@Override
public void replaceTop(T newTop) {
ensureThread();
this.pop();
this.add(newTop);
}
@Override
public long size() {
ensureThread();
return size;
}
@Override
public void clear() {
ensureThread();
try (var wb = new WriteBatch()) {
wb.deleteRange(rocksDB.getColumnFamilyHandle(), new byte[0], getBiggestKey());
size = 0;
rocksDB.write(writeOptions, wb);
} catch (RocksDBException e) {
throw new IllegalStateException(e);
}
}
private byte[] getBiggestKey() {
var biggestKey = new byte[4096];
Arrays.fill(biggestKey, (byte) 0xFF);
return biggestKey;
}
@Override
public boolean remove(@NotNull T element) {
ensureThread();
Objects.requireNonNull(element);
try (var keyBuf = serializeKey(element)) {
UpdateAtomicResultPrevious prev = (UpdateAtomicResultPrevious) rocksDB.updateAtomic(readOptions,
writeOptions,
keyBuf,
this::reduceOrRemove,
UpdateAtomicResultMode.PREVIOUS
);
try {
if (prev.previous() != null) {
--size;
return true;
} else {
return false;
}
} finally {
if (prev.previous() != null) {
prev.previous().close();
}
}
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
}
public Flux<T> reverseIterate() {
return iterate(0, true);
}
@Override
public Flux<T> iterate() {
return iterate(0, false);
}
private Flux<T> iterate(long skips, boolean reverse) {
return Flux.<List<T>, RocksIteratorTuple>generate(() -> {
var it = rocksDB.getRocksIterator(true, readOptions, LLRange.all(), reverse);
var rocksIterator = it.iterator();
if (reverse) {
rocksIterator.seekToLast();
} else {
rocksIterator.seekToFirst();
}
long skipsDone = 0;
while (rocksIterator.isValid() && skipsDone < skips) {
if (reverse) {
rocksIterator.prev();
} else {
rocksIterator.next();
}
skipsDone++;
}
return it;
}, (itT, sink) -> {
var rocksIterator = itT.iterator();
if (rocksIterator.isValid()) {
try (var keyBuf = rocksDB.getAllocator().copyOf(rocksIterator.key());
var valBuf = rocksDB.getAllocator().copyOf(rocksIterator.value())) {
var count = deserializeValue(valBuf);
if (count == 0) {
sink.next(List.of());
} else {
var result = new ArrayList<T>(count);
T origKey = deserializeKey(keyBuf);
for (int i = 0; i < count; i++) {
if (i == 0) {
result.add(origKey);
} else {
result.add(codec.clone(origKey));
}
}
sink.next(result);
}
}
try {
if (reverse) {
rocksIterator.prev();
} else {
rocksIterator.next();
}
} catch (RocksDBException e) {
sink.error(e);
}
} else {
sink.complete();
}
return itT;
}, RocksIteratorTuple::close).concatMapIterable(item -> item);
}
@Override
public Flux<T> iterate(long skips) {
return iterate(skips, false);
}
public Flux<T> reverseIterate(long skips) {
return iterate(skips, true);
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
this.tempEnv.freeDb(hugePqId);
if (this.codec instanceof SafeCloseable closeable) {
closeable.close();
}
}
}
@Override
public String toString() {
return new StringJoiner(", ", HugePqPriorityQueue.class.getSimpleName() + "[", "]")
.add("size=" + size)
.toString();
}
@Override
public ReversableResourceIterable<T> reverse() {
return new ReversableResourceIterable<>() {
@Override
public void close() {
HugePqPriorityQueue.this.close();
}
@Override
public Flux<T> iterate() {
return reverseIterate();
}
@Override
public Flux<T> iterate(long skips) {
return reverseIterate(skips);
}
@Override
public ReversableResourceIterable<T> reverse() {
return HugePqPriorityQueue.this;
}
};
}
}

View File

@ -1,17 +1,17 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
public class IntCodec implements LMDBCodec<Integer> {
public class IntCodec implements HugePqCodec<Integer> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Integer data) {
public Buffer serialize(Function<Integer, Buffer> allocator, Integer data) {
return allocator.apply(Integer.BYTES).writeInt(data);
}
@Override
public Integer deserialize(ByteBuf b) {
public Integer deserialize(Buffer b) {
return b.readInt();
}
}

View File

@ -1,10 +1,10 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.ArrayList;
import java.util.function.Function;
public class LLFieldDocCodec implements LMDBSortedCodec<LLFieldDoc> {
public class LLFieldDocCodec implements HugePqCodec<LLFieldDoc> {
private enum FieldType {
FLOAT,
@ -18,7 +18,7 @@ public class LLFieldDocCodec implements LMDBSortedCodec<LLFieldDoc> {
}
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLFieldDoc data) {
public Buffer serialize(Function<Integer, Buffer> allocator, LLFieldDoc data) {
int fieldsDataSize = 0;
byte[] fieldTypes = new byte[data.fields().size()];
int fieldId = 0;
@ -47,7 +47,7 @@ public class LLFieldDocCodec implements LMDBSortedCodec<LLFieldDoc> {
setDoc(buf, data.doc());
setShardIndex(buf, data.shardIndex());
setFieldsCount(buf, data.fields().size());
buf.writerIndex(size);
buf.writerOffset(size);
fieldId = 0;
for (Object field : data.fields()) {
@ -67,14 +67,14 @@ public class LLFieldDocCodec implements LMDBSortedCodec<LLFieldDoc> {
fieldId++;
}
assert buf.writableBytes() == 0;
return buf.asReadOnly();
return buf;
}
@Override
public LLFieldDoc deserialize(ByteBuf buf) {
public LLFieldDoc deserialize(Buffer buf) {
var fieldsCount = getFieldsCount(buf);
ArrayList<Object> fields = new ArrayList<>(fieldsCount);
buf.readerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Character.BYTES);
buf.readerOffset(Float.BYTES + Integer.BYTES + Integer.BYTES + Character.BYTES);
for (char i = 0; i < fieldsCount; i++) {
fields.add(switch (FieldType.values()[buf.readByte()]) {
case FLOAT -> buf.readFloat();
@ -87,65 +87,40 @@ public class LLFieldDocCodec implements LMDBSortedCodec<LLFieldDoc> {
return new LLFieldDoc(getDoc(buf), getScore(buf), getShardIndex(buf), fields);
}
@Override
public int compare(LLFieldDoc hitA, LLFieldDoc hitB) {
if (hitA.score() == hitB.score()) {
if (hitA.doc() == hitB.doc()) {
return Integer.compare(hitA.shardIndex(), hitB.shardIndex());
} else {
return Integer.compare(hitB.doc(), hitA.doc());
}
} else {
return Float.compare(hitA.score(), hitB.score());
}
private static float getScore(Buffer hit) {
return HugePqCodec.getLexFloat(hit, 0, false);
}
@Override
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
var scoreA = getScore(hitA);
var scoreB = getScore(hitB);
if (scoreA == scoreB) {
var docA = getDoc(hitA);
var docB = getDoc(hitB);
if (docA == docB) {
return Integer.compare(getShardIndex(hitA), getShardIndex(hitB));
} else {
return Integer.compare(docB, docA);
}
} else {
return Float.compare(scoreA, scoreB);
}
private static int getDoc(Buffer hit) {
return HugePqCodec.getLexInt(hit, Float.BYTES, true);
}
private static float getScore(ByteBuf hit) {
return hit.getFloat(0);
private static int getShardIndex(Buffer hit) {
return HugePqCodec.getLexInt(hit, Float.BYTES + Integer.BYTES, false);
}
private static int getDoc(ByteBuf hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(ByteBuf hit) {
return hit.getInt(Float.BYTES + Integer.BYTES);
}
private char getFieldsCount(ByteBuf hit) {
private char getFieldsCount(Buffer hit) {
return hit.getChar(Float.BYTES + Integer.BYTES + Integer.BYTES);
}
private static void setScore(ByteBuf hit, float score) {
hit.setFloat(0, score);
private static void setScore(Buffer hit, float score) {
HugePqCodec.setLexFloat(hit, 0, false, score);
}
private static void setDoc(ByteBuf hit, int doc) {
hit.setInt(Float.BYTES, doc);
private static void setDoc(Buffer hit, int doc) {
HugePqCodec.setLexInt(hit, Float.BYTES, true, doc);
}
private static void setShardIndex(ByteBuf hit, int shardIndex) {
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
private static void setShardIndex(Buffer hit, int shardIndex) {
HugePqCodec.setLexInt(hit, Float.BYTES + Integer.BYTES, false, shardIndex);
}
private void setFieldsCount(ByteBuf hit, int size) {
private void setFieldsCount(Buffer hit, int size) {
hit.setChar(Float.BYTES + Integer.BYTES + Integer.BYTES, (char) size);
}
@Override
public LLFieldDoc clone(LLFieldDoc obj) {
return new LLFieldDoc(obj.doc(), obj.score(), obj.shardIndex(), obj.fields());
}
}

View File

@ -1,76 +1,51 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
public class LLScoreDocCodec implements LMDBSortedCodec<LLScoreDoc> {
public class LLScoreDocCodec implements HugePqCodec<LLScoreDoc> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLScoreDoc data) {
public Buffer serialize(Function<Integer, Buffer> allocator, LLScoreDoc data) {
var buf = allocator.apply(Float.BYTES + Integer.BYTES + Integer.BYTES);
buf.writerOffset(Float.BYTES + Integer.BYTES + Integer.BYTES);
setScore(buf, data.score());
setDoc(buf, data.doc());
setShardIndex(buf, data.shardIndex());
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES);
return buf.asReadOnly();
return buf;
}
@Override
public LLScoreDoc deserialize(ByteBuf buf) {
public LLScoreDoc deserialize(Buffer buf) {
return new LLScoreDoc(getDoc(buf), getScore(buf), getShardIndex(buf));
}
@Override
public int compare(LLScoreDoc hitA, LLScoreDoc hitB) {
if (hitA.score() == hitB.score()) {
if (hitA.doc() == hitB.doc()) {
return Integer.compare(hitA.shardIndex(), hitB.shardIndex());
} else {
return Integer.compare(hitB.doc(), hitA.doc());
}
} else {
return Float.compare(hitA.score(), hitB.score());
}
private static float getScore(Buffer hit) {
return HugePqCodec.getLexFloat(hit, 0, false);
}
private static int getDoc(Buffer hit) {
return HugePqCodec.getLexInt(hit, Float.BYTES, true);
}
private static int getShardIndex(Buffer hit) {
return HugePqCodec.getLexInt(hit, Float.BYTES + Integer.BYTES, false);
}
private static void setScore(Buffer hit, float score) {
HugePqCodec.setLexFloat(hit, 0, false, score);
}
private static void setDoc(Buffer hit, int doc) {
HugePqCodec.setLexInt(hit, Float.BYTES, true, doc);
}
private static void setShardIndex(Buffer hit, int shardIndex) {
HugePqCodec.setLexInt(hit, Float.BYTES + Integer.BYTES, false, shardIndex);
}
@Override
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
var scoreA = getScore(hitA);
var scoreB = getScore(hitB);
if (scoreA == scoreB) {
var docA = getDoc(hitA);
var docB = getDoc(hitB);
if (docA == docB) {
return Integer.compare(getShardIndex(hitA), getShardIndex(hitB));
} else {
return Integer.compare(docB, docA);
}
} else {
return Float.compare(scoreA, scoreB);
}
}
private static float getScore(ByteBuf hit) {
return hit.getFloat(0);
}
private static int getDoc(ByteBuf hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(ByteBuf hit) {
return hit.getInt(Float.BYTES + Integer.BYTES);
}
private static void setScore(ByteBuf hit, float score) {
hit.setFloat(0, score);
}
private static void setDoc(ByteBuf hit, int doc) {
hit.setInt(Float.BYTES, doc);
}
private static void setShardIndex(ByteBuf hit, int shardIndex) {
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
public LLScoreDoc clone(LLScoreDoc obj) {
return new LLScoreDoc(obj.doc(), obj.score(), obj.shardIndex());
}
}

View File

@ -1,10 +1,10 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.Closeable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@ -15,15 +15,18 @@ import org.apache.lucene.search.LeafFieldComparator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ComparatorOptions;
public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHitQueue, SafeCloseable {
public class LLSlotDocCodec implements HugePqCodec<LLSlotDoc>, FieldValueHitQueue, SafeCloseable {
private final SortField[] fields;
protected final FieldComparator<?>[] comparators;
protected final int[] reverseMul;
private final AbstractComparator comparator;
public LLSlotDocCodec(LLTempLMDBEnv env, int numHits, SortField[] fields) {
public LLSlotDocCodec(LLTempHugePqEnv env, int numHits, SortField[] fields) {
// When we get here, fields.length is guaranteed to be > 0, therefore no
// need to check it again.
@ -37,89 +40,92 @@ public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHit
for (int i = 0; i < numComparators; ++i) {
SortField field = fields[i];
reverseMul[i] = field.getReverse() ? -1 : 1;
comparators[i] = LMDBComparator.getComparator(env, field, numHits, i);
comparators[i] = HugePqComparator.getComparator(env, field, numHits, i);
}
comparator = new AbstractComparator(new ComparatorOptions()) {
@Override
public String name() {
return "slot-doc-codec-comparator";
}
@Override
public int compare(ByteBuffer hitA, ByteBuffer hitB) {
assert hitA != hitB;
assert getSlot(hitA) != getSlot(hitB);
int numComparators = comparators.length;
for (int i = 0; i < numComparators; ++i) {
final int c = reverseMul[i] * comparators[i].compare(getSlot(hitA), getSlot(hitB));
if (c != 0) {
// Short circuit
return -c;
}
}
// avoid random sort order that could lead to duplicates (bug #31241):
return Integer.compare(getDoc(hitB), getDoc(hitA));
}
};
}
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, LLSlotDoc data) {
public Buffer serialize(Function<Integer, Buffer> allocator, LLSlotDoc data) {
var buf = allocator.apply(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES);
setScore(buf, data.score());
setDoc(buf, data.doc());
setShardIndex(buf, data.shardIndex());
setSlot(buf, data.slot());
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES);
return buf.asReadOnly();
buf.writerOffset(Float.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES);
return buf;
}
@Override
public LLSlotDoc deserialize(ByteBuf buf) {
public LLSlotDoc deserialize(Buffer buf) {
return new LLSlotDoc(getDoc(buf), getScore(buf), getShardIndex(buf), getSlot(buf));
}
@Override
public int compare(LLSlotDoc hitA, LLSlotDoc hitB) {
int numComparators = comparators.length;
for (int i = 0; i < numComparators; ++i) {
final int c = reverseMul[i] * comparators[i].compare(hitA.slot(), hitB.slot());
if (c != 0) {
// Short circuit
return -c;
}
}
// avoid random sort order that could lead to duplicates (bug #31241):
return Integer.compare(hitB.doc(), hitA.doc());
public AbstractComparator getComparator() {
return comparator;
}
@Override
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
assert hitA != hitB;
assert getSlot(hitA) != getSlot(hitB);
int numComparators = comparators.length;
for (int i = 0; i < numComparators; ++i) {
final int c = reverseMul[i] * comparators[i].compare(getSlot(hitA), getSlot(hitB));
if (c != 0) {
// Short circuit
return -c;
}
}
// avoid random sort order that could lead to duplicates (bug #31241):
return Integer.compare(getDoc(hitB), getDoc(hitA));
}
private static float getScore(ByteBuf hit) {
private static float getScore(Buffer hit) {
return hit.getFloat(0);
}
private static int getDoc(ByteBuf hit) {
private static int getDoc(Buffer hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(ByteBuf hit) {
private static int getDoc(ByteBuffer hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(Buffer hit) {
return hit.getInt(Float.BYTES + Integer.BYTES);
}
private static int getSlot(ByteBuf hit) {
private static int getSlot(Buffer hit) {
return hit.getInt(Float.BYTES + Integer.BYTES + Integer.BYTES);
}
private static void setScore(ByteBuf hit, float score) {
private static int getSlot(ByteBuffer hit) {
return hit.getInt(Float.BYTES + Integer.BYTES + Integer.BYTES);
}
private static void setScore(Buffer hit, float score) {
hit.setFloat(0, score);
}
private static void setDoc(ByteBuf hit, int doc) {
private static void setDoc(Buffer hit, int doc) {
hit.setInt(Float.BYTES, doc);
}
private static void setShardIndex(ByteBuf hit, int shardIndex) {
private static void setShardIndex(Buffer hit, int shardIndex) {
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
}
private static void setSlot(ByteBuf hit, int slot) {
private static void setSlot(Buffer hit, int slot) {
hit.setInt(Float.BYTES + Integer.BYTES + Integer.BYTES, slot);
}
@ -177,4 +183,9 @@ public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHit
}
}
}
@Override
public LLSlotDoc clone(LLSlotDoc obj) {
return new LLSlotDoc(obj.doc(), obj.score(), obj.shardIndex(), obj.slot());
}
}

View File

@ -1,269 +0,0 @@
package it.cavallium.dbengine.lucene;
import static org.lmdbjava.DbiFlags.MDB_CREATE;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.lmdbjava.Dbi;
import org.lmdbjava.Env;
import org.lmdbjava.Txn;
public class LMDBArray<V> implements IArray<V>, SafeCloseable {
private final AtomicBoolean closed = new AtomicBoolean();
private final LMDBCodec<V> valueCodec;
private final LLTempLMDBEnv tempEnv;
private final Env<ByteBuf> env;
private final int lmdbDbId;
private final Dbi<ByteBuf> lmdb;
private final V defaultValue;
private boolean writing;
private Txn<ByteBuf> readTxn;
private Txn<ByteBuf> rwTxn;
// Cache
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
private final Long2ObjectMap<ByteBuf> toWrite = new Long2ObjectOpenHashMap<>();
private long allocatedSize = 0;
private final long virtualSize;
public LMDBArray(LLTempLMDBEnv env, LMDBCodec<V> codec, long size, @Nullable V defaultValue) {
this.valueCodec = codec;
this.tempEnv = env;
this.env = env.getEnv();
this.lmdbDbId = env.allocateDb();
this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), MDB_CREATE);
this.defaultValue = defaultValue;
this.writing = true;
this.rwTxn = null;
this.readTxn = null;
this.virtualSize = size;
}
public LMDBCodec<V> getValueCodec() {
return valueCodec;
}
private ByteBuf allocate(int size) {
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
}
private void switchToMode(boolean write) {
if (toWrite.size() > 0) {
switchToModeUncached(true);
try {
toWrite.forEach((ki, v) -> {
var keyBuf = allocate(Long.BYTES);
keyBuf.writeLong(ki);
if (lmdb.put(rwTxn, keyBuf, v)) {
allocatedSize++;
}
});
} finally {
endMode();
for (ByteBuf value : toWrite.values()) {
value.release();
}
toWrite.clear();
}
}
switchToModeUncached(write);
}
private void switchToModeUncached(boolean write) {
if (write) {
if (!writing) {
writing = true;
readTxn.close();
readTxn = null;
assert rwTxn == null;
rwTxn = env.txnWrite();
} else if (rwTxn == null) {
assert readTxn == null;
rwTxn = env.txnWrite();
}
} else {
if (writing) {
writing = false;
if (rwTxn != null) {
rwTxn.commit();
rwTxn.close();
rwTxn = null;
}
assert readTxn == null;
readTxn = env.txnRead();
}
}
}
private void endMode() {
writing = true;
if (readTxn != null) {
readTxn.commit();
readTxn.close();
readTxn = null;
}
if (rwTxn != null) {
rwTxn.commit();
rwTxn.close();
rwTxn = null;
}
assert readTxn == null;
}
private static void ensureThread() {
LLUtils.ensureBlocking();
}
private static void ensureItThread() {
ensureThread();
//if (!(Thread.currentThread() instanceof LMDBThread)) {
// throw new IllegalStateException("Must run in LMDB scheduler");
//}
}
@Override
public void set(long index, @Nullable V value) {
ensureBounds(index);
ensureThread();
var valueBuf = valueCodec.serialize(this::allocate, value);
if (toWrite.size() < WRITE_QUEUE_MAX_BOUND) {
var prev = toWrite.put(index, valueBuf);
if (prev != null) {
prev.release();
}
} else {
var keyBuf = allocate(Long.BYTES);
keyBuf.writeLong(index);
switchToMode(true);
try {
if (lmdb.put(rwTxn, keyBuf, valueBuf)) {
allocatedSize++;
}
} finally {
endMode();
keyBuf.release();
valueBuf.release();
}
}
}
@Override
public void reset(long index) {
ensureBounds(index);
ensureThread();
switchToMode(true);
var keyBuf = allocate(Long.BYTES);
keyBuf.writeLong(index);
try {
if (lmdb.delete(rwTxn, keyBuf)) {
allocatedSize--;
}
} finally {
endMode();
keyBuf.release();
}
}
@Override
public @Nullable V get(long index) {
ensureBounds(index);
ensureThread();
if (!toWrite.isEmpty()) {
var v = toWrite.get(index);
if (v != null) {
var ri = v.readerIndex();
var wi = v.writerIndex();
var c = v.capacity();
try {
return valueCodec.deserialize(v);
} finally {
v.readerIndex(ri);
v.writerIndex(wi);
v.capacity(c);
}
}
}
var keyBuf = allocate(Long.BYTES);
keyBuf.writeLong(index);
try {
switchToModeUncached(false);
var value = lmdb.get(readTxn, keyBuf);
if (value != null) {
return valueCodec.deserialize(value);
} else {
return defaultValue;
}
} finally {
endMode();
keyBuf.release();
}
}
private void ensureBounds(long index) {
if (index < 0 || index >= virtualSize) throw new IndexOutOfBoundsException();
}
@Override
public long size() {
ensureThread();
return virtualSize;
}
public long allocatedSize() {
return allocatedSize;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
for (ByteBuf value : toWrite.values()) {
value.release();
}
toWrite.clear();
if (rwTxn != null) {
rwTxn.close();
}
if (readTxn != null) {
readTxn.close();
}
try (var txn = env.txnWrite()) {
lmdb.drop(txn, true);
txn.commit();
}
lmdb.close();
this.tempEnv.freeDb(lmdbDbId);
}
}
@Override
public String toString() {
return "lmdb_array[" + virtualSize + " (allocated=" + allocatedSize + ")]";
}
}

View File

@ -1,11 +0,0 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import java.util.function.Function;
public interface LMDBCodec<T> {
ByteBuf serialize(Function<Integer, ByteBuf> allocator, T data);
T deserialize(ByteBuf b);
}

View File

@ -1,567 +0,0 @@
package it.cavallium.dbengine.lucene;
import static org.lmdbjava.DbiFlags.*;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.NotNull;
import org.lmdbjava.Cursor;
import org.lmdbjava.CursorIterable;
import org.lmdbjava.CursorIterable.KeyVal;
import org.lmdbjava.Dbi;
import org.lmdbjava.Env;
import org.lmdbjava.GetOp;
import org.lmdbjava.Txn;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
private static final AtomicLong NEXT_ITEM_UID = new AtomicLong(0);
private final AtomicBoolean closed = new AtomicBoolean();
private final LMDBSortedCodec<T> codec;
private final LLTempLMDBEnv tempEnv;
private final Env<ByteBuf> env;
private final int lmdbDbId;
private final Dbi<ByteBuf> lmdb;
private boolean writing;
private boolean iterating;
private Txn<ByteBuf> readTxn;
private Txn<ByteBuf> rwTxn;
private Cursor<ByteBuf> cur;
// Cache
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
private final Deque<ByteBuf> toWriteKeys = new ArrayDeque<>();
private final Deque<ByteBuf> toWriteValues = new ArrayDeque<>();
private boolean topValid = true;
private T top = null;
private long size = 0;
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBSortedCodec<T> codec) {
this.codec = codec;
this.tempEnv = env;
this.env = env.getEnv();
this.lmdbDbId = env.allocateDb();
this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED);
this.writing = true;
this.iterating = false;
this.rwTxn = null;
this.readTxn = null;
this.cur = null;
}
public LMDBSortedCodec<T> getCodec() {
return codec;
}
private ByteBuf allocate(int size) {
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
}
private void switchToMode(boolean write, boolean wantCursor) {
assert !closed.get() : "Closed";
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
if (toWriteKeys.size() > 0) {
switchToModeUncached(true, false);
try {
var ki = toWriteKeys.iterator();
var vi = toWriteValues.iterator();
while (ki.hasNext()) {
var k = ki.next();
var v = vi.next();
lmdb.put(rwTxn, k, v);
}
} finally {
endMode();
for (ByteBuf toWriteKey : toWriteKeys) {
toWriteKey.release();
}
for (ByteBuf toWriteValue : toWriteValues) {
toWriteValue.release();
}
toWriteKeys.clear();
toWriteValues.clear();
}
}
switchToModeUncached(write, wantCursor);
}
private void switchToModeUncached(boolean write, boolean wantCursor) {
assert !closed.get() : "Closed";
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
boolean changedMode = false;
if (write) {
if (!writing) {
changedMode = true;
writing = true;
if (cur != null) {
cur.close();
cur = null;
}
readTxn.close();
readTxn = null;
assert rwTxn == null;
rwTxn = env.txnWrite();
} else if (rwTxn == null) {
assert readTxn == null;
rwTxn = env.txnWrite();
}
} else {
if (writing) {
changedMode = true;
writing = false;
if (cur != null) {
cur.close();
cur = null;
}
if (rwTxn != null) {
rwTxn.commit();
rwTxn.close();
rwTxn = null;
}
assert readTxn == null;
readTxn = env.txnRead();
}
}
if (cur == null) {
if (wantCursor) {
cur = lmdb.openCursor(Objects.requireNonNull(writing ? rwTxn : readTxn));
}
} else {
if (changedMode) {
cur.close();
cur = null;
}
}
}
private void endMode() {
if (cur != null) {
cur.close();
cur = null;
}
writing = true;
if (readTxn != null) {
readTxn.commit();
readTxn.close();
readTxn = null;
}
if (rwTxn != null) {
rwTxn.commit();
rwTxn.close();
rwTxn = null;
}
assert cur == null;
assert readTxn == null;
}
private static void ensureThread() {
LLUtils.ensureBlocking();
}
private static void ensureItThread() {
ensureThread();
//if (!(Thread.currentThread() instanceof LMDBThread)) {
// throw new IllegalStateException("Must run in LMDB scheduler");
//}
}
@Override
public void add(T element) {
ensureThread();
var buf = codec.serialize(this::allocate, element);
var uid = allocate(Long.BYTES);
uid.writeLong(NEXT_ITEM_UID.getAndIncrement());
if (toWriteKeys.size() < WRITE_QUEUE_MAX_BOUND) {
toWriteKeys.add(buf);
toWriteValues.add(uid);
if (++size == 1) {
topValid = true;
top = element;
} else {
topValid = false;
}
} else {
switchToMode(true, false);
try {
if (lmdb.put(rwTxn, buf, uid)) {
if (++size == 1) {
topValid = true;
top = element;
} else {
topValid = false;
}
}
} finally {
endMode();
buf.release();
uid.release();
}
}
assert topSingleValid(element);
}
private boolean topSingleValid(T element) {
if (size == 1) {
var top = databaseTop();
return codec.compare(top, element) == 0;
} else {
return true;
}
}
@Override
public T top() {
ensureThread();
if (topValid) {
return top;
} else {
var top = databaseTop();
this.top = top;
topValid = true;
return top;
}
}
private T databaseTop() {
ensureThread();
switchToMode(false, true);
try {
if (cur.first()) {
return codec.deserialize(cur.key());
} else {
return null;
}
} finally {
endMode();
}
}
@Override
public T pop() {
ensureThread();
switchToMode(true, true);
try {
if (cur.first()) {
var data = codec.deserialize(cur.key());
if (--size == 0) {
topValid = true;
top = null;
} else {
topValid = false;
top = null;
}
cur.delete();
return data;
} else {
topValid = true;
top = null;
return null;
}
} finally {
endMode();
}
}
@Override
public void replaceTop(T newTop) {
ensureThread();
this.pop();
this.add(newTop);
}
@Override
public long size() {
ensureThread();
return size;
}
@Override
public void clear() {
ensureThread();
switchToMode(true, false);
try {
lmdb.drop(rwTxn);
topValid = true;
top = null;
size = 0;
} finally {
endMode();
}
}
@Override
public boolean remove(@NotNull T element) {
ensureThread();
Objects.requireNonNull(element);
switchToMode(true, true);
var buf = codec.serialize(this::allocate, element);
try {
var deletable = cur.get(buf, GetOp.MDB_SET);
if (deletable) {
cur.delete();
if (topValid && codec.compare(top, element) == 0) {
if (--size == 0) {
top = null;
}
} else {
if (--size == 0) {
topValid = true;
top = null;
} else {
topValid = false;
}
}
}
return deletable;
} finally {
endMode();
}
}
public Flux<T> reverseIterate() {
return Flux
.generate(() -> {
ensureItThread();
switchToMode(false, true);
iterating = true;
return true;
}, (isLastKey, sink) -> {
try {
ensureItThread();
boolean found;
if (isLastKey) {
found = cur.last();
} else {
found = cur.prev();
}
if (found) {
sink.next(codec.deserialize(cur.key()));
} else {
sink.complete();
}
return false;
} catch (Throwable ex) {
sink.error(ex);
return false;
}
}, t -> {
ensureItThread();
iterating = false;
endMode();
});
}
@Override
public Flux<T> iterate() {
return Flux
.<T, Tuple2<CursorIterable<ByteBuf>, Iterator<KeyVal<ByteBuf>>>>generate(() -> {
ensureItThread();
switchToMode(false, false);
iterating = true;
if (cur != null) {
cur.close();
cur = null;
}
CursorIterable<ByteBuf> cit = lmdb.iterate(readTxn);
var it = cit.iterator();
return Tuples.of(cit, it);
}, (t, sink) -> {
try {
ensureItThread();
var it = t.getT2();
if (it.hasNext()) {
sink.next(codec.deserialize(it.next().key()));
} else {
sink.complete();
}
return t;
} catch (Throwable ex) {
sink.error(ex);
return t;
}
}, t -> {
ensureItThread();
var cit = t.getT1();
cit.close();
iterating = false;
endMode();
});
}
@Override
public Flux<T> iterate(long skips) {
return Flux
.<T, Tuple3<CursorIterable<ByteBuf>, Iterator<KeyVal<ByteBuf>>, Long>>generate(() -> {
ensureItThread();
switchToMode(false, false);
iterating = true;
if (cur != null) {
cur.close();
cur = null;
}
CursorIterable<ByteBuf> cit = lmdb.iterate(readTxn);
var it = cit.iterator();
return Tuples.of(cit, it, skips);
}, (t, sink) -> {
ensureItThread();
var it = t.getT2();
var remainingSkips = t.getT3();
while (remainingSkips-- > 0 && it.hasNext()) {
it.next();
}
if (it.hasNext()) {
sink.next(codec.deserialize(it.next().key()));
} else {
sink.complete();
}
return t.getT3() == 0L ? t : t.mapT3(s -> 0L);
}, t -> {
ensureItThread();
var cit = t.getT1();
cit.close();
iterating = false;
endMode();
});
}
public Flux<T> reverseIterate(long skips) {
return Flux
.generate(() -> {
ensureItThread();
switchToMode(false, true);
iterating = true;
return true;
}, (isLastKey, sink) -> {
try {
ensureItThread();
boolean found;
if (isLastKey) {
found = cur.last();
} else {
found = cur.prev();
}
if (found) {
// Skip elements
if (isLastKey) {
long remainingSkips = skips;
while (remainingSkips > 0) {
if (cur.prev()) {
remainingSkips--;
} else {
sink.complete();
return false;
}
}
}
sink.next(codec.deserialize(cur.key()));
} else {
sink.complete();
}
return false;
} catch (Throwable ex) {
sink.error(ex);
return false;
}
}, t -> {
ensureItThread();
iterating = false;
endMode();
});
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
ensureThread();
for (ByteBuf toWriteKey : toWriteKeys) {
toWriteKey.release();
}
for (ByteBuf toWriteValue : toWriteValues) {
toWriteValue.release();
}
toWriteKeys.clear();
toWriteValues.clear();
if (cur != null) {
cur.close();
}
if (rwTxn != null) {
rwTxn.close();
}
if (readTxn != null) {
readTxn.close();
}
try (var txn = env.txnWrite()) {
lmdb.drop(txn, true);
txn.commit();
}
lmdb.close();
this.tempEnv.freeDb(lmdbDbId);
if (this.codec instanceof SafeCloseable closeable) {
closeable.close();
}
}
}
@Override
public String toString() {
return new StringJoiner(", ", LMDBPriorityQueue.class.getSimpleName() + "[", "]")
.add("size=" + size)
.toString();
}
@Override
public ReversableResourceIterable<T> reverse() {
return new ReversableResourceIterable<>() {
@Override
public void close() {
LMDBPriorityQueue.this.close();
}
@Override
public Flux<T> iterate() {
return reverseIterate();
}
@Override
public Flux<T> iterate(long skips) {
return reverseIterate(skips);
}
@Override
public ReversableResourceIterable<T> reverse() {
return LMDBPriorityQueue.this;
}
};
}
}

View File

@ -1,13 +0,0 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import java.util.Comparator;
import java.util.function.Function;
public interface LMDBSortedCodec<T> extends LMDBCodec<T> {
int compare(T o1, T o2);
int compareDirect(ByteBuf o1, ByteBuf o2);
}

View File

@ -1,8 +0,0 @@
package it.cavallium.dbengine.lucene;
public class LMDBThread extends Thread {
public LMDBThread(Runnable r) {
super(r);
}
}

View File

@ -1,17 +1,18 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import java.util.function.Function;
public class LongCodec implements LMDBCodec<Long> {
public class LongCodec implements HugePqCodec<Long> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Long data) {
public Buffer serialize(Function<Integer, Buffer> allocator, Long data) {
return allocator.apply(Long.BYTES).writeLong(data);
}
@Override
public Long deserialize(ByteBuf b) {
public Long deserialize(Buffer b) {
return b.readLong();
}
}

View File

@ -1,78 +0,0 @@
package it.cavallium.dbengine.lucene;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.util.function.Function;
import org.apache.lucene.search.SortField;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
public class SortFieldCodec implements LMDBCodec<SortField> {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, SortField data) {
var out = new ByteBufDataOutput();
try {
var provider = data.getIndexSorter().getProviderName();
out.writeString(provider);
SortField.Provider.forName(provider).writeSortField(data, out);
} catch (IOException e) {
throw new RuntimeException(e);
}
return out.buf;
}
@Override
public SortField deserialize(ByteBuf b) {
var in = new ByteBufDataInput(b);
try {
return SortField.Provider.forName(in.readString()).readSortField(in);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static class ByteBufDataOutput extends DataOutput {
private final ByteBuf buf;
public ByteBufDataOutput() {
this.buf = PooledByteBufAllocator.DEFAULT.directBuffer();
}
@Override
public void writeByte(byte b) {
buf.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
buf.writeBytes(b, offset, length);
}
}
private static class ByteBufDataInput extends DataInput {
private final ByteBuf buf;
public ByteBufDataInput(ByteBuf b) {
this.buf = b;
}
@Override
public byte readByte() {
return buf.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) {
buf.readBytes(b, offset, len);
}
@Override
public void skipBytes(long numBytes) {
buf.skipBytes((int) numBytes);
}
}
}

View File

@ -18,12 +18,10 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.DoubleCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.SortFieldCodec;
import java.io.Closeable;
import it.cavallium.dbengine.lucene.HugePqArray;
import java.io.IOException;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.index.LeafReaderContext;
@ -40,10 +38,10 @@ public class DoubleComparator extends NumericComparator<Double> implements SafeC
protected double topValue;
protected double bottom;
public DoubleComparator(LLTempLMDBEnv env,
public DoubleComparator(LLTempHugePqEnv env,
int numHits, String field, Double missingValue, boolean reverse, int sortPos) {
super(field, missingValue != null ? missingValue : 0.0, reverse, sortPos, Double.BYTES);
values = new LMDBArray<>(env, new DoubleCodec(), numHits, 0d);
values = new HugePqArray<>(env, new DoubleCodec(), numHits, 0d);
}
@Override

View File

@ -18,12 +18,10 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.DoubleCodec;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.LMDBArray;
import java.io.Closeable;
import it.cavallium.dbengine.lucene.HugePqArray;
import java.io.IOException;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.index.LeafReaderContext;
@ -40,10 +38,10 @@ public class FloatComparator extends NumericComparator<Float> implements SafeClo
protected float topValue;
protected float bottom;
public FloatComparator(LLTempLMDBEnv env,
public FloatComparator(LLTempHugePqEnv env,
int numHits, String field, Float missingValue, boolean reverse, int sortPos) {
super(field, missingValue != null ? missingValue : 0.0f, reverse, sortPos, Float.BYTES);
values = new LMDBArray<>(env, new FloatCodec(), numHits, 0f);
values = new HugePqArray<>(env, new FloatCodec(), numHits, 0f);
}
@Override

View File

@ -18,12 +18,10 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import java.io.Closeable;
import it.cavallium.dbengine.lucene.HugePqArray;
import java.io.IOException;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.index.LeafReaderContext;
@ -40,10 +38,10 @@ public class IntComparator extends NumericComparator<Integer> implements SafeClo
protected int topValue;
protected int bottom;
public IntComparator(LLTempLMDBEnv env,
public IntComparator(LLTempHugePqEnv env,
int numHits, String field, Integer missingValue, boolean reverse, int sortPos) {
super(field, missingValue != null ? missingValue : 0, reverse, sortPos, Integer.BYTES);
values = new LMDBArray<>(env, new IntCodec(), numHits, 0);
values = new HugePqArray<>(env, new IntCodec(), numHits, 0);
}
@Override

View File

@ -18,12 +18,10 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.HugePqArray;
import it.cavallium.dbengine.lucene.LongCodec;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
@ -40,10 +38,10 @@ public class LongComparator extends NumericComparator<Long> implements SafeClose
protected long topValue;
protected long bottom;
public LongComparator(LLTempLMDBEnv env,
public LongComparator(LLTempHugePqEnv env,
int numHits, String field, Long missingValue, boolean reverse, int sortPos) {
super(field, missingValue != null ? missingValue : 0L, reverse, sortPos, Long.BYTES);
values = new LMDBArray<>(env, new LongCodec(), numHits, 0L);
values = new HugePqArray<>(env, new LongCodec(), numHits, 0L);
}
@Override

View File

@ -17,24 +17,17 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.LongCodec;
import java.io.Closeable;
import it.cavallium.dbengine.lucene.HugePqArray;
import java.io.IOException;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.LeafFieldComparator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreCachingWrappingScorer;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
/**
* Sorts by descending relevance. NOTE: if you are sorting only by descending relevance and then secondarily by
@ -52,8 +45,8 @@ public final class RelevanceComparator extends FieldComparator<Float> implements
/**
* Creates a new comparator based on relevance for {@code numHits}.
*/
public RelevanceComparator(LLTempLMDBEnv env, int numHits) {
scores = new LMDBArray<>(env, new FloatCodec(), numHits, 0f);
public RelevanceComparator(LLTempHugePqEnv env, int numHits) {
scores = new HugePqArray<>(env, new FloatCodec(), numHits, 0f);
}
@Override

View File

@ -1,14 +1,11 @@
package it.cavallium.dbengine.lucene.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.ByteArrayCodec;
import it.cavallium.dbengine.lucene.BytesRefCodec;
import it.cavallium.dbengine.lucene.FloatCodec;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import java.io.Closeable;
import it.cavallium.dbengine.lucene.HugePqArray;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.index.DocValues;
@ -18,7 +15,6 @@ import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.LeafFieldComparator;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
/**
* Sorts by field's natural Term sort order, using ordinals. This is functionally equivalent to
@ -88,7 +84,7 @@ public class TermOrdValComparator extends FieldComparator<BytesRef> implements L
final int missingOrd;
/** Creates this, sorting missing values first. */
public TermOrdValComparator(LLTempLMDBEnv env, int numHits, String field) {
public TermOrdValComparator(LLTempHugePqEnv env, int numHits, String field) {
this(env, numHits, field, false);
}
@ -96,10 +92,10 @@ public class TermOrdValComparator extends FieldComparator<BytesRef> implements L
* Creates this, with control over how missing values are sorted. Pass sortMissingLast=true to
* put missing values at the end.
*/
public TermOrdValComparator(LLTempLMDBEnv env, int numHits, String field, boolean sortMissingLast) {
ords = new LMDBArray<>(env, new IntCodec(), numHits, 0);
values = new LMDBArray<>(env, new ByteArrayCodec(), numHits, null);
readerGen = new LMDBArray<>(env, new IntCodec(), numHits, 0);
public TermOrdValComparator(LLTempHugePqEnv env, int numHits, String field, boolean sortMissingLast) {
ords = new HugePqArray<>(env, new IntCodec(), numHits, 0);
values = new HugePqArray<>(env, new ByteArrayCodec(), numHits, null);
readerGen = new HugePqArray<>(env, new IntCodec(), numHits, 0);
this.field = field;
if (sortMissingLast) {
missingSortCmp = 1;

View File

@ -7,7 +7,7 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import java.io.IOException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
@ -34,9 +34,9 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
@Nullable
private final SortedScoredFullMultiSearcher sortedScoredFull;
public AdaptiveLocalSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) {
sortedByScoreFull = useLMDB ? new SortedByScoreFullMultiSearcher(env) : null;
sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null;
public AdaptiveLocalSearcher(LLTempHugePqEnv env, boolean useHugePq, int maxInMemoryResultEntries) {
sortedByScoreFull = useHugePq ? new SortedByScoreFullMultiSearcher(env) : null;
sortedScoredFull = useHugePq ? new SortedScoredFullMultiSearcher(env) : null;
this.maxInMemoryResultEntries = maxInMemoryResultEntries;
}

View File

@ -6,7 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRIT
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import java.io.IOException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
@ -33,9 +33,9 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
@Nullable
private final SortedScoredFullMultiSearcher sortedScoredFull;
public AdaptiveMultiSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) {
sortedByScoreFull = useLMDB ? new SortedByScoreFullMultiSearcher(env) : null;
sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null;
public AdaptiveMultiSearcher(LLTempHugePqEnv env, boolean useHugePq, int maxInMemoryResultEntries) {
sortedByScoreFull = useHugePq ? new SortedByScoreFullMultiSearcher(env) : null;
sortedScoredFull = useHugePq ? new SortedScoredFullMultiSearcher(env) : null;
this.maxInMemoryResultEntries = maxInMemoryResultEntries;
}

View File

@ -6,11 +6,11 @@ import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import org.apache.lucene.search.LMDBFullScoreDocCollector;
import org.apache.lucene.search.HugePqFullScoreDocCollector;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -24,9 +24,9 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
protected static final Logger logger = LogManager.getLogger(SortedByScoreFullMultiSearcher.class);
private final LLTempLMDBEnv env;
private final LLTempHugePqEnv env;
public SortedByScoreFullMultiSearcher(LLTempLMDBEnv env) {
public SortedByScoreFullMultiSearcher(LLTempHugePqEnv env) {
this.env = env;
}
@ -77,7 +77,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
.fromCallable(() -> {
LLUtils.ensureBlocking();
var totalHitsThreshold = queryParams.getTotalHitsThresholdLong();
return LMDBFullScoreDocCollector.createSharedManager(env, queryParams.limitLong(), totalHitsThreshold);
return HugePqFullScoreDocCollector.createSharedManager(env, queryParams.limitLong(), totalHitsThreshold);
})
.flatMap(sharedManager -> Flux
.fromIterable(indexSearchers)
@ -102,7 +102,7 @@ public class SortedByScoreFullMultiSearcher implements MultiSearcher {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
} catch (Throwable ex) {
for (LMDBFullScoreDocCollector collector : collectors) {
for (HugePqFullScoreDocCollector collector : collectors) {
collector.close();
}
throw ex;

View File

@ -6,11 +6,11 @@ import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LuceneUtils;
import org.apache.lucene.search.LMDBFullFieldDocCollector;
import org.apache.lucene.search.HugePqFullFieldDocCollector;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -24,9 +24,9 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
protected static final Logger logger = LogManager.getLogger(SortedScoredFullMultiSearcher.class);
private final LLTempLMDBEnv env;
private final LLTempHugePqEnv env;
public SortedScoredFullMultiSearcher(LLTempLMDBEnv env) {
public SortedScoredFullMultiSearcher(LLTempHugePqEnv env) {
this.env = env;
}
@ -70,7 +70,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
.fromCallable(() -> {
LLUtils.ensureBlocking();
var totalHitsThreshold = queryParams.getTotalHitsThresholdLong();
return LMDBFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(),
return HugePqFullFieldDocCollector.createSharedManager(env, queryParams.sort(), queryParams.limitInt(),
totalHitsThreshold);
})
.<FullDocs<LLFieldDoc>>flatMap(sharedManager -> Flux
@ -96,7 +96,7 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher {
LLUtils.ensureBlocking();
return sharedManager.reduce(collectors);
} catch (Throwable ex) {
for (LMDBFullFieldDocCollector collector : collectors) {
for (HugePqFullFieldDocCollector collector : collectors) {
collector.close();
}
throw ex;

View File

@ -17,13 +17,13 @@
package org.apache.lucene.search;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.FieldValueHitQueue;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LLSlotDoc;
import it.cavallium.dbengine.lucene.LLSlotDocCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.HugePqPriorityQueue;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.lucene.ResourceIterable;
@ -39,15 +39,15 @@ import reactor.core.publisher.Flux;
/**
* A {@link org.apache.lucene.search.Collector} that sorts by {@link SortField} using {@link FieldComparator}s.
*
* <p>See the {@link #create(LLTempLMDBEnv, Sort, int, int)} (org.apache.lucene.search.Sort, int, int)} method for instantiating a
* <p>See the {@link #create(LLTempHugePqEnv, Sort, int, int)} (org.apache.lucene.search.Sort, int, int)} method for instantiating a
* TopFieldCollector.
*
* This class must mirror this changes:
* <a href="https://github.com/apache/lucene/commits/main/lucene/core/src/java/org/apache/lucene/search/TopFieldCollector.java">
* Lucene TopFieldCollector changes on GitHub</a>
*/
public abstract class LMDBFullFieldDocCollector extends
FullDocsCollector<LMDBPriorityQueue<LLSlotDoc>, LLSlotDoc, LLFieldDoc> {
public abstract class HugePqFullFieldDocCollector extends
FullDocsCollector<HugePqPriorityQueue<LLSlotDoc>, LLSlotDoc, LLFieldDoc> {
// TODO: one optimization we could do is to pre-fill
// the queue with sentinel value that guaranteed to
@ -185,14 +185,14 @@ public abstract class LMDBFullFieldDocCollector extends
* Implements a TopFieldCollector over one SortField criteria, with tracking
* document scores and maxScore.
*/
private static class SimpleFieldCollector extends LMDBFullFieldDocCollector {
private static class SimpleFieldCollector extends HugePqFullFieldDocCollector {
final Sort sort;
final PriorityQueue<LLSlotDoc> queue;
private final FieldValueHitQueue fieldValueHitQueue;
public SimpleFieldCollector(
Sort sort,
LMDBPriorityQueue<LLSlotDoc> queue,
HugePqPriorityQueue<LLSlotDoc> queue,
FieldValueHitQueue fieldValueHitQueue,
long numHits,
HitsThresholdChecker hitsThresholdChecker,
@ -268,8 +268,8 @@ public abstract class LMDBFullFieldDocCollector extends
// internal versions. If someone will define a constructor with any other
// visibility, then anyone will be able to extend the class, which is not what
// we want.
private LMDBFullFieldDocCollector(
LMDBPriorityQueue<LLSlotDoc> pq,
private HugePqFullFieldDocCollector(
HugePqPriorityQueue<LLSlotDoc> pq,
FieldValueHitQueue fieldValueHitQueue,
long numHits,
HitsThresholdChecker hitsThresholdChecker,
@ -335,7 +335,7 @@ public abstract class LMDBFullFieldDocCollector extends
}
/**
* Creates a new {@link LMDBFullFieldDocCollector} from the given arguments.
* Creates a new {@link HugePqFullFieldDocCollector} from the given arguments.
*
* <p><b>NOTE</b>: The instances returned by this method pre-allocate a full array of length
* <code>numHits</code>.
@ -347,9 +347,9 @@ public abstract class LMDBFullFieldDocCollector extends
* hand if the query matches less than or exactly {@code totalHitsThreshold} hits then the hit
* count of the result will be accurate. {@link Integer#MAX_VALUE} may be used to make the hit
* count accurate, but this will also make query processing slower.
* @return a {@link LMDBFullFieldDocCollector} instance which will sort the results by the sort criteria.
* @return a {@link HugePqFullFieldDocCollector} instance which will sort the results by the sort criteria.
*/
public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, Sort sort, int numHits, int totalHitsThreshold) {
public static HugePqFullFieldDocCollector create(LLTempHugePqEnv env, Sort sort, int numHits, int totalHitsThreshold) {
if (totalHitsThreshold < 0) {
throw new IllegalArgumentException(
"totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
@ -367,8 +367,8 @@ public abstract class LMDBFullFieldDocCollector extends
* Same as above with additional parameters to allow passing in the threshold checker and the max
* score accumulator.
*/
static LMDBFullFieldDocCollector create(
LLTempLMDBEnv env,
static HugePqFullFieldDocCollector create(
LLTempHugePqEnv env,
Sort sort,
int numHits,
HitsThresholdChecker hitsThresholdChecker,
@ -388,7 +388,7 @@ public abstract class LMDBFullFieldDocCollector extends
}
var fieldValueHitQueue = new LLSlotDocCodec(env, numHits, sort.getSort());
var queue = new LMDBPriorityQueue<>(env, fieldValueHitQueue);
var queue = new HugePqPriorityQueue<>(env, fieldValueHitQueue);
// inform a comparator that sort is based on this single field
// to enable some optimizations for skipping over non-competitive documents
@ -405,8 +405,8 @@ public abstract class LMDBFullFieldDocCollector extends
* shared {@link MaxScoreAccumulator} to propagate the minimum score accross segments if the
* primary sort is by relevancy.
*/
public static CollectorManager<LMDBFullFieldDocCollector, FullFieldDocs<LLFieldDoc>> createSharedManager(
LLTempLMDBEnv env, Sort sort, int numHits, long totalHitsThreshold) {
public static CollectorManager<HugePqFullFieldDocCollector, FullFieldDocs<LLFieldDoc>> createSharedManager(
LLTempHugePqEnv env, Sort sort, int numHits, long totalHitsThreshold) {
return new CollectorManager<>() {
private final HitsThresholdChecker hitsThresholdChecker;
@ -422,18 +422,18 @@ public abstract class LMDBFullFieldDocCollector extends
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullFieldDocCollector newCollector() {
public HugePqFullFieldDocCollector newCollector() {
return create(env, sort, numHits, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullFieldDocs<LLFieldDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) {
public FullFieldDocs<LLFieldDoc> reduce(Collection<HugePqFullFieldDocCollector> collectors) {
return reduceShared(sort, collectors);
}
};
}
private static FullFieldDocs<LLFieldDoc> reduceShared(Sort sort, Collection<LMDBFullFieldDocCollector> collectors) {
private static FullFieldDocs<LLFieldDoc> reduceShared(Sort sort, Collection<HugePqFullFieldDocCollector> collectors) {
@SuppressWarnings("unchecked")
final FullDocs<LLFieldDoc>[] fullDocs = new FullDocs[collectors.size()];
int i = 0;

View File

@ -16,11 +16,11 @@
*/
package org.apache.lucene.search;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.LLScoreDocCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.HugePqPriorityQueue;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import it.cavallium.dbengine.lucene.ResourceIterable;
import it.cavallium.dbengine.lucene.collector.FullDocsCollector;
@ -45,8 +45,8 @@ import org.jetbrains.annotations.Nullable;
* <a href="https://github.com/apache/lucene/commits/main/lucene/core/src/java/org/apache/lucene/search/TopScoreDocCollector.java">
* Lucene TopScoreDocCollector changes on GitHub</a>
*/
public abstract class LMDBFullScoreDocCollector extends
FullDocsCollector<LMDBPriorityQueue<LLScoreDoc>, LLScoreDoc, LLScoreDoc> {
public abstract class HugePqFullScoreDocCollector extends
FullDocsCollector<HugePqPriorityQueue<LLScoreDoc>, LLScoreDoc, LLScoreDoc> {
/** Scorable leaf collector */
public abstract static class ScorerLeafCollector implements LeafCollector {
@ -59,9 +59,9 @@ public abstract class LMDBFullScoreDocCollector extends
}
}
private static class SimpleLMDBFullScoreDocCollector extends LMDBFullScoreDocCollector {
private static class SimpleHugePqFullScoreDocCollector extends HugePqFullScoreDocCollector {
SimpleLMDBFullScoreDocCollector(LLTempLMDBEnv env, @Nullable Long limit,
SimpleHugePqFullScoreDocCollector(LLTempHugePqEnv env, @Nullable Long limit,
CustomHitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
super(env, limit, hitsThresholdChecker, minScoreAcc);
}
@ -133,7 +133,7 @@ public abstract class LMDBFullScoreDocCollector extends
}
/**
* Creates a new {@link LMDBFullScoreDocCollector} given the number of hits to collect and the number
* Creates a new {@link HugePqFullScoreDocCollector} given the number of hits to collect and the number
* of hits to count accurately.
*
* <p><b>NOTE</b>: If the total hit count of the top docs is less than or exactly {@code
@ -145,22 +145,22 @@ public abstract class LMDBFullScoreDocCollector extends
* <p><b>NOTE</b>: The instances returned by this method pre-allocate a full array of length
* <code>numHits</code>, and fill the array with sentinel objects.
*/
public static LMDBFullScoreDocCollector create(LLTempLMDBEnv env, long numHits, int totalHitsThreshold) {
public static HugePqFullScoreDocCollector create(LLTempHugePqEnv env, long numHits, int totalHitsThreshold) {
return create(env, numHits, CustomHitsThresholdChecker.create(totalHitsThreshold), null);
}
/**
* Creates a new {@link LMDBFullScoreDocCollector} given the number of hits to count accurately.
* Creates a new {@link HugePqFullScoreDocCollector} given the number of hits to count accurately.
*
* <p><b>NOTE</b>: A value of {@link Integer#MAX_VALUE} will make the hit count accurate
* but will also likely make query processing slower.
*/
public static LMDBFullScoreDocCollector create(LLTempLMDBEnv env, int totalHitsThreshold) {
public static HugePqFullScoreDocCollector create(LLTempHugePqEnv env, int totalHitsThreshold) {
return create(env, CustomHitsThresholdChecker.create(totalHitsThreshold), null);
}
static LMDBFullScoreDocCollector create(
LLTempLMDBEnv env,
static HugePqFullScoreDocCollector create(
LLTempHugePqEnv env,
CustomHitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
@ -168,11 +168,11 @@ public abstract class LMDBFullScoreDocCollector extends
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
}
return new SimpleLMDBFullScoreDocCollector(env, null, hitsThresholdChecker, minScoreAcc);
return new SimpleHugePqFullScoreDocCollector(env, null, hitsThresholdChecker, minScoreAcc);
}
static LMDBFullScoreDocCollector create(
LLTempLMDBEnv env,
static HugePqFullScoreDocCollector create(
LLTempHugePqEnv env,
@NotNull Long numHits,
CustomHitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
@ -181,7 +181,7 @@ public abstract class LMDBFullScoreDocCollector extends
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
}
return new SimpleLMDBFullScoreDocCollector(env,
return new SimpleHugePqFullScoreDocCollector(env,
(numHits < 0 || numHits >= 2147483630L) ? null : numHits,
hitsThresholdChecker,
minScoreAcc
@ -192,8 +192,8 @@ public abstract class LMDBFullScoreDocCollector extends
* Create a CollectorManager which uses a shared hit counter to maintain number of hits and a
* shared {@link MaxScoreAccumulator} to propagate the minimum score accross segments
*/
public static CollectorManager<LMDBFullScoreDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempLMDBEnv env,
public static CollectorManager<HugePqFullScoreDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempHugePqEnv env,
long numHits,
long totalHitsThreshold) {
return new CollectorManager<>() {
@ -203,12 +203,12 @@ public abstract class LMDBFullScoreDocCollector extends
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullScoreDocCollector newCollector() {
return LMDBFullScoreDocCollector.create(env, numHits, hitsThresholdChecker, minScoreAcc);
public HugePqFullScoreDocCollector newCollector() {
return HugePqFullScoreDocCollector.create(env, numHits, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullScoreDocCollector> collectors) {
public FullDocs<LLScoreDoc> reduce(Collection<HugePqFullScoreDocCollector> collectors) {
return reduceShared(collectors);
}
};
@ -218,8 +218,8 @@ public abstract class LMDBFullScoreDocCollector extends
* Create a CollectorManager which uses a shared {@link MaxScoreAccumulator} to propagate
* the minimum score accross segments
*/
public static CollectorManager<LMDBFullScoreDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempLMDBEnv env,
public static CollectorManager<HugePqFullScoreDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempHugePqEnv env,
long totalHitsThreshold) {
return new CollectorManager<>() {
@ -228,22 +228,22 @@ public abstract class LMDBFullScoreDocCollector extends
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullScoreDocCollector newCollector() {
return LMDBFullScoreDocCollector.create(env, hitsThresholdChecker, minScoreAcc);
public HugePqFullScoreDocCollector newCollector() {
return HugePqFullScoreDocCollector.create(env, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullScoreDocCollector> collectors) {
public FullDocs<LLScoreDoc> reduce(Collection<HugePqFullScoreDocCollector> collectors) {
return reduceShared(collectors);
}
};
}
private static FullDocs<LLScoreDoc> reduceShared(Collection<LMDBFullScoreDocCollector> collectors) {
private static FullDocs<LLScoreDoc> reduceShared(Collection<HugePqFullScoreDocCollector> collectors) {
@SuppressWarnings("unchecked")
final FullDocs<LLScoreDoc>[] fullDocs = new FullDocs[collectors.size()];
int i = 0;
for (LMDBFullScoreDocCollector collector : collectors) {
for (HugePqFullScoreDocCollector collector : collectors) {
fullDocs[i++] = collector.fullDocs();
}
return FullDocs.merge(null, fullDocs);
@ -256,9 +256,9 @@ public abstract class LMDBFullScoreDocCollector extends
float minCompetitiveScore;
// prevents instantiation
LMDBFullScoreDocCollector(LLTempLMDBEnv env, @Nullable Long limit,
HugePqFullScoreDocCollector(LLTempHugePqEnv env, @Nullable Long limit,
CustomHitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
super(new LMDBPriorityQueue<>(env, new LLScoreDocCodec()));
super(new HugePqPriorityQueue<>(env, new LLScoreDocCodec()));
assert hitsThresholdChecker != null;
this.limit = limit;
this.hitsThresholdChecker = hitsThresholdChecker;

View File

@ -18,22 +18,21 @@
package org.apache.lucene.search.comparators;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.IArray;
import it.cavallium.dbengine.lucene.IntCodec;
import it.cavallium.dbengine.lucene.LMDBArray;
import it.cavallium.dbengine.lucene.HugePqArray;
import java.io.IOException;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.LeafFieldComparator;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.comparators.MinDocIterator;
/**
* Comparator that sorts by asc _doc
* Based on {@link org.apache.lucene.search.comparators.DocComparator}
* */
public class LMDBDocComparator extends org.apache.lucene.search.comparators.DocComparator implements SafeCloseable {
public class HugePqDocComparator extends org.apache.lucene.search.comparators.DocComparator implements SafeCloseable {
private final IArray<Integer> docIDs;
private final boolean enableSkipping; // if skipping functionality should be enabled
private int bottom;
@ -43,9 +42,9 @@ public class LMDBDocComparator extends org.apache.lucene.search.comparators.DocC
private boolean hitsThresholdReached;
/** Creates a new comparator based on document ids for {@code numHits} */
public LMDBDocComparator(LLTempLMDBEnv env, int numHits, boolean reverse, int sortPost) {
public HugePqDocComparator(LLTempHugePqEnv env, int numHits, boolean reverse, int sortPost) {
super(0, reverse, sortPost);
this.docIDs = new LMDBArray<>(env, new IntCodec(), numHits, 0);
this.docIDs = new HugePqArray<>(env, new IntCodec(), numHits, 0);
// skipping functionality is enabled if we are sorting by _doc in asc order as a primary sort
this.enableSkipping = (!reverse && sortPost == 0);
}

View File

@ -1,154 +0,0 @@
/*-
* #%L
* LmdbJava
* %%
* Copyright (C) 2016 - 2021 The LmdbJava Open Source Project
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package org.lmdbjava;
import static io.netty5.buffer.PooledByteBufAllocator.DEFAULT;
import static java.lang.Class.forName;
import static org.lmdbjava.UnsafeAccess.UNSAFE;
import io.netty5.buffer.ByteBuf;
import java.lang.reflect.Field;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.PooledByteBufAllocator;
import jnr.ffi.Pointer;
/**
* A buffer proxy backed by Netty's {@link ByteBuf}.
*
* <p>
* This class requires {@link UnsafeAccess} and netty-buffer must be in the
* classpath.
*/
public final class Net5ByteBufProxy extends BufferProxy<ByteBuf> {
/**
* A proxy for using Netty {@link ByteBuf}. Guaranteed to never be null,
* although a class initialization exception will occur if an attempt is made
* to access this field when Netty is unavailable.
*/
public static final BufferProxy<ByteBuf> PROXY_NETTY = new Net5ByteBufProxy();
private static final int BUFFER_RETRIES = 10;
private static final String FIELD_NAME_ADDRESS = "memoryAddress";
private static final String FIELD_NAME_LENGTH = "length";
private static final String NAME = "io.netty5.buffer.PooledUnsafeDirectByteBuf";
private final long lengthOffset;
private final long addressOffset;
private final PooledByteBufAllocator nettyAllocator;
private Net5ByteBufProxy() {
this(DEFAULT);
}
public Net5ByteBufProxy(final PooledByteBufAllocator allocator) {
this.nettyAllocator = allocator;
try {
final ByteBuf initBuf = this.allocate();
initBuf.release();
final Field address = findField(NAME, FIELD_NAME_ADDRESS);
final Field length = findField(NAME, FIELD_NAME_LENGTH);
addressOffset = UNSAFE.objectFieldOffset(address);
lengthOffset = UNSAFE.objectFieldOffset(length);
} catch (final SecurityException e) {
throw new LmdbException("Field access error", e);
}
}
static Field findField(final String c, final String name) {
Class<?> clazz;
try {
clazz = forName(c);
} catch (final ClassNotFoundException e) {
throw new LmdbException(c + " class unavailable", e);
}
do {
try {
final Field field = clazz.getDeclaredField(name);
field.setAccessible(true);
return field;
} catch (final NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
} while (clazz != null);
throw new LmdbException(name + " not found");
}
@Override
protected ByteBuf allocate() {
for (int i = 0; i < BUFFER_RETRIES; i++) {
final ByteBuf bb = nettyAllocator.directBuffer();
if (NAME.equals(bb.getClass().getName())) {
return bb;
} else {
bb.release();
}
}
throw new IllegalStateException("Netty buffer must be " + NAME);
}
@Override
protected int compare(final ByteBuf o1, final ByteBuf o2) {
return o1.compareTo(o2);
}
@Override
protected void deallocate(final ByteBuf buff) {
buff.release();
}
@Override
protected byte[] getBytes(final ByteBuf buffer) {
final byte[] dest = new byte[buffer.capacity()];
buffer.getBytes(0, dest);
return dest;
}
@Override
protected void in(final ByteBuf buffer, final Pointer ptr, final long ptrAddr) {
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE,
buffer.writerIndex() - buffer.readerIndex());
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA,
buffer.memoryAddress() + buffer.readerIndex());
}
@Override
protected void in(final ByteBuf buffer, final int size, final Pointer ptr,
final long ptrAddr) {
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE,
size);
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA,
buffer.memoryAddress() + buffer.readerIndex());
}
@Override
protected ByteBuf out(final ByteBuf buffer, final Pointer ptr,
final long ptrAddr) {
final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA);
final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE);
UNSAFE.putLong(buffer, addressOffset, addr);
UNSAFE.putInt(buffer, lengthOffset, (int) size);
buffer.writerIndex((int) size).readerIndex(0);
return buffer;
}
}

View File

@ -2,10 +2,10 @@ package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty5.buffer.ByteBuf;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LMDBSortedCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.HugePqCodec;
import it.cavallium.dbengine.lucene.HugePqPriorityQueue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -15,33 +15,23 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class TestLMDB {
public class TestHugePq {
private LLTempLMDBEnv env;
private LMDBPriorityQueue<Integer> queue;
private LLTempHugePqEnv env;
private HugePqPriorityQueue<Integer> queue;
@BeforeEach
public void beforeEach() throws IOException {
this.env = new LLTempLMDBEnv();
this.queue = new LMDBPriorityQueue<>(env, new LMDBSortedCodec<Integer>() {
this.env = new LLTempHugePqEnv();
this.queue = new HugePqPriorityQueue<>(env, new HugePqCodec<Integer>() {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, Integer data) {
return allocator.apply(Integer.BYTES).writeInt(data).asReadOnly();
public Buffer serialize(Function<Integer, Buffer> allocator, Integer data) {
return HugePqCodec.setLexInt(allocator.apply(Integer.BYTES), 0, false, data);
}
@Override
public Integer deserialize(ByteBuf b) {
return b.getInt(0);
}
@Override
public int compare(Integer o1, Integer o2) {
return Integer.compare(o1, o2);
}
@Override
public int compareDirect(ByteBuf o1, ByteBuf o2) {
return Integer.compare(o1.getInt(0), o2.getInt(0));
public Integer deserialize(Buffer b) {
return HugePqCodec.getLexInt(b, 0, false);
}
});
}
@ -144,7 +134,6 @@ public class TestLMDB {
@AfterEach
public void afterEach() throws IOException {
queue.close();
assertEquals(0, env.countUsedDbs());
env.close();
}
}

View File

@ -1,16 +1,13 @@
package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.Lists;
import io.netty5.buffer.ByteBuf;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.LMDBSortedCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.HugePqCodec;
import it.cavallium.dbengine.lucene.HugePqPriorityQueue;
import it.cavallium.dbengine.lucene.PriorityQueue;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -19,18 +16,20 @@ import java.util.Random;
import java.util.function.Function;
import org.apache.lucene.search.HitQueue;
import org.apache.lucene.search.ScoreDoc;
import org.assertj.core.description.Description;
import org.assertj.core.description.TextDescription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
public class TestLMDBHitQueue {
public class TestHugePqHitQueue {
public static final int NUM_HITS = 1024;
private LLTempLMDBEnv env;
private SafeCloseable lmdbQueue;
private LLTempHugePqEnv env;
private SafeCloseable hugePqQueue;
private TestingPriorityQueue testingPriorityQueue;
@ -54,14 +53,14 @@ public class TestLMDBHitQueue {
}
}
private static void assertEqualsScoreDoc(ScoreDoc expected, ScoreDoc actual) {
Assertions.assertEquals(toLLScoreDoc(expected), toLLScoreDoc(actual));
private static void assertEqualsScoreDoc(Description description, ScoreDoc expected, ScoreDoc actual) {
org.assertj.core.api.Assertions.assertThat(toLLScoreDoc(expected)).as(description).isEqualTo(toLLScoreDoc(actual));
}
private static void assertEqualsScoreDoc(List<ScoreDoc> expected, List<ScoreDoc> actual) {
Assertions.assertEquals(expected.size(), actual.size());
var list1 = expected.iterator();
var list2 = actual.iterator();
Assertions.assertEquals(expected.size(), actual.size());
while (list1.hasNext() && list2.hasNext()) {
Assertions.assertFalse(lessThan(list1.next(), list2.next()));
}
@ -69,75 +68,58 @@ public class TestLMDBHitQueue {
@BeforeEach
public void beforeEach() throws IOException {
this.env = new LLTempLMDBEnv();
var lmdbQueue = new LMDBPriorityQueue<ScoreDoc>(env, new LMDBSortedCodec<>() {
this.env = new LLTempHugePqEnv();
var hugePqQueue = new HugePqPriorityQueue<ScoreDoc>(env, new HugePqCodec<>() {
@Override
public ByteBuf serialize(Function<Integer, ByteBuf> allocator, ScoreDoc data) {
public Buffer serialize(Function<Integer, Buffer> allocator, ScoreDoc data) {
var buf = allocator.apply(Float.BYTES + Integer.BYTES + Integer.BYTES);
buf.writerOffset(Float.BYTES + Integer.BYTES + Integer.BYTES);
setScore(buf, data.score);
setDoc(buf, data.doc);
setShardIndex(buf, data.shardIndex);
buf.writerIndex(Float.BYTES + Integer.BYTES + Integer.BYTES);
return buf.asReadOnly();
return buf;
}
@Override
public ScoreDoc deserialize(ByteBuf buf) {
public ScoreDoc deserialize(Buffer buf) {
return new ScoreDoc(getDoc(buf), getScore(buf), getShardIndex(buf));
}
@Override
public int compare(ScoreDoc hitA, ScoreDoc hitB) {
return compareScoreDoc(hitA, hitB);
private static float getScore(Buffer hit) {
return HugePqCodec.getLexFloat(hit, 0, false);
}
private static int getDoc(Buffer hit) {
return HugePqCodec.getLexInt(hit, Float.BYTES, true);
}
private static int getShardIndex(Buffer hit) {
return HugePqCodec.getLexInt(hit, Float.BYTES + Integer.BYTES, false);
}
private static void setScore(Buffer hit, float score) {
HugePqCodec.setLexFloat(hit, 0, false, score);
}
private static void setDoc(Buffer hit, int doc) {
HugePqCodec.setLexInt(hit, Float.BYTES, true, doc);
}
private static void setShardIndex(Buffer hit, int shardIndex) {
HugePqCodec.setLexInt(hit, Float.BYTES + Integer.BYTES, false, shardIndex);
}
@Override
public int compareDirect(ByteBuf hitA, ByteBuf hitB) {
var scoreA = getScore(hitA);
var scoreB = getScore(hitB);
if (scoreA == scoreB) {
var docA = getDoc(hitA);
var docB = getDoc(hitB);
if (docA == docB) {
return Integer.compare(getShardIndex(hitA), getShardIndex(hitB));
} else {
return Integer.compare(docB, docA);
}
} else {
return Float.compare(scoreA, scoreB);
}
}
private static float getScore(ByteBuf hit) {
return hit.getFloat(0);
}
private static int getDoc(ByteBuf hit) {
return hit.getInt(Float.BYTES);
}
private static int getShardIndex(ByteBuf hit) {
return hit.getInt(Float.BYTES + Integer.BYTES);
}
private static void setScore(ByteBuf hit, float score) {
hit.setFloat(0, score);
}
private static void setDoc(ByteBuf hit, int doc) {
hit.setInt(Float.BYTES, doc);
}
private static void setShardIndex(ByteBuf hit, int shardIndex) {
hit.setInt(Float.BYTES + Integer.BYTES, shardIndex);
public ScoreDoc clone(ScoreDoc obj) {
return new ScoreDoc(obj.doc, obj.score, obj.shardIndex);
}
});
this.lmdbQueue = lmdbQueue;
this.hugePqQueue = hugePqQueue;
PriorityQueueAdaptor<ScoreDoc> hitQueue = new PriorityQueueAdaptor<>(new HitQueue(NUM_HITS, false));
Assertions.assertEquals(0, lmdbQueue.size());
Assertions.assertEquals(0, hugePqQueue.size());
Assertions.assertEquals(0, hitQueue.size());
this.testingPriorityQueue = new TestingPriorityQueue(hitQueue, lmdbQueue);
this.testingPriorityQueue = new TestingPriorityQueue(hitQueue, hugePqQueue);
}
@Test
@ -154,7 +136,7 @@ public class TestLMDBHitQueue {
public void testAddSingle() {
var item = new ScoreDoc(0, 0, 0);
testingPriorityQueue.add(item);
assertEqualsScoreDoc(item, testingPriorityQueue.top());
assertEqualsScoreDoc(new TextDescription("top value of %s", testingPriorityQueue), item, testingPriorityQueue.top());
}
@Test
@ -163,7 +145,7 @@ public class TestLMDBHitQueue {
var item = new ScoreDoc(i, i >> 1, -1);
testingPriorityQueue.addUnsafe(item);
}
assertEqualsScoreDoc(new ScoreDoc(1, 0, -1), testingPriorityQueue.top());
assertEqualsScoreDoc(new TextDescription("top value of %s", testingPriorityQueue), new ScoreDoc(1, 0, -1), testingPriorityQueue.top());
}
@Test
@ -189,7 +171,7 @@ public class TestLMDBHitQueue {
var item = new ScoreDoc(0, 0, 0);
testingPriorityQueue.addUnsafe(item);
testingPriorityQueue.remove(new ScoreDoc(2, 0, 0));
assertEqualsScoreDoc(item, testingPriorityQueue.top());
assertEqualsScoreDoc(new TextDescription("top value of %s", testingPriorityQueue), item, testingPriorityQueue.top());
}
@Test
@ -206,7 +188,7 @@ public class TestLMDBHitQueue {
testingPriorityQueue.addUnsafe(item);
}
testingPriorityQueue.removeUnsafe(toRemove);
assertEqualsScoreDoc(top, testingPriorityQueue.top());
assertEqualsScoreDoc(new TextDescription("top value of %s", testingPriorityQueue), top, testingPriorityQueue.top());
}
@Test
@ -223,7 +205,7 @@ public class TestLMDBHitQueue {
testingPriorityQueue.addUnsafe(item);
}
testingPriorityQueue.removeUnsafe(new ScoreDoc(0, 0, -1));
assertEqualsScoreDoc(top, testingPriorityQueue.top());
assertEqualsScoreDoc(new TextDescription("top value of %s", testingPriorityQueue), top, testingPriorityQueue.top());
}
@Test
@ -232,89 +214,94 @@ public class TestLMDBHitQueue {
for (int i = 0; i < 1000; i++) {
sortedNumbers.add(new ScoreDoc(i, i >> 1, -1));
}
sortedNumbers.sort(TestLMDBHitQueue::compareScoreDoc);
sortedNumbers.sort(TestHugePqHitQueue::compareScoreDoc);
var shuffledNumbers = new ArrayList<>(sortedNumbers);
Collections.shuffle(shuffledNumbers, new Random(1000));
org.assertj.core.api.Assertions.assertThat(testingPriorityQueue.size()).isEqualTo(0);
for (ScoreDoc scoreDoc : shuffledNumbers) {
testingPriorityQueue.addUnsafe(scoreDoc);
}
org.assertj.core.api.Assertions.assertThat(testingPriorityQueue.size()).isEqualTo(sortedNumbers.size());
var newSortedNumbers = new ArrayList<ScoreDoc>();
ScoreDoc popped;
while ((popped = testingPriorityQueue.popUnsafe()) != null) {
newSortedNumbers.add(popped);
}
org.assertj.core.api.Assertions.assertThat(testingPriorityQueue.size()).isEqualTo(0);
assertEqualsScoreDoc(sortedNumbers, newSortedNumbers);
}
@AfterEach
public void afterEach() throws IOException {
lmdbQueue.close();
assertEquals(0, env.countUsedDbs());
hugePqQueue.close();
env.close();
}
private static class TestingPriorityQueue implements PriorityQueue<ScoreDoc> {
private final PriorityQueue<ScoreDoc> referenceQueue;
private final PriorityQueue<ScoreDoc> testQueue;
private final PriorityQueue<ScoreDoc> myQueue;
public TestingPriorityQueue(PriorityQueue<ScoreDoc> referenceQueue, PriorityQueue<ScoreDoc> testQueue) {
public TestingPriorityQueue(PriorityQueue<ScoreDoc> referenceQueue, PriorityQueue<ScoreDoc> myQueue) {
this.referenceQueue = referenceQueue;
this.testQueue = testQueue;
this.myQueue = myQueue;
}
@Override
public void add(ScoreDoc element) {
referenceQueue.add(element);
testQueue.add(element);
myQueue.add(element);
ensureEquality();
}
public void addUnsafe(ScoreDoc element) {
referenceQueue.add(element);
testQueue.add(element);
myQueue.add(element);
}
@Override
public ScoreDoc top() {
var top1 = referenceQueue.top();
var top2 = testQueue.top();
assertEqualsScoreDoc(top1, top2);
var top2 = myQueue.top();
assertEqualsScoreDoc(new TextDescription("top value of %s", myQueue), top1, top2);
return top2;
}
public ScoreDoc topUnsafe() {
var top1 = referenceQueue.top();
var top2 = testQueue.top();
var top2 = myQueue.top();
return top2;
}
@Override
public ScoreDoc pop() {
var top1 = referenceQueue.pop();
var top2 = testQueue.pop();
assertEqualsScoreDoc(top1, top2);
var top2 = myQueue.pop();
assertEqualsScoreDoc(new TextDescription("top value of %s", myQueue), top1, top2);
return top2;
}
public ScoreDoc popUnsafe() {
var top1 = referenceQueue.pop();
var top2 = testQueue.pop();
var top2 = myQueue.pop();
return top2;
}
@Override
public void replaceTop(ScoreDoc newTop) {
referenceQueue.replaceTop(newTop);
testQueue.replaceTop(newTop);
myQueue.replaceTop(newTop);
}
@Override
public long size() {
var size1 = referenceQueue.size();
var size2 = testQueue.size();
var size2 = myQueue.size();
Assertions.assertEquals(size1, size2);
return size2;
}
@ -322,20 +309,20 @@ public class TestLMDBHitQueue {
@Override
public void clear() {
referenceQueue.clear();
testQueue.clear();
myQueue.clear();
}
@Override
public boolean remove(ScoreDoc element) {
var removed1 = referenceQueue.remove(element);
var removed2 = testQueue.remove(element);
Assertions.assertEquals(removed1, removed2);
return removed2;
var removedRef = referenceQueue.remove(element);
var removedMy = myQueue.remove(element);
Assertions.assertEquals(removedRef, removedMy);
return removedMy;
}
public boolean removeUnsafe(ScoreDoc element) {
var removed1 = referenceQueue.remove(element);
var removed2 = testQueue.remove(element);
var removed2 = myQueue.remove(element);
return removed2;
}
@ -344,7 +331,7 @@ public class TestLMDBHitQueue {
//noinspection BlockingMethodInNonBlockingContext
var it1 = referenceQueue.iterate().collectList().blockOptional().orElseThrow();
//noinspection BlockingMethodInNonBlockingContext
var it2 = testQueue.iterate().collectList().blockOptional().orElseThrow();
var it2 = myQueue.iterate().collectList().blockOptional().orElseThrow();
assertEqualsScoreDoc(it1, it2);
return Flux.fromIterable(it2);
}
@ -352,18 +339,18 @@ public class TestLMDBHitQueue {
@Override
public void close() {
referenceQueue.close();
testQueue.close();
myQueue.close();
}
private void ensureEquality() {
Assertions.assertEquals(referenceQueue.size(), testQueue.size());
Assertions.assertEquals(referenceQueue.size(), myQueue.size());
var referenceQueueElements = Lists.newArrayList(referenceQueue
.iterate()
.map(TestLMDBHitQueue::toLLScoreDoc)
.map(TestHugePqHitQueue::toLLScoreDoc)
.toIterable());
var testQueueElements = Lists.newArrayList(testQueue
var testQueueElements = Lists.newArrayList(myQueue
.iterate()
.map(TestLMDBHitQueue::toLLScoreDoc)
.map(TestHugePqHitQueue::toLLScoreDoc)
.toIterable());
Assertions.assertEquals(referenceQueueElements, testQueueElements);
}

View File

@ -15,7 +15,7 @@ import it.cavallium.dbengine.client.Sort;
import it.cavallium.dbengine.client.query.current.data.MatchAllDocsQuery;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher;
@ -42,7 +42,7 @@ import reactor.util.function.Tuples;
public class TestLuceneIndex {
private final Logger log = LogManager.getLogger(this.getClass());
private static LLTempLMDBEnv ENV;
private static LLTempHugePqEnv ENV;
private TestAllocator allocator;
private TempDb tempDb;
@ -55,7 +55,7 @@ public class TestLuceneIndex {
@BeforeAll
public static void beforeAll() throws IOException {
ENV = new LLTempLMDBEnv();
ENV = new LLTempHugePqEnv();
}
@BeforeEach

View File

@ -29,7 +29,7 @@ import it.cavallium.dbengine.client.query.current.data.Term;
import it.cavallium.dbengine.client.query.current.data.TermQuery;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher;
@ -70,7 +70,7 @@ import reactor.util.function.Tuples;
public class TestLuceneSearches {
private static final Logger log = LogManager.getLogger(TestLuceneSearches.class);
private static LLTempLMDBEnv ENV;
private static LLTempHugePqEnv ENV;
private static final MemoryTemporaryDbGenerator TEMP_DB_GENERATOR = new MemoryTemporaryDbGenerator();
private static TestAllocator allocator;
@ -112,8 +112,7 @@ public class TestLuceneSearches {
tempDb = Objects.requireNonNull(TEMP_DB_GENERATOR.openTempDb(allocator).block(), "TempDB");
luceneSingle = tempDb.luceneSingle();
luceneMulti = tempDb.luceneMulti();
ENV = new LLTempLMDBEnv();
assertEquals(0, ENV.countUsedDbs());
ENV = new LLTempHugePqEnv();
setUpIndex(true);
setUpIndex(false);
@ -206,18 +205,15 @@ public class TestLuceneSearches {
@BeforeEach
public void beforeEach() {
assertEquals(0, ENV.countUsedDbs());
}
@AfterEach
public void afterEach() {
assertEquals(0, ENV.countUsedDbs());
}
@AfterAll
public static void afterAll() throws IOException {
TEMP_DB_GENERATOR.closeTempDb(tempDb).block();
assertEquals(0, ENV.countUsedDbs());
ENV.close();
ensureNoLeaks(allocator.allocator(), true, false);
destroyAllocator(allocator);