180 lines
6.2 KiB
Java
180 lines
6.2 KiB
Java
package it.cavallium.strangedb.database.references;
|
|
|
|
import it.unimi.dsi.fastutil.longs.*;
|
|
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
|
import it.unimi.dsi.fastutil.objects.ObjectIterator;
|
|
|
|
import java.io.IOException;
|
|
import java.util.HashSet;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.concurrent.*;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
|
|
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ;
|
|
|
|
public class DatabaseReferencesMetadataCache {
|
|
|
|
private static final int BASE_QUANTITY = (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500);
|
|
private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY;
|
|
private static final int FLUSH_CACHE_SIZE = 300 * BASE_QUANTITY;
|
|
private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY;
|
|
private static final int MODIFIED_FLUSH_CACHE_SIZE = 160 * BASE_QUANTITY;
|
|
private static final int MODIFIED_MAX_CACHE_SIZE = 260 * BASE_QUANTITY;
|
|
|
|
private final Long2ObjectMap<ReferenceInfo> referencesInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f));
|
|
private final Long2ObjectMap<ReferenceInfo> modifiedInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MODIFIED_MAX_CACHE_SIZE, 0.5f));
|
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
|
|
private final DatabaseReferencesMetadataCacheFlusher flusher;
|
|
private volatile boolean closed;
|
|
ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "References Flush Thread"));
|
|
|
|
public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) {
|
|
this.flusher = flusher;
|
|
}
|
|
|
|
public ReferenceInfo getInfo(long reference) throws IOException {
|
|
if (closed) throw new IOException("Cache already closed!");
|
|
lock.readLock().lock();
|
|
try {
|
|
return referencesInfos.getOrDefault(reference, NONEXISTENT_REFERENCE_INFO);
|
|
} finally {
|
|
lock.readLock().unlock();
|
|
}
|
|
}
|
|
|
|
public void put(long reference, ReferenceInfo info, boolean modified) throws IOException {
|
|
if (closed) throw new IOException("Cache already closed!");
|
|
lock.writeLock().lock();
|
|
try {
|
|
if (info.getCleanerId() == 0) {
|
|
throw new IOException("Null cleaner id");
|
|
}
|
|
referencesInfos.put(reference, info);
|
|
if (modified) {
|
|
modifiedInfos.put(reference, info);
|
|
}
|
|
flush();
|
|
} finally {
|
|
lock.writeLock().unlock();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Put all items in cache
|
|
* @param references
|
|
* @param infos
|
|
* @throws IOException
|
|
*/
|
|
public void putAll(long[] references, ReferenceInfo[] infos) throws IOException {
|
|
if (closed) throw new IOException("Cache already closed!");
|
|
lock.writeLock().lock();
|
|
try {
|
|
Long2ObjectMap<ReferenceInfo> referencesBlocksToAdd = new Long2ObjectLinkedOpenHashMap<>(references.length, 0.5f);
|
|
for (int i = 0; i < references.length; i++) {
|
|
if (infos[i].getCleanerId() != 0) {
|
|
referencesBlocksToAdd.put(references[i], infos[i]);
|
|
}
|
|
}
|
|
referencesInfos.putAll(referencesBlocksToAdd);
|
|
flush();
|
|
} finally {
|
|
lock.writeLock().unlock();
|
|
}
|
|
}
|
|
|
|
private void flush() throws IOException {
|
|
if (closed) return;
|
|
int readOnlyInfoSize = referencesInfos.size();
|
|
int modifiedInfoSize = modifiedInfos.size();
|
|
if (readOnlyInfoSize >= FLUSH_CACHE_SIZE || modifiedInfoSize >= MODIFIED_FLUSH_CACHE_SIZE) {
|
|
ObjectIterator<Long2ObjectMap.Entry<ReferenceInfo>> entriesIterator = modifiedInfos.long2ObjectEntrySet().iterator();
|
|
long startId = -1;
|
|
long prevId = -1;
|
|
LinkedList<ReferenceInfo> prevInfos = new LinkedList<>();
|
|
for (int i = 0; i < modifiedInfoSize; i++) {
|
|
Long2ObjectMap.Entry<ReferenceInfo> entry = entriesIterator.next();
|
|
Long refId = entry.getLongKey();
|
|
ReferenceInfo info = entry.getValue();
|
|
|
|
if (info == null) {
|
|
throw new IOException("A reference is null");
|
|
}
|
|
|
|
if (startId == -1) {
|
|
startId = refId;
|
|
prevInfos.add(info);
|
|
} else if (prevId + 1 == refId) {
|
|
prevInfos.add(info);
|
|
} else {
|
|
flushAction(prevInfos, startId);
|
|
|
|
startId = refId;
|
|
prevInfos.clear();
|
|
prevInfos.add(info);
|
|
}
|
|
prevId = refId;
|
|
}
|
|
modifiedInfos.clear();
|
|
flushAction(prevInfos, startId);
|
|
}
|
|
if (readOnlyInfoSize >= FLUSH_CACHE_SIZE) {
|
|
ObjectIterator<Long2ObjectMap.Entry<ReferenceInfo>> entriesIterator = referencesInfos.long2ObjectEntrySet().iterator();
|
|
for (int i = 0; i < readOnlyInfoSize - GOOD_CACHE_SIZE; i++) {
|
|
entriesIterator.next();
|
|
entriesIterator.remove();
|
|
}
|
|
}
|
|
}
|
|
|
|
private void flushAction(LinkedList<ReferenceInfo> prevInfos, long startId) throws IOException {
|
|
if (prevInfos.size() > 1) {
|
|
flusher.flushMultiple(startId, prevInfos.toArray(new ReferenceInfo[0]));
|
|
} else if (prevInfos.size() == 1) {
|
|
flusher.flush(startId, prevInfos.getFirst(), false);
|
|
}
|
|
}
|
|
|
|
public void close() throws IOException {
|
|
if (!closed) {
|
|
closed = true;
|
|
lock.writeLock().lock();
|
|
try {
|
|
int references2BlocksSize = modifiedInfos.size();
|
|
ObjectIterator<Long2ObjectMap.Entry<ReferenceInfo>> entriesIterator = modifiedInfos.long2ObjectEntrySet().iterator();
|
|
@SuppressWarnings("unchecked")
|
|
ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize], references2BlocksSize);
|
|
for (int i = 0; i < references2BlocksSize; i++) {
|
|
Long2ObjectMap.Entry<ReferenceInfo> entry = entriesIterator.next();
|
|
Long refId = entry.getLongKey();
|
|
ReferenceInfo info = entry.getValue();
|
|
tasks.set(i, () -> {
|
|
try {
|
|
flusher.flush(refId, info, true);
|
|
} catch (IOException e) {
|
|
throw new CompletionException(e);
|
|
}
|
|
return null;
|
|
});
|
|
}
|
|
modifiedInfos.clear();
|
|
try {
|
|
flushExecutorService.invokeAll(tasks);
|
|
} catch (InterruptedException e) {
|
|
throw new IOException(e.getCause());
|
|
}
|
|
flushExecutorService.shutdown();
|
|
try {
|
|
if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS))
|
|
flushExecutorService.shutdownNow();
|
|
} catch (InterruptedException e) {
|
|
throw new IOException(e);
|
|
}
|
|
} finally {
|
|
lock.writeLock().unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|