strangedb/src/main/java/it/cavallium/strangedb/database/DatabaseBlocksMetadataCache...

106 lines
3.4 KiB
Java

package it.cavallium.strangedb.database;
import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import it.cavallium.strangedb.BlockInfo;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class DatabaseBlocksMetadataCache {
private static final int GOOD_CACHE_SIZE = 70000;
private static final int MAX_CACHE_SIZE = 100000;
private final Long2ObjectMap<BlockInfo> blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE);
private final Object readAccessLock = new Object();
private final Object writeAccessLock = new Object();
private final DatabaseBlocksMetadataCacheFlusher flusher;
private volatile boolean closed;
public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) {
this.flusher = flusher;
}
public BlockInfo get(long block) throws IOException {
if (closed) throw new IOException("Cache already closed!");
synchronized (readAccessLock) {
return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO);
}
}
public void put(long block, BlockInfo blockInfo) throws IOException {
if (closed) return;
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
blocks2Info.put(block, blockInfo);
}
}
this.flush();
}
private void flush() throws IOException {
if (closed) return;
int blocks2InfoSize = blocks2Info.size();
if (blocks2InfoSize > MAX_CACHE_SIZE) {
synchronized (writeAccessLock) {
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (blocks2InfoSize > GOOD_CACHE_SIZE) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
entriesIterator.remove();
if (entriesToFlush.size() >= 1000) {
executeAsyncFlush(entriesToFlush);
}
blocks2InfoSize--;
}
executeAsyncFlush(entriesToFlush);
}
}
}
public void close() throws IOException {
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
if (!closed) {
closed = true;
int blocks2InfoSize = blocks2Info.size();
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (blocks2InfoSize > 0) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
entriesIterator.remove();
if (entriesToFlush.size() >= 1000) {
executeAsyncFlush(entriesToFlush);
}
blocks2InfoSize--;
}
executeAsyncFlush(entriesToFlush);
}
}
}
}
private void executeAsyncFlush(List<Future<?>> entriesToFlush) throws IOException {
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
entriesToFlush.clear();
}
}
}