Load tests, fix sst writer configurations
This commit is contained in:
parent
47c3e31f75
commit
150bff9f09
pom.xml
src
fatjar/java
main/java/it/cavallium/rockserver/core
config
impl
resources
test/java/it/cavallium/rockserver/core/impl/test
4
pom.xml
4
pom.xml
@ -10,9 +10,9 @@
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>22</maven.compiler.source>
|
||||
<maven.compiler.target>22</maven.compiler.target>
|
||||
<maven.compiler.targe>22</maven.compiler.targe>
|
||||
<native.maven.plugin.version>0.9.28</native.maven.plugin.version>
|
||||
<gestalt.version>0.32.1</gestalt.version>
|
||||
<gestalt.version>0.32.2</gestalt.version>
|
||||
<rocksdb.version>9.0.0</rocksdb.version>
|
||||
<slf4j.version>2.0.12</slf4j.version>
|
||||
<imageName>rockserver-core</imageName>
|
||||
|
@ -27,8 +27,8 @@ module rockserver.core {
|
||||
requires io.netty.codec;
|
||||
requires io.netty.codec.http2;
|
||||
requires jdk.unsupported;
|
||||
requires io.netty.transport.classes.epoll;
|
||||
requires org.reactivestreams;
|
||||
requires io.netty.transport.classes.epoll;
|
||||
requires org.reactivestreams;
|
||||
requires io.netty.transport.unix.common;
|
||||
|
||||
exports it.cavallium.rockserver.core.client;
|
||||
|
@ -4,15 +4,25 @@ import it.cavallium.rockserver.core.common.RocksDBException;
|
||||
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
|
||||
import it.cavallium.rockserver.core.impl.DataSizeDecoder;
|
||||
import it.cavallium.rockserver.core.impl.DbCompressionDecoder;
|
||||
import it.cavallium.rockserver.core.resources.DefaultConfig;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import org.github.gestalt.config.builder.GestaltBuilder;
|
||||
import org.github.gestalt.config.builder.SourceBuilder;
|
||||
import org.github.gestalt.config.decoder.ProxyDecoderMode;
|
||||
import org.github.gestalt.config.exceptions.GestaltException;
|
||||
import org.github.gestalt.config.reload.ConfigReloadStrategy;
|
||||
import org.github.gestalt.config.source.ClassPathConfigSourceBuilder;
|
||||
import org.github.gestalt.config.source.ConfigSource;
|
||||
import org.github.gestalt.config.source.FileConfigSourceBuilder;
|
||||
import org.github.gestalt.config.source.InputStreamConfigSourceBuilder;
|
||||
import org.github.gestalt.config.tag.Tags;
|
||||
import org.github.gestalt.config.utils.Pair;
|
||||
|
||||
public class ConfigParser {
|
||||
|
||||
@ -33,7 +43,9 @@ public class ConfigParser {
|
||||
|
||||
public static DatabaseConfig parse(Path configPath) {
|
||||
var parser = new ConfigParser();
|
||||
parser.addSource(configPath);
|
||||
if (configPath != null) {
|
||||
parser.addSource(configPath);
|
||||
}
|
||||
return parser.parse();
|
||||
}
|
||||
|
||||
@ -51,8 +63,11 @@ public class ConfigParser {
|
||||
|
||||
public DatabaseConfig parse() {
|
||||
try {
|
||||
gsb.addSource(ClassPathConfigSourceBuilder
|
||||
.builder().setResource("it/cavallium/rockserver/core/resources/default.conf").build());
|
||||
gsb.addSource(InputStreamConfigSourceBuilder
|
||||
.builder()
|
||||
.setConfig(DefaultConfig.getDefaultConfig())
|
||||
.setFormat("conf")
|
||||
.build());
|
||||
for (SourceBuilder<?, ?> sourceBuilder : sourceBuilders) {
|
||||
gsb.addSource(sourceBuilder.build());
|
||||
}
|
||||
|
@ -65,6 +65,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
private final SafeShutdown ops;
|
||||
private final Object columnEditLock = new Object();
|
||||
private final DatabaseConfig config;
|
||||
private final RocksDBObjects refs;
|
||||
private final @Nullable Cache cache;
|
||||
private Path tempSSTsPath;
|
||||
|
||||
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
|
||||
@ -80,6 +82,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
var loadedDb = RocksDBLoader.load(path, config, logger);
|
||||
this.db = loadedDb.db();
|
||||
this.dbOptions = loadedDb.dbOptions();
|
||||
this.refs = loadedDb.refs();
|
||||
this.cache = loadedDb.cache();
|
||||
this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap();
|
||||
try {
|
||||
this.tempSSTsPath = config.global().tempSstPath();
|
||||
@ -204,6 +208,18 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
if (retries >= 5000) {
|
||||
throw new IllegalStateException("Can't find column in column names index: " + name);
|
||||
}
|
||||
|
||||
ColumnFamilyOptions columnConfig;
|
||||
while ((columnConfig = this.columnsConifg.remove(name)) == null && retries++ < 5_000) {
|
||||
Thread.yield();
|
||||
}
|
||||
if (columnConfig != null) {
|
||||
columnConfig.close();
|
||||
}
|
||||
if (retries >= 5000) {
|
||||
throw new IllegalStateException("Can't find column in column names index: " + name);
|
||||
}
|
||||
|
||||
return col;
|
||||
}
|
||||
|
||||
@ -214,6 +230,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
ops.closeAndWait(MAX_TRANSACTION_DURATION_MS);
|
||||
columnSchemasColumnDescriptorHandle.close();
|
||||
db.close();
|
||||
refs.close();
|
||||
if (path == null) {
|
||||
Utils.deleteDirectory(db.getPath());
|
||||
}
|
||||
@ -332,12 +349,19 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
var options = RocksDBLoader.getColumnOptions(name, this.config.global(),
|
||||
logger, this.refs, path == null, cache);
|
||||
var prev = columnsConifg.put(name, options);
|
||||
if (prev != null) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL,
|
||||
"ColumnsConfig already exists with name \"" + name + "\"");
|
||||
}
|
||||
byte[] key = name.getBytes(StandardCharsets.UTF_8);
|
||||
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(key));
|
||||
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(key, options));
|
||||
byte[] value = encodeColumnSchema(schema);
|
||||
db.get().put(columnSchemasColumnDescriptorHandle, key, value);
|
||||
return registerColumn(new ColumnInstance(cf, schema));
|
||||
} catch (RocksDBException e) {
|
||||
} catch (RocksDBException | GestaltException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
|
||||
}
|
||||
}
|
||||
@ -462,7 +486,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
yield wb;
|
||||
}
|
||||
case SST_INGESTION, SST_INGEST_BEHIND -> {
|
||||
var sstWriter = getSSTWriter(columnId, null, null, false, mode == PutBatchMode.SST_INGEST_BEHIND);
|
||||
var sstWriter = getSSTWriter(columnId, null, false, mode == PutBatchMode.SST_INGEST_BEHIND);
|
||||
refs.add(sstWriter);
|
||||
yield sstWriter;
|
||||
}
|
||||
@ -549,7 +573,6 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
@VisibleForTesting
|
||||
public SSTWriter getSSTWriter(long colId,
|
||||
@Nullable GlobalDatabaseConfig globalDatabaseConfigOverride,
|
||||
@Nullable FallbackColumnConfig columnConfigOverride,
|
||||
boolean forceNoOptions,
|
||||
boolean ingestBehind) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
try {
|
||||
@ -557,11 +580,16 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
ColumnFamilyOptions columnConifg;
|
||||
RocksDBObjects refs;
|
||||
if (!forceNoOptions) {
|
||||
if (columnConfigOverride != null) {
|
||||
refs = new RocksDBObjects();
|
||||
columnConifg = RocksDBLoader.getColumnOptions(globalDatabaseConfigOverride, columnConfigOverride, logger, refs, path == null, null);
|
||||
var name = new String(col.cfh().getName(), StandardCharsets.UTF_8);
|
||||
refs = new RocksDBObjects();
|
||||
if (globalDatabaseConfigOverride != null) {
|
||||
columnConifg = RocksDBLoader.getColumnOptions(name, globalDatabaseConfigOverride, logger, refs, false, null);
|
||||
} else {
|
||||
columnConifg = columnsConifg.get(new String(col.cfh().getName(), StandardCharsets.UTF_8));
|
||||
try {
|
||||
columnConifg = RocksDBLoader.getColumnOptions(name, this.config.global(), logger, refs, false, null);
|
||||
} catch (GestaltException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, e);
|
||||
}
|
||||
refs = null;
|
||||
}
|
||||
} else {
|
||||
@ -1002,4 +1030,13 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
return path;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public TransactionalDB getDb() {
|
||||
return db;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public DatabaseConfig getConfig() {
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
@ -100,18 +100,30 @@ public class RocksDBLoader {
|
||||
}
|
||||
|
||||
public record LoadedDb(TransactionalDB db, DBOptions dbOptions,
|
||||
Map<String, ColumnFamilyOptions> definitiveColumnFamilyOptionsMap) {}
|
||||
Map<String, ColumnFamilyOptions> definitiveColumnFamilyOptionsMap, RocksDBObjects refs,
|
||||
@Nullable Cache cache) {}
|
||||
|
||||
public static ColumnFamilyOptions getColumnOptions(
|
||||
GlobalDatabaseConfig globalDatabaseConfig,
|
||||
FallbackColumnConfig columnOptions, Logger logger,
|
||||
RocksDBObjects refs,
|
||||
boolean inMemory,
|
||||
@Nullable Cache cache) {
|
||||
public static ColumnFamilyOptions getColumnOptions(String name,
|
||||
GlobalDatabaseConfig globalDatabaseConfig,
|
||||
Logger logger,
|
||||
RocksDBObjects refs,
|
||||
boolean inMemory,
|
||||
@Nullable Cache cache) {
|
||||
try {
|
||||
var columnFamilyOptions = new ColumnFamilyOptions();
|
||||
refs.add(columnFamilyOptions);
|
||||
|
||||
FallbackColumnConfig columnOptions = null;
|
||||
for (NamedColumnConfig namedColumnConfig : globalDatabaseConfig.columnOptions()) {
|
||||
if (namedColumnConfig.name().equals(name)) {
|
||||
columnOptions = namedColumnConfig;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (columnOptions == null) {
|
||||
columnOptions = globalDatabaseConfig.fallbackColumnOptions();
|
||||
}
|
||||
|
||||
//noinspection ConstantConditions
|
||||
if (columnOptions.memtableMemoryBudgetBytes() != null) {
|
||||
// about 512MB of ram will be used for level style compaction
|
||||
@ -571,12 +583,7 @@ public class RocksDBLoader {
|
||||
|
||||
for (Map.Entry<String, FallbackColumnConfig> entry : columnConfigMap.entrySet()) {
|
||||
String name = entry.getKey();
|
||||
FallbackColumnConfig columnOptions = entry.getValue();
|
||||
if (columnOptions instanceof NamedColumnConfig namedColumnConfig && !namedColumnConfig.name().equals(name)) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name);
|
||||
}
|
||||
|
||||
var columnFamilyOptions = getColumnOptions(databaseOptions.global(), columnOptions,
|
||||
var columnFamilyOptions = getColumnOptions(name, databaseOptions.global(),
|
||||
logger, refs, path == null, optionsWithCache.standardCache());
|
||||
refs.add(columnFamilyOptions);
|
||||
|
||||
@ -617,7 +624,7 @@ public class RocksDBLoader {
|
||||
|
||||
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
|
||||
var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig);
|
||||
return new LoadedDb(TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks), rocksdbOptions, definitiveColumnFamilyOptionsMap);
|
||||
return new LoadedDb(TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks), rocksdbOptions, definitiveColumnFamilyOptionsMap, refs, optionsWithCache.standardCache());
|
||||
} catch (IOException | RocksDBException ex) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
|
||||
} catch (GestaltException e) {
|
||||
|
@ -61,8 +61,6 @@ public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInst
|
||||
.setBottommostCompressionOptions(cloneCompressionOptions(columnConifg.bottommostCompressionOptions()));
|
||||
if (columnConifg.memTableConfig() != null) {
|
||||
options.setMemTableConfig(columnConifg.memTableConfig());
|
||||
} else {
|
||||
options.setMemTableConfig(new SkipListMemTableConfig());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,18 @@
|
||||
package it.cavallium.rockserver.core.resources;
|
||||
|
||||
import it.cavallium.rockserver.core.common.RocksDBException;
|
||||
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
|
||||
import java.io.InputStream;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public class DefaultConfig {
|
||||
|
||||
@NotNull
|
||||
public static InputStream getDefaultConfig() {
|
||||
var stream = DefaultConfig.class.getResourceAsStream("default.conf");
|
||||
if (stream == null) {
|
||||
throw RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Missing default config resource: default.conf");
|
||||
}
|
||||
return stream;
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package it.cavallium.rockserver.core.impl.test;
|
||||
|
||||
import it.cavallium.rockserver.core.config.ConfigParser;
|
||||
import it.cavallium.rockserver.core.config.DataSize;
|
||||
import java.nio.file.Path;
|
||||
import org.github.gestalt.config.exceptions.GestaltException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class DefaultConfigTest {
|
||||
|
||||
@Test
|
||||
public void test() throws GestaltException {
|
||||
var def = ConfigParser.parseDefault();
|
||||
var checksum = def.global().checksum();
|
||||
Assertions.assertTrue(checksum);
|
||||
var ingestBehind = def.global().ingestBehind();
|
||||
Assertions.assertFalse(ingestBehind);
|
||||
Assertions.assertEquals(Path.of("./volume"), def.global().volumes()[0].volumePath());
|
||||
Assertions.assertEquals(new DataSize("32KiB"), def.global().fallbackColumnOptions().levels()[6].maxDictBytes());
|
||||
}
|
||||
}
|
@ -14,6 +14,9 @@ import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.rocksdb.CompressionType;
|
||||
import org.rocksdb.Options;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.SstFileReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -44,7 +47,7 @@ public class TestSSTWriter {
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
LOG.info("Obtaining sst writer");
|
||||
try (var sstWriter = db.getSSTWriter(colId, null, null, true, false)) {
|
||||
try (var sstWriter = db.getSSTWriter(colId, null, true, false)) {
|
||||
LOG.info("Creating sst");
|
||||
var tl = ThreadLocalRandom.current();
|
||||
var bytes = new byte[1024];
|
||||
@ -61,6 +64,38 @@ public class TestSSTWriter {
|
||||
LOG.info("Done");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompression() throws IOException, RocksDBException, GestaltException {
|
||||
LOG.info("Obtaining sst writer");
|
||||
try (var sstWriter = db.getSSTWriter(colId, null, false, false)) {
|
||||
LOG.info("Creating sst");
|
||||
var tl = ThreadLocalRandom.current();
|
||||
var bytes = new byte[1024];
|
||||
long i = 0;
|
||||
while (i < 1_000) {
|
||||
var ib = Longs.toByteArray(i++);
|
||||
tl.nextBytes(bytes);
|
||||
sstWriter.put(ib, bytes);
|
||||
}
|
||||
LOG.info("Writing pending sst data");
|
||||
sstWriter.writePending();
|
||||
LOG.info("Done, closing");
|
||||
}
|
||||
var transactionalDB = db.getDb();
|
||||
var rocksDB = transactionalDB.get();
|
||||
var metadata = rocksDB.getLiveFilesMetaData();
|
||||
Assertions.assertEquals(1, metadata.size(), "There are more than one sst files");
|
||||
var sstMetadata = metadata.getFirst();
|
||||
var sstPath = Path.of(sstMetadata.path(), sstMetadata.fileName());
|
||||
Assertions.assertTrue(Files.exists(sstPath), "SST file does not exists");
|
||||
try (var options = new Options(); var sstReader = new SstFileReader(options)) {
|
||||
sstReader.open(sstPath.toString());
|
||||
var p = sstReader.getTableProperties();
|
||||
Assertions.assertNotEquals("snappy", p.getCompressionName().toLowerCase());
|
||||
}
|
||||
LOG.info("Done");
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws IOException {
|
||||
db.close();
|
||||
|
Loading…
x
Reference in New Issue
Block a user