Various bugfixes

This commit is contained in:
Andrea Cavalli 2022-03-11 17:59:46 +01:00
parent 16f6025b30
commit 4a2d143135
12 changed files with 229 additions and 27 deletions

View File

@ -148,7 +148,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.35</version>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
@ -565,7 +565,7 @@
<annotationProcessorPath>
<groupId>io.soabase.record-builder</groupId>
<artifactId>record-builder-processor</artifactId>
<version>1.19</version>
<version>32</version>
</annotationProcessorPath>
</annotationProcessorPaths>
<annotationProcessors>

View File

@ -1,10 +1,8 @@
package it.cavallium.dbengine.client;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
@ -14,19 +12,17 @@ import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@ -136,7 +132,13 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@Override
public Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query) {
return this
.search(ClientQueryParams.builder().snapshot(snapshot).query(query).limit(0).build())
.search(ClientQueryParams
.builder()
.snapshot(snapshot)
.query(query)
.timeout(Duration.ofSeconds(30))
.limit(0)
.build())
.single()
.map(searchResultKeys -> {
try (searchResultKeys) {

View File

@ -27,6 +27,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
@ -39,6 +42,9 @@ import reactor.util.function.Tuple2;
*/
public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> {
private static final Logger LOG = LogManager.getLogger(DatabaseMapDictionary.class);
private final AtomicLong totalZeroBytesErrors = new AtomicLong();
private final Serializer<U> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary,
@ -70,6 +76,20 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private void deserializeValue(Send<Buffer> valueToReceive, SynchronousSink<U> sink) {
try (var value = valueToReceive.receive()) {
sink.next(valueSerializer.deserialize(value));
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
LOG.error("Unexpected zero-bytes value in column "
+ dictionary.getDatabaseName() + ":" + dictionary.getColumnName()
+ " total=" + totalZeroBytesErrors
);
}
sink.complete();
} else {
sink.error(ex);
}
} catch (Throwable ex) {
sink.error(ex);
}
@ -445,7 +465,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
assert suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix = deserializeSuffix(keyBuf);
assert serializedEntry.getValueUnsafe() != null;

View File

@ -17,9 +17,7 @@ import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.logging.log4j.LogManager;
@ -34,7 +32,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
ResourceSupport<DatabaseStage<Object2ObjectSortedMap<T, U>>, DatabaseMapDictionaryDeep<T, U, US>> implements
DatabaseStageMap<T, U, US> {
private static final Logger logger = LogManager.getLogger(DatabaseMapDictionaryDeep.class);
private static final Logger LOG = LogManager.getLogger(DatabaseMapDictionaryDeep.class);
private static final Drop<DatabaseMapDictionaryDeep<?, ?, ?>> DROP = new Drop<>() {
@Override
@ -44,21 +42,21 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
obj.range.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
LOG.error("Failed to close range", ex);
}
try {
if (obj.keyPrefix != null) {
obj.keyPrefix.close();
}
} catch (Throwable ex) {
logger.error("Failed to close keyPrefix", ex);
LOG.error("Failed to close keyPrefix", ex);
}
try {
if (obj.onClose != null) {
obj.onClose.run();
}
} catch (Throwable ex) {
logger.error("Failed to close onClose", ex);
LOG.error("Failed to close onClose", ex);
}
}

View File

@ -18,6 +18,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@ -28,15 +29,16 @@ import reactor.core.publisher.SynchronousSink;
public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, DatabaseSingle<U>> implements
DatabaseStageEntry<U> {
private static final Logger logger = LogManager.getLogger(DatabaseSingle.class);
private static final Logger LOG = LogManager.getLogger(DatabaseSingle.class);
private final AtomicLong totalZeroBytesErrors = new AtomicLong();
private static final Drop<DatabaseSingle<?>> DROP = new Drop<>() {
@Override
public void drop(DatabaseSingle<?> obj) {
try {
obj.key.close();
} catch (Throwable ex) {
logger.error("Failed to close key", ex);
LOG.error("Failed to close key", ex);
}
if (obj.onClose != null) {
obj.onClose.run();
@ -87,6 +89,15 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
deserializedValue = serializer.deserialize(valueBuf);
}
sink.next(deserializedValue);
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
LOG.error("Unexpected zero-bytes value in column "
+ dictionary.getDatabaseName() + ":" + dictionary.getColumnName());
sink.complete();
} else {
sink.error(ex);
}
} catch (SerializationException ex) {
sink.error(ex);
}

View File

@ -78,7 +78,7 @@ public class LLLocalDictionary implements LLDictionary {
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean PREFER_SEEK_TO_FIRST = true;
/**
* It used to be false,
* now it's true to avoid crashes during iterations on completely corrupted files

View File

@ -41,7 +41,6 @@ import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AccessHint;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
@ -51,7 +50,6 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
@ -60,7 +58,6 @@ import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
@ -545,10 +542,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
this.db = RocksDB.open(options, dbPathString, descriptors, handles);
for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) {
handles.add(db.createColumnFamily(columnFamilyDescriptor));
}
this.db = RocksDB.open(new DBOptions(options).setCreateMissingColumnFamilies(true),
dbPathString,
descriptors,
handles
);
flushAndCloseDb(db, handles);
this.db = null;

View File

@ -27,6 +27,7 @@ import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.directory.Lucene90CodecWithNoFieldCompression;
import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
@ -133,7 +134,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.directory = LuceneUtils.createLuceneDirectory(luceneOptions.directoryOptions(),
LuceneUtils.getStandardName(clusterName, shardIndex),
rocksDBManager);
//boolean compressCodec = !luceneOptions.directoryOptions().isStorageCompressed();
boolean isFilesystemCompressed = LuceneUtils.getIsFilesystemCompressed(luceneOptions.directoryOptions());
this.shardName = clusterName;
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
@ -173,6 +174,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
writerSchedulerMaxThreadCount = concurrentMergeScheduler.getMaxThreadCount();
mergeScheduler = concurrentMergeScheduler;
}
if (isFilesystemCompressed) {
indexWriterConfig.setUseCompoundFile(false);
indexWriterConfig.setCodec(new Lucene90CodecWithNoFieldCompression());
}
logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount);
indexWriterConfig.setMergeScheduler(mergeScheduler);
if (luceneOptions.indexWriterRAMBufferSizeMB().isPresent()) {

View File

@ -36,6 +36,7 @@ import it.cavallium.dbengine.rpc.current.data.MemoryMappedFSDirectory;
import it.cavallium.dbengine.rpc.current.data.NIOFSDirectory;
import it.cavallium.dbengine.rpc.current.data.NRTCachingDirectory;
import it.cavallium.dbengine.rpc.current.data.RocksDBSharedDirectory;
import it.cavallium.dbengine.rpc.current.data.RocksDBStandaloneDirectory;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
@ -616,7 +617,15 @@ public class LuceneUtils {
directoryName,
rocksDBSharedDirectory.blockSize()
);
} else {
} else if (directoryOptions instanceof RocksDBStandaloneDirectory rocksDBStandaloneDirectory) {
var dbInstance = rocksDBManager.getOrCreate(rocksDBStandaloneDirectory.managedPath());
return new RocksdbDirectory(rocksDBManager.getAllocator(),
dbInstance.db(),
dbInstance.handles(),
directoryName,
rocksDBStandaloneDirectory.blockSize()
);
}else {
throw new UnsupportedOperationException("Unsupported directory: " + directoryName + ", " + directoryOptions);
}
}
@ -632,6 +641,8 @@ public class LuceneUtils {
return Optional.of(niofsDirectory.managedPath());
} else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {
return getManagedPath(nrtCachingDirectory.delegate());
} else if (directoryOptions instanceof RocksDBStandaloneDirectory rocksDBStandaloneDirectory) {
return Optional.of(rocksDBStandaloneDirectory.managedPath());
} else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) {
return Optional.of(rocksDBSharedDirectory.managedPath());
} else {
@ -639,6 +650,26 @@ public class LuceneUtils {
}
}
public static boolean getIsFilesystemCompressed(LuceneDirectoryOptions directoryOptions) {
if (directoryOptions instanceof ByteBuffersDirectory) {
return false;
} else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) {
return getIsFilesystemCompressed(directIOFSDirectory.delegate());
} else if (directoryOptions instanceof MemoryMappedFSDirectory) {
return false;
} else if (directoryOptions instanceof NIOFSDirectory) {
return false;
} else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {
return getIsFilesystemCompressed(nrtCachingDirectory.delegate());
} else if (directoryOptions instanceof RocksDBStandaloneDirectory) {
return true;
} else if (directoryOptions instanceof RocksDBSharedDirectory) {
return true;
} else {
throw new UnsupportedOperationException("Unsupported directory: " + directoryOptions);
}
}
public static IntList intListTo(int to) {
var il = new IntArrayList(to);
for (int i = 0; i < to; i++) {

View File

@ -0,0 +1,21 @@
package it.cavallium.dbengine.lucene.directory;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.lucene90.Lucene90Codec;
import org.apache.lucene.codecs.lucene90.Lucene90StoredFieldsFormat;
public final class Lucene90CodecWithNoFieldCompression extends FilterCodec {
private final StoredFieldsFormat storedFieldsFormat;
public Lucene90CodecWithNoFieldCompression() {
super("Lucene410CodecWithNoFieldCompression", new Lucene90Codec());
storedFieldsFormat = new Lucene90NoCompressionStoredFieldsFormat();
}
@Override
public StoredFieldsFormat storedFieldsFormat() {
return storedFieldsFormat;
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package it.cavallium.dbengine.lucene.directory;
import java.io.IOException;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
public class Lucene90NoCompressionStoredFieldsFormat extends StoredFieldsFormat {
public static final CompressionMode DUMMY = new CompressionMode() {
@Override
public Compressor newCompressor() {
return DUMMY_COMPRESSOR;
}
@Override
public Decompressor newDecompressor() {
return DUMMY_DECOMPRESSOR;
}
@Override
public String toString() {
return "DUMMY";
}
};
private static final Decompressor DUMMY_DECOMPRESSOR = new Decompressor() {
@Override
public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes)
throws IOException {
assert offset + length <= originalLength;
if (bytes.bytes.length < originalLength) {
bytes.bytes = new byte[ArrayUtil.oversize(originalLength, 1)];
}
in.readBytes(bytes.bytes, 0, offset + length);
bytes.offset = offset;
bytes.length = length;
}
@Override
public Decompressor clone() {
return this;
}
};
private static final Compressor DUMMY_COMPRESSOR = new Compressor() {
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
out.writeBytes(bytes, off, len);
}
@Override
public void close() {
}
};
public Lucene90NoCompressionStoredFieldsFormat() {
}
@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context)
throws IOException {
return impl().fieldsReader(directory, si, fn, context);
}
@Override
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException {
return impl().fieldsWriter(directory, si, context);
}
StoredFieldsFormat impl() {
return new Lucene90CompressingStoredFieldsFormat("Lucene90StoredFieldsFastData",
DUMMY,
BEST_SPEED_BLOCK_LENGTH,
1024,
10
);
}
// Shoot for 10 sub blocks of 8kB each.
private static final int BEST_SPEED_BLOCK_LENGTH = 10 * 8 * 1024;
}

View File

@ -0,0 +1 @@
it.cavallium.dbengine.lucene.directory.Lucene90CodecWithNoFieldCompression