Unify read options

This commit is contained in:
Andrea Cavalli 2022-03-19 16:36:59 +01:00
parent bbc77df56b
commit e866241ff1
4 changed files with 37 additions and 19 deletions

View File

@ -9,16 +9,12 @@ import io.netty5.buffer.api.AllocatorControl;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.CompositeBuffer; import io.netty5.buffer.api.CompositeBuffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.WritableComponent;
import io.netty5.buffer.api.bytebuffer.ByteBufferMemoryManager;
import io.netty5.buffer.api.internal.Statics; import io.netty5.buffer.api.internal.Statics;
import io.netty5.buffer.api.unsafe.UnsafeMemoryManager;
import io.netty5.util.IllegalReferenceCountException; import io.netty5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta; import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta;
@ -28,7 +24,6 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.lucene.RandomSortField;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -38,7 +33,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -67,6 +61,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks; import reactor.core.publisher.Hooks;
@ -717,6 +712,33 @@ public class LLUtils {
} }
} }
public static boolean isClosedRange(LLRange rangeShared) {
return rangeShared.hasMin() && rangeShared.hasMax();
}
/**
* Generate a copy of the passed ReadOptions, with some parameters modified to help with bulk iterations
* @param readOptions the read options to copy
* @param canFillCache true to fill the cache. If closedRange is false, this field will be ignored
* @param closedRange true if the range is closed
* @return a new instance of ReadOptions
*/
public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions, boolean canFillCache, boolean closedRange) {
if (readOptions != null) {
readOptions = new ReadOptions(readOptions);
} else {
readOptions = new ReadOptions();
}
if (!closedRange) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(false);
readOptions.setVerifyChecksums(false);
} else {
readOptions.setFillCache(canFillCache);
}
return readOptions;
}
@Deprecated @Deprecated
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}

View File

@ -1,6 +1,8 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
import static it.cavallium.dbengine.database.LLUtils.isClosedRange;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Drop;
@ -86,8 +88,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
public final Flux<List<T>> flux() { public final Flux<List<T>> flux() {
return Flux return Flux
.generate(() -> { .generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = generateCustomReadOptions(this.readOptions, true, isClosedRange(range));
readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax());
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
} }

View File

@ -1,6 +1,8 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
import static it.cavallium.dbengine.database.LLUtils.isClosedRange;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Drop;
@ -78,11 +80,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
public Flux<Send<Buffer>> flux() { public Flux<Send<Buffer>> flux() {
return Flux.generate(() -> { return Flux.generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = generateCustomReadOptions(this.readOptions, canFillCache, isClosedRange(rangeShared));
if (!rangeShared.hasMin() || !rangeShared.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache);
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
} }

View File

@ -1,6 +1,8 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
import static it.cavallium.dbengine.database.LLUtils.isClosedRange;
import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator; import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
@ -77,12 +79,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
public final Flux<T> flux() { public final Flux<T> flux() {
return Flux.generate(() -> { return Flux.generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = generateCustomReadOptions(this.readOptions, true, isClosedRange(rangeShared));
if (!rangeShared.hasMin() || !rangeShared.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(false);
readOptions.setVerifyChecksums(false);
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
} }