This commit is contained in:
Andrea Cavalli 2021-05-03 12:29:15 +02:00
parent 04df5f4a36
commit 7588cd3219
3 changed files with 30 additions and 10 deletions

View File

@ -127,11 +127,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
ByteBuf keySuffixBuf = serializeSuffix(keySuffix); ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keyBuf = toKey(keySuffixBuf.retain()); ByteBuf keyBuf = toKey(keySuffixBuf.retain());
ByteBuf valueBuf = serialize(value); ByteBuf valueBuf = serialize(value);
return dictionary.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID).doFinally(s -> { return dictionary
.put(keyBuf.retain(), valueBuf.retain(), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.doFinally(s -> {
keyBuf.release(); keyBuf.release();
keySuffixBuf.release(); keySuffixBuf.release();
valueBuf.release(); valueBuf.release();
}).then(); })
.then();
} }
@Override @Override
@ -201,7 +205,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return Mono return Mono
.using( .using(
() -> toKey(serializeSuffix(keySuffix)), () -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary.remove(keyBuf.retain(), LLDictionaryResultType.VOID).then(), keyBuf -> dictionary
.remove(keyBuf.retain(), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then(),
ReferenceCounted::release ReferenceCounted::release
); );
} }
@ -335,6 +342,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} else if (range.isSingle()) { } else if (range.isSingle()) {
return dictionary return dictionary
.remove(range.getSingle().retain(), LLDictionaryResultType.VOID) .remove(range.getSingle().retain(), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then(); .then();
} else { } else {
return dictionary return dictionary

View File

@ -537,6 +537,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return Mono return Mono
.defer(() -> dictionary .defer(() -> dictionary
.remove(range.getSingle().retain(), LLDictionaryResultType.VOID) .remove(range.getSingle().retain(), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
) )
.then(); .then();
} else { } else {

View File

@ -81,11 +81,19 @@ public class LLLocalDictionary implements LLDictionary {
/** /**
* Default: true. Use false to debug problems with write batches. * Default: true. Use false to debug problems with write batches.
*/ */
static final boolean USE_WRITE_BATCHES_IN_SET_RANGE = false; static final boolean USE_WRITE_BATCHES_IN_PUT_MULTI = true;
/**
* Default: true. Use false to debug problems with write batches.
*/
static final boolean USE_WRITE_BATCHES_IN_SET_RANGE = true;
/** /**
* Default: true. Use false to debug problems with capped write batches. * Default: true. Use false to debug problems with capped write batches.
*/ */
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = true; static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = true;
/**
* Default: true. Use false to debug problems with write batches deletes.
*/
static final boolean USE_WRITE_BATCH_IN_SET_RANGE_DELETE = false;
static final boolean PARALLEL_EXACT_SIZE = true; static final boolean PARALLEL_EXACT_SIZE = true;
private static final int STRIPES = 512; private static final int STRIPES = 512;
@ -774,7 +782,7 @@ public class LLLocalDictionary implements LLDictionary {
stamps = null; stamps = null;
} }
try { try {
if (USE_WRITE_BATCHES_IN_SET_RANGE) { if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db, var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
@ -1003,7 +1011,7 @@ public class LLLocalDictionary implements LLDictionary {
if (USE_WINDOW_IN_SET_RANGE) { if (USE_WINDOW_IN_SET_RANGE) {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
var opts = new ReadOptions(EMPTY_READ_OPTIONS); var opts = new ReadOptions(EMPTY_READ_OPTIONS);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
@ -1149,6 +1157,7 @@ public class LLLocalDictionary implements LLDictionary {
) )
.then(entries .then(entries
.flatMap(entry -> this.put(entry.getKey(), entry.getValue(), LLDictionaryResultType.VOID)) .flatMap(entry -> this.put(entry.getKey(), entry.getValue(), LLDictionaryResultType.VOID))
.doOnNext(ReferenceCounted::release)
.then(Mono.<Void>empty()) .then(Mono.<Void>empty())
) )
.onErrorMap(cause -> new IOException("Failed to write range", cause)) .onErrorMap(cause -> new IOException("Failed to write range", cause))
@ -1156,6 +1165,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
//todo: this is broken, check why
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range) private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
throws RocksDBException { throws RocksDBException {
try { try {
@ -1181,7 +1191,8 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)); var b = LLUtils.toArray(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
writeBatch.delete(cfh, b);
rocksIterator.next(); rocksIterator.next();
} }
} finally { } finally {