package it.cavallium.strangedb.database.references; import it.cavallium.strangedb.VariableWrapper; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectIterator; import it.unimi.dsi.fastutil.objects.ObjectIterators; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO; import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ; public class DatabaseReferencesMetadataCache { private static final int BASE_QUANTITY = (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500); private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY; private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY; private final Long2LongMap references2Blocks = Long2LongMaps.synchronize(new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f)); private final Long2ByteMap referencesCleaners = Long2ByteMaps.synchronize(new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f)); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final DatabaseReferencesMetadataCacheFlusher flusher; private volatile boolean closed; ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "References Flush Thread")); public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { this.flusher = flusher; } public long getBlock(long reference) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.readLock().lock(); try { return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); } finally { lock.readLock().unlock(); } } public ReferenceInfo get(long reference) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.readLock().lock(); try { long blockId = references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); if (blockId == ERROR_BLOCK_ID) { return NONEXISTENT_REFERENCE_INFO; } byte cleanerId = referencesCleaners.get(reference); return new ReferenceInfo(cleanerId, blockId); } finally { lock.readLock().unlock(); } } public void put(long reference, byte cleanerId, long blockId) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.writeLock().lock(); try { if (cleanerId == 0) { throw new IOException("Null cleaner id"); } references2Blocks.put(reference, blockId); referencesCleaners.put(reference, cleanerId); flush(); } finally { lock.writeLock().unlock(); } } public void putAll(long[] references, byte[] cleanerIds, long[] blockIds) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.writeLock().lock(); try { Long2LongMap referencesBlocksToAdd = new Long2LongLinkedOpenHashMap(references, blockIds, 0.5f); Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f); for (int i = 0; i < references.length; i++) { if (cleanerIds[i] == 0) { referencesBlocksToAdd.remove(references[i]); referencesCleanersToAdd.remove(references[i]); } } references2Blocks.putAll(referencesBlocksToAdd); referencesCleaners.putAll(referencesCleanersToAdd); flush(); } finally { lock.writeLock().unlock(); } } private void flush() throws IOException { if (closed) return; int references2BlocksSize = references2Blocks.size(); if (references2BlocksSize >= FLUSH_CACHE_SIZE) { ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); @SuppressWarnings("unchecked") ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize - GOOD_CACHE_SIZE], references2BlocksSize - GOOD_CACHE_SIZE); for (int i = 0; i < references2BlocksSize - GOOD_CACHE_SIZE; i++) { Long2LongMap.Entry entry = entriesIterator.next(); Long2ByteMap.Entry cleaner = cleanersIterator.next(); long refId = entry.getLongKey(); byte cleanerId = cleaner.getByteValue(); long blockId = entry.getLongValue(); entriesIterator.remove(); cleanersIterator.remove(); tasks.set(i, () -> { try { flusher.flush(refId, cleanerId, blockId); } catch (IOException e) { throw new CompletionException(e); } return null; }); } try { flushExecutorService.invokeAll(tasks); } catch (InterruptedException e) { throw new IOException(e.getCause()); } } } public void close() throws IOException { if (!closed) { closed = true; lock.writeLock().lock(); try { int references2BlocksSize = references2Blocks.size(); ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); @SuppressWarnings("unchecked") ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize], references2BlocksSize); for (int i = 0; i < references2BlocksSize; i++) { Long2LongMap.Entry entry = entriesIterator.next(); Long2ByteMap.Entry cleaner = cleanersIterator.next(); long refId = entry.getLongKey(); byte cleanerId = cleaner.getByteValue(); long blockId = entry.getLongValue(); entriesIterator.remove(); cleanersIterator.remove(); tasks.set(i, () -> { try { flusher.flush(refId, cleanerId, blockId); } catch (IOException e) { throw new CompletionException(e); } return null; }); } try { flushExecutorService.invokeAll(tasks); } catch (InterruptedException e) { throw new IOException(e.getCause()); } flushExecutorService.shutdown(); try { if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) flushExecutorService.shutdownNow(); } catch (InterruptedException e) { throw new IOException(e); } } finally { lock.writeLock().unlock(); } } } }