Add temp sst path

This commit is contained in:
Andrea Cavalli 2024-09-25 12:53:05 +02:00
parent d4f6f1097b
commit 77f3e012ce
5 changed files with 38 additions and 151 deletions

View File

@ -33,6 +33,9 @@ public interface GlobalDatabaseConfig {
@Nullable @Nullable
Path walPath() throws GestaltException; Path walPath() throws GestaltException;
@Nullable
Path tempSstPath() throws GestaltException;
@Nullable @Nullable
Duration delayWalFlushDuration() throws GestaltException; Duration delayWalFlushDuration() throws GestaltException;

View File

@ -24,6 +24,7 @@ import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -64,6 +65,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private final SafeShutdown ops; private final SafeShutdown ops;
private final Object columnEditLock = new Object(); private final Object columnEditLock = new Object();
private final DatabaseConfig config; private final DatabaseConfig config;
private Path tempSSTsPath;
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException { public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
this.path = path; this.path = path;
@ -79,7 +81,12 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.db = loadedDb.db(); this.db = loadedDb.db();
this.dbOptions = loadedDb.dbOptions(); this.dbOptions = loadedDb.dbOptions();
this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap(); this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap();
var existingColumnSchemasColumnDescriptorOptional = db try {
this.tempSSTsPath = config.global().tempSstPath();
} catch (GestaltException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Can't get wal path");
}
var existingColumnSchemasColumnDescriptorOptional = db
.getStartupColumns() .getStartupColumns()
.entrySet() .entrySet()
.stream() .stream()
@ -556,7 +563,10 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
columnConifg = null; columnConifg = null;
refs = null; refs = null;
} }
return SSTWriter.open(db, col, columnConifg, forceNoOptions, ingestBehind, refs); if (Files.notExists(tempSSTsPath)) {
Files.createDirectories(tempSSTsPath);
}
return SSTWriter.open(tempSSTsPath, db, col, columnConifg, forceNoOptions, ingestBehind, refs);
} catch (IOException ex) { } catch (IOException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex);
} catch (RocksDBException ex) { } catch (RocksDBException ex) {

View File

@ -20,7 +20,7 @@ public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInst
private static final Logger LOG = LoggerFactory.getLogger(SSTWriter.class); private static final Logger LOG = LoggerFactory.getLogger(SSTWriter.class);
public static SSTWriter open(TransactionalDB db, ColumnInstance col, ColumnFamilyOptions columnConifg, boolean forceNoOptions, boolean ingestBehind, RocksDBObjects refs) throws IOException, org.rocksdb.RocksDBException { public static SSTWriter open(Path tempSSTsPath, TransactionalDB db, ColumnInstance col, ColumnFamilyOptions columnConifg, boolean forceNoOptions, boolean ingestBehind, RocksDBObjects refs) throws IOException, org.rocksdb.RocksDBException {
if (refs == null) { if (refs == null) {
refs = new RocksDBObjects(); refs = new RocksDBObjects();
} }
@ -56,15 +56,19 @@ public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInst
.setCompressionPerLevel(columnConifg.compressionPerLevel()) .setCompressionPerLevel(columnConifg.compressionPerLevel())
.setNumLevels(columnConifg.numLevels()) .setNumLevels(columnConifg.numLevels())
.setTableFormatConfig(columnConifg.tableFormatConfig()) .setTableFormatConfig(columnConifg.tableFormatConfig())
.setMemTableConfig(columnConifg.memTableConfig())
.setTargetFileSizeBase(columnConifg.targetFileSizeBase()) .setTargetFileSizeBase(columnConifg.targetFileSizeBase())
.setTargetFileSizeMultiplier(columnConifg.targetFileSizeMultiplier()) .setTargetFileSizeMultiplier(columnConifg.targetFileSizeMultiplier())
.setMaxOpenFiles(-1); .setMaxOpenFiles(-1);
if (columnConifg.memTableConfig() != null) {
options.setMemTableConfig(columnConifg.memTableConfig());
} else {
options.setMemTableConfig(new SkipListMemTableConfig());
}
} }
} }
Path tempFile; Path tempFile;
try { try {
var tempDir = Path.of(db.getPath()).resolve(".temp_sst"); var tempDir = tempSSTsPath;
if (Files.notExists(tempDir)) { if (Files.notExists(tempDir)) {
Files.createDirectories(tempDir); Files.createDirectories(tempDir);
} }

View File

@ -70,6 +70,8 @@ database: {
log-path: ./logs log-path: ./logs
# Write-Ahead-Log data path # Write-Ahead-Log data path
wal-path: ./wal wal-path: ./wal
# Write-Ahead-Log data path
temp-sst-path: ./temp_sst
# If set and greater than zero, the WAL will not be flushed on every write, but every x seconds # If set and greater than zero, the WAL will not be flushed on every write, but every x seconds
delay-wal-flush-duration: PT5S delay-wal-flush-duration: PT5S
fallback-column-options: { fallback-column-options: {

View File

@ -18,8 +18,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -29,162 +32,19 @@ public class TestSSTWriter {
private EmbeddedDB db; private EmbeddedDB db;
private long colId; private long colId;
private Path tempSstPath;
@BeforeEach @BeforeEach
public void setUp() throws IOException { public void setUp() throws IOException {
db = new EmbeddedDB(null, "test", null); db = new EmbeddedDB(null, "test", null);
this.colId = db.createColumn("test", ColumnSchema.of(IntList.of(Long.BYTES), ObjectList.of(), true)); this.colId = db.createColumn("test", ColumnSchema.of(IntList.of(Long.BYTES), ObjectList.of(), true));
this.tempSstPath = Files.createTempDirectory("tempssts");
} }
@Test @Test
public void test() throws IOException { public void test() throws IOException {
LOG.info("Obtaining sst writer"); LOG.info("Obtaining sst writer");
var globalDatabaseConfigOverride = new GlobalDatabaseConfig() { try (var sstWriter = db.getSSTWriter(colId, null, null, true, false)) {
@Override
public boolean spinning() {
return false;
}
@Override
public boolean checksum() {
return false;
}
@Override
public boolean useDirectIo() {
return false;
}
@Override
public boolean allowRocksdbMemoryMapping() {
return true;
}
@Override
public @Nullable Integer maximumOpenFiles() {
return -1;
}
@Override
public boolean optimistic() {
return true;
}
@Override
public @Nullable DataSize blockCache() {
return new DataSize("10MiB");
}
@Override
public @Nullable DataSize writeBufferManager() {
return new DataSize("1MiB");
}
@Override
public @Nullable Path logPath() {
return null;
}
@Override
public @Nullable Path walPath() {
return null;
}
@Override
public @Nullable Duration delayWalFlushDuration() {
return null;
}
@Override
public boolean absoluteConsistency() {
return false;
}
@Override
public boolean ingestBehind() {
return true;
}
@Override
public boolean unorderedWrite() {
return false;
}
@Override
public VolumeConfig[] volumes() {
return new VolumeConfig[0];
}
@Override
public FallbackColumnConfig fallbackColumnOptions() {
return null;
}
@Override
public NamedColumnConfig[] columnOptions() {
return new NamedColumnConfig[0];
}
};
var fallbackColumnConfig = new FallbackColumnConfig() {
@Override
public ColumnLevelConfig[] levels() {
return new ColumnLevelConfig[] {
new ColumnLevelConfig() {
@Override
public CompressionType compression() {
return CompressionType.NO_COMPRESSION;
}
@Override
public DataSize maxDictBytes() {
return DataSize.ZERO;
}
}
};
}
@Override
public @Nullable DataSize memtableMemoryBudgetBytes() {
return new DataSize("1MiB");
}
@Override
public @Nullable Boolean cacheIndexAndFilterBlocks() {
return true;
}
@Override
public @Nullable Boolean partitionFilters() {
return false;
}
@Override
public @Nullable BloomFilterConfig bloomFilter() {
return new BloomFilterConfig() {
@Override
public int bitsPerKey() {
return 10;
}
@Override
public @Nullable Boolean optimizeForHits() {
return true;
}
};
}
@Override
public @Nullable DataSize blockSize() {
return new DataSize("128KiB");
}
@Override
public @Nullable DataSize writeBufferSize() {
return new DataSize("1MiB");
}
};
try (var sstWriter = db.getSSTWriter(colId, globalDatabaseConfigOverride, fallbackColumnConfig, false, true)) {
LOG.info("Creating sst"); LOG.info("Creating sst");
var tl = ThreadLocalRandom.current(); var tl = ThreadLocalRandom.current();
var bytes = new byte[1024]; var bytes = new byte[1024];
@ -204,5 +64,13 @@ public class TestSSTWriter {
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
db.close(); db.close();
Files.walkFileTree(tempSstPath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.deleteIfExists(file);
return FileVisitResult.CONTINUE;
}
});
Files.deleteIfExists(tempSstPath);
} }
} }