diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java index f20537f..2a37500 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java @@ -13,6 +13,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; +import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; public class DatabaseBlocksIO implements IBlocksIO { @@ -29,6 +30,12 @@ public class DatabaseBlocksIO implements IBlocksIO { if (size == 0) { return EMPTY_BLOCK_ID; } + if (size < 0) { + throw new IOException("Trying to create a block with size " + size); + } + if (data.limit() < size) { + throw new IOException("Trying to create a block with size " + size + " but with a buffer of size " + data.limit()); + } long index = fileIO.writeAtEnd(size, data); return blocksMetadata.newBlock(index, size); } @@ -38,6 +45,12 @@ public class DatabaseBlocksIO implements IBlocksIO { if (blockId == EMPTY_BLOCK_ID) { return ByteBuffer.wrap(new byte[0]); } + if (blockId == ERROR_BLOCK_ID) { + throw new IOException("Errored block id"); + } + if (blockId < 0) { + throw new IOException("Block id " + blockId + " is not valid"); + } BlockInfo blockInfo = blocksMetadata.getBlockInfo(blockId); return fileIO.readAt(blockInfo.getIndex(), blockInfo.getSize()); } diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java index 636ec90..038190e 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java @@ -4,13 +4,12 @@ package it.cavallium.strangedb.database.blocks; import it.cavallium.strangedb.database.IBlocksMetadata; import java.io.IOException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE; @@ -21,11 +20,11 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { private final AsynchronousFileChannel metaFileChannel; private final DatabaseBlocksMetadataCache cache; - private long firstFreeBlock; + private AtomicLong firstFreeBlock; public DatabaseBlocksMetadata(Path metaFile) throws IOException { metaFileChannel = AsynchronousFileChannel.open(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); - firstFreeBlock = metaFileChannel.size() / BLOCK_META_BYTES_COUNT; + firstFreeBlock = new AtomicLong(metaFileChannel.size() / BLOCK_META_BYTES_COUNT); this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk); } @@ -34,14 +33,18 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { if (blockId == EMPTY_BLOCK_ID) { return EMPTY_BLOCK_INFO; } + if (blockId == ERROR_BLOCK_ID) { + throw new IOException("Errored block id"); + } BlockInfo blockInfo; if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { return blockInfo; } long position = blockId * BLOCK_META_BYTES_COUNT; int size = BLOCK_META_READS_AT_EVERY_READ * BLOCK_META_BYTES_COUNT; - if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= firstFreeBlock) { - size = (int) ((firstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT); + long currentFirstFreeBlock = this.firstFreeBlock.get(); + if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= currentFirstFreeBlock) { + size = (int) ((currentFirstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT); } int blocksCount = size / BLOCK_META_BYTES_COUNT; @@ -68,6 +71,7 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { long[] allBlockIds = new long[blocksCount]; BlockInfo[] allBlockInfo = new BlockInfo[blocksCount]; + blockInfo = EMPTY_BLOCK_INFO; for (int delta = 0; delta < blocksCount; delta++) { long blockToLoad = blockId + delta; long blockIndex = buffer.getLong(); @@ -88,7 +92,7 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { if (size == 0) { return EMPTY_BLOCK_ID; } - long newBlockId = firstFreeBlock++; + long newBlockId = firstFreeBlock.getAndIncrement(); BlockInfo blockInfo = new BlockInfo(index, size); cache.put(newBlockId, blockInfo); return newBlockId; @@ -100,16 +104,22 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { metaFileChannel.close(); } - private Future writeBlockToDisk(long blockId, long index, int size) { + private void writeBlockToDisk(long blockId, long index, int size) throws IOException { ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); data.putLong(index); data.putInt(size); data.flip(); - return metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT); + try { + metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } } @Override public long getTotalBlocksCount() { - return firstFreeBlock; + return firstFreeBlock.get(); } } diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java index 2dc4aa3..3a176b4 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java @@ -1,10 +1,12 @@ package it.cavallium.strangedb.database.blocks; import it.unimi.dsi.fastutil.longs.*; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; @@ -19,13 +21,12 @@ public class DatabaseBlocksMetadataCache { private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ; - private final Long2ObjectMap blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f); + private final Long2ObjectMap blocks2Info = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f)); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final DatabaseBlocksMetadataCacheFlusher flusher; private volatile boolean closed; - private ExecutorService flushService = Executors.newSingleThreadExecutor(); - private volatile boolean flushing; + ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "Blocks Flush Thread")); public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { this.flusher = flusher; @@ -41,127 +42,102 @@ public class DatabaseBlocksMetadataCache { } } - public void put(long block, BlockInfo blockInfo) { + public void put(long block, BlockInfo blockInfo) throws IOException { if (closed) return; lock.writeLock().lock(); try { blocks2Info.put(block, blockInfo); + flush(); } finally { lock.writeLock().unlock(); - flushAsync(); } } @SuppressWarnings("unchecked") - public void putAll(long[] blocks, BlockInfo[] blockInfos) { + public void putAll(long[] blocks, BlockInfo[] blockInfos) throws IOException { if (closed) return; lock.writeLock().lock(); try { Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f); blocks2Info.putAll(blocksInfosToAdd); + flush(); } finally { lock.writeLock().unlock(); - flushAsync(); - } - } - - private void flushAsync() { - if (blocks2Info.size() >= FLUSH_CACHE_SIZE && !flushing) { - flushing = true; - flushService.execute(() -> { - try { - flush(); - } catch (IOException e) { - throw new RejectedExecutionException(e); - } - }); } } private void flush() throws IOException { - if (closed) { - flushing = false; - return; - } + if (closed) return; int blocks2InfoSize = blocks2Info.size(); if (blocks2InfoSize >= FLUSH_CACHE_SIZE) { - lock.writeLock().lock(); - try { - ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + @SuppressWarnings("unchecked") + ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize - GOOD_CACHE_SIZE], blocks2InfoSize - GOOD_CACHE_SIZE); + for (int i = 0; i < blocks2InfoSize - GOOD_CACHE_SIZE; i++) { + Long2ObjectMap.Entry entry = entriesIterator.next(); + BlockInfo blockInfo = entry.getValue(); + long blockId = entry.getLongKey(); + long blockPosition = blockInfo.getIndex(); + int blockSize = blockInfo.getSize(); + entriesIterator.remove(); - List> entriesToFlush = new ArrayList<>(); - while (blocks2InfoSize >= GOOD_CACHE_SIZE) { - Long2ObjectMap.Entry entry = entriesIterator.next(); - BlockInfo blockInfo = entry.getValue(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); - entriesIterator.remove(); - - if (entriesToFlush.size() >= 1000) { - awaitFlushWriteEnd(entriesToFlush); + tasks.set(i, () -> { + try { + flusher.flush(blockId, blockPosition, blockSize); + } catch (IOException e) { + throw new CompletionException(e); } - blocks2InfoSize--; - } - awaitFlushWriteEnd(entriesToFlush); - } finally { - flushing = false; - lock.writeLock().unlock(); + return null; + }); + } + try { + flushExecutorService.invokeAll(tasks); + } catch (InterruptedException e) { + throw new IOException(e.getCause()); } - } else { - flushing = false; } } public void close() throws IOException { if (!closed) { closed = true; - stopFlushService(); lock.writeLock().lock(); try { int blocks2InfoSize = blocks2Info.size(); ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); - - List> entriesToFlush = new LinkedList<>(); - while (blocks2InfoSize > 0) { + @SuppressWarnings("unchecked") + ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize], blocks2InfoSize); + for (int i = 0; i < blocks2InfoSize; i++) { Long2ObjectMap.Entry entry = entriesIterator.next(); BlockInfo blockInfo = entry.getValue(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + long blockId = entry.getLongKey(); + long blockPosition = blockInfo.getIndex(); + int blockSize = blockInfo.getSize(); entriesIterator.remove(); - - if (entriesToFlush.size() >= 1000) { - awaitFlushWriteEnd(entriesToFlush); - } - blocks2InfoSize--; + tasks.set(i, () -> { + try { + flusher.flush(blockId, blockPosition, blockSize); + } catch (IOException e) { + throw new CompletionException(e); + } + return null; + }); + } + try { + flushExecutorService.invokeAll(tasks); + } catch (InterruptedException e) { + throw new IOException(e.getCause()); + } + flushExecutorService.shutdown(); + try { + if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) + flushExecutorService.shutdownNow(); + } catch (InterruptedException e) { + throw new IOException(e); } - awaitFlushWriteEnd(entriesToFlush); } finally { lock.writeLock().unlock(); } } } - - private void stopFlushService() { - flushService.shutdown(); - try { flushService.awaitTermination(180, TimeUnit.SECONDS); } catch (InterruptedException e) {} - if (!flushService.isTerminated()) { - flushService.shutdownNow(); - } - } - - private void awaitFlushWriteEnd(List> entriesToFlush) throws IOException { - try { - entriesToFlush.parallelStream().forEach((entry) -> { - try { - entry.get(); - } catch (InterruptedException e) { - throw new CompletionException(e); - } catch (ExecutionException e) { - throw new CompletionException(e.getCause()); - } - }); - } catch (CompletionException e) { - throw new IOException(e.getCause()); - } finally { - entriesToFlush.clear(); - } - } } diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java index 95865fb..d6d50dd 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java @@ -4,5 +4,5 @@ import java.io.IOException; import java.util.concurrent.Future; public interface DatabaseBlocksMetadataCacheFlusher { - Future flush(long key, long value1, int value2) throws IOException; + void flush(long key, long value1, int value2) throws IOException; } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java index d54579b..ef528d1 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java @@ -5,7 +5,6 @@ import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; @@ -45,12 +44,13 @@ public class DatabaseReferencesIO implements IReferencesIO { @Override public ByteBuffer readFromReference(long reference) throws IOException { + long blockId; lock.readLock().lock(); try { - long blockId = referencesMetadata.getReferenceBlockId(reference); - return blocksIO.readBlock(blockId); + blockId = referencesMetadata.getReferenceBlockId(reference); } finally { lock.readLock().unlock(); } + return blocksIO.readBlock(blockId); } } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java index b8b17be..b12cec1 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java @@ -7,11 +7,10 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; @@ -27,14 +26,14 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { private final AsynchronousFileChannel metaFileChannel; private final DatabaseReferencesMetadataCache cache; - private long firstFreeReference; - private long lastWrittenReference; + private AtomicLong firstFreeReference; + private AtomicLong lastWrittenReference; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); - firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT; - lastWrittenReference = firstFreeReference - 1; + firstFreeReference = new AtomicLong(metaFileChannel.size() / REF_META_BYTES_COUNT); + lastWrittenReference = new AtomicLong(firstFreeReference.get() - 1); this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); } @@ -49,6 +48,7 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { } private long getReferenceBlockId_(long reference) throws IOException { + long firstFreeReference = this.firstFreeReference.get(); if (reference >= firstFreeReference) { return EMPTY_BLOCK_ID; } @@ -108,18 +108,13 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { } } } - - for (int delta = 0; delta < referencesCount; delta++) { - if (allCleaners[delta] == 0) { - System.out.println("ro"); - } - } cache.putAll(allReferences, allCleaners, allBlocks); return block; } /** * This method is SLOW! Use this only for the cleaner + * * @param reference reference * @return * @throws IOException @@ -139,13 +134,14 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { @Override public long newReference(long blockId) throws IOException { lock.writeLock().lock(); + long newReference; try { - long newReference = firstFreeReference++; - cache.put(newReference, BLANK_DATA_CLEANER, blockId); - return newReference; + newReference = firstFreeReference.getAndIncrement(); } finally { lock.writeLock().unlock(); } + cache.put(newReference, BLANK_DATA_CLEANER, blockId); + return newReference; } @Override @@ -181,30 +177,55 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { @Override public long getFirstFreeReference() { - synchronized (lock.readLock()) { - return firstFreeReference; + lock.readLock().lock(); + try { + return firstFreeReference.get(); + } finally { + lock.readLock().unlock(); } } - private Future writeReferenceToDisk(long reference, byte cleanerId, long blockId) { + private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException { if (cleanerId == ERRORED_CLEANER) { - return CompletableFuture.failedFuture(new IOException("Passing a cleaner with the id of ERRORED_CLIENT")); + throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT"); } ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); data.putLong(blockId); data.putInt(cleanerId & 0xFF); data.flip(); - while (lastWrittenReference < reference - 1) { - ByteBuffer emptyData = ByteBuffer.allocate(REF_META_BYTES_COUNT); - emptyData.putLong(ERROR_BLOCK_ID); - emptyData.putInt(ERRORED_CLEANER & 0xFF); - emptyData.flip(); - metaFileChannel.write(emptyData, ++lastWrittenReference * REF_META_BYTES_COUNT); + try { + metaFileChannel.write(data, reference * REF_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); } - if (reference > lastWrittenReference) { - lastWrittenReference = reference; - } - return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT); } +/* + private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException { + if (cleanerId == ERRORED_CLEANER) { + throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT"); + } + long firstReferenceToWrite = 1 + lastWrittenReference.getAndUpdate((lastWrittenReferenceVal) -> reference > lastWrittenReferenceVal ? reference : lastWrittenReferenceVal); + if (firstReferenceToWrite > reference) { + firstReferenceToWrite = reference; + } + ByteBuffer data = ByteBuffer.allocate((int) ((reference + 1 - firstReferenceToWrite) * REF_META_BYTES_COUNT)); + for (long i = firstReferenceToWrite; i < reference - 1; i++) { + data.putLong(ERROR_BLOCK_ID); + data.putInt(ERRORED_CLEANER & 0xFF); + } + data.putLong(blockId); + data.putInt(cleanerId & 0xFF); + data.flip(); + try { + metaFileChannel.write(data, firstReferenceToWrite * REF_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } +*/ } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java index 19b2f3f..9785d2e 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java @@ -1,10 +1,14 @@ package it.cavallium.strangedb.database.references; +import it.cavallium.strangedb.VariableWrapper; import it.unimi.dsi.fastutil.longs.*; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectIterator; +import it.unimi.dsi.fastutil.objects.ObjectIterators; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; @@ -12,7 +16,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; -import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.*; +import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO; +import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ; public class DatabaseReferencesMetadataCache { @@ -21,13 +26,12 @@ public class DatabaseReferencesMetadataCache { private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY; - private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f); - private final Long2ByteMap referencesCleaners = new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f); + private final Long2LongMap references2Blocks = Long2LongMaps.synchronize(new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f)); + private final Long2ByteMap referencesCleaners = Long2ByteMaps.synchronize(new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f)); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final DatabaseReferencesMetadataCacheFlusher flusher; private volatile boolean closed; - private ExecutorService flushService = Executors.newSingleThreadExecutor(); - private volatile boolean flushing; + ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "References Flush Thread")); public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { this.flusher = flusher; @@ -67,9 +71,9 @@ public class DatabaseReferencesMetadataCache { } references2Blocks.put(reference, blockId); referencesCleaners.put(reference, cleanerId); + flush(); } finally { lock.writeLock().unlock(); - flushAsync(); } } @@ -86,110 +90,88 @@ public class DatabaseReferencesMetadataCache { Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f); references2Blocks.putAll(referencesBlocksToAdd); referencesCleaners.putAll(referencesCleanersToAdd); + flush(); } finally { lock.writeLock().unlock(); - flushAsync(); - } - } - - private void flushAsync() { - if (references2Blocks.size() >= FLUSH_CACHE_SIZE && !flushing) { - flushing = true; - flushService.execute(() -> { - try { - flush(); - } catch (IOException e) { - throw new RejectedExecutionException(e); - } - }); } } private void flush() throws IOException { - if (closed) { - flushing = false; - return; - } + if (closed) return; int references2BlocksSize = references2Blocks.size(); if (references2BlocksSize >= FLUSH_CACHE_SIZE) { - lock.writeLock().lock(); - try { - ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); - ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); - List> entriesToFlush = new ArrayList<>(); - while (references2BlocksSize >= GOOD_CACHE_SIZE) { - Long2LongMap.Entry entry = entriesIterator.next(); - Long2ByteMap.Entry cleaner = cleanersIterator.next(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue())); - entriesIterator.remove(); - cleanersIterator.remove(); - if (entriesToFlush.size() >= 1000) { - awaitFlushWriteEnd(entriesToFlush); + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); + @SuppressWarnings("unchecked") + ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize - GOOD_CACHE_SIZE], references2BlocksSize - GOOD_CACHE_SIZE); + for (int i = 0; i < references2BlocksSize - GOOD_CACHE_SIZE; i++) { + Long2LongMap.Entry entry = entriesIterator.next(); + Long2ByteMap.Entry cleaner = cleanersIterator.next(); + long refId = entry.getLongKey(); + byte cleanerId = cleaner.getByteValue(); + long blockId = entry.getLongValue(); + entriesIterator.remove(); + cleanersIterator.remove(); + + tasks.set(i, () -> { + try { + flusher.flush(refId, cleanerId, blockId); + } catch (IOException e) { + throw new CompletionException(e); } - references2BlocksSize--; - } - awaitFlushWriteEnd(entriesToFlush); - } finally { - flushing = false; - lock.writeLock().unlock(); + return null; + }); + } + try { + flushExecutorService.invokeAll(tasks); + } catch (InterruptedException e) { + throw new IOException(e.getCause()); } - } else { - flushing = false; } } public void close() throws IOException { if (!closed) { closed = true; - stopFlushService(); lock.writeLock().lock(); try { int references2BlocksSize = references2Blocks.size(); ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); - List> entriesToFlush = new LinkedList<>(); - while (references2BlocksSize > 0) { + @SuppressWarnings("unchecked") + ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize], references2BlocksSize); + for (int i = 0; i < references2BlocksSize; i++) { Long2LongMap.Entry entry = entriesIterator.next(); Long2ByteMap.Entry cleaner = cleanersIterator.next(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue())); + long refId = entry.getLongKey(); + byte cleanerId = cleaner.getByteValue(); + long blockId = entry.getLongValue(); entriesIterator.remove(); cleanersIterator.remove(); - - if (entriesToFlush.size() >= 1000) { - awaitFlushWriteEnd(entriesToFlush); - } - references2BlocksSize--; + tasks.set(i, () -> { + try { + flusher.flush(refId, cleanerId, blockId); + } catch (IOException e) { + throw new CompletionException(e); + } + return null; + }); + } + try { + flushExecutorService.invokeAll(tasks); + } catch (InterruptedException e) { + throw new IOException(e.getCause()); + } + flushExecutorService.shutdown(); + try { + if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) + flushExecutorService.shutdownNow(); + } catch (InterruptedException e) { + throw new IOException(e); } - awaitFlushWriteEnd(entriesToFlush); } finally { lock.writeLock().unlock(); } } } - - private void stopFlushService() { - flushService.shutdown(); - try { flushService.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) {} - if (!flushService.isTerminated()) { - flushService.shutdownNow(); - } - } - - private void awaitFlushWriteEnd(List> entriesToFlush) throws IOException { - try { - entriesToFlush.parallelStream().forEach((entry) -> { - try { - entry.get(); - } catch (InterruptedException e) { - throw new CompletionException(e); - } catch (ExecutionException e) { - throw new CompletionException(e.getCause()); - } - }); - } catch (CompletionException e) { - throw new IOException(e.getCause()); - } finally { - entriesToFlush.clear(); - } - } } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java index 05d016a..3b9e2dd 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java @@ -4,5 +4,5 @@ import java.io.IOException; import java.util.concurrent.Future; public interface DatabaseReferencesMetadataCacheFlusher { - Future flush(long reference, byte cleanerId, long block) throws IOException; + void flush(long reference, byte cleanerId, long block) throws IOException; }