Avoid crashes during sst errors

This commit is contained in:
Andrea Cavalli 2024-09-25 14:58:38 +02:00
parent 77f3e012ce
commit 4e14ae77db
2 changed files with 5 additions and 2 deletions

View File

@ -440,6 +440,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try { try {
var cf = new CompletableFuture<Void>(); var cf = new CompletableFuture<Void>();
batchPublisher.subscribe(new Subscriber<>() { batchPublisher.subscribe(new Subscriber<>() {
private boolean stopped;
private Subscription subscription; private Subscription subscription;
private ColumnInstance col; private ColumnInstance col;
private ArrayList<AutoCloseable> refs; private ArrayList<AutoCloseable> refs;
@ -476,6 +477,9 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
@Override @Override
public void onNext(KVBatch kvBatch) { public void onNext(KVBatch kvBatch) {
if (stopped) {
return;
}
var keyIt = kvBatch.keys().iterator(); var keyIt = kvBatch.keys().iterator();
var valueIt = kvBatch.values().iterator(); var valueIt = kvBatch.values().iterator();
try (var arena = Arena.ofConfined()) { try (var arena = Arena.ofConfined()) {
@ -516,6 +520,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
private void doFinally() { private void doFinally() {
stopped = true;
for (int i = refs.size() - 1; i >= 0; i--) { for (int i = refs.size() - 1; i >= 0; i--) {
try { try {
var c = refs.get(i); var c = refs.get(i);

View File

@ -85,7 +85,6 @@ public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInst
public void put(byte[] key, byte[] value) throws RocksDBException { public void put(byte[] key, byte[] value) throws RocksDBException {
try { try {
checkOwningHandle();
sstFileWriter.put(key, value); sstFileWriter.put(key, value);
} catch (org.rocksdb.RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e); throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e);
@ -94,7 +93,6 @@ public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInst
public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException { public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
try { try {
checkOwningHandle();
sstFileWriter.put(key, value); sstFileWriter.put(key, value);
} catch (org.rocksdb.RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e); throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e);