Execute all events in a dedicated executor
This commit is contained in:
parent
4d46a32bb4
commit
215ba242fd
@ -102,8 +102,8 @@ public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInst
|
||||
@Override
|
||||
public void writePending() throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
try {
|
||||
checkOwningHandle();
|
||||
try (this) {
|
||||
checkOwningHandle();
|
||||
sstFileWriter.finish();
|
||||
try (var ingestOptions = new IngestExternalFileOptions()) {
|
||||
ingestOptions
|
||||
|
@ -290,25 +290,29 @@ public class GrpcServer extends Server {
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
state = State.RECEIVED_ALL;
|
||||
doFinally();
|
||||
if (putBatchInputsSubscriber != null) {
|
||||
putBatchInputsSubscriber.onError(throwable);
|
||||
} else {
|
||||
serverCallStreamObserver.onError(throwable);
|
||||
}
|
||||
sstExecutor.execute(() -> {
|
||||
state = State.RECEIVED_ALL;
|
||||
doFinally();
|
||||
if (putBatchInputsSubscriber != null) {
|
||||
putBatchInputsSubscriber.onError(throwable);
|
||||
} else {
|
||||
serverCallStreamObserver.onError(throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (state == State.BEFORE_INITIAL_REQUEST) {
|
||||
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
|
||||
} else if (state == State.RECEIVING_DATA) {
|
||||
state = State.RECEIVED_ALL;
|
||||
checkCompleted(false);
|
||||
} else {
|
||||
putBatchInputsSubscriber.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, "Unknown state during onComplete: " + state));
|
||||
}
|
||||
sstExecutor.execute(() -> {
|
||||
if (state == State.BEFORE_INITIAL_REQUEST) {
|
||||
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
|
||||
} else if (state == State.RECEIVING_DATA) {
|
||||
state = State.RECEIVED_ALL;
|
||||
checkCompleted(false);
|
||||
} else {
|
||||
putBatchInputsSubscriber.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, "Unknown state during onComplete: " + state));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void checkCompleted(boolean requestDone) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user