Fix crashes

This commit is contained in:
Andrea Cavalli 2024-10-24 03:25:42 +02:00
parent 4e2e1e8c8e
commit 4f4533d434

View File

@ -1135,27 +1135,27 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
throw ex;
}
}, res -> Flux.<T, RocksIterator>generate(() -> {
if (!reverse) {
res.it.seekToFirst();
} else {
res.it.seekToLast();
}
return res.it;
if (!reverse) {
res.it.seekToFirst();
} else {
res.it.seekToLast();
}
return res.it;
}, (it, sink) -> {
if (!it.isValid()) {
sink.complete();
} else {
var calculatedKey = toMemorySegment(arena, it.key());
var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
//noinspection unchecked
sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue));
if (!reverse) {
res.it.next();
} else {
res.it.prev();
}
}
return it;
if (!it.isValid()) {
sink.complete();
} else {
var calculatedKey = toMemorySegment(arena, it.key());
var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL;
if (!reverse) {
res.it.next();
} else {
res.it.prev();
}
//noinspection unchecked
sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue));
}
return it;
}), Resources::close)
.subscribeOn(Schedulers.boundedElastic())
.doFirst(ops::beginOp)