This commit is contained in:
Andrea Cavalli 2021-05-03 02:57:08 +02:00
parent 4aa18fcd60
commit 04df5f4a36

View File

@ -1005,22 +1005,34 @@ public class LLLocalDictionary implements LLDictionary {
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
var opts = new ReadOptions(EMPTY_READ_OPTIONS); var opts = new ReadOptions(EMPTY_READ_OPTIONS);
ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
setIterateBound(opts, IterateBound.LOWER, range.getMin().retain()); minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = emptyReleasableSlice();
} }
if (range.hasMax()) { try {
setIterateBound(opts, IterateBound.UPPER, range.getMax().retain()); ReleasableSlice maxBound;
} if (range.hasMax()) {
try (RocksIterator it = db.newIterator(cfh, opts)) { maxBound = setIterateBound(opts, IterateBound.UPPER, range.getMax().retain());
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(it, range.getMin().retain());
} else { } else {
it.seekToFirst(); maxBound = emptyReleasableSlice();
} }
while (it.isValid()) { try (RocksIterator it = db.newIterator(cfh, opts)) {
db.delete(cfh, it.key()); if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
it.next(); rocksIterSeekTo(it, range.getMin().retain());
} else {
it.seekToFirst();
}
while (it.isValid()) {
db.delete(cfh, it.key());
it.next();
}
} finally {
maxBound.release();
} }
} finally {
minBound.release();
} }
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db, try (var batch = new CappedWriteBatch(db,
@ -1146,72 +1158,84 @@ public class LLLocalDictionary implements LLDictionary {
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range) private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
throws RocksDBException { throws RocksDBException {
var readOpts = getReadOptions(null); try {
readOpts.setFillCache(false); var readOpts = getReadOptions(null);
ReleasableSlice minBound; readOpts.setFillCache(false);
if (range.hasMin()) { ReleasableSlice minBound;
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); if (range.hasMin()) {
} else { minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else { } else {
rocksIterator.seekToFirst(); minBound = emptyReleasableSlice();
} }
while (rocksIterator.isValid()) { try {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)); ReleasableSlice maxBound;
rocksIterator.next(); if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
rocksIterator.next();
}
} finally {
maxBound.release();
}
} finally {
minBound.release();
} }
} finally { } finally {
minBound.release();
maxBound.release();
range.release(); range.release();
} }
} }
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, LLRange range) private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, LLRange range)
throws RocksDBException { throws RocksDBException {
var readOpts = getReadOptions(null); try {
readOpts.setFillCache(false); var readOpts = getReadOptions(null);
ReleasableSlice minBound; readOpts.setFillCache(false);
if (range.hasMin()) { ReleasableSlice minBound;
var arr = LLUtils.toArray(range.getMin()); if (range.hasMin()) {
var minSlice = new Slice(arr); var arr = LLUtils.toArray(range.getMin());
readOpts.setIterateLowerBound(minSlice); var minSlice = new Slice(arr);
minBound = new ReleasableSlice(minSlice, null, arr); readOpts.setIterateLowerBound(minSlice);
} else { minBound = new ReleasableSlice(minSlice, null, arr);
minBound = emptyReleasableSlice();
}
ReleasableSlice maxBound;
if (range.hasMax()) {
var arr = LLUtils.toArray(range.getMax());
var maxSlice = new Slice(arr);
readOpts.setIterateUpperBound(maxSlice);
maxBound = new ReleasableSlice(maxSlice, null, arr);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(LLUtils.toArray(range.getMin()));
} else { } else {
rocksIterator.seekToFirst(); minBound = emptyReleasableSlice();
} }
while (rocksIterator.isValid()) { try {
writeBatch.delete(cfh, rocksIterator.key()); ReleasableSlice maxBound;
rocksIterator.next(); if (range.hasMax()) {
var arr = LLUtils.toArray(range.getMax());
var maxSlice = new Slice(arr);
readOpts.setIterateUpperBound(maxSlice);
maxBound = new ReleasableSlice(maxSlice, null, arr);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(LLUtils.toArray(range.getMin()));
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
}
} finally {
maxBound.release();
}
} finally {
minBound.release();
} }
} finally { } finally {
minBound.release();
maxBound.release();
range.release(); range.release();
} }
} }
@ -1350,33 +1374,36 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = emptyReleasableSlice();
}
try { try {
if (fast) { ReleasableSlice maxBound;
readOpts.setIgnoreRangeDeletions(true); if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(cfh, readOpts)) { try {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (fast) {
rocksIterSeekTo(rocksIterator, range.getMin().retain()); readOpts.setIgnoreRangeDeletions(true);
} else {
rocksIterator.seekToFirst();
} }
long i = 0; try (var rocksIterator = db.newIterator(cfh, readOpts)) {
while (rocksIterator.isValid()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.next(); rocksIterSeekTo(rocksIterator, range.getMin().retain());
i++; } else {
rocksIterator.seekToFirst();
}
long i = 0;
while (rocksIterator.isValid()) {
rocksIterator.next();
i++;
}
return i;
} }
return i; } finally {
maxBound.release();
} }
} finally { } finally {
minBound.release(); minBound.release();
maxBound.release();
} }
}) })
.onErrorMap(cause -> new IOException("Failed to get size of range " .onErrorMap(cause -> new IOException("Failed to get size of range "
@ -1397,36 +1424,39 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
ReleasableSlice maxBound; try {
if (range.hasMax()) { ReleasableSlice maxBound;
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); if (range.hasMax()) {
} else { maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else { } else {
rocksIterator.seekToFirst(); maxBound = emptyReleasableSlice();
} }
if (rocksIterator.isValid()) { try (var rocksIterator = db.newIterator(cfh, readOpts)) {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
try { rocksIterSeekTo(rocksIterator, range.getMin().retain());
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); } else {
try { rocksIterator.seekToFirst();
return Map.entry(key.retain(), value.retain());
} finally {
value.release();
}
} finally {
key.release();
} }
} else { if (rocksIterator.isValid()) {
return null; ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
try {
return Map.entry(key.retain(), value.retain());
} finally {
value.release();
}
} finally {
key.release();
}
} else {
return null;
}
} finally {
maxBound.release();
} }
} finally { } finally {
minBound.release(); minBound.release();
maxBound.release();
} }
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
@ -1444,28 +1474,31 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
ReleasableSlice maxBound; try {
if (range.hasMax()) { ReleasableSlice maxBound;
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); if (range.hasMax()) {
} else { maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else { } else {
rocksIterator.seekToFirst(); maxBound = emptyReleasableSlice();
} }
ByteBuf key; try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (rocksIterator.isValid()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); rocksIterSeekTo(rocksIterator, range.getMin().retain());
return key; } else {
} else { rocksIterator.seekToFirst();
return null; }
ByteBuf key;
if (rocksIterator.isValid()) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
return key;
} else {
return null;
}
} finally {
maxBound.release();
} }
} finally { } finally {
minBound.release(); minBound.release();
maxBound.release();
} }
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
@ -1584,28 +1617,31 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
ReleasableSlice maxBound; try {
if (range.hasMax()) { ReleasableSlice maxBound;
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); if (range.hasMax()) {
} else { maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = emptyReleasableSlice();
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else { } else {
rocksIterator.seekToFirst(); maxBound = emptyReleasableSlice();
} }
if (!rocksIterator.isValid()) { try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
return null; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
if (!rocksIterator.isValid()) {
return null;
}
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
dbDelete(cfh, null, key);
return Map.entry(key, value);
} finally {
maxBound.release();
} }
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
dbDelete(cfh, null, key);
return Map.entry(key, value);
} finally { } finally {
minBound.release(); minBound.release();
maxBound.release();
} }
}) })
.onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))