Fix error printing, stable column names

This commit is contained in:
Andrea Cavalli 2024-10-24 02:07:12 +02:00
parent 8fa0a94b62
commit 16f8b8e43f
2 changed files with 55 additions and 42 deletions

View File

@ -176,19 +176,29 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
* Do not try to register a column that may already be registered * Do not try to register a column that may already be registered
*/ */
private long registerColumn(@NotNull ColumnInstance column) { private long registerColumn(@NotNull ColumnInstance column) {
try { synchronized (columnEditLock) {
var columnName = new String(column.cfh().getName(), StandardCharsets.UTF_8); try {
long id = FastRandomUtils.allocateNewValue(this.columns, column, 1, Long.MAX_VALUE); var columnName = new String(column.cfh().getName(), StandardCharsets.UTF_8);
Long previous = this.columnNamesIndex.putIfAbsent(columnName, id); long hashCode = columnName.hashCode();
if (previous != null) { long id;
//noinspection resource if (this.columnNamesIndex.get(columnName) == hashCode) {
this.columns.remove(id); id = hashCode;
throw new UnsupportedOperationException("Column already registered!"); } else if (this.columns.get(hashCode) == null) {
id = hashCode;
} else {
id = FastRandomUtils.allocateNewValue(this.columns, column, 1, Long.MAX_VALUE);
}
Long previous = this.columnNamesIndex.putIfAbsent(columnName, id);
if (previous != null) {
//noinspection resource
this.columns.remove(id);
throw new UnsupportedOperationException("Column already registered!");
}
logger.info("Registered column: " + column);
return id;
} catch (org.rocksdb.RocksDBException e) {
throw new RuntimeException(e);
} }
logger.info("Registered column: " + column);
return id;
} catch (org.rocksdb.RocksDBException e) {
throw new RuntimeException(e);
} }
} }
@ -197,35 +207,37 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
* Do not try to unregister a column that may already be unregistered, or that may not be registered * Do not try to unregister a column that may already be unregistered, or that may not be registered
*/ */
private ColumnInstance unregisterColumn(long id) { private ColumnInstance unregisterColumn(long id) {
var col = this.columns.remove(id); synchronized (columnEditLock) {
Objects.requireNonNull(col, () -> "Column does not exist: " + id); var col = this.columns.remove(id);
String name; Objects.requireNonNull(col, () -> "Column does not exist: " + id);
try { String name;
name = new String(col.cfh().getName(), StandardCharsets.UTF_8); try {
} catch (org.rocksdb.RocksDBException e) { name = new String(col.cfh().getName(), StandardCharsets.UTF_8);
throw new RuntimeException(e); } catch (org.rocksdb.RocksDBException e) {
} throw new RuntimeException(e);
// Unregister the column name from the index avoiding race conditions }
int retries = 0; // Unregister the column name from the index avoiding race conditions
while (this.columnNamesIndex.remove(name) == null && retries++ < 5_000) { int retries = 0;
Thread.yield(); while (this.columnNamesIndex.remove(name) == null && retries++ < 5_000) {
} Thread.yield();
if (retries >= 5000) { }
throw new IllegalStateException("Can't find column in column names index: " + name); if (retries >= 5000) {
} throw new IllegalStateException("Can't find column in column names index: " + name);
}
ColumnFamilyOptions columnConfig; ColumnFamilyOptions columnConfig;
while ((columnConfig = this.columnsConifg.remove(name)) == null && retries++ < 5_000) { while ((columnConfig = this.columnsConifg.remove(name)) == null && retries++ < 5_000) {
Thread.yield(); Thread.yield();
} }
if (columnConfig != null) { if (columnConfig != null) {
columnConfig.close(); columnConfig.close();
} }
if (retries >= 5000) { if (retries >= 5000) {
throw new IllegalStateException("Can't find column in column names index: " + name); throw new IllegalStateException("Can't find column in column names index: " + name);
} }
return col; return col;
}
} }
@Override @Override

View File

@ -542,8 +542,9 @@ public class GrpcServer extends Server {
private <T> Function<Flux<T>, Flux<T>> onErrorMapFluxWithRequestInfo(String requestName, Message request) { private <T> Function<Flux<T>, Flux<T>> onErrorMapFluxWithRequestInfo(String requestName, Message request) {
return flux -> flux.onErrorResume(throwable -> { return flux -> flux.onErrorResume(throwable -> {
var ex = handleError(throwable).asException(); var ex = handleError(throwable).asException();
if (ex.getStatus().getCode() == Code.INTERNAL) { if (ex.getStatus().getCode() == Code.INTERNAL && !(throwable instanceof RocksDBException)) {
LOG.error("Unexpected internal error during request: {}", request, ex); LOG.error("Unexpected internal error during request \"{}\": {}", requestName, request.toString(), ex);
return Mono.error(RocksDBException.of(RocksDBErrorType.INTERNAL_ERROR, ex.getCause()));
} }
return Mono.error(ex); return Mono.error(ex);
}); });
@ -552,7 +553,7 @@ public class GrpcServer extends Server {
private <T> Function<Mono<T>, Mono<T>> onErrorMapMonoWithRequestInfo(String requestName, Message request) { private <T> Function<Mono<T>, Mono<T>> onErrorMapMonoWithRequestInfo(String requestName, Message request) {
return flux -> flux.onErrorResume(throwable -> { return flux -> flux.onErrorResume(throwable -> {
var ex = handleError(throwable).asException(); var ex = handleError(throwable).asException();
if (ex.getStatus().getCode() == Code.INTERNAL) { if (ex.getStatus().getCode() == Code.INTERNAL && !(throwable instanceof RocksDBException)) {
LOG.error("Unexpected internal error during request \"{}\": {}", requestName, request.toString(), ex); LOG.error("Unexpected internal error during request \"{}\": {}", requestName, request.toString(), ex);
return Mono.error(RocksDBException.of(RocksDBErrorType.INTERNAL_ERROR, ex.getCause())); return Mono.error(RocksDBException.of(RocksDBErrorType.INTERNAL_ERROR, ex.getCause()));
} }