package it.cavallium.strangedb.database.references; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; public class DatabaseReferencesMetadataCache { private static final int GOOD_CACHE_SIZE = 70000; private static final int MAX_CACHE_SIZE = 100000; private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE); private final Object readAccessLock = new Object(); private final Object writeAccessLock = new Object(); private final DatabaseReferencesMetadataCacheFlusher flusher; private volatile boolean closed; public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { this.flusher = flusher; } public long get(long reference) throws IOException { synchronized (readAccessLock) { if (closed) throw new IOException("Cache already closed!"); return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); } } public void put(long reference, long blockId) throws IOException { synchronized (readAccessLock) { synchronized (writeAccessLock) { if (closed) return; references2Blocks.put(reference, blockId); } } this.flush(); } private void flush() throws IOException { synchronized (readAccessLock) { synchronized (writeAccessLock) { if (closed) return; int references2BlocksSize = references2Blocks.size(); if (references2BlocksSize > MAX_CACHE_SIZE) { synchronized (writeAccessLock) { ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); List> entriesToFlush = new LinkedList<>(); while (references2BlocksSize > GOOD_CACHE_SIZE) { Long2LongMap.Entry entry = entriesIterator.next(); entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); entriesIterator.remove(); if (entriesToFlush.size() >= 1000) { executeAsyncFlush(entriesToFlush); } references2BlocksSize--; } executeAsyncFlush(entriesToFlush); } } } } } public void close() throws IOException { synchronized (readAccessLock) { synchronized (writeAccessLock) { if (!closed) { closed = true; int references2BlocksSize = references2Blocks.size(); ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); List> entriesToFlush = new LinkedList<>(); while (references2BlocksSize > 0) { Long2LongMap.Entry entry = entriesIterator.next(); entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); entriesIterator.remove(); if (entriesToFlush.size() >= 1000) { executeAsyncFlush(entriesToFlush); } references2BlocksSize--; } executeAsyncFlush(entriesToFlush); } } } } private void executeAsyncFlush(List> entriesToFlush) throws IOException { synchronized (readAccessLock) { synchronized (writeAccessLock) { try { for (Future entryToFlush : entriesToFlush) { entryToFlush.get(); } } catch (InterruptedException e) { throw new IOException(e); } catch (ExecutionException e) { throw new IOException(e.getCause()); } finally { entriesToFlush.clear(); } } } } }