strangedb-core/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache...

144 lines
4.8 KiB
Java

package it.cavallium.strangedb.database.blocks;
import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ;
public class DatabaseBlocksMetadataCache {
private static final int BASE_QUANTITY = (BLOCK_META_READS_AT_EVERY_READ < 500 ? BLOCK_META_READS_AT_EVERY_READ : 500);
private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY;
private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ;
private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ;
private final Long2ObjectMap<BlockInfo> blocks2Info = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f));
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
private final DatabaseBlocksMetadataCacheFlusher flusher;
private volatile boolean closed;
ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "Blocks Flush Thread"));
public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) {
this.flusher = flusher;
}
public BlockInfo get(long block) throws IOException {
if (closed) throw new IOException("Cache already closed!");
lock.readLock().lock();
try {
return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO);
} finally {
lock.readLock().unlock();
}
}
public void put(long block, BlockInfo blockInfo) throws IOException {
if (closed) return;
lock.writeLock().lock();
try {
blocks2Info.put(block, blockInfo);
flush();
} finally {
lock.writeLock().unlock();
}
}
@SuppressWarnings("unchecked")
public void putAll(long[] blocks, BlockInfo[] blockInfos) throws IOException {
if (closed) return;
lock.writeLock().lock();
try {
Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f);
blocks2Info.putAll(blocksInfosToAdd);
flush();
} finally {
lock.writeLock().unlock();
}
}
private void flush() throws IOException {
if (closed) return;
int blocks2InfoSize = blocks2Info.size();
if (blocks2InfoSize >= FLUSH_CACHE_SIZE) {
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
@SuppressWarnings("unchecked")
ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize - GOOD_CACHE_SIZE], blocks2InfoSize - GOOD_CACHE_SIZE);
for (int i = 0; i < blocks2InfoSize - GOOD_CACHE_SIZE; i++) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
long blockId = entry.getLongKey();
long blockPosition = blockInfo.getIndex();
int blockSize = blockInfo.getSize();
entriesIterator.remove();
tasks.set(i, () -> {
try {
flusher.flush(blockId, blockPosition, blockSize);
} catch (IOException e) {
throw new CompletionException(e);
}
return null;
});
}
try {
flushExecutorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new IOException(e.getCause());
}
}
}
public void close() throws IOException {
if (!closed) {
closed = true;
lock.writeLock().lock();
try {
int blocks2InfoSize = blocks2Info.size();
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
@SuppressWarnings("unchecked")
ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize], blocks2InfoSize);
for (int i = 0; i < blocks2InfoSize; i++) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
long blockId = entry.getLongKey();
long blockPosition = blockInfo.getIndex();
int blockSize = blockInfo.getSize();
entriesIterator.remove();
tasks.set(i, () -> {
try {
flusher.flush(blockId, blockPosition, blockSize);
} catch (IOException e) {
throw new CompletionException(e);
}
return null;
});
}
try {
flushExecutorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new IOException(e.getCause());
}
flushExecutorService.shutdown();
try {
if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS))
flushExecutorService.shutdownNow();
} catch (InterruptedException e) {
throw new IOException(e);
}
} finally {
lock.writeLock().unlock();
}
}
}
}