package it.cavallium.dbengine.database.disk; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.warp.commonutils.concurrency.atomicity.NotAtomic; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; @NotAtomic public class LLLocalDictionary implements LLDictionary { private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = true; static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); private static final byte[] NO_DATA = new byte[0]; private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); private static final List EMPTY_UNMODIFIABLE_LIST = List.of(); private final RocksDB db; private final ColumnFamilyHandle cfh; private final String databaseName; private final Function snapshotResolver; public LLLocalDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle, String databaseName, Function snapshotResolver) { Objects.requireNonNull(db); this.db = db; Objects.requireNonNull(columnFamilyHandle); this.cfh = columnFamilyHandle; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; } @Override public String getDatabaseName() { return databaseName; } private ReadOptions resolveSnapshot(LLSnapshot snapshot) { if (snapshot != null) { return getReadOptions(snapshotResolver.apply(snapshot)); } else { return EMPTY_READ_OPTIONS; } } private ReadOptions getReadOptions(Snapshot snapshot) { if (snapshot != null) { return new ReadOptions().setSnapshot(snapshot); } else { return EMPTY_READ_OPTIONS; } } @Override public Optional get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { try { Holder data = new Holder<>(); if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { if (data.getValue() != null) { return Optional.of(data.getValue()); } else { byte[] value = db.get(cfh, resolveSnapshot(snapshot), key); return Optional.ofNullable(value); } } else { return Optional.empty(); } } catch (RocksDBException e) { throw new IOException(e); } } @Override public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { return contains_(snapshot, key); } private boolean contains_(@Nullable LLSnapshot snapshot, byte[] key) throws IOException { try { int size = RocksDB.NOT_FOUND; Holder data = new Holder<>(); if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { if (data.getValue() != null) { size = data.getValue().length; } else { size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA); } } return size != RocksDB.NOT_FOUND; } catch (RocksDBException e) { throw new IOException(e); } } @Override public Optional put(byte[] key, byte[] value, LLDictionaryResultType resultType) throws IOException { try { byte[] response = null; switch (resultType) { case VALUE_CHANGED: response = LLUtils.booleanToResponse(!contains_(null, key)); break; case PREVIOUS_VALUE: var data = new Holder(); if (db.keyMayExist(cfh, key, data)) { if (data.getValue() != null) { response = data.getValue(); } else { response = db.get(cfh, key); } } else { response = null; } break; } db.put(cfh, key, value); return Optional.ofNullable(response); } catch (RocksDBException e) { throw new IOException(e); } } @Override public void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType, Consumer responsesConsumer) throws IOException { if (key.length == value.length) { List responses; try (WriteBatch writeBatch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { if (resultType == LLDictionaryResultType.VOID) { responses = EMPTY_UNMODIFIABLE_LIST; } else { responses = db.multiGetAsList(newCfhList(cfh, key.length), Arrays.asList(key)); } for (int i = 0; i < key.length; i++) { writeBatch.put(cfh, key[i], value[i]); } db.write(BATCH_WRITE_OPTIONS, writeBatch); } catch (RocksDBException e) { throw new IOException(e); } for (byte[] response : responses) { responsesConsumer.accept(response); } } else { throw new IOException("Wrong parameters count"); } } private static List newCfhList(ColumnFamilyHandle cfh, int size) { var list = new ArrayList(size); for (int i = 0; i < size; i++) { list.add(cfh); } return list; } @Override public Optional remove(byte[] key, LLDictionaryResultType resultType) throws IOException { try { byte[] response = null; switch (resultType) { case VALUE_CHANGED: response = LLUtils.booleanToResponse(contains_(null, key)); break; case PREVIOUS_VALUE: var data = new Holder(); if (db.keyMayExist(cfh, key, data)) { if (data.getValue() != null) { response = data.getValue(); } else { response = db.get(cfh, key); } } else { response = null; } break; } db.delete(cfh, key); return Optional.ofNullable(response); } catch (RocksDBException e) { throw new IOException(e); } } //todo: implement parallel forEach @Override public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer) { try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { iter.seekToFirst(); while (iter.isValid()) { consumer.accept(iter.key(), iter.value()); iter.next(); } } } //todo: implement parallel replace @Override public void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException { try { try (var snapshot = replaceKeys ? db.getSnapshot() : null) { try (RocksIterator iter = db.newIterator(cfh, getReadOptions(snapshot)); CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) { iter.seekToFirst(); if (replaceKeys) { while (iter.isValid()) { writeBatch.delete(cfh, iter.key()); iter.next(); } } iter.seekToFirst(); while (iter.isValid()) { var result = consumer.apply(iter.key(), iter.value()); boolean keyDiffers = !Arrays.equals(iter.key(), result.getKey()); if (!replaceKeys && keyDiffers) { throw new IOException("Tried to replace a key"); } // put if changed or if keys can be swapped/replaced if (replaceKeys || !Arrays.equals(iter.value(), result.getValue())) { writeBatch.put(cfh, result.getKey(), result.getValue()); } iter.next(); } writeBatch.writeToDbAndClose(); } finally { db.releaseSnapshot(snapshot); } } } catch (RocksDBException e) { throw new IOException(e); } } // This method is exactly the same of LLLocalDictionary. Remember to keep the code equal @Override public void clear() throws IOException { try (RocksIterator iter = db.newIterator(cfh); CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) { iter.seekToFirst(); while (iter.isValid()) { writeBatch.delete(cfh, iter.key()); iter.next(); } writeBatch.writeToDbAndClose(); // Compact range db.compactRange(cfh); db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh); db.flushWal(true); var finalSize = exactSize(null); if (finalSize != 0) { throw new IllegalStateException("The dictionary is not empty after calling clear()"); } } catch (RocksDBException e) { throw new IOException(e); } } @Override public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException { return fast ? fastSize(snapshot) : exactSize(snapshot); } public long fastSize(@Nullable LLSnapshot snapshot) { var rocksdbSnapshot = resolveSnapshot(snapshot); if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { try { return db.getLongProperty(cfh, "rocksdb.estimate-num-keys"); } catch (RocksDBException e) { e.printStackTrace(); return 0; } } else { long count = 0; try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) { iter.seekToFirst(); // If it's a fast size of a snapshot, count only up to 1000 elements while (iter.isValid() && count < 1000) { count++; iter.next(); } return count; } } } public long exactSize(@Nullable LLSnapshot snapshot) { long count = 0; try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { iter.seekToFirst(); while (iter.isValid()) { count++; iter.next(); } return count; } } @Override public boolean isEmpty(@Nullable LLSnapshot snapshot) { try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) { iter.seekToFirst(); if (iter.isValid()) { return false; } } return true; } @Override public Optional> removeOne() throws IOException { try (RocksIterator iter = db.newIterator(cfh)) { iter.seekToFirst(); if (iter.isValid()) { byte[] key = iter.key(); byte[] value = iter.value(); db.delete(cfh, key); return Optional.of(Map.entry(key, value)); } } catch (RocksDBException e) { throw new IOException(e); } return Optional.empty(); } }